Skip to content

Commit

Permalink
misc(event): Remove ordering of event queries when not required
Browse files Browse the repository at this point in the history
  • Loading branch information
vincent-pochet committed Sep 23, 2024
1 parent a1cea61 commit 7abfffc
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 104 deletions.
6 changes: 3 additions & 3 deletions app/services/events/stores/clickhouse/unique_count_query.rb
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def events_cte_sql
<<-SQL
WITH events_data AS (
(#{
events
events(ordered: true)
.select(
"toDateTime64(timestamp, 5, 'UTC') as timestamp, \
#{sanitized_property_name} AS property, \
Expand All @@ -224,7 +224,7 @@ def grouped_events_cte_sql

<<-SQL
WITH events_data AS (#{
events
events(ordered: true)
.select(
"#{groups.join(", ")}, \
toDateTime64(timestamp, 5, 'UTC') as timestamp, \
Expand Down Expand Up @@ -340,7 +340,7 @@ def grouped_period_ratio_sql
end

def group_names
@group_names ||= store.grouped_by.map.with_index { |_, index| "g_#{index}" }.join(', ')
@group_names ||= store.grouped_by.map.with_index { |_, index| "g_#{index}" }.join(", ")
end
end
end
Expand Down
14 changes: 7 additions & 7 deletions app/services/events/stores/clickhouse/weighted_sum_query.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def events_cte_sql
(#{initial_value_sql})
UNION ALL
(#{
events
events(ordered: true)
.select("timestamp, #{sanitized_numeric_property} AS difference")
.group(Events::Stores::ClickhouseStore::DEDUPLICATION_GROUP)
.to_sql
Expand Down Expand Up @@ -124,7 +124,7 @@ def grouped_events_cte_sql(initial_values)
(#{grouped_initial_value_sql(initial_values)})
UNION ALL
(#{
events
events(ordered: true)
.select("#{groups.join(", ")}, timestamp, #{sanitized_numeric_property} AS difference")
.group(Events::Stores::ClickhouseStore::DEDUPLICATION_GROUP)
.to_sql
Expand All @@ -143,9 +143,9 @@ def grouped_initial_value_sql(initial_values)

[
groups,
'toDateTime64(:from_datetime, 5, \'UTC\') AS timestamp',
"toDateTime64(:from_datetime, 5, 'UTC') AS timestamp",
"toDecimal128(#{initial_value[:value]}, :decimal_scale) AS difference"
].flatten.join(', ')
].flatten.join(", ")
end

<<-SQL
Expand All @@ -166,8 +166,8 @@ def grouped_end_of_period_value_sql(initial_values)
[
groups,
"toDateTime64(:to_datetime, 5, 'UTC') AS timestamp",
'toDecimal128(0, :decimal_scale) AS difference'
].flatten.join(', ')
"toDecimal128(0, :decimal_scale) AS difference"
].flatten.join(", ")
end

<<-SQL
Expand Down Expand Up @@ -202,7 +202,7 @@ def grouped_period_ratio_sql
end

def group_names
@group_names ||= store.grouped_by.map.with_index { |_, index| "g_#{index}" }.join(', ')
@group_names ||= store.grouped_by.map.with_index { |_, index| "g_#{index}" }.join(", ")
end
end
end
Expand Down
91 changes: 45 additions & 46 deletions app/services/events/stores/clickhouse_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,45 +4,46 @@ module Events
module Stores
class ClickhouseStore < BaseStore
DECIMAL_SCALE = 26
DEDUPLICATION_GROUP = 'events_raw.transaction_id, events_raw.properties, events_raw.timestamp'
DEDUPLICATION_GROUP = "events_raw.transaction_id, events_raw.properties, events_raw.timestamp"

# NOTE: keeps in mind that events could contains duplicated transaction_id
# and should be deduplicated depending on the aggregation logic
def events(force_from: false)
def events(force_from: false, ordered: false)
scope = ::Clickhouse::EventsRaw.where(external_subscription_id: subscription.external_id)
.where(organization_id: subscription.organization.id)
.where(code:)
.order(timestamp: :asc)

scope = scope.where('events_raw.timestamp >= ?', from_datetime) if force_from || use_from_boundary
scope = scope.where('events_raw.timestamp <= ?', to_datetime) if to_datetime
scope = scope.order(timestamp: :asc) if ordered

scope = scope.where("events_raw.timestamp >= ?", from_datetime) if force_from || use_from_boundary
scope = scope.where("events_raw.timestamp <= ?", to_datetime) if to_datetime
scope = scope.where(numeric_condition) if numeric_property

scope = with_grouped_by_values(scope) if grouped_by_values?
filters_scope(scope)
end

def events_values(limit: nil, force_from: false, exclude_event: false)
scope = events(force_from:).group(DEDUPLICATION_GROUP)
scope = events(force_from:, ordered: true).group(DEDUPLICATION_GROUP)

scope = scope.where('events_raw.transaction_id != ?', filters[:event].transaction_id) if exclude_event
scope = scope.where("events_raw.transaction_id != ?", filters[:event].transaction_id) if exclude_event
scope = scope.limit(limit) if limit

scope.pluck(Arel.sql(sanitized_numeric_property))
end

def last_event
events.last
events(ordered: true).last
end

def grouped_last_event
groups = grouped_by.map { |group| sanitized_property_name(group) }
group_names = groups.map.with_index { |_, index| "g_#{index}" }.join(', ')
group_names = groups.map.with_index { |_, index| "g_#{index}" }.join(", ")

cte_sql = events.group(DEDUPLICATION_GROUP)
cte_sql = events(ordered: true).group(DEDUPLICATION_GROUP)
.select(Arel.sql(
(groups.map.with_index { |group, index| "#{group} AS g_#{index}" } +
["#{sanitized_numeric_property} AS property", 'events_raw.timestamp']).join(', ')
["#{sanitized_numeric_property} AS property", "events_raw.timestamp"]).join(", ")
))
.to_sql

Expand All @@ -61,15 +62,15 @@ def grouped_last_event
end

def prorated_events_values(total_duration)
ratio_sql = duration_ratio_sql('events_raw.timestamp', to_datetime, total_duration)
ratio_sql = duration_ratio_sql("events_raw.timestamp", to_datetime, total_duration)

events.group(DEDUPLICATION_GROUP)
events(ordered: true).group(DEDUPLICATION_GROUP)
.pluck(Arel.sql("#{sanitized_numeric_property} * (#{ratio_sql})"))
end

def count
sql = events.reorder('')
.select('uniqExact(events_raw.transaction_id) AS event_count')
sql = events
.select("uniqExact(events_raw.transaction_id) AS event_count")
.to_sql

::Clickhouse::EventsRaw.connection.select_value(sql).to_i
Expand All @@ -82,7 +83,7 @@ def grouped_count
group_names = groups.map.with_index { |_, index| "g_#{index}" }

cte_sql = events.group(DEDUPLICATION_GROUP)
.select((groups + ['events_raw.transaction_id']).join(', '))
.select((groups + ["events_raw.transaction_id"]).join(", "))

sql = <<-SQL
with events as (#{cte_sql.to_sql})
Expand All @@ -101,14 +102,14 @@ def grouped_count
# unique property
def active_unique_property?(event)
previous_event = events
.where('events_raw.properties[?] = ?', aggregation_property, event.properties[aggregation_property])
.where('events_raw.timestamp < ?', event.timestamp)
.reorder(timestamp: :desc)
.where("events_raw.properties[?] = ?", aggregation_property, event.properties[aggregation_property])
.where("events_raw.timestamp < ?", event.timestamp)
.order(timestamp: :desc)
.first

previous_event && (
previous_event.properties['operation_type'].nil? ||
previous_event.properties['operation_type'] == 'add'
previous_event.properties["operation_type"].nil? ||
previous_event.properties["operation_type"] == "add"
)
end

Expand All @@ -122,7 +123,7 @@ def unique_count
)
result = ::Clickhouse::EventsRaw.connection.select_one(sql)

result['aggregation']
result["aggregation"]
end

# NOTE: not used in production, only for debug purpose to check the computed values before aggregation
Expand Down Expand Up @@ -153,7 +154,7 @@ def prorated_unique_count
)
result = ::Clickhouse::EventsRaw.connection.select_one(sql)

result['aggregation']
result["aggregation"]
end

def prorated_unique_count_breakdown(with_remove: false)
Expand Down Expand Up @@ -210,12 +211,12 @@ def max

def grouped_max
groups = grouped_by.map { |group| sanitized_property_name(group) }
group_names = groups.map.with_index { |_, index| "g_#{index}" }.join(', ')
group_names = groups.map.with_index { |_, index| "g_#{index}" }.join(", ")

cte_sql = events.group(DEDUPLICATION_GROUP)
.select(Arel.sql(
(groups.map.with_index { |group, index| "#{group} AS g_#{index}" } +
["#{sanitized_numeric_property} AS property", 'events_raw.timestamp']).join(', ')
["#{sanitized_numeric_property} AS property", "events_raw.timestamp"]).join(", ")
))
.to_sql

Expand All @@ -233,20 +234,20 @@ def grouped_max
end

def last
value = events.last&.properties&.[](aggregation_property)
value = events(ordered: true).last&.properties&.[](aggregation_property)
return value unless value

BigDecimal(value)
end

def grouped_last
groups = grouped_by.map { |group| sanitized_property_name(group) }
group_names = groups.map.with_index { |_, index| "g_#{index}" }.join(', ')
group_names = groups.map.with_index { |_, index| "g_#{index}" }.join(", ")

cte_sql = events.group(DEDUPLICATION_GROUP)
cte_sql = events(ordered: true).group(DEDUPLICATION_GROUP)
.select(Arel.sql(
(groups.map.with_index { |group, index| "#{group} AS g_#{index}" } +
["#{sanitized_numeric_property} AS property", 'events_raw.timestamp']).join(', ')
["#{sanitized_numeric_property} AS property", "events_raw.timestamp"]).join(", ")
))
.to_sql

Expand Down Expand Up @@ -282,10 +283,10 @@ def grouped_sum
groups = grouped_by.map.with_index do |group, index|
"#{sanitized_property_name(group)} AS g_#{index}"
end
group_names = groups.map.with_index { |_, index| "g_#{index}" }.join(', ')
group_names = groups.map.with_index { |_, index| "g_#{index}" }.join(", ")

cte_sql = events.group(DEDUPLICATION_GROUP)
.select((groups + [Arel.sql("#{sanitized_numeric_property} AS property")]).join(', '))
.select((groups + [Arel.sql("#{sanitized_numeric_property} AS property")]).join(", "))

sql = <<-SQL
with events as (#{cte_sql.to_sql})
Expand All @@ -304,11 +305,10 @@ def prorated_sum(period_duration:, persisted_duration: nil)
ratio = if persisted_duration
persisted_duration.fdiv(period_duration)
else
duration_ratio_sql('events_raw.timestamp', to_datetime, period_duration)
duration_ratio_sql("events_raw.timestamp", to_datetime, period_duration)
end

cte_sql = events
.reorder('')
.group(DEDUPLICATION_GROUP)
.select(Arel.sql("(#{sanitized_numeric_property}) * (#{ratio}) AS prorated_value"))
.to_sql
Expand All @@ -327,18 +327,17 @@ def grouped_prorated_sum(period_duration:, persisted_duration: nil)
groups = grouped_by.map.with_index do |group, index|
"#{sanitized_property_name(group)} AS g_#{index}"
end
group_names = groups.map.with_index { |_, index| "g_#{index}" }.join(', ')
group_names = groups.map.with_index { |_, index| "g_#{index}" }.join(", ")

ratio = if persisted_duration
persisted_duration.fdiv(period_duration)
else
duration_ratio_sql('events_raw.timestamp', to_datetime, period_duration)
duration_ratio_sql("events_raw.timestamp", to_datetime, period_duration)
end

cte_sql = events
.reorder('')
.group(DEDUPLICATION_GROUP)
.select((groups + [Arel.sql("(#{sanitized_numeric_property}) * (#{ratio}) AS prorated_value")]).join(', '))
.select((groups + [Arel.sql("(#{sanitized_numeric_property}) * (#{ratio}) AS prorated_value")]).join(", "))
.to_sql

sql = <<-SQL
Expand All @@ -355,7 +354,7 @@ def grouped_prorated_sum(period_duration:, persisted_duration: nil)
end

def sum_date_breakdown
date_field = date_in_customer_timezone_sql('events_raw.timestamp')
date_field = date_in_customer_timezone_sql("events_raw.timestamp")

cte_sql = events.group(DEDUPLICATION_GROUP)
.select("toDate(#{date_field}) as day, #{sanitized_numeric_property} as property")
Expand Down Expand Up @@ -393,7 +392,7 @@ def weighted_sum(initial_value: 0)
)

result = ::Clickhouse::EventsRaw.connection.select_one(sql)
result['aggregation']
result["aggregation"]
end

def grouped_weighted_sum(initial_values: [])
Expand Down Expand Up @@ -451,17 +450,17 @@ def weighted_sum_breakdown(initial_value: 0)

def filters_scope(scope)
matching_filters.each do |key, values|
scope = scope.where('events_raw.properties[?] IN ?', key.to_s, values)
scope = scope.where("events_raw.properties[?] IN ?", key.to_s, values)
end

conditions = ignored_filters.map do |filters|
filters.map do |key, values|
ActiveRecord::Base.sanitize_sql_for_conditions(
["(coalesce(events_raw.properties[?], '') IN (?))", key.to_s, values.map(&:to_s)]
)
end.join(' AND ')
end.join(" AND ")
end
sql = conditions.map { "(#{_1})" }.join(' OR ')
sql = conditions.map { "(#{_1})" }.join(" OR ")
scope = scope.where.not(sql) if sql.present?

scope
Expand All @@ -470,7 +469,7 @@ def filters_scope(scope)
def with_grouped_by_values(scope)
grouped_by_values.each do |grouped_by, grouped_by_value|
scope = if grouped_by_value.present?
scope.where('events_raw.properties[?] = ?', grouped_by, grouped_by_value)
scope.where("events_raw.properties[?] = ?", grouped_by, grouped_by_value)
else
scope.where("COALESCE(events_raw.properties[?], '') = ''", grouped_by)
end
Expand All @@ -481,14 +480,14 @@ def with_grouped_by_values(scope)

def sanitized_property_name(property = aggregation_property)
ActiveRecord::Base.sanitize_sql_for_conditions(
['events_raw.properties[?]', property]
["events_raw.properties[?]", property]
)
end

def numeric_condition
ActiveRecord::Base.sanitize_sql_for_conditions(
[
'toDecimal128OrNull(events_raw.properties[?], ?) IS NOT NULL',
"toDecimal128OrNull(events_raw.properties[?], ?) IS NOT NULL",
aggregation_property,
DECIMAL_SCALE
]
Expand All @@ -497,7 +496,7 @@ def numeric_condition

def sanitized_numeric_property
ActiveRecord::Base.sanitize_sql_for_conditions(
['toDecimal128(events_raw.properties[?], ?)', aggregation_property, DECIMAL_SCALE]
["toDecimal128(events_raw.properties[?], ?)", aggregation_property, DECIMAL_SCALE]
)
end

Expand Down
6 changes: 3 additions & 3 deletions app/services/events/stores/postgres/unique_count_query.rb
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def events_cte_sql
# NOTE: Common table expression returning event's timestamp, property name and operation type.
<<-SQL
WITH events_data AS (#{
events
events(ordered: true)
.select(
"timestamp, \
#{sanitized_property_name} AS property, \
Expand All @@ -220,7 +220,7 @@ def grouped_events_cte_sql

<<-SQL
WITH events_data AS (#{
events
events(ordered: true)
.select(
"#{groups.join(", ")}, \
timestamp, \
Expand Down Expand Up @@ -331,7 +331,7 @@ def grouped_period_ratio_sql
end

def group_names
@group_names ||= store.grouped_by.map.with_index { |_, index| "g_#{index}" }.join(', ')
@group_names ||= store.grouped_by.map.with_index { |_, index| "g_#{index}" }.join(", ")
end
end
end
Expand Down
Loading

0 comments on commit 7abfffc

Please sign in to comment.