diff --git a/ee/clickhouse/models/test/__snapshots__/test_cohort.ambr b/ee/clickhouse/models/test/__snapshots__/test_cohort.ambr index a4c302f1f0633..aac0888965b88 100644 --- a/ee/clickhouse/models/test/__snapshots__/test_cohort.ambr +++ b/ee/clickhouse/models/test/__snapshots__/test_cohort.ambr @@ -83,7 +83,7 @@ (SELECT pdi.person_id AS person_id, countIf(timestamp > now() - INTERVAL 2 year AND timestamp < now() - AND event = '$pageview') > 0 AS performed_event_condition_5_level_level_0_level_0_level_0_0 + AND event = '$pageview') > 0 AS performed_event_condition_21_level_level_0_level_0_level_0_0 FROM events e INNER JOIN (SELECT distinct_id, @@ -113,7 +113,7 @@ HAVING max(is_deleted) = 0 AND (((((NOT has(['something1'], replaceRegexpAll(JSONExtractRaw(argMax(person.properties, version), '$some_prop'), '^"|"$', ''))))))))) person ON person.person_id = behavior_query.person_id WHERE 1 = 1 - AND ((((performed_event_condition_5_level_level_0_level_0_level_0_0)))) ) as person + AND ((((performed_event_condition_21_level_level_0_level_0_level_0_0)))) ) as person UNION ALL SELECT person_id, cohort_id, @@ -148,7 +148,7 @@ (SELECT pdi.person_id AS person_id, countIf(timestamp > now() - INTERVAL 2 year AND timestamp < now() - AND event = '$pageview') > 0 AS performed_event_condition_7_level_level_0_level_0_level_0_0 + AND event = '$pageview') > 0 AS performed_event_condition_23_level_level_0_level_0_level_0_0 FROM events e INNER JOIN (SELECT distinct_id, @@ -178,7 +178,7 @@ HAVING max(is_deleted) = 0 AND (((((NOT has(['something1'], replaceRegexpAll(JSONExtractRaw(argMax(person.properties, version), '$some_prop'), '^"|"$', ''))))))))) person ON person.person_id = behavior_query.person_id WHERE 1 = 1 - AND ((((performed_event_condition_7_level_level_0_level_0_level_0_0)))) ) )) + AND ((((performed_event_condition_23_level_level_0_level_0_level_0_0)))) ) )) ' --- # name: TestCohort.test_cohortpeople_with_not_in_cohort_operator_for_behavioural_cohorts @@ -195,7 +195,7 @@ FROM (SELECT pdi.person_id AS person_id, minIf(timestamp, event = 'signup') >= now() - INTERVAL 15 day - AND minIf(timestamp, event = 'signup') < now() as first_time_condition_8_level_level_0_level_0_0 + AND minIf(timestamp, event = 'signup') < now() as first_time_condition_24_level_level_0_level_0_0 FROM events e INNER JOIN (SELECT distinct_id, @@ -208,7 +208,7 @@ AND event IN ['signup'] GROUP BY person_id) behavior_query WHERE 1 = 1 - AND (((first_time_condition_8_level_level_0_level_0_0))) ) as person + AND (((first_time_condition_24_level_level_0_level_0_0))) ) as person UNION ALL SELECT person_id, cohort_id, @@ -237,9 +237,9 @@ (SELECT pdi.person_id AS person_id, countIf(timestamp > now() - INTERVAL 2 year AND timestamp < now() - AND event = '$pageview') > 0 AS performed_event_condition_9_level_level_0_level_0_level_0_0, + AND event = '$pageview') > 0 AS performed_event_condition_25_level_level_0_level_0_level_0_0, minIf(timestamp, event = 'signup') >= now() - INTERVAL 15 day - AND minIf(timestamp, event = 'signup') < now() as first_time_condition_9_level_level_0_level_1_level_0_level_0_level_0_0 + AND minIf(timestamp, event = 'signup') < now() as first_time_condition_25_level_level_0_level_1_level_0_level_0_level_0_0 FROM events e INNER JOIN (SELECT distinct_id, @@ -252,8 +252,8 @@ AND event IN ['$pageview', 'signup'] GROUP BY person_id) behavior_query WHERE 1 = 1 - AND ((((performed_event_condition_9_level_level_0_level_0_level_0_0)) - AND ((((NOT first_time_condition_9_level_level_0_level_1_level_0_level_0_level_0_0)))))) ) as person + AND ((((performed_event_condition_25_level_level_0_level_0_level_0_0)) + AND ((((NOT first_time_condition_25_level_level_0_level_1_level_0_level_0_level_0_0)))))) ) as person UNION ALL SELECT person_id, cohort_id, diff --git a/ee/clickhouse/models/test/__snapshots__/test_property.ambr b/ee/clickhouse/models/test/__snapshots__/test_property.ambr index fb50673941128..61edbc23877de 100644 --- a/ee/clickhouse/models/test/__snapshots__/test_property.ambr +++ b/ee/clickhouse/models/test/__snapshots__/test_property.ambr @@ -146,7 +146,7 @@ )) ', { - 'global_cohort_id_0': 37, + 'global_cohort_id_0': 25, }, ) --- diff --git a/ee/clickhouse/models/test/test_cohort.py b/ee/clickhouse/models/test/test_cohort.py index 48f7a2b1e8540..cef6324e869f6 100644 --- a/ee/clickhouse/models/test/test_cohort.py +++ b/ee/clickhouse/models/test/test_cohort.py @@ -1,5 +1,4 @@ from datetime import datetime, timedelta -from unittest.mock import patch from django.utils import timezone from freezegun import freeze_time @@ -365,32 +364,6 @@ def test_cohortpeople_basic(self): results = self._get_cohortpeople(cohort1) self.assertEqual(len(results), 2) - @patch("time.sleep", return_value=None) - def test_cohortpeople_basic_paginating(self, mock_sleep): - for i in range(15): - Person.objects.create( - team_id=self.team.pk, - distinct_ids=[f"{i}"], - properties={"$some_prop": "something", "$another_prop": "something"}, - ) - - cohort1: Cohort = Cohort.objects.create( - team=self.team, - groups=[ - { - "properties": [ - {"key": "$some_prop", "value": "something", "type": "person"}, - {"key": "$another_prop", "value": "something", "type": "person"}, - ] - } - ], - name="cohort1", - ) - - cohort1.calculate_people(new_version=cohort1.version, batch_size=2, pg_batch_size=1) - - self.assertEqual(len(cohort1.people.all()), 15) - def test_cohortpeople_action_basic(self): action = _create_action(team=self.team, name="$pageview") Person.objects.create( diff --git a/posthog/api/feature_flag.py b/posthog/api/feature_flag.py index c6c71ac432eb3..d5c7c29b59d0f 100644 --- a/posthog/api/feature_flag.py +++ b/posthog/api/feature_flag.py @@ -201,7 +201,6 @@ def create(self, validated_data: Dict, *args: Any, **kwargs: Any) -> FeatureFlag FeatureFlag.objects.filter(key=validated_data["key"], team=self.context["team_id"], deleted=True).delete() instance: FeatureFlag = super().create(validated_data) - instance.update_cohorts() self._attempt_set_tags(tags, instance) @@ -216,7 +215,6 @@ def update(self, instance: FeatureFlag, validated_data: Dict, *args: Any, **kwar FeatureFlag.objects.filter(key=validated_key, team=instance.team, deleted=True).delete() self._update_filters(validated_data) instance = super().update(instance, validated_data) - instance.update_cohorts() report_user_action(request.user, "feature flag updated", instance.get_analytics_metadata()) diff --git a/posthog/api/test/test_cohort.py b/posthog/api/test/test_cohort.py index 34bfdbb1776c5..cb5331b6d2489 100644 --- a/posthog/api/test/test_cohort.py +++ b/posthog/api/test/test_cohort.py @@ -45,7 +45,6 @@ def test_creating_update_and_calculating(self, patch_calculate_cohort, patch_cap "values": [{"type": "AND", "values": [{"key": "team_id", "value": 5, "type": "person"}]}], }, "name_length": 8, - "person_count_precalc": 0, "groups_count": 1, "action_groups_count": 0, "properties_groups_count": 1, @@ -79,7 +78,6 @@ def test_creating_update_and_calculating(self, patch_calculate_cohort, patch_cap "values": [{"type": "AND", "values": [{"key": "team_id", "value": 6, "type": "person"}]}], }, "name_length": 9, - "person_count_precalc": 0, "groups_count": 1, "action_groups_count": 0, "properties_groups_count": 1, diff --git a/posthog/api/test/test_feature_flag.py b/posthog/api/test/test_feature_flag.py index 61f414055274c..302def9eaa892 100644 --- a/posthog/api/test/test_feature_flag.py +++ b/posthog/api/test/test_feature_flag.py @@ -1689,19 +1689,6 @@ def test_creating_feature_flag_with_behavioral_cohort(self): response.json(), ) - @patch("posthog.tasks.calculate_cohort.calculate_cohort_ch.delay") - def test_cohort_is_calculated(self, calculate_cohort_ch): - cohort = Cohort.objects.create( - team=self.team, - groups=[{"properties": {"$some_prop": "something", "$another_prop": "something"}}], - name="cohort1", - ) - cohort_request = self._create_flag_with_properties( - "cohort-flag", [{"key": "id", "type": "cohort", "value": cohort.pk}] - ) - self.assertEqual(cohort_request.status_code, status.HTTP_201_CREATED) - self.assertEqual(calculate_cohort_ch.call_count, 1) - def test_validation_group_properties(self): groups_request = self._create_flag_with_properties( "groups-flag", diff --git a/posthog/celery.py b/posthog/celery.py index d328b645af778..7f440397c2914 100644 --- a/posthog/celery.py +++ b/posthog/celery.py @@ -132,8 +132,6 @@ def setup_periodic_tasks(sender: Celery, **kwargs): ) sender.add_periodic_task(120, graphile_worker_queue_size.s(), name="Graphile Worker queue size") - sender.add_periodic_task(crontab(minute=0, hour="*"), calculate_cohort_ids_in_feature_flags_task.s()) - sender.add_periodic_task(120, calculate_cohort.s(), name="recalculate cohorts") if settings.ASYNC_EVENT_PROPERTY_USAGE: @@ -560,13 +558,6 @@ def sync_insight_caching_state(team_id: int, insight_id: Optional[int] = None, d sync_insight_caching_state(team_id, insight_id, dashboard_tile_id) -@app.task(ignore_result=True) -def calculate_cohort_ids_in_feature_flags_task(): - from posthog.tasks.cohorts_in_feature_flag import calculate_cohort_ids_in_feature_flags - - calculate_cohort_ids_in_feature_flags() - - @app.task(ignore_result=True, bind=True) def debug_task(self): print(f"Request: {self.request!r}") diff --git a/posthog/models/cohort/cohort.py b/posthog/models/cohort/cohort.py index c21752838cde7..6f704247e7633 100644 --- a/posthog/models/cohort/cohort.py +++ b/posthog/models/cohort/cohort.py @@ -187,61 +187,21 @@ def get_analytics_metadata(self): return { "filters": self.properties.to_dict(), "name_length": len(self.name) if self.name else 0, - "person_count_precalc": self.people.count(), "groups_count": len(self.groups), "action_groups_count": action_groups_count, "properties_groups_count": properties_groups_count, "deleted": self.deleted, } - def calculate_people(self, new_version: int, batch_size=10000, pg_batch_size=1000): - if self.is_static: - return - try: - - # Paginate fetch batch_size from clickhouse and paginate insert pg_batch_size into postgres - cursor = 0 - persons = self._clickhouse_persons_query(batch_size=batch_size, offset=cursor) - while persons: - # TODO: Insert from a subquery instead of pulling retrieving - # then sending large lists of data backwards and forwards. - to_insert = [ - CohortPeople(person_id=person_id, cohort_id=self.pk, version=new_version) - #  Just pull out the person id as we don't need anything - #  else. - for person_id in persons.values_list("id", flat=True) - ] - #  TODO: make sure this bulk_create doesn't actually return anything - CohortPeople.objects.bulk_create(to_insert, batch_size=pg_batch_size) - - cursor += batch_size - persons = self._clickhouse_persons_query(batch_size=batch_size, offset=cursor) - if persons.exists() and not TEST: - time.sleep(5) - - except Exception as err: - # Clear the pending version people if there's an error - batch_delete_cohort_people(self.pk, new_version) - - raise err - def calculate_people_ch(self, pending_version): from posthog.models.cohort.util import recalculate_cohortpeople - from posthog.tasks.cohorts_in_feature_flag import get_cohort_ids_in_feature_flags logger.info("cohort_calculation_started", id=self.pk, current_version=self.version, new_version=pending_version) start_time = time.monotonic() try: count = recalculate_cohortpeople(self, pending_version) - - # only precalculate if used in feature flag - ids = get_cohort_ids_in_feature_flags() - - if self.pk in ids: - self.calculate_people(new_version=pending_version) - else: - self.count = count + self.count = count self.last_calculation = timezone.now() self.errors_calculating = 0 @@ -372,9 +332,3 @@ class CohortPeople(models.Model): class Meta: indexes = [models.Index(fields=["cohort_id", "person_id"])] - - -def batch_delete_cohort_people(cohort_id: int, version: int, batch_size: int = 1000): - while batch := CohortPeople.objects.filter(cohort_id=cohort_id, version=version).values("id")[:batch_size]: - CohortPeople.objects.filter(id__in=batch)._raw_delete(batch.db) # type: ignore - time.sleep(1) diff --git a/posthog/models/feature_flag/feature_flag.py b/posthog/models/feature_flag/feature_flag.py index 2b6b1ee5f2ea2..a18702e07b6e9 100644 --- a/posthog/models/feature_flag/feature_flag.py +++ b/posthog/models/feature_flag/feature_flag.py @@ -213,15 +213,6 @@ def cohort_ids(self) -> List[int]: cohort_ids.append(cohort_id) return cohort_ids - def update_cohorts(self) -> None: - from posthog.tasks.calculate_cohort import update_cohort - from posthog.tasks.cohorts_in_feature_flag import COHORT_ID_IN_FF_KEY - - if self.cohort_ids: - cache.delete(COHORT_ID_IN_FF_KEY) - for cohort in Cohort.objects.filter(pk__in=self.cohort_ids): - update_cohort(cohort) - def __str__(self): return f"{self.key} ({self.pk})" diff --git a/posthog/tasks/cohorts_in_feature_flag.py b/posthog/tasks/cohorts_in_feature_flag.py deleted file mode 100644 index 7298dca7dbf41..0000000000000 --- a/posthog/tasks/cohorts_in_feature_flag.py +++ /dev/null @@ -1,35 +0,0 @@ -from typing import List - -from django.core.cache import cache -from django.db.models import TextField -from django.db.models.functions import Cast - -COHORT_ID_IN_FF_KEY = "cohort_ids_in_feature_flag" - - -def calculate_cohort_ids_in_feature_flags() -> List[int]: - from posthog.models.feature_flag import FeatureFlag - - flag: FeatureFlag - cohort_ids = [] - for flag in FeatureFlag.objects.annotate(filters_as_text=Cast("filters", TextField())).filter( - deleted=False, filters_as_text__contains="cohort" - ): - cohort_ids.extend(flag.cohort_ids) - - # dedup - cohort_ids = list(set(cohort_ids)) - - cache.set(COHORT_ID_IN_FF_KEY, cohort_ids, None) # don't expire - return cohort_ids - - -def get_cohort_ids_in_feature_flags() -> List[int]: - try: - ids = cache.get(COHORT_ID_IN_FF_KEY, None) - if ids: - return ids - else: - return calculate_cohort_ids_in_feature_flags() - except: - return calculate_cohort_ids_in_feature_flags() diff --git a/posthog/test/test_cohort_model.py b/posthog/test/test_cohort_model.py index 011b600510823..140f9f7186ed8 100644 --- a/posthog/test/test_cohort_model.py +++ b/posthog/test/test_cohort_model.py @@ -1,10 +1,7 @@ -from unittest.mock import patch - import pytest from posthog.client import sync_execute -from posthog.models import Cohort, FeatureFlag, Person, Team -from posthog.models.cohort import CohortPeople, batch_delete_cohort_people +from posthog.models import Cohort, Person, Team from posthog.models.cohort.sql import GET_COHORTPEOPLE_BY_COHORT_ID from posthog.test.base import BaseTest @@ -68,35 +65,6 @@ def test_empty_query(self): cohort2.calculate_people_ch(pending_version=0) self.assertFalse(Cohort.objects.get().is_calculating) - @patch("time.sleep", return_value=None) - def test_batch_delete_cohort_people(self, patch_sleep): - Person.objects.create(distinct_ids=["person1"], team_id=self.team.pk, properties={"$some_prop": "something"}) - Person.objects.create(distinct_ids=["person2"], team_id=self.team.pk, properties={}) - Person.objects.create(distinct_ids=["person3"], team_id=self.team.pk, properties={"$some_prop": "something"}) - cohort = Cohort.objects.create( - team=self.team, - groups=[{"properties": [{"key": "$some_prop", "value": "something", "type": "person"}]}], - name="cohort1", - ) - - cohort.calculate_people_ch(pending_version=0) - - flag: FeatureFlag = FeatureFlag.objects.create( - team=self.team, - filters={ - "groups": [ - {"properties": [{"key": "id", "type": "cohort", "value": cohort.pk}], "rollout_percentage": None} - ] - }, - key="default-flag-1", - created_by=self.user, - ) - flag.update_cohorts() - - self.assertEqual(CohortPeople.objects.count(), 2) - batch_delete_cohort_people(cohort_id=cohort.pk, version=1, batch_size=1) - self.assertEqual(CohortPeople.objects.count(), 0) - def test_group_to_property_conversion(self): cohort = Cohort.objects.create( team=self.team, diff --git a/posthog/test/test_feature_flag.py b/posthog/test/test_feature_flag.py index 97b812677e2e1..2ae5099882bbe 100644 --- a/posthog/test/test_feature_flag.py +++ b/posthog/test/test_feature_flag.py @@ -1364,8 +1364,6 @@ def test_user_in_cohort(self): filters={"groups": [{"properties": [{"key": "id", "value": cohort.pk, "type": "cohort"}]}]} ) - feature_flag.update_cohorts() - self.assertEqual( FeatureFlagMatcher([feature_flag], "example_id_1").get_match(feature_flag), FeatureFlagMatch(True, None, FeatureFlagMatchReason.CONDITION_MATCH, 0), @@ -1412,8 +1410,6 @@ def test_cohort_expansion_returns_same_result_as_regular_flag(self): }, ) - feature_flag.update_cohorts() - self.assertEqual( FeatureFlagMatcher([feature_flag], "example_id_4").get_match(feature_flag), FeatureFlagMatch(False, None, FeatureFlagMatchReason.OUT_OF_ROLLOUT_BOUND, 0), @@ -1458,7 +1454,6 @@ def test_user_in_static_cohort(self): filters={"groups": [{"properties": [{"key": "id", "value": cohort.pk, "type": "cohort"}]}]} ) - feature_flag.update_cohorts() self.assertEqual( FeatureFlagMatcher([feature_flag], "example_id_1").get_match(feature_flag), FeatureFlagMatch(True, None, FeatureFlagMatchReason.CONDITION_MATCH, 0), @@ -1557,8 +1552,6 @@ def test_legacy_user_in_cohort(self): filters={"properties": [{"key": "id", "value": cohort.pk, "type": "cohort"}]} ) - feature_flag.update_cohorts() - self.assertEqual( FeatureFlagMatcher([feature_flag], "example_id_2").get_match(feature_flag), FeatureFlagMatch(True, None, FeatureFlagMatchReason.CONDITION_MATCH, 0),