handle onetoone conflicts and deletes
This commit is contained in:
parent
750061ebd6
commit
bd01e38f35
@ -11,7 +11,7 @@ class OneToOneConflict(MergeException):
|
|||||||
|
|
||||||
ERROR = 0
|
ERROR = 0
|
||||||
KEEP = 1
|
KEEP = 1
|
||||||
UPDATE = 2
|
DELETE = 2
|
||||||
|
|
||||||
|
|
||||||
def merge(from_obj, to_obj, one_to_one_conflict=ERROR):
|
def merge(from_obj, to_obj, one_to_one_conflict=ERROR):
|
||||||
@ -32,8 +32,18 @@ def merge(from_obj, to_obj, one_to_one_conflict=ERROR):
|
|||||||
try:
|
try:
|
||||||
field = getattr(from_obj, accessor_name)
|
field = getattr(from_obj, accessor_name)
|
||||||
try:
|
try:
|
||||||
to_obj_field = getattr(to_obj, accessor_name)
|
to_field = getattr(to_obj, accessor_name)
|
||||||
raise OneToOneConflict("both fields have an attribute set for {}".format(accessor_name))
|
if one_to_one_conflict == KEEP:
|
||||||
|
pass # do nothing
|
||||||
|
elif one_to_one_conflict == DELETE:
|
||||||
|
# if null:
|
||||||
|
# setattr(to_field, varname, None)
|
||||||
|
# to_field.save()
|
||||||
|
to_field.delete()
|
||||||
|
setattr(field, varname, to_obj)
|
||||||
|
field.save()
|
||||||
|
else:
|
||||||
|
raise OneToOneConflict("both fields have an attribute set for {}".format(accessor_name))
|
||||||
except ObjectDoesNotExist:
|
except ObjectDoesNotExist:
|
||||||
# doesn't exist, safe to overwrite
|
# doesn't exist, safe to overwrite
|
||||||
setattr(field, varname, to_obj)
|
setattr(field, varname, to_obj)
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
from django.test import TestCase
|
from django.test import TestCase
|
||||||
from .models import Person, Number, SSN, Group
|
from .models import Person, Number, SSN, Group
|
||||||
from fkreplace import merge, OneToOneConflict
|
from fkreplace import merge, OneToOneConflict, KEEP, DELETE
|
||||||
|
|
||||||
class MergeTests(TestCase):
|
class MergeTests(TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
@ -43,14 +43,31 @@ class MergeTests(TestCase):
|
|||||||
assert Person.objects.get(pk=self.b.pk).ssn.number == '1'
|
assert Person.objects.get(pk=self.b.pk).ssn.number == '1'
|
||||||
assert SSN.objects.count() == 1
|
assert SSN.objects.count() == 1
|
||||||
|
|
||||||
# TODO: test one2one when there's a conflict
|
|
||||||
def test_one2one_conflict(self):
|
def test_one2one_conflict(self):
|
||||||
SSN.objects.create(person=self.a, number='1')
|
SSN.objects.create(person=self.a, number='1')
|
||||||
SSN.objects.create(person=self.b, number='1')
|
SSN.objects.create(person=self.b, number='2')
|
||||||
|
|
||||||
with self.assertRaises(OneToOneConflict):
|
with self.assertRaises(OneToOneConflict):
|
||||||
merge(self.a, self.b)
|
merge(self.a, self.b)
|
||||||
|
|
||||||
|
def test_one2one_conflict_keep(self):
|
||||||
|
SSN.objects.create(person=self.a, number='1')
|
||||||
|
SSN.objects.create(person=self.b, number='2')
|
||||||
|
|
||||||
|
merge(self.a, self.b, KEEP)
|
||||||
|
|
||||||
|
assert Person.objects.get(pk=self.a.pk).ssn.number == '1'
|
||||||
|
assert Person.objects.get(pk=self.b.pk).ssn.number == '2'
|
||||||
|
|
||||||
|
def test_one2one_conflict_delete(self):
|
||||||
|
SSN.objects.create(person=self.a, number='1')
|
||||||
|
SSN.objects.create(person=self.b, number='2')
|
||||||
|
|
||||||
|
merge(self.a, self.b, DELETE)
|
||||||
|
|
||||||
|
assert Person.objects.get(pk=self.b.pk).ssn.number == '1'
|
||||||
|
assert SSN.objects.count() == 1
|
||||||
|
|
||||||
def test_many2many_simple(self):
|
def test_many2many_simple(self):
|
||||||
self.g.people.add(self.a)
|
self.g.people.add(self.a)
|
||||||
merge(self.a, self.b)
|
merge(self.a, self.b)
|
||||||
|
Loading…
Reference in New Issue
Block a user