Skip to content

Commit

Permalink
[#109] Fix migration test
Browse files Browse the repository at this point in the history
  • Loading branch information
javrasya committed Nov 17, 2019
1 parent 67e07b3 commit 3ee2951
Showing 1 changed file with 17 additions and 20 deletions.
37 changes: 17 additions & 20 deletions river/tests/tmigrations/test__migrations.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import os
import sqlite3
import sys
from unittest import skipUnless

from django.contrib.contenttypes.models import ContentType
from django.db import connection, transaction
from django.db import connection
from django.test.utils import override_settings
from hamcrest import assert_that, equal_to, has_length

Expand Down Expand Up @@ -88,12 +86,9 @@ def test__shouldNotKeepRecreatingMigrationsWhenNoChange(self):
assert_that(self.migrations_after, has_length(len(self.migrations_before)))

@override_settings(MIGRATION_MODULES={"tests": "river.tests.volatile.river_tests"})
@skipUnless(sqlite3.sqlite_version <= "3.24.0", "This test is not able to run with newer version of sqlite")
def test__shouldMigrateTransitionApprovalStatusToStringInDB(self):
out = StringIO()
sys.stout = out
cur = connection.cursor()

state1 = StateObjectFactory(label="state1")
state2 = StateObjectFactory(label="state2")
workflow = WorkflowFactory(initial_state=state1, content_type=ContentType.objects.get_for_model(BasicTestModel), field_name="my_field")
Expand All @@ -105,24 +100,26 @@ def test__shouldMigrateTransitionApprovalStatusToStringInDB(self):
)
workflow_object = BasicTestModelObjectFactory()

result = cur.execute("select status from river_transitionapproval where object_id=%s;" % workflow_object.model.pk).fetchall()
assert_that(result[0][0], equal_to("pending"))
with connection.cursor() as cur:
result = cur.execute("select status from river_transitionapproval where object_id=%s;" % workflow_object.model.pk).fetchall()
assert_that(result[0][0], equal_to("pending"))

call_command('migrate', 'river', '0004', stdout=out)
schema = cur.execute("PRAGMA table_info('river_transitionapproval');").fetchall()
self.migrations_after = list(filter(lambda f: f.endswith('.py') and f != '__init__.py', os.listdir('river/tests/volatile/river/')))
status_col_type = list(filter(lambda column: column[1] == 'status', schema))[0][2]
assert_that(status_col_type, equal_to("integer"))

result = cur.execute("select status from river_transitionapproval where object_id=%s;" % workflow_object.model.pk).fetchall()
assert_that(result[0][0], equal_to(0))
with connection.cursor() as cur:
schema = cur.execute("PRAGMA table_info('river_transitionapproval');").fetchall()
status_col_type = list(filter(lambda column: column[1] == 'status', schema))[0][2]
assert_that(status_col_type, equal_to("integer"))

result = cur.execute("select status from river_transitionapproval where object_id=%s;" % workflow_object.model.pk).fetchall()
assert_that(result[0][0], equal_to(0))

call_command('migrate', 'river', stdout=out)
schema = cur.execute("PRAGMA table_info('river_transitionapproval');").fetchall()

self.migrations_after = list(filter(lambda f: f.endswith('.py') and f != '__init__.py', os.listdir('river/tests/volatile/river/')))
status_col_type = list(filter(lambda column: column[1] == 'status', schema))[0][2]
assert_that(status_col_type, equal_to("varchar(100)"))
with connection.cursor() as cur:
schema = cur.execute("PRAGMA table_info('river_transitionapproval');").fetchall()
status_col_type = list(filter(lambda column: column[1] == 'status', schema))[0][2]
assert_that(status_col_type, equal_to("varchar(100)"))

result = cur.execute("select status from river_transitionapproval where object_id=%s;" % workflow_object.model.pk).fetchall()
assert_that(result[0][0], equal_to("pending"))
result = cur.execute("select status from river_transitionapproval where object_id=%s;" % workflow_object.model.pk).fetchall()
assert_that(result[0][0], equal_to("pending"))

0 comments on commit 3ee2951

Please sign in to comment.