Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cohorts): Remove postgres calculations for flags #14272

Merged
merged 6 commits into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions ee/clickhouse/models/test/__snapshots__/test_cohort.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
(SELECT pdi.person_id AS person_id,
countIf(timestamp > now() - INTERVAL 2 year
AND timestamp < now()
AND event = '$pageview') > 0 AS performed_event_condition_5_level_level_0_level_0_level_0_0
AND event = '$pageview') > 0 AS performed_event_condition_21_level_level_0_level_0_level_0_0
FROM events e
INNER JOIN
(SELECT distinct_id,
Expand Down Expand Up @@ -113,7 +113,7 @@
HAVING max(is_deleted) = 0
AND (((((NOT has(['something1'], replaceRegexpAll(JSONExtractRaw(argMax(person.properties, version), '$some_prop'), '^"|"$', ''))))))))) person ON person.person_id = behavior_query.person_id
WHERE 1 = 1
AND ((((performed_event_condition_5_level_level_0_level_0_level_0_0)))) ) as person
AND ((((performed_event_condition_21_level_level_0_level_0_level_0_0)))) ) as person
UNION ALL
SELECT person_id,
cohort_id,
Expand Down Expand Up @@ -148,7 +148,7 @@
(SELECT pdi.person_id AS person_id,
countIf(timestamp > now() - INTERVAL 2 year
AND timestamp < now()
AND event = '$pageview') > 0 AS performed_event_condition_7_level_level_0_level_0_level_0_0
AND event = '$pageview') > 0 AS performed_event_condition_23_level_level_0_level_0_level_0_0
FROM events e
INNER JOIN
(SELECT distinct_id,
Expand Down Expand Up @@ -178,7 +178,7 @@
HAVING max(is_deleted) = 0
AND (((((NOT has(['something1'], replaceRegexpAll(JSONExtractRaw(argMax(person.properties, version), '$some_prop'), '^"|"$', ''))))))))) person ON person.person_id = behavior_query.person_id
WHERE 1 = 1
AND ((((performed_event_condition_7_level_level_0_level_0_level_0_0)))) ) ))
AND ((((performed_event_condition_23_level_level_0_level_0_level_0_0)))) ) ))
'
---
# name: TestCohort.test_cohortpeople_with_not_in_cohort_operator_for_behavioural_cohorts
Expand All @@ -195,7 +195,7 @@
FROM
(SELECT pdi.person_id AS person_id,
minIf(timestamp, event = 'signup') >= now() - INTERVAL 15 day
AND minIf(timestamp, event = 'signup') < now() as first_time_condition_8_level_level_0_level_0_0
AND minIf(timestamp, event = 'signup') < now() as first_time_condition_24_level_level_0_level_0_0
FROM events e
INNER JOIN
(SELECT distinct_id,
Expand All @@ -208,7 +208,7 @@
AND event IN ['signup']
GROUP BY person_id) behavior_query
WHERE 1 = 1
AND (((first_time_condition_8_level_level_0_level_0_0))) ) as person
AND (((first_time_condition_24_level_level_0_level_0_0))) ) as person
UNION ALL
SELECT person_id,
cohort_id,
Expand Down Expand Up @@ -237,9 +237,9 @@
(SELECT pdi.person_id AS person_id,
countIf(timestamp > now() - INTERVAL 2 year
AND timestamp < now()
AND event = '$pageview') > 0 AS performed_event_condition_9_level_level_0_level_0_level_0_0,
AND event = '$pageview') > 0 AS performed_event_condition_25_level_level_0_level_0_level_0_0,
minIf(timestamp, event = 'signup') >= now() - INTERVAL 15 day
AND minIf(timestamp, event = 'signup') < now() as first_time_condition_9_level_level_0_level_1_level_0_level_0_level_0_0
AND minIf(timestamp, event = 'signup') < now() as first_time_condition_25_level_level_0_level_1_level_0_level_0_level_0_0
FROM events e
INNER JOIN
(SELECT distinct_id,
Expand All @@ -252,8 +252,8 @@
AND event IN ['$pageview', 'signup']
GROUP BY person_id) behavior_query
WHERE 1 = 1
AND ((((performed_event_condition_9_level_level_0_level_0_level_0_0))
AND ((((NOT first_time_condition_9_level_level_0_level_1_level_0_level_0_level_0_0)))))) ) as person
AND ((((performed_event_condition_25_level_level_0_level_0_level_0_0))
AND ((((NOT first_time_condition_25_level_level_0_level_1_level_0_level_0_level_0_0)))))) ) as person
UNION ALL
SELECT person_id,
cohort_id,
Expand Down
2 changes: 1 addition & 1 deletion ee/clickhouse/models/test/__snapshots__/test_property.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@
))
',
<class 'dict'> {
'global_cohort_id_0': 37,
'global_cohort_id_0': 25,
},
)
---
27 changes: 0 additions & 27 deletions ee/clickhouse/models/test/test_cohort.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from datetime import datetime, timedelta
from unittest.mock import patch

