Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

misc(event): Remove ordering of event queries when not required #2618

Merged
merged 1 commit into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def perform_custom_aggregation(target_result: result, grouped_by_values: nil)

# NOTE: Loop over events by batch
(1..total_batches).each do |batch|
events_properties = store.events.page(batch).per(BATCH_SIZE)
events_properties = store.events(ordered: true).page(batch).per(BATCH_SIZE)
.map { |event| {timestamp: event.timestamp, properties: event.properties} }

state = sandboxed_aggregation(events_properties, state)
Expand Down
6 changes: 3 additions & 3 deletions app/services/events/stores/clickhouse/unique_count_query.rb
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def events_cte_sql
<<-SQL
WITH events_data AS (
(#{
events
events(ordered: true)
.select(
"toDateTime64(timestamp, 5, 'UTC') as timestamp, \
#{sanitized_property_name} AS property, \
Expand All @@ -224,7 +224,7 @@ def grouped_events_cte_sql

<<-SQL
WITH events_data AS (#{
events
events(ordered: true)
.select(
"#{groups.join(", ")}, \
toDateTime64(timestamp, 5, 'UTC') as timestamp, \
Expand Down Expand Up @@ -340,7 +340,7 @@ def grouped_period_ratio_sql
end

def group_names
@group_names ||= store.grouped_by.map.with_index { |_, index| "g_#{index}" }.join(', ')
@group_names ||= store.grouped_by.map.with_index { |_, index| "g_#{index}" }.join(", ")
end
end
end
Expand Down
14 changes: 7 additions & 7 deletions app/services/events/stores/clickhouse/weighted_sum_query.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def events_cte_sql
(#{initial_value_sql})
UNION ALL
(#{
events
events(ordered: true)
.select("timestamp, #{sanitized_numeric_property} AS difference")
.group(Events::Stores::ClickhouseStore::DEDUPLICATION_GROUP)
.to_sql
Expand Down Expand Up @@ -124,7 +124,7 @@ def grouped_events_cte_sql(initial_values)
(#{grouped_initial_value_sql(initial_values)})
UNION ALL
(#{
events
events(ordered: true)
.select("#{groups.join(", ")}, timestamp, #{sanitized_numeric_property} AS difference")
.group(Events::Stores::ClickhouseStore::DEDUPLICATION_GROUP)
.to_sql
Expand All @@ -143,9 +143,9 @@ def grouped_initial_value_sql(initial_values)

[
groups,
'toDateTime64(:from_datetime, 5, \'UTC\') AS timestamp',
"toDateTime64(:from_datetime, 5, 'UTC') AS timestamp",
"toDecimal128(#{initial_value[:value]}, :decimal_scale) AS difference"
].flatten.join(', ')
].flatten.join(", ")
end

<<-SQL
Expand All @@ -166,8 +166,8 @@ def grouped_end_of_period_value_sql(initial_values)
[
groups,
"toDateTime64(:to_datetime, 5, 'UTC') AS timestamp",
'toDecimal128(0, :decimal_scale) AS difference'
].flatten.join(', ')
"toDecimal128(0, :decimal_scale) AS difference"
].flatten.join(", ")
end

<<-SQL
Expand Down Expand Up @@ -202,7 +202,7 @@ def grouped_period_ratio_sql
end

def group_names
@group_names ||= store.grouped_by.map.with_index { |_, index| "g_#{index}" }.join(', ')
@group_names ||= store.grouped_by.map.with_index { |_, index| "g_#{index}" }.join(", ")
end
end
end
Expand Down
91 changes: 45 additions & 46 deletions app/services/events/stores/clickhouse_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,45 +4,46 @@ module Events
module Stores
class ClickhouseStore < BaseStore
DECIMAL_SCALE = 26
DEDUPLICATION_GROUP = 'events_raw.transaction_id, events_raw.properties, events_raw.timestamp'
DEDUPLICATION_GROUP = "events_raw.transaction_id, events_raw.properties, events_raw.timestamp"

# NOTE: keeps in mind that events could contains duplicated transaction_id
# and should be deduplicated depending on the aggregation logic
def events(force_from: false)
def events(force_from: false, ordered: false)
scope = ::Clickhouse::EventsRaw.where(external_subscription_id: subscription.external_id)
.where(organization_id: subscription.organization.id)
.where(code:)
.order(timestamp: :asc)

scope = scope.where('events_raw.timestamp >= ?', from_datetime) if force_from || use_from_boundary
scope = scope.where('events_raw.timestamp <= ?', to_datetime) if to_datetime
scope = scope.order(timestamp: :asc) if ordered

scope = scope.where("events_raw.timestamp >= ?", from_datetime) if force_from || use_from_boundary
scope = scope.where("events_raw.timestamp <= ?", to_datetime) if to_datetime
scope = scope.where(numeric_condition) if numeric_property

scope = with_grouped_by_values(scope) if grouped_by_values?
filters_scope(scope)
end

def events_values(limit: nil, force_from: false, exclude_event: false)
scope = events(force_from:).group(DEDUPLICATION_GROUP)
scope = events(force_from:, ordered: true).group(DEDUPLICATION_GROUP)

scope = scope.where('events_raw.transaction_id != ?', filters[:event].transaction_id) if exclude_event
scope = scope.where("events_raw.transaction_id != ?", filters[:event].transaction_id) if exclude_event
scope = scope.limit(limit) if limit

scope.pluck(Arel.sql(sanitized_numeric_property))
end

def last_event
events.last
events(ordered: true).last
end

def grouped_last_event
groups = grouped_by.map { |group| sanitized_property_name(group) }
group_names = groups.map.with_index { |_, index| "g_#{index}" }.join(', ')
group_names = groups.map.with_index { |_, index| "g_#{index}" }.join(", ")

cte_sql = events.group(DEDUPLICATION_GROUP)
cte_sql = events(ordered: true).group(DEDUPLICATION_GROUP)
.select(Arel.sql(
(groups.map.with_index { |group, index| "#{group} AS g_#{index}" } +
["#{sanitized_numeric_property} AS property", 'events_raw.timestamp']).join(', ')
["#{sanitized_numeric_property} AS property", "events_raw.timestamp"]).join(", ")
))
.to_sql

Expand All @@ -61,15 +62,15 @@ def grouped_last_event
end

def prorated_events_values(total_duration)
ratio_sql = duration_ratio_sql('events_raw.timestamp', to_datetime, total_duration)
ratio_sql = duration_ratio_sql("events_raw.timestamp", to_datetime, total_duration)

events.group(DEDUPLICATION_GROUP)
events(ordered: true).group(DEDUPLICATION_GROUP)
.pluck(Arel.sql("#{sanitized_numeric_property} * (#{ratio_sql})"))
end

def count
sql = events.reorder('')
.select('uniqExact(events_raw.transaction_id) AS event_count')
sql = events
.select("uniqExact(events_raw.transaction_id) AS event_count")
.to_sql

::Clickhouse::EventsRaw.connection.select_value(sql).to_i
Expand All @@ -82,7 +83,7 @@ def grouped_count
group_names = groups.map.with_index { |_, index| "g_#{index}" }

cte_sql = events.group(DEDUPLICATION_GROUP)
.select((groups + ['events_raw.transaction_id']).join(', '))
.select((groups + ["events_raw.transaction_id"]).join(", "))

sql = <<-SQL
with events as (#{cte_sql.to_sql})
Expand All @@ -101,14 +102,14 @@ def grouped_count
# unique property
def active_unique_property?(event)
previous_event = events
.where('events_raw.properties[?] = ?', aggregation_property, event.properties[aggregation_property])
.where('events_raw.timestamp < ?', event.timestamp)
.reorder(timestamp: :desc)
.where("events_raw.properties[?] = ?", aggregation_property, event.properties[aggregation_property])
.where("events_raw.timestamp < ?", event.timestamp)
.order(timestamp: :desc)
.first

previous_event && (
previous_event.properties['operation_type'].nil? ||
previous_event.properties['operation_type'] == 'add'
previous_event.properties["operation_type"].nil? ||
previous_event.properties["operation_type"] == "add"
)
end

Expand All @@ -122,7 +123,7 @@ def unique_count
)
result = ::Clickhouse::EventsRaw.connection.select_one(sql)

result['aggregation']
result["aggregation"]
end

# NOTE: not used in production, only for debug purpose to check the computed values before aggregation
Expand Down Expand Up @@ -153,7 +154,7 @@ def prorated_unique_count
)
result = ::Clickhouse::EventsRaw.connection.select_one(sql)

result['aggregation']
result["aggregation"]
end

def prorated_unique_count_breakdown(with_remove: false)
Expand Down Expand Up @@ -210,12 +211,12 @@ def max

def grouped_max
groups = grouped_by.map { |group| sanitized_property_name(group) }
group_names = groups.map.with_index { |_, index| "g_#{index}" }.join(', ')
group_names = groups.map.with_index { |_, index| "g_#{index}" }.join(", ")

cte_sql = events.group(DEDUPLICATION_GROUP)
.select(Arel.sql(
(groups.map.with_index { |group, index| "#{group} AS g_#{index}" } +
["#{sanitized_numeric_property} AS property", 'events_raw.timestamp']).join(', ')
["#{sanitized_numeric_property} AS property", "events_raw.timestamp"]).join(", ")
))
.to_sql

Expand All @@ -233,20 +234,20 @@ def grouped_max
end

def last
value = events.last&.properties&.[](aggregation_property)
value = events(ordered: true).last&.properties&.[](aggregation_property)
return value unless value

BigDecimal(value)
end

def grouped_last
groups = grouped_by.map { |group| sanitized_property_name(group) }
group_names = groups.map.with_index { |_, index| "g_#{index}" }.join(', ')
group_names = groups.map.with_index { |_, index| "g_#{index}" }.join(", ")

cte_sql = events.group(DEDUPLICATION_GROUP)
cte_sql = events(ordered: true).group(DEDUPLICATION_GROUP)
.select(Arel.sql(
(groups.map.with_index { |group, index| "#{group} AS g_#{index}" } +
["#{sanitized_numeric_property} AS property", 'events_raw.timestamp']).join(', ')
["#{sanitized_numeric_property} AS property", "events_raw.timestamp"]).join(", ")
))
.to_sql

Expand Down Expand Up @@ -282,10 +283,10 @@ def grouped_sum
groups = grouped_by.map.with_index do |group, index|
"#{sanitized_property_name(group)} AS g_#{index}"
end
group_names = groups.map.with_index { |_, index| "g_#{index}" }.join(', ')
group_names = groups.map.with_index { |_, index| "g_#{index}" }.join(", ")

cte_sql = events.group(DEDUPLICATION_GROUP)
.select((groups + [Arel.sql("#{sanitized_numeric_property} AS property")]).join(', '))
.select((groups + [Arel.sql("#{sanitized_numeric_property} AS property")]).join(", "))

sql = <<-SQL
with events as (#{cte_sql.to_sql})
Expand All @@ -304,11 +305,10 @@ def prorated_sum(period_duration:, persisted_duration: nil)
ratio = if persisted_duration
persisted_duration.fdiv(period_duration)
else
duration_ratio_sql('events_raw.timestamp', to_datetime, period_duration)
duration_ratio_sql("events_raw.timestamp", to_datetime, period_duration)
end

cte_sql = events
.reorder('')
.group(DEDUPLICATION_GROUP)
.select(Arel.sql("(#{sanitized_numeric_property}) * (#{ratio}) AS prorated_value"))
.to_sql
Expand All @@ -327,18 +327,17 @@ def grouped_prorated_sum(period_duration:, persisted_duration: nil)
groups = grouped_by.map.with_index do |group, index|
"#{sanitized_property_name(group)} AS g_#{index}"
end
group_names = groups.map.with_index { |_, index| "g_#{index}" }.join(', ')
group_names = groups.map.with_index { |_, index| "g_#{index}" }.join(", ")

ratio = if persisted_duration
persisted_duration.fdiv(period_duration)
else
duration_ratio_sql('events_raw.timestamp', to_datetime, period_duration)
duration_ratio_sql("events_raw.timestamp", to_datetime, period_duration)
end

cte_sql = events
.reorder('')
.group(DEDUPLICATION_GROUP)
.select((groups + [Arel.sql("(#{sanitized_numeric_property}) * (#{ratio}) AS prorated_value")]).join(', '))
.select((groups + [Arel.sql("(#{sanitized_numeric_property}) * (#{ratio}) AS prorated_value")]).join(", "))
.to_sql

sql = <<-SQL
Expand All @@ -355,7 +354,7 @@ def grouped_prorated_sum(period_duration:, persisted_duration: nil)
end

def sum_date_breakdown
date_field = date_in_customer_timezone_sql('events_raw.timestamp')
date_field = date_in_customer_timezone_sql("events_raw.timestamp")

cte_sql = events.group(DEDUPLICATION_GROUP)
.select("toDate(#{date_field}) as day, #{sanitized_numeric_property} as property")
Expand Down Expand Up @@ -393,7 +392,7 @@ def weighted_sum(initial_value: 0)
)

result = ::Clickhouse::EventsRaw.connection.select_one(sql)
result['aggregation']
result["aggregation"]
end

def grouped_weighted_sum(initial_values: [])
Expand Down Expand Up @@ -451,17 +450,17 @@ def weighted_sum_breakdown(initial_value: 0)

def filters_scope(scope)
matching_filters.each do |key, values|
scope = scope.where('events_raw.properties[?] IN ?', key.to_s, values)
scope = scope.where("events_raw.properties[?] IN ?", key.to_s, values)
end

conditions = ignored_filters.map do |filters|
filters.map do |key, values|
ActiveRecord::Base.sanitize_sql_for_conditions(
["(coalesce(events_raw.properties[?], '') IN (?))", key.to_s, values.map(&:to_s)]
)
end.join(' AND ')
end.join(" AND ")
end
sql = conditions.map { "(#{_1})" }.join(' OR ')
sql = conditions.map { "(#{_1})" }.join(" OR ")
scope = scope.where.not(sql) if sql.present?

scope
Expand All @@ -470,7 +469,7 @@ def filters_scope(scope)
def with_grouped_by_values(scope)
grouped_by_values.each do |grouped_by, grouped_by_value|
scope = if grouped_by_value.present?
scope.where('events_raw.properties[?] = ?', grouped_by, grouped_by_value)
scope.where("events_raw.properties[?] = ?", grouped_by, grouped_by_value)
else
scope.where("COALESCE(events_raw.properties[?], '') = ''", grouped_by)
end
Expand All @@ -481,14 +480,14 @@ def with_grouped_by_values(scope)

def sanitized_property_name(property = aggregation_property)
ActiveRecord::Base.sanitize_sql_for_conditions(
['events_raw.properties[?]', property]
["events_raw.properties[?]", property]
)
end

def numeric_condition
ActiveRecord::Base.sanitize_sql_for_conditions(
[
'toDecimal128OrNull(events_raw.properties[?], ?) IS NOT NULL',
"toDecimal128OrNull(events_raw.properties[?], ?) IS NOT NULL",
aggregation_property,
DECIMAL_SCALE
]
Expand All @@ -497,7 +496,7 @@ def numeric_condition

def sanitized_numeric_property
ActiveRecord::Base.sanitize_sql_for_conditions(
['toDecimal128(events_raw.properties[?], ?)', aggregation_property, DECIMAL_SCALE]
["toDecimal128(events_raw.properties[?], ?)", aggregation_property, DECIMAL_SCALE]
)
end

Expand Down
6 changes: 3 additions & 3 deletions app/services/events/stores/postgres/unique_count_query.rb
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def events_cte_sql
# NOTE: Common table expression returning event's timestamp, property name and operation type.
<<-SQL
WITH events_data AS (#{
events
events(ordered: true)
.select(
"timestamp, \
#{sanitized_property_name} AS property, \
Expand All @@ -220,7 +220,7 @@ def grouped_events_cte_sql

<<-SQL
WITH events_data AS (#{
events
events(ordered: true)
.select(
"#{groups.join(", ")}, \
timestamp, \
Expand Down Expand Up @@ -331,7 +331,7 @@ def grouped_period_ratio_sql
end

def group_names
@group_names ||= store.grouped_by.map.with_index { |_, index| "g_#{index}" }.join(', ')
@group_names ||= store.grouped_by.map.with_index { |_, index| "g_#{index}" }.join(", ")
end
end
end
Expand Down
Loading