Skip to content

Commit

Permalink
Support schemas for MSSQL drop server default + FK constraint
Browse files Browse the repository at this point in the history
Fixed bug in MSSQL dialect where the drop constraint execution steps used
to remove server default or implicit foreign key constraint failed to take
into account the schema name of the target table.

Change-Id: Ia95b043ee6289efbb90d6f21392f4ce622748611
Fixes: #621
  • Loading branch information
zzzeek committed Nov 12, 2019
1 parent ca77102 commit 6e30795
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 19 deletions.
48 changes: 31 additions & 17 deletions alembic/ddl/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ def alter_column(
if existing_server_default is not False or server_default is None:
self._exec(
_ExecDropConstraint(
table_name, column_name, "sys.default_constraints"
table_name,
column_name,
"sys.default_constraints",
schema,
)
)
if server_default is not None:
Expand Down Expand Up @@ -129,72 +132,83 @@ def bulk_insert(self, table, rows, **kw):
else:
super(MSSQLImpl, self).bulk_insert(table, rows, **kw)

def drop_column(self, table_name, column, **kw):
def drop_column(self, table_name, column, schema=None, **kw):
drop_default = kw.pop("mssql_drop_default", False)
if drop_default:
self._exec(
_ExecDropConstraint(
table_name, column, "sys.default_constraints"
table_name, column, "sys.default_constraints", schema
)
)
drop_check = kw.pop("mssql_drop_check", False)
if drop_check:
self._exec(
_ExecDropConstraint(
table_name, column, "sys.check_constraints"
table_name, column, "sys.check_constraints", schema
)
)
drop_fks = kw.pop("mssql_drop_foreign_key", False)
if drop_fks:
self._exec(_ExecDropFKConstraint(table_name, column))
super(MSSQLImpl, self).drop_column(table_name, column, **kw)
self._exec(_ExecDropFKConstraint(table_name, column, schema))
super(MSSQLImpl, self).drop_column(
table_name, column, schema=schema, **kw
)


class _ExecDropConstraint(Executable, ClauseElement):
def __init__(self, tname, colname, type_):
def __init__(self, tname, colname, type_, schema):
self.tname = tname
self.colname = colname
self.type_ = type_
self.schema = schema


class _ExecDropFKConstraint(Executable, ClauseElement):
def __init__(self, tname, colname):
def __init__(self, tname, colname, schema):
self.tname = tname
self.colname = colname
self.schema = schema


@compiles(_ExecDropConstraint, "mssql")
def _exec_drop_col_constraint(element, compiler, **kw):
tname, colname, type_ = element.tname, element.colname, element.type_
schema, tname, colname, type_ = (
element.schema,
element.tname,
element.colname,
element.type_,
)
# from http://www.mssqltips.com/sqlservertip/1425/\
# working-with-default-constraints-in-sql-server/
# TODO: needs table formatting, etc.
return """declare @const_name varchar(256)
select @const_name = [name] from %(type)s
where parent_object_id = object_id('%(tname)s')
where parent_object_id = object_id('%(schema_dot)s%(tname)s')
and col_name(parent_object_id, parent_column_id) = '%(colname)s'
exec('alter table %(tname_quoted)s drop constraint ' + @const_name)""" % {
"type": type_,
"tname": tname,
"colname": colname,
"tname_quoted": format_table_name(compiler, tname, None),
"tname_quoted": format_table_name(compiler, tname, schema),
"schema_dot": schema + "." if schema else "",
}


@compiles(_ExecDropFKConstraint, "mssql")
def _exec_drop_col_fk_constraint(element, compiler, **kw):
tname, colname = element.tname, element.colname
schema, tname, colname = element.schema, element.tname, element.colname

return """declare @const_name varchar(256)
select @const_name = [name] from
sys.foreign_keys fk join sys.foreign_key_columns fkc
on fk.object_id=fkc.constraint_object_id
where fkc.parent_object_id = object_id('%(tname)s')
and col_name(fkc.parent_object_id, fkc.parent_column_id) = '%(colname)s'
sys.foreign_keys fk join sys.foreign_key_columns fkc
on fk.object_id=fkc.constraint_object_id
where fkc.parent_object_id = object_id('%(schema_dot)s%(tname)s')
`and col_name(fkc.parent_object_id, fkc.parent_column_id) = '%(colname)s'
exec('alter table %(tname_quoted)s drop constraint ' + @const_name)""" % {
"tname": tname,
"colname": colname,
"tname_quoted": format_table_name(compiler, tname, None),
"tname_quoted": format_table_name(compiler, tname, schema),
"schema_dot": schema + "." if schema else "",
}


Expand Down
7 changes: 5 additions & 2 deletions alembic/testing/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,20 @@ def flush(self):
buf = buffer_()

class ctx(MigrationContext):
def get_buf(self):
return buf

def clear_assertions(self):
buf.lines[:] = []

def assert_(self, *sql):
# TODO: make this more flexible about
# whitespace and such
eq_(buf.lines, list(sql))
eq_(buf.lines, [re.sub(r"[\n\t]", "", s) for s in sql])

def assert_contains(self, sql):
for stmt in buf.lines:
if sql in stmt:
if re.sub(r"[\n\t]", "", sql) in stmt:
return
else:
assert False, "Could not locate fragment %r in %r" % (
Expand Down
8 changes: 8 additions & 0 deletions docs/build/unreleased/621.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
.. change::
:tags: bug, mssql
:tickets: 621

Fixed bug in MSSQL dialect where the drop constraint execution steps used
to remove server default or implicit foreign key constraint failed to take
into account the schema name of the target table.

45 changes: 45 additions & 0 deletions tests/test_mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,29 @@ def test_drop_column_w_default_in_batch(self):
def test_alter_column_drop_default(self):
context = op_fixture("mssql")
op.alter_column("t", "c", server_default=None)
context.assert_contains(
"declare @const_name varchar(256)select @const_name = [name] "
"from sys.default_constraintswhere parent_object_id = "
"object_id('t')and col_name(parent_object_id, "
"parent_column_id) = 'c'"
)
context.assert_contains(
"exec('alter table t drop constraint ' + @const_name)"
)

def test_alter_column_drop_default_w_schema(self):
context = op_fixture("mssql")
op.alter_column("t", "c", server_default=None, schema="xyz")
context.assert_contains(
"declare @const_name varchar(256)select @const_name = [name] "
"from sys.default_constraintswhere parent_object_id = "
"object_id('xyz.t')and col_name(parent_object_id, "
"parent_column_id) = 'c'"
)
context.assert_contains(
"exec('alter table xyz.t drop constraint ' + @const_name)"
)

def test_alter_column_dont_drop_default(self):
context = op_fixture("mssql")
op.alter_column("t", "c", server_default=False)
Expand Down Expand Up @@ -165,11 +184,37 @@ def test_alter_column_nullable_w_existing_type(self):
def test_drop_column_w_fk(self):
context = op_fixture("mssql")
op.drop_column("t1", "c1", mssql_drop_foreign_key=True)
context.assert_contains(
"declare @const_name varchar(256)\n"
"select @const_name = [name] from\n"
"sys.foreign_keys fk join sys.foreign_key_columns fkcon "
"fk.object_id=fkc.constraint_object_id\n"
"where fkc.parent_object_id = object_id('t1')`and "
"col_name(fkc.parent_object_id, fkc.parent_column_id) = 'c1'\n"
"exec('alter table t1 drop constraint ' + @const_name)"
)
context.assert_contains(
"exec('alter table t1 drop constraint ' + @const_name)"
)
context.assert_contains("ALTER TABLE t1 DROP COLUMN c1")

def test_drop_column_w_fk_schema(self):
context = op_fixture("mssql")
op.drop_column("t1", "c1", schema="xyz", mssql_drop_foreign_key=True)
context.assert_contains(
"declare @const_name varchar(256)\n"
"select @const_name = [name] from\n"
"sys.foreign_keys fk join sys.foreign_key_columns fkcon "
"fk.object_id=fkc.constraint_object_id\n"
"where fkc.parent_object_id = object_id('xyz.t1')`and "
"col_name(fkc.parent_object_id, fkc.parent_column_id) = 'c1'\n"
"exec('alter table xyz.t1 drop constraint ' + @const_name)"
)
context.assert_contains(
"exec('alter table xyz.t1 drop constraint ' + @const_name)"
)
context.assert_contains("ALTER TABLE xyz.t1 DROP COLUMN c1")

def test_drop_column_w_fk_in_batch(self):
context = op_fixture("mssql")
with op.batch_alter_table("t1", schema=None) as batch_op:
Expand Down

0 comments on commit 6e30795

Please sign in to comment.