from django.utils import timezone
from freezegun import freeze_time
Expand Down Expand Up @@ -365,32 +364,6 @@ def test_cohortpeople_basic(self):
results = self._get_cohortpeople(cohort1)
self.assertEqual(len(results), 2)

@patch("time.sleep", return_value=None)
def test_cohortpeople_basic_paginating(self, mock_sleep):
for i in range(15):
Person.objects.create(
team_id=self.team.pk,
distinct_ids=[f"{i}"],
properties={"$some_prop": "something", "$another_prop": "something"},
)

cohort1: Cohort = Cohort.objects.create(
team=self.team,
groups=[
{
"properties": [
{"key": "$some_prop", "value": "something", "type": "person"},
{"key": "$another_prop", "value": "something", "type": "person"},
]
}
],
name="cohort1",
)

cohort1.calculate_people(new_version=cohort1.version, batch_size=2, pg_batch_size=1)

self.assertEqual(len(cohort1.people.all()), 15)

def test_cohortpeople_action_basic(self):
action = _create_action(team=self.team, name="$pageview")
Person.objects.create(
Expand Down
2 changes: 0 additions & 2 deletions posthog/api/feature_flag.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ def create(self, validated_data: Dict, *args: Any, **kwargs: Any) -> FeatureFlag

FeatureFlag.objects.filter(key=validated_data["key"], team=self.context["team_id"], deleted=True).delete()
instance: FeatureFlag = super().create(validated_data)
instance.update_cohorts()

self._attempt_set_tags(tags, instance)

Expand All @@ -216,7 +215,6 @@ def update(self, instance: FeatureFlag, validated_data: Dict, *args: Any, **kwar
FeatureFlag.objects.filter(key=validated_key, team=instance.team, deleted=True).delete()
self._update_filters(validated_data)
instance = super().update(instance, validated_data)
instance.update_cohorts()

report_user_action(request.user, "feature flag updated", instance.get_analytics_metadata())

Expand Down
2 changes: 0 additions & 2 deletions posthog/api/test/test_cohort.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def test_creating_update_and_calculating(self, patch_calculate_cohort, patch_cap
"values": [{"type": "AND", "values": [{"key": "team_id", "value": 5, "type": "person"}]}],
},
"name_length": 8,
"person_count_precalc": 0,
"groups_count": 1,
"action_groups_count": 0,
"properties_groups_count": 1,
Expand Down Expand Up @@ -79,7 +78,6 @@ def test_creating_update_and_calculating(self, patch_calculate_cohort, patch_cap
"values": [{"type": "AND", "values": [{"key": "team_id", "value": 6, "type": "person"}]}],
},
"name_length": 9,
"person_count_precalc": 0,
"groups_count": 1,
"action_groups_count": 0,
"properties_groups_count": 1,
Expand Down
13 changes: 0 additions & 13 deletions posthog/api/test/test_feature_flag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1689,19 +1689,6 @@ def test_creating_feature_flag_with_behavioral_cohort(self):
response.json(),
)

@patch("posthog.tasks.calculate_cohort.calculate_cohort_ch.delay")
def test_cohort_is_calculated(self, calculate_cohort_ch):
cohort = Cohort.objects.create(
team=self.team,
groups=[{"properties": {"$some_prop": "something", "$another_prop": "something"}}],
name="cohort1",
)
cohort_request = self._create_flag_with_properties(
"cohort-flag", [{"key": "id", "type": "cohort", "value": cohort.pk}]
)
self.assertEqual(cohort_request.status_code, status.HTTP_201_CREATED)
self.assertEqual(calculate_cohort_ch.call_count, 1)

def test_validation_group_properties(self):
groups_request = self._create_flag_with_properties(
"groups-flag",
Expand Down
9 changes: 0 additions & 9 deletions posthog/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ def setup_periodic_tasks(sender: Celery, **kwargs):
)
sender.add_periodic_task(120, graphile_worker_queue_size.s(), name="Graphile Worker queue size")

sender.add_periodic_task(crontab(minute=0, hour="*"), calculate_cohort_ids_in_feature_flags_task.s())

sender.add_periodic_task(120, calculate_cohort.s(), name="recalculate cohorts")

if settings.ASYNC_EVENT_PROPERTY_USAGE:
Expand Down Expand Up @@ -560,13 +558,6 @@ def sync_insight_caching_state(team_id: int, insight_id: Optional[int] = None, d
sync_insight_caching_state(team_id, insight_id, dashboard_tile_id)


@app.task(ignore_result=True)
def calculate_cohort_ids_in_feature_flags_task():
from posthog.tasks.cohorts_in_feature_flag import calculate_cohort_ids_in_feature_flags

calculate_cohort_ids_in_feature_flags()


@app.task(ignore_result=True, bind=True)
def debug_task(self):
print(f"Request: {self.request!r}")
Expand Down
48 changes: 1 addition & 47 deletions posthog/models/cohort/cohort.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,61 +187,21 @@ def get_analytics_metadata(self):
return {
"filters": self.properties.to_dict(),
"name_length": len(self.name) if self.name else 0,
"person_count_precalc": self.people.count(),
"groups_count": len(self.groups),
"action_groups_count": action_groups_count,
"properties_groups_count": properties_groups_count,
"deleted": self.deleted,
}

def calculate_people(self, new_version: int, batch_size=10000, pg_batch_size=1000):
if self.is_static:
return
try:

# Paginate fetch batch_size from clickhouse and paginate insert pg_batch_size into postgres
cursor = 0
persons = self._clickhouse_persons_query(batch_size=batch_size, offset=cursor)
while persons:
# TODO: Insert from a subquery instead of pulling retrieving
# then sending large lists of data backwards and forwards.
to_insert = [
CohortPeople(person_id=person_id, cohort_id=self.pk, version=new_version)
#  Just pull out the person id as we don't need anything
#  else.
for person_id in persons.values_list("id", flat=True)
]
#  TODO: make sure this bulk_create doesn't actually return anything
CohortPeople.objects.bulk_create(to_insert, batch_size=pg_batch_size)

cursor += batch_size
persons = self._clickhouse_persons_query(batch_size=batch_size, offset=cursor)
if persons.exists() and not TEST:
time.sleep(5)

except Exception as err:
# Clear the pending version people if there's an error
batch_delete_cohort_people(self.pk, new_version)

raise err

def calculate_people_ch(self, pending_version):
from posthog.models.cohort.util import recalculate_cohortpeople
from posthog.tasks.cohorts_in_feature_flag import get_cohort_ids_in_feature_flags

logger.info("cohort_calculation_started", id=self.pk, current_version=self.version, new_version=pending_version)
start_time = time.monotonic()

try:
count = recalculate_cohortpeople(self, pending_version)

# only precalculate if used in feature flag
ids = get_cohort_ids_in_feature_flags()

if self.pk in ids:
self.calculate_people(new_version=pending_version)
else:
self.count = count
self.count = count

self.last_calculation = timezone.now()
self.errors_calculating = 0
Expand Down Expand Up @@ -372,9 +332,3 @@ class CohortPeople(models.Model):

class Meta:
indexes = [models.Index(fields=["cohort_id", "person_id"])]


def batch_delete_cohort_people(cohort_id: int, version: int, batch_size: int = 1000):
while batch := CohortPeople.objects.filter(cohort_id=cohort_id, version=version).values("id")[:batch_size]:
CohortPeople.objects.filter(id__in=batch)._raw_delete(batch.db) # type: ignore
time.sleep(1)
9 changes: 0 additions & 9 deletions posthog/models/feature_flag/feature_flag.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,6 @@ def cohort_ids(self) -> List[int]:
cohort_ids.append(cohort_id)
return cohort_ids

def update_cohorts(self) -> None:
from posthog.tasks.calculate_cohort import update_cohort
from posthog.tasks.cohorts_in_feature_flag import COHORT_ID_IN_FF_KEY

if self.cohort_ids:
cache.delete(COHORT_ID_IN_FF_KEY)
for cohort in Cohort.objects.filter(pk__in=self.cohort_ids):
update_cohort(cohort)

def __str__(self):
return f"{self.key} ({self.pk})"

Expand Down
35 changes: 0 additions & 35 deletions posthog/tasks/cohorts_in_feature_flag.py

This file was deleted.

34 changes: 1 addition & 33 deletions posthog/test/test_cohort_model.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
from unittest.mock import patch

import pytest

from posthog.client import sync_execute
from posthog.models import Cohort, FeatureFlag, Person, Team
from posthog.models.cohort import CohortPeople, batch_delete_cohort_people
from posthog.models import Cohort, Person, Team
from posthog.models.cohort.sql import GET_COHORTPEOPLE_BY_COHORT_ID
from posthog.test.base import BaseTest

Expand Down Expand Up @@ -68,35 +65,6 @@ def test_empty_query(self):
cohort2.calculate_people_ch(pending_version=0)
self.assertFalse(Cohort.objects.get().is_calculating)

@patch("time.sleep", return_value=None)
def test_batch_delete_cohort_people(self, patch_sleep):
Person.objects.create(distinct_ids=["person1"], team_id=self.team.pk, properties={"$some_prop": "something"})
Person.objects.create(distinct_ids=["person2"], team_id=self.team.pk, properties={})
Person.objects.create(distinct_ids=["person3"], team_id=self.team.pk, properties={"$some_prop": "something"})
cohort = Cohort.objects.create(
team=self.team,
groups=[{"properties": [{"key": "$some_prop", "value": "something", "type": "person"}]}],
name="cohort1",
)

cohort.calculate_people_ch(pending_version=0)

flag: FeatureFlag = FeatureFlag.objects.create(
team=self.team,
filters={
"groups": [
{"properties": [{"key": "id", "type": "cohort", "value": cohort.pk}], "rollout_percentage": None}
]
},
key="default-flag-1",
created_by=self.user,
)
flag.update_cohorts()

self.assertEqual(CohortPeople.objects.count(), 2)
batch_delete_cohort_people(cohort_id=cohort.pk, version=1, batch_size=1)
self.assertEqual(CohortPeople.objects.count(), 0)

def test_group_to_property_conversion(self):
cohort = Cohort.objects.create(
team=self.team,
Expand Down
Loading