diff --git a/ee/clickhouse/models/test/test_cohort.py b/ee/clickhouse/models/test/test_cohort.py index b1c547c792c2c..cef6324e869f6 100644 --- a/ee/clickhouse/models/test/test_cohort.py +++ b/ee/clickhouse/models/test/test_cohort.py @@ -571,9 +571,12 @@ def test_cohortpeople_prop_changed(self): cohort1.calculate_people_ch(pending_version=0) with freeze_time((datetime.now() - timedelta(days=2)).strftime("%Y-%m-%d")): - p2.version = 1 - p2.properties = ({"$some_prop": "another", "$another_prop": "another"},) - p2.save() + _create_person( + uuid=p2.uuid, + team_id=self.team.pk, + version=1, + properties={"$some_prop": "another", "$another_prop": "another"}, + ) cohort1.calculate_people_ch(pending_version=1) diff --git a/posthog/management/commands/setup_test_environment.py b/posthog/management/commands/setup_test_environment.py index 2f0b73034f5e5..7a108c9bd043c 100644 --- a/posthog/management/commands/setup_test_environment.py +++ b/posthog/management/commands/setup_test_environment.py @@ -85,9 +85,19 @@ class MigrateSilentCommand(migrate.Command): def handle(self, *args, **kwargs): from django.db import connection + from posthog.models.person.person import CREATE_FUNCTION_FOR_CONSTRAINT_SQL + + create_function = ";".join( + ( + "SET check_function_bodies=off", + CREATE_FUNCTION_FOR_CONSTRAINT_SQL, + ) + ) + # :TRICKY: Create extension and function depended on by models. with connection.cursor() as cursor: cursor.execute("CREATE EXTENSION pg_trgm") + cursor.execute(create_function) return super().handle(*args, **kwargs) diff --git a/posthog/management/commands/test/test_migrate_kafka_data.py b/posthog/management/commands/test/test_migrate_kafka_data.py new file mode 100644 index 0000000000000..0053c7201b876 --- /dev/null +++ b/posthog/management/commands/test/test_migrate_kafka_data.py @@ -0,0 +1,347 @@ +from unittest import mock +from uuid import uuid4 + +from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer +from kafka.admin.new_topic import NewTopic +from kafka.errors import KafkaError +from kafka.producer.future import FutureProduceResult, FutureRecordMetadata +from kafka.structs import TopicPartition + +from bin.migrate_kafka_data import run as migrate_kafka_data + + +def test_can_migrate_data_from_one_topic_to_another_on_a_different_cluster(): + """ + Importantly, we want to make sure: + + 1. we commit offsets to the old cluster, such that we do not produce + duplicates on e.g. multiple runs + 2. we do not commit offsets to the new cluster + 3. we do not produce to the old cluster + 4. we copy over not just the values of the messages, but also the keys + + """ + old_events_topic = str(uuid4()) + new_events_topic = str(uuid4()) + consumer_group_id = "events-ingestion-consumer" + message_key = str(uuid4()) + + # The command will fail if we don't have a consumer group ID that has + # already committed offsets to the old topic, so we need to commit some + # offsets first. + _commit_offsets_for_topic(old_events_topic, consumer_group_id) + + _create_topic(new_events_topic) + + # Put some data to the old topic + _send_message(old_events_topic, b'{ "event": "test" }', key=message_key.encode("utf-8"), headers=[("foo", b"bar")]) + + migrate_kafka_data( + "--from-topic", + old_events_topic, + "--to-topic", + new_events_topic, + "--from-cluster", + "localhost:9092", + "--to-cluster", + "localhost:9092", + "--consumer-group-id", + consumer_group_id, + # Include all the options so we check they can be passed in + "--linger-ms", + "0", + "--batch-size", + "100", + "--timeout-ms", + "1000", + ) + + # We should have produced a message to the new topic + found_message = _wait_for_message(new_events_topic, message_key) + + assert found_message and found_message.value == b'{ "event": "test" }', "Did not find message in new topic" + assert found_message and found_message.headers == [("foo", b"bar")], "Did not find headers in new topic" + + # Try running the command again, and we should't see a new message produced + migrate_kafka_data( + "--from-topic", + old_events_topic, + "--to-topic", + new_events_topic, + "--from-cluster", + "localhost:9092", + "--to-cluster", + "localhost:9092", + "--consumer-group-id", + consumer_group_id, + ) + + found_message = _wait_for_message(new_events_topic, message_key) + assert not found_message + + +def test_we_do_not_migrate_when_dry_run_is_set(): + """ + We want to make sure that we do not migrate data when the dry run flag is + set. + """ + old_events_topic = str(uuid4()) + new_events_topic = str(uuid4()) + consumer_group_id = "events-ingestion-consumer" + message_key = str(uuid4()) + + _commit_offsets_for_topic(old_events_topic, consumer_group_id) + + _create_topic(new_events_topic) + + # Put some data to the old topic + _send_message(old_events_topic, b'{ "event": "test" }', key=message_key.encode("utf-8"), headers=[("foo", b"bar")]) + + migrate_kafka_data( + "--from-topic", + old_events_topic, + "--to-topic", + new_events_topic, + "--from-cluster", + "localhost:9092", + "--to-cluster", + "localhost:9092", + "--consumer-group-id", + consumer_group_id, + "--dry-run", + ) + + # We should not have produced a message to the new topic + found_message = _wait_for_message(new_events_topic, message_key) + assert not found_message + + +def test_cannot_send_data_back_into_same_topic_on_same_cluster(): + """ + We want to make sure that we do not send data back into the same topic on + the same cluster, as that would cause duplicates. + """ + topic = str(uuid4()) + consumer_group_id = "events-ingestion-consumer" + message_key = str(uuid4()) + + _commit_offsets_for_topic(topic, consumer_group_id) + + # Put some data to the topic + _send_message(topic, b'{ "event": "test" }', key=message_key.encode("utf-8"), headers=[("foo", b"bar")]) + + try: + migrate_kafka_data( + "--from-topic", + topic, + "--to-topic", + topic, + "--from-cluster", + "localhost:9092", + "--to-cluster", + "localhost:9092", + "--consumer-group-id", + consumer_group_id, + ) + except ValueError as e: + assert str(e) == "You must specify a different topic and cluster to migrate data to" + else: + assert False, "Expected ValueError to be raised" + + +def test_that_the_command_fails_if_the_specified_consumer_group_does_not_exist(): + """ + We want to make sure that the command fails if the specified consumer group + does not exist for the topic. + """ + old_topic = str(uuid4()) + new_topic = str(uuid4()) + message_key = str(uuid4()) + + _create_topic(new_topic) + + # Put some data to the topic + _send_message(old_topic, b'{ "event": "test" }', key=message_key.encode("utf-8"), headers=[("foo", b"bar")]) + + try: + migrate_kafka_data( + "--from-topic", + old_topic, + "--to-topic", + new_topic, + "--from-cluster", + "localhost:9092", + "--to-cluster", + "localhost:9092", + "--consumer-group-id", + "nonexistent-consumer-group", + ) + except ValueError as e: + assert str(e) == "Consumer group nonexistent-consumer-group has no committed offsets" + else: + assert False, "Expected ValueError to be raised" + + +def test_that_we_error_if_the_target_topic_doesnt_exist(): + """ + We want to make sure that the command fails if the target topic does not + exist. + """ + old_topic = str(uuid4()) + new_topic = str(uuid4()) + consumer_group_id = "events-ingestion-consumer" + message_key = str(uuid4()) + + _commit_offsets_for_topic(old_topic, consumer_group_id) + + # Put some data to the topic + _send_message(old_topic, b'{ "event": "test" }', key=message_key.encode("utf-8"), headers=[("foo", b"bar")]) + + try: + migrate_kafka_data( + "--from-topic", + old_topic, + "--to-topic", + new_topic, + "--from-cluster", + "localhost:9092", + "--to-cluster", + "localhost:9092", + "--consumer-group-id", + consumer_group_id, + ) + except ValueError as e: + assert str(e) == f"Topic {new_topic} does not exist" + else: + assert False, "Expected ValueError to be raised" + + +def test_we_fail_on_send_errors_to_new_topic(): + """ + We want to make sure that we fail if we get an error when sending data to + the new topic. + """ + old_topic = str(uuid4()) + new_topic = str(uuid4()) + consumer_group_id = "events-ingestion-consumer" + message_key = str(uuid4()) + + _create_topic(new_topic) + + _commit_offsets_for_topic(old_topic, consumer_group_id) + + # Put some data to the topic + _send_message(old_topic, b'{ "event": "test" }', key=message_key.encode("utf-8"), headers=[("foo", b"bar")]) + + with mock.patch("kafka.KafkaProducer.send") as mock_send: + produce_future = FutureProduceResult(topic_partition=TopicPartition(new_topic, 1)) + future = FutureRecordMetadata( + produce_future=produce_future, + relative_offset=0, + timestamp_ms=0, + checksum=0, + serialized_key_size=0, + serialized_value_size=0, + serialized_header_size=0, + ) + future.failure(KafkaError("Failed to produce")) + mock_send.return_value = future + + try: + migrate_kafka_data( + "--from-topic", + old_topic, + "--to-topic", + new_topic, + "--from-cluster", + "localhost:9092", + "--to-cluster", + "localhost:9092", + "--consumer-group-id", + consumer_group_id, + ) + except KafkaError as e: + assert str(e) == "KafkaError: Failed to produce" + else: + assert False, "Expected KafkaError to be raised" + + # Ensure that if we run the command again, it will not fail + # and will re-consume and produce the message to the new topic. + migrate_kafka_data( + "--from-topic", + old_topic, + "--to-topic", + new_topic, + "--from-cluster", + "localhost:9092", + "--to-cluster", + "localhost:9092", + "--consumer-group-id", + consumer_group_id, + ) + + found_message = _wait_for_message(new_topic, message_key) + + assert found_message, "Did not find message in new topic" + + +def _commit_offsets_for_topic(topic, consumer_group_id): + kafka_consumer = KafkaConsumer( + topic, + bootstrap_servers="localhost:9092", + auto_offset_reset="latest", + group_id=consumer_group_id, + ) + + try: + kafka_consumer.poll(timeout_ms=1000) + kafka_consumer.commit() + + finally: + kafka_consumer.close() + + +def _wait_for_message(topic: str, key: str): + """ + Wait for a message to appear in the topic with the specified key. + """ + new_kafka_consumer = KafkaConsumer( + topic, + bootstrap_servers="localhost:9092", + auto_offset_reset="earliest", + group_id="test", + ) + + try: + messages_by_topic = new_kafka_consumer.poll(timeout_ms=1000) + + if not messages_by_topic: + return + + for _, messages in messages_by_topic.items(): + for message in messages: + if message.key.decode("utf-8") == key: + return message + + finally: + new_kafka_consumer.close() + + +def _send_message(topic, value, key, headers): + producer = KafkaProducer(bootstrap_servers="localhost:9092") + + try: + producer.send(topic, value, key, headers).get() + + finally: + producer.close() + + +def _create_topic(topic): + admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092") + + try: + admin_client.create_topics([NewTopic(topic, num_partitions=1, replication_factor=1)]) + + finally: + admin_client.close() diff --git a/posthog/migrations/0300_add_constraints_to_person_override.py b/posthog/migrations/0300_add_constraints_to_person_override.py index 3ad68aef4d116..6a3f5e1511757 100644 --- a/posthog/migrations/0300_add_constraints_to_person_override.py +++ b/posthog/migrations/0300_add_constraints_to_person_override.py @@ -3,30 +3,7 @@ import django.db.models.expressions from django.db import migrations, models -# NOTE: I've moved these here to make sure that the migration is self-contained -# such that the state of the database is predictable. -# -# This function checks two things: -# 1. A new override_person_id must not match an existing old_person_id -# 2. A new old_person_id must not match an existing override_person_id -CREATE_FUNCTION_FOR_CONSTRAINT_SQL = f""" -CREATE OR REPLACE FUNCTION is_override_person_not_used_as_old_person(team_id bigint, override_person_id uuid, old_person_id uuid) -RETURNS BOOLEAN AS $$ - SELECT NOT EXISTS ( - SELECT 1 - FROM "posthog_personoverride" - WHERE team_id = $1 - AND override_person_id = $3 - ) AND NOT EXISTS ( - SELECT 1 - FROM "posthog_personoverride" - WHERE team_id = $1 - AND old_person_id = $2 - ); -$$ LANGUAGE SQL; -""" - -DROP_FUNCTION_FOR_CONSTRAINT_SQL = "DROP FUNCTION is_override_person_not_used_as_old_person" +from posthog.models.person.person import CREATE_FUNCTION_FOR_CONSTRAINT_SQL, DROP_FUNCTION_FOR_CONSTRAINT_SQL class Migration(migrations.Migration): diff --git a/posthog/migrations/0305_rework_person_overrides.py b/posthog/migrations/0305_rework_person_overrides.py index 24526b18e62a8..3afd6d4154b54 100644 --- a/posthog/migrations/0305_rework_person_overrides.py +++ b/posthog/migrations/0305_rework_person_overrides.py @@ -1,20 +1,4 @@ -# Generated by Django 3.2.16 on 2023-02-16 11:47 - -import django.db.models.deletion -from django.db import migrations, models - -# The previous migration attempted to add a constraint to the personoverride -# table. We want to remove that constraint, as the ForeignKey replaces this. -# The function was never used, but to make migrations work we need these two -DROP_FUNCTION_FOR_CONSTRAINT_SQL = "DROP FUNCTION is_override_person_not_used_as_old_person" -CREATE_FUNCTION_FOR_CONSTRAINT_SQL = f""" -CREATE OR REPLACE FUNCTION is_override_person_not_used_as_old_person(team_id bigint, override_person_id uuid, old_person_id uuid) -RETURNS BOOLEAN AS $$ - SELECT false; -$$ LANGUAGE SQL; -""" - -DROP_FUNCTION_FOR_CONSTRAINT_SQL = "DROP FUNCTION is_override_person_not_used_as_old_person" +from django.db import migrations class Migration(migrations.Migration): @@ -23,47 +7,4 @@ class Migration(migrations.Migration): ("posthog", "0304_store_dashboard_template_in_db"), ] - operations = [ - migrations.DeleteModel("personoverride"), - migrations.RunSQL(DROP_FUNCTION_FOR_CONSTRAINT_SQL, CREATE_FUNCTION_FOR_CONSTRAINT_SQL), - migrations.CreateModel( - name="PersonOverride", - fields=[ - ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), - ("old_person_id", models.UUIDField(db_index=True)), - ("oldest_event", models.DateTimeField()), - ("version", models.BigIntegerField(blank=True, null=True)), - ], - ), - migrations.AddConstraint( - model_name="person", - constraint=models.UniqueConstraint(fields=("uuid",), name="unique uuid for person"), - ), - migrations.AddField( - model_name="personoverride", - name="override_person", - field=models.ForeignKey( - on_delete=django.db.models.deletion.DO_NOTHING, to="posthog.person", to_field="uuid" - ), - ), - migrations.AddField( - model_name="personoverride", - name="team", - field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to="posthog.team"), - ), - migrations.AddConstraint( - model_name="personoverride", - constraint=models.UniqueConstraint( - fields=("team", "old_person_id"), name="unique override per old_person_id" - ), - ), - migrations.AddConstraint( - model_name="personoverride", - constraint=models.CheckConstraint( - check=models.Q( - ("old_person_id__exact", django.db.models.expressions.F("override_person_id")), _negated=True - ), - name="old_person_id_different_from_override_person_id", - ), - ), - ] + operations = [] # type: ignore diff --git a/posthog/models/person/person.py b/posthog/models/person/person.py index 04bf9059e0af4..745a944bd2ac8 100644 --- a/posthog/models/person/person.py +++ b/posthog/models/person/person.py @@ -2,6 +2,8 @@ from django.db import models, transaction from django.db.models import F, Q +from django.db.models.expressions import Func +from django.db.models.fields import BooleanField from posthog.models.utils import UUIDT @@ -89,18 +91,6 @@ def split_person(self, main_distinct_id: Optional[str]): # Has an index on properties -> email from migration 0121, (team_id, id DESC) from migration 0164 - class Meta: - constraints = [ - models.UniqueConstraint( - # This was added to enable the overrides table to reference this - # table using the uuid. Ideally we'd put this on (team_id, uuid) - # but I couldn't see if Django could handle SQL `REFERENCES` on - # a composite key. - fields=["uuid"], - name="unique uuid for person", - ), - ] - class PersonDistinctId(models.Model): class Meta: @@ -120,16 +110,10 @@ class PersonOverride(models.Model): This model has a set of constraints to ensure correctness: 1. Unique constraint on (team_id, old_person_id) pairs. 2. Check that old_person_id is different to override_person_id for every row. - 3. Same person id cannot be used as an old_person_id and an override_person_id (per team) - (e.g. if a row exists with old_person_id=123 then we would not allow a row with + 3. Exclude rows that overlap across old_person_id and override_person_id (e.g. if + a row exists with old_person_id=123 then we would not allow a row with override_person_id=123 to exist, as that would require a self join to figure out the ultimate override_person_id required for old_person_id=123). - To accomplish this: - 3.1. Ensuring old_person_id doesn't exist in person table (assumption about code) - - during person merges we update the override_ids to point to the new merged person - - during person deletions we delete the overide table entries (they aren't needed anymore) - 3.2. Ensuring any override_person_id exists in person table (db level check) - Override person field is a foreign key into the person table. """ class Meta: @@ -139,15 +123,50 @@ class Meta: check=~Q(old_person_id__exact=F("override_person_id")), name="old_person_id_different_from_override_person_id", ), + models.CheckConstraint( + check=Q( + Func( + F("team_id"), + F("override_person_id"), + F("old_person_id"), + function="is_override_person_not_used_as_old_person", + output_field=BooleanField(), + ) + ), + name="old_person_id_is_not_override_person_id", + ), ] id = models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID") team: models.ForeignKey = models.ForeignKey("Team", on_delete=models.CASCADE) + # We don't want to delete rows before we had a chance to propagate updates to the events table. + # To reduce potential side-effects, these are not ForeingKeys. old_person_id = models.UUIDField(db_index=True) - # During person deletion we need to manually delete overrides, we can't cascade here as during - # merges we want to make sure we didn't miss any concurrently added entries - override_person = models.ForeignKey(Person, to_field="uuid", on_delete=models.DO_NOTHING) + override_person_id = models.UUIDField(db_index=True) oldest_event: models.DateTimeField = models.DateTimeField() version: models.BigIntegerField = models.BigIntegerField(null=True, blank=True) + + +# This function checks two things: +# 1. A new override_person_id must not match an existing old_person_id +# 2. A new old_person_id must not match an existing override_person_id +CREATE_FUNCTION_FOR_CONSTRAINT_SQL = f""" +CREATE OR REPLACE FUNCTION is_override_person_not_used_as_old_person(team_id bigint, override_person_id uuid, old_person_id uuid) +RETURNS BOOLEAN AS $$ + SELECT NOT EXISTS ( + SELECT 1 + FROM "{PersonOverride._meta.db_table}" + WHERE team_id = $1 + AND override_person_id = $3 + ) AND NOT EXISTS ( + SELECT 1 + FROM "{PersonOverride._meta.db_table}" + WHERE team_id = $1 + AND old_person_id = $2 + ); +$$ LANGUAGE SQL; +""" + +DROP_FUNCTION_FOR_CONSTRAINT_SQL = "DROP FUNCTION is_override_person_not_used_as_old_person" diff --git a/posthog/models/test/test_person_override_model.py b/posthog/models/test/test_person_override_model.py index 181c5b97046b0..6f373120f4e58 100644 --- a/posthog/models/test/test_person_override_model.py +++ b/posthog/models/test/test_person_override_model.py @@ -1,223 +1,166 @@ -import contextlib import datetime as dt from uuid import uuid4 import pytest -from django.db.utils import DEFAULT_DB_ALIAS, ConnectionHandler, IntegrityError +from django.db.utils import IntegrityError -from posthog.api.test.test_organization import create_organization -from posthog.api.test.test_team import create_team from posthog.models import PersonOverride, Team -from posthog.models.person.person import Person +from posthog.test.base import BaseTest -pytestmark = pytest.mark.django_db +class TestPersonOverride(BaseTest): + def setUp(self, *args, **kwargs): + super().setUp(*args, **kwargs) -def test_person_override_disallows_overriding_to_itself(): - """Test old_person_id cannot match override_person_id. + PersonOverride.objects.all().delete() - This is enforced by a CHECK constraint old_person_id_different_from_override_person_id - """ - organization = create_organization(name="test") - team = create_team(organization=organization) + def test_person_override_disallows_same_old_person_id(self): + """Test a new old_person_id cannot match an existing old_person_id. - oldest_event = dt.datetime.now(dt.timezone.utc) - person_id = uuid4() + This is enforced by a UNIQUE constraint on (team_id, old_person_id) + """ + oldest_event = dt.datetime.now(dt.timezone.utc) + old_person_id = uuid4() + override_person_id = uuid4() + new_override_person_id = uuid4() + + person_override = PersonOverride.objects.create( + team=self.team, + old_person_id=old_person_id, + override_person_id=override_person_id, + oldest_event=oldest_event, + version=1, + ) + person_override.save() + + assert person_override.old_person_id == old_person_id + assert person_override.override_person_id == override_person_id - Person.objects.create( - team_id=team.pk, - uuid=person_id, - ) + with pytest.raises(IntegrityError): + PersonOverride.objects.create( + team=self.team, + old_person_id=old_person_id, + override_person_id=new_override_person_id, + oldest_event=oldest_event, + version=1, + ).save() + + def test_person_override_same_old_person_id_in_different_teams(self): + """Test a new old_person_id can match an existing from a different team.""" + oldest_event = dt.datetime.now(dt.timezone.utc) + old_person_id = uuid4() + override_person_id = uuid4() + new_team = Team.objects.create( + organization=self.organization, + api_token="a different token", + ) - with pytest.raises(IntegrityError): - PersonOverride.objects.create( - team=team, - old_person_id=person_id, - override_person_id=person_id, + p1 = PersonOverride.objects.create( + team=self.team, + old_person_id=old_person_id, + override_person_id=override_person_id, oldest_event=oldest_event, - version=0, - ).save() - - -def test_person_override_disallows_same_old_person_id(): - """Test a new old_person_id cannot match an existing old_person_id. - - This is enforced by a UNIQUE constraint on (team_id, old_person_id) - """ - organization = create_organization(name="test") - team = create_team(organization=organization) - - oldest_event = dt.datetime.now(dt.timezone.utc) - old_person_id = uuid4() - override_person_id = uuid4() - new_override_person_id = uuid4() - - Person.objects.create( - team_id=team.pk, - uuid=override_person_id, - ) - - person_override = PersonOverride.objects.create( - team=team, - old_person_id=old_person_id, - override_person_id=override_person_id, - oldest_event=oldest_event, - version=1, - ) - person_override.save() - - assert person_override.old_person_id == old_person_id - assert person_override.override_person_id == override_person_id - - Person.objects.create( - team_id=team.pk, - uuid=new_override_person_id, - ) - - with pytest.raises(IntegrityError): - PersonOverride.objects.create( - team=team, + version=1, + ) + p1.save() + + assert p1.old_person_id == old_person_id + assert p1.override_person_id == override_person_id + + p2 = PersonOverride.objects.create( + team=new_team, old_person_id=old_person_id, - override_person_id=new_override_person_id, + override_person_id=override_person_id, oldest_event=oldest_event, version=1, - ).save() - - -def test_person_override_allow_same_old_person_id_in_different_teams(): - """Test a new old_person_id can match an existing from a different team.""" - organization = create_organization(name="test") - team = create_team(organization=organization) - - oldest_event = dt.datetime.now(dt.timezone.utc) - old_person_id = uuid4() - override_person_id = uuid4() - new_team = Team.objects.create( - organization=organization, - api_token="a different token", - ) - - Person.objects.create( - team_id=team.pk, - uuid=override_person_id, - ) - - p1 = PersonOverride.objects.create( - team=team, - old_person_id=old_person_id, - override_person_id=override_person_id, - oldest_event=oldest_event, - version=1, - ) - p1.save() - - assert p1.old_person_id == old_person_id - assert p1.override_person_id == override_person_id - - p2 = PersonOverride.objects.create( - team=new_team, - old_person_id=old_person_id, - override_person_id=override_person_id, - oldest_event=oldest_event, - version=1, - ) - p2.save() - - assert p1.old_person_id == p2.old_person_id - assert p1.override_person_id == p2.override_person_id - assert p1.team != p2.team - - -def test_person_override_allows_override_person_id_as_old_person_id_in_different_teams(): - """Test a new old_person_id can match an override in a different team.""" - organization = create_organization(name="test") - team = create_team(organization=organization) - - oldest_event = dt.datetime.now(dt.timezone.utc) - old_person_id = uuid4() - override_person_id = uuid4() - new_override_person_id = uuid4() - new_team = Team.objects.create( - organization=organization, - api_token="a much different token", - ) - - Person.objects.create( - team_id=team.pk, - uuid=override_person_id, - ) - - p1 = PersonOverride.objects.create( - team=team, - old_person_id=old_person_id, - override_person_id=override_person_id, - oldest_event=oldest_event, - version=1, - ) - p1.save() - - assert p1.old_person_id == old_person_id - assert p1.override_person_id == override_person_id - - Person.objects.create( - team_id=team.pk, - uuid=new_override_person_id, - ) - - p2 = PersonOverride.objects.create( - team=new_team, - old_person_id=override_person_id, - override_person_id=new_override_person_id, - oldest_event=oldest_event, - version=1, - ) - p2.save() - - assert p1.override_person_id == p2.old_person_id - assert p2.override_person_id == new_override_person_id - assert p1.team != p2.team - - -def test_person_override_creation_disallowed_for_non_existing_person(): - """This is guaranteed by the foreign key constraint.""" - organization = create_organization(name="test") - team = create_team(organization=organization) - - oldest_event = dt.datetime.now(dt.timezone.utc) - person_id = uuid4() - - Person.objects.create( - team_id=team.pk, - uuid=person_id, - ) - - with pytest.raises(IntegrityError): - PersonOverride.objects.create( - team=team, - old_person_id=person_id, - override_person_id=person_id, + ) + p2.save() + + assert p1.old_person_id == p2.old_person_id + assert p1.override_person_id == p2.override_person_id + assert p1.team != p2.team + + def test_person_override_disallows_override_person_id_as_old_person_id(self): + """Test a new old_person_id cannot match an existing override_person_id. + + We re-use the override_person_id from the first model created as the old_person_id + of the second model. We expect an exception on saving this second model. + """ + oldest_event = dt.datetime.now(dt.timezone.utc) + old_person_id = uuid4() + override_person_id = uuid4() + new_override_person_id = uuid4() + + person_override = PersonOverride.objects.create( + team=self.team, + old_person_id=old_person_id, + override_person_id=override_person_id, + oldest_event=oldest_event, + version=1, + ) + person_override.save() + + assert person_override.old_person_id == old_person_id + assert person_override.override_person_id == override_person_id + + with pytest.raises(IntegrityError): + PersonOverride.objects.create( + team=self.team, + old_person_id=override_person_id, + override_person_id=new_override_person_id, + oldest_event=oldest_event, + version=1, + ).save() + + def test_person_override_allows_override_person_id_as_old_person_id_in_different_teams(self): + """Test a new old_person_id can match an override in a different team.""" + oldest_event = dt.datetime.now(dt.timezone.utc) + old_person_id = uuid4() + override_person_id = uuid4() + new_override_person_id = uuid4() + new_team = Team.objects.create( + organization=self.organization, + api_token="a much different token", + ) + + p1 = PersonOverride.objects.create( + team=self.team, + old_person_id=old_person_id, + override_person_id=override_person_id, oldest_event=oldest_event, - version=0, - ).save() + version=1, + ) + p1.save() + assert p1.old_person_id == old_person_id + assert p1.override_person_id == override_person_id -def test_person_override_allows_duplicate_override_person_id(): - """Test duplicate override_person_ids with different old_person_ids are allowed.""" - organization = create_organization(name="test") - team = create_team(organization=organization) + p2 = PersonOverride.objects.create( + team=new_team, + old_person_id=override_person_id, + override_person_id=new_override_person_id, + oldest_event=oldest_event, + version=1, + ) + p2.save() - oldest_event = dt.datetime.now(dt.timezone.utc) - override_person_id = uuid4() - n_person_overrides = 2 - created = [] + assert p1.override_person_id == p2.old_person_id + assert p2.override_person_id == new_override_person_id + assert p1.team != p2.team - Person.objects.create(uuid=override_person_id, team=team) + def test_person_override_disallows_old_person_id_as_override_person_id(self): + """Test a new override_person_id cannot match an existing old_person_id. - for _ in range(n_person_overrides): + We re-use the old_person_id from the first model created as the override_person_id + of the second model. We expect an exception on saving this second model. + """ + oldest_event = dt.datetime.now(dt.timezone.utc) old_person_id = uuid4() + override_person_id = uuid4() + new_old_person_id = uuid4() person_override = PersonOverride.objects.create( - team=team, + team=self.team, old_person_id=old_person_id, override_person_id=override_person_id, oldest_event=oldest_event, @@ -225,337 +168,74 @@ def test_person_override_allows_duplicate_override_person_id(): ) person_override.save() - created.append(person_override) - - assert all(p.override_person_id == override_person_id for p in created) - assert len(set(p.old_person_id for p in created)) == n_person_overrides - - -def test_person_override_allows_old_person_id_as_override_person_id_in_different_teams(): - """Test a new override_person_id can match an old in a different team.""" - organization = create_organization(name="test") - team = create_team(organization=organization) - - oldest_event = dt.datetime.now(dt.timezone.utc) - old_person_id = uuid4() - override_person_id = uuid4() - new_old_person_id = uuid4() - new_team = Team.objects.create( - organization=organization, - api_token="a significantly different token", - ) - - Person.objects.create(uuid=old_person_id, team=team) - Person.objects.create(uuid=override_person_id, team=team) - Person.objects.create(uuid=new_old_person_id, team=team) - - p1 = PersonOverride.objects.create( - team=team, - old_person_id=old_person_id, - override_person_id=override_person_id, - oldest_event=oldest_event, - version=1, - ) - p1.save() - - assert p1.old_person_id == old_person_id - assert p1.override_person_id == override_person_id - - p2 = PersonOverride.objects.create( - team=new_team, - old_person_id=new_old_person_id, - override_person_id=old_person_id, - oldest_event=oldest_event, - version=1, - ) - p2.save() - - assert p1.old_person_id == p2.override_person_id - assert p2.old_person_id == new_old_person_id - assert p1.team != p2.team - - -@pytest.mark.django_db(transaction=True) -def test_person_deletion_disallowed_when_override_exists(): - """Person deletion would result in an error if the override exists""" - organization = create_organization(name="test") - team = create_team(organization=organization) - - oldest_event = dt.datetime.now(dt.timezone.utc) - old_person_id = uuid4() - override_person_id = uuid4() - - override_person = Person.objects.create( - team_id=team.pk, - uuid=override_person_id, - ) - PersonOverride.objects.create( - team=team, - old_person_id=old_person_id, - override_person_id=override_person_id, - oldest_event=oldest_event, - version=0, - ).save() - - with pytest.raises(IntegrityError): - override_person.delete() - - -""" -Concurrency tests for person overrides table -Goal: verify that we don't end up in a situation with the same uuid is both -an old person id and an override person id - -- there are two cases that we want to check for - - concurrent merges - - concurrent merge and person deletion - -In both cases one of the transactions will wait on the lock, -so they can only complete in one order (which is tested below). - -Note that to test the race condition scenario we need to: - 1. create multiple concurrent transactions, such that we can verify - constraints are enforced at COMMIT time. - 2. enable transactions for the Django test. This is more so we can see data - from the main Django PostgreSQL connection session in the other - concurrent transactions. Not 100% required but makes things a little - easier to write. -""" - - -@pytest.mark.django_db(transaction=True) -def test_person_override_allow_merge_first_then_delete(): - # This essentially just verifies our merge and delete functions work as expected - organization = create_organization(name="test") - team = create_team(organization=organization) - - oldest_event = dt.datetime.now(dt.timezone.utc) - old_person_id = uuid4() - override_person_id = uuid4() - - Person.objects.create(uuid=old_person_id, team=team) - Person.objects.create(uuid=override_person_id, team=team) - with create_connection() as merge_cursor: - merge_cursor.execute("BEGIN") - _merge_people(team, merge_cursor, old_person_id, override_person_id, oldest_event) - merge_cursor.execute("COMMIT") - - assert list(PersonOverride.objects.all().values_list("old_person_id", "override_person_id")) == [ - (old_person_id, override_person_id), - ] # type: ignore - - with create_connection() as delete_cursor: - delete_cursor.execute("BEGIN") - _delete_person(team, delete_cursor, override_person_id) - delete_cursor.execute("COMMIT") - - assert list(PersonOverride.objects.filter(team=team).all()) == [] - - -@pytest.mark.django_db(transaction=True) -def test_person_override_disallow_merge_if_delete_ran_concurrently(): - organization = create_organization(name="test") - team = create_team(organization=organization) - - oldest_event = dt.datetime.now(dt.timezone.utc) - old_person_id = uuid4() - override_person_id = uuid4() - - Person.objects.create(uuid=old_person_id, team=team) - Person.objects.create(uuid=override_person_id, team=team) - with create_connection() as merge_cursor, create_connection() as delete_cursor: - # each transaction gets a "copy" of the DB state - merge_cursor.execute("BEGIN") - delete_cursor.execute("BEGIN") - # merge and delete - _merge_people(team, merge_cursor, old_person_id, override_person_id, oldest_event) - _delete_person(team, delete_cursor, override_person_id) - - # finish delete first, then merge fails - delete_cursor.execute("COMMIT") - with pytest.raises(IntegrityError): - merge_cursor.execute("COMMIT") - - -@pytest.mark.django_db(transaction=True) -def test_person_override_allow_concequitive_merges(): - # This essentially just verifies our merge function works as expected - organization = create_organization(name="test") - team = create_team(organization=organization) - - oldest_event = dt.datetime.now(dt.timezone.utc) - old_person_id = uuid4() - override_person_id = uuid4() - new_override_person_id = uuid4() - - Person.objects.create(uuid=old_person_id, team=team) - Person.objects.create(uuid=override_person_id, team=team) - Person.objects.create(uuid=new_override_person_id, team=team) - - with create_connection() as first_cursor: - first_cursor.execute("BEGIN") - _merge_people(team, first_cursor, old_person_id, override_person_id, oldest_event) - first_cursor.execute("COMMIT") - - with create_connection() as second_cursor: - second_cursor.execute("BEGIN") - _merge_people(team, second_cursor, override_person_id, new_override_person_id, oldest_event) - second_cursor.execute("COMMIT") - - mappings = list(PersonOverride.objects.all().values_list("old_person_id", "override_person_id")) - - assert sorted(mappings) == sorted( - [ - (override_person_id, new_override_person_id), - (old_person_id, new_override_person_id), - ] - ), f"{mappings=} {old_person_id=}, {override_person_id=}, {new_override_person_id=}" # type: ignore - - -@pytest.mark.django_db(transaction=True) -def test_person_override_disallows_concurrent_merge(): - """ - Running two merges: - A: old_person -> override_person - B: override_person -> new_override_person - - Running them concurrently when B is committed first, then A should raise an exception, - because override_person has been deleted, so it can't be used as override_id. - """ - organization = create_organization(name="test") - team = create_team(organization=organization) - - oldest_event = dt.datetime.now(dt.timezone.utc) - old_person_id = uuid4() - override_person_id = uuid4() - new_override_person_id = uuid4() - - Person.objects.create(uuid=old_person_id, team=team) - Person.objects.create(uuid=override_person_id, team=team) - Person.objects.create(uuid=new_override_person_id, team=team) - - with create_connection() as first_cursor, create_connection() as second_cursor: - # each transaction gets a "copy" of the DB state - first_cursor.execute("BEGIN") - second_cursor.execute("BEGIN") - - # try to do the merges - _merge_people(team, first_cursor, old_person_id, override_person_id, oldest_event) - _merge_people(team, second_cursor, override_person_id, new_override_person_id, oldest_event) - - # the one finishing first succeeds, the one finishing second fails - second_cursor.execute("COMMIT") + assert person_override.old_person_id == old_person_id + assert person_override.override_person_id == override_person_id + with pytest.raises(IntegrityError): - first_cursor.execute("COMMIT") - - assert list(PersonOverride.objects.all().values_list("old_person_id", "override_person_id")) == [ - (override_person_id, new_override_person_id), - ] # type: ignore - - -@contextlib.contextmanager -def create_connection(alias=DEFAULT_DB_ALIAS): - connection = ConnectionHandler().create_connection(alias) # type: ignore - try: - with connection.cursor() as cursor: - cursor.execute("SET lock_timeout TO '10s'") - try: - yield cursor - finally: - # Make sure that it there was a transaction still open, then roll it - # back. - cursor.execute("ROLLBACK") - cursor.close() - - finally: - connection.close() - - -def _merge_people(team, cursor, old_person_id, override_person_id, oldest_event): - """ - Merge two people together, using the override_person_id as the canonical - person. - - This function is meant to be run in a separate thread, so that we can test - that the transaction is rolled back if there is a conflict. - - This mimics how we expect the code to do person merges, i.e. in a transaction - that deletes the old person, adds old person->override person override and updates - all old person as override person rows to now point to the new override person. - - Note that we don't actually handle the merge on the posthog_person table, - but rather simply DELETE the record associated with `old_person_id`. It may - be that we remove the implmentation of deleting merged persons, in which - case we'll need to update the constraint to also include e.g. the - `is_deleted` flag we may add. - """ - cursor.execute( - """ - DELETE FROM - posthog_person - WHERE - uuid = %(old_person_id)s - AND team_id = %(team_id)s; - - INSERT INTO posthog_personoverride( - team_id, - old_person_id, - override_person_id, - oldest_event, - version + PersonOverride.objects.create( + team=self.team, + old_person_id=new_old_person_id, + override_person_id=old_person_id, + oldest_event=oldest_event, + version=1, + ).save() + + def test_person_override_old_person_id_as_override_person_id_in_different_teams(self): + """Test a new override_person_id can match an old in a different team.""" + oldest_event = dt.datetime.now(dt.timezone.utc) + old_person_id = uuid4() + override_person_id = uuid4() + new_old_person_id = uuid4() + new_team = Team.objects.create( + organization=self.organization, + api_token="a significantly different token", + ) + + p1 = PersonOverride.objects.create( + team=self.team, + old_person_id=old_person_id, + override_person_id=override_person_id, + oldest_event=oldest_event, + version=1, + ) + p1.save() + + assert p1.old_person_id == old_person_id + assert p1.override_person_id == override_person_id + + p2 = PersonOverride.objects.create( + team=new_team, + old_person_id=new_old_person_id, + override_person_id=old_person_id, + oldest_event=oldest_event, + version=1, + ) + p2.save() + + assert p1.old_person_id == p2.override_person_id + assert p2.old_person_id == new_old_person_id + assert p1.team != p2.team + + def test_person_override_allows_duplicate_override_person_id(self): + """Test duplicate override_person_ids with different old_person_ids are allowed.""" + oldest_event = dt.datetime.now(dt.timezone.utc) + override_person_id = uuid4() + n_person_overrides = 2 + created = [] + + for _ in range(n_person_overrides): + old_person_id = uuid4() + + person_override = PersonOverride.objects.create( + team=self.team, + old_person_id=old_person_id, + override_person_id=override_person_id, + oldest_event=oldest_event, + version=1, ) - VALUES ( - %(team_id)s, - %(old_person_id)s, - %(override_person_id)s, - %(oldest_event)s, - 1 - ); - - UPDATE - posthog_personoverride - SET - override_person_id = %(override_person_id)s, - version = version + 1 - WHERE override_person_id = %(old_person_id)s - AND team_id = %(team_id)s; - """, - { - "team_id": team.id, - "old_person_id": old_person_id, - "override_person_id": override_person_id, - "oldest_event": oldest_event, - }, - ) - - -def _delete_person(team, cursor, person_id): - """ - Delete the person. - - This mimics how we expect the code to do person deletions, i.e. in a transaction - that deletes the person_overrides in addition to deleting from the person table. - - It may be that we change to using soft deletes in the person table instead in that - case we'll need to update the constraint to also include e.g. the - `is_deleted` flag we may add. - """ - cursor.execute( - """ - DELETE FROM - posthog_personoverride - WHERE - override_person_id = %(person_id)s - AND team_id = %(team_id)s; - - DELETE FROM - posthog_person - WHERE - uuid = %(person_id)s - AND team_id = %(team_id)s; - """, - { - "team_id": team.id, - "person_id": person_id, - }, - ) + person_override.save() + + created.append(person_override) + + assert all(p.override_person_id == override_person_id for p in created) + assert len(set(p.old_person_id for p in created)) == n_person_overrides