Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

from-to and fix table name #138

Merged
merged 11 commits into from
Jun 1, 2023
25 changes: 18 additions & 7 deletions api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,22 +306,33 @@ func (h *apiHandler) queryUsage() http.HandlerFunc {
creatorId := qs.Get("creatorId")

query := usage.QuerySpec{
From: from,
To: to,
From: from,
To: to,
TimeStep: qs.Get("timeStep"),
Filter: usage.QueryFilter{
UserID: userId,
CreatorID: qs.Get("creatorId"),
},
}

usage, err := h.usage.QuerySummary(r.Context(), userId, creatorId, query)
if qs.Get("timeStep") == "" {
usage, err := h.usage.QuerySummary(r.Context(), userId, creatorId, query)
if err != nil {
respondError(rw, http.StatusInternalServerError, err)
return
}

if err != nil {
respondError(rw, http.StatusInternalServerError, err)
return
respondJson(rw, http.StatusOK, usage)
} else {
usage, err := h.usage.QuerySummaryWithTimestep(r.Context(), userId, creatorId, query)
gioelecerati marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
respondError(rw, http.StatusInternalServerError, err)
return
}

respondJson(rw, http.StatusOK, usage)
}

respondJson(rw, http.StatusOK, usage)
}
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func parseFlags(version string) cliFlags {
fs.StringVar(&cli.viewsOpts.BigQueryCredentialsJSON, "bigquery-credentials-json", "", "Google Cloud service account credentials JSON with access to BigQuery")
fs.StringVar(&cli.viewsOpts.ViewershipEventsTable, "viewership-events-table", "livepeer-analytics.viewership.staging_viewership_events", "BigQuery table to read viewership events from")
fs.StringVar(&cli.viewsOpts.ViewershipSummaryTable, "viewership-summary-table", "livepeer-analytics.viewership.staging_viewership_summary_by_video", "BigQuery table to read viewership summarized metrics from")
fs.StringVar(&cli.usageOpts.HourlyUsageTable, "hourly-usage-table", "livepeer-analytics.usage.staging_hourly_usage", "BigQuery table to read hourly usage metrics from")
fs.StringVar(&cli.usageOpts.HourlyUsageTable, "hourly-usage-table", "livepeer-analytics.staging.hourly_billing_usage", "BigQuery table to read hourly usage metrics from")
fs.Int64Var(&cli.viewsOpts.MaxBytesBilledPerBigQuery, "max-bytes-billed-per-big-query", 50*1024*1024 /* 50 MB */, "Max bytes billed configuration to use for the queries to BigQuery")

flag.Set("logtostderr", "true")
Expand Down
57 changes: 51 additions & 6 deletions usage/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,28 @@ type QueryFilter struct {
}

type QuerySpec struct {
TimeStep string
From, To *time.Time
Filter QueryFilter
}

var allowedTimeSteps = map[string]bool{
"hour": true,
"day": true,
}

type UsageSummaryRow struct {
UserID string `bigquery:"user_id"`
CreatorID string `bigquery:"creator_id"`

ViewCount int64 `bigquery:"view_count"`
gioelecerati marked this conversation as resolved.
Show resolved Hide resolved
LegacyViewCount bigquery.NullInt64 `bigquery:"legacy_view_count"`
PlaytimeMins float64 `bigquery:"playtime_mins"`
DeliveryUsageGbs float64 `bigquery:"delivery_usage_gbs"`
gioelecerati marked this conversation as resolved.
Show resolved Hide resolved
TotalUsageMins float64 `bigquery:"transcode_total_usage_mins"`
StorageUsageMins float64 `bigquery:"storage_usage_mins"`
}

type BigQuery interface {
QueryUsageSummary(ctx context.Context, userID string, creatorID string, spec QuerySpec) (*UsageSummaryRow, error)
QueryUsageSummaryWithTimestep(ctx context.Context, userID string, creatorID string, spec QuerySpec) (*[]UsageSummaryRow, error)
}

type BigQueryOptions struct {
Expand All @@ -40,6 +47,8 @@ type BigQueryOptions struct {
MaxBytesBilledPerBigQuery int64
}

const maxBigQueryResultRows = 10000

func NewBigQuery(opts BigQueryOptions) (BigQuery, error) {
bigquery, err := bigquery.NewClient(context.Background(),
bigquery.DetectProjectID,
Expand Down Expand Up @@ -82,23 +91,59 @@ func (bq *bigqueryHandler) QueryUsageSummary(ctx context.Context, userID string,
return &bqRows[0], nil
}

func (bq *bigqueryHandler) QueryUsageSummaryWithTimestep(ctx context.Context, userID string, creatorID string, spec QuerySpec) (*[]UsageSummaryRow, error) {
sql, args, err := buildUsageSummaryQuery(bq.opts.HourlyUsageTable, userID, creatorID, spec)
if err != nil {
return nil, fmt.Errorf("error building usage summary query: %w", err)
}

bqRows, err := doBigQuery[UsageSummaryRow](bq, ctx, sql, args)
if err != nil {
return nil, fmt.Errorf("bigquery error: %w", err)
}

if len(bqRows) == 0 {
return nil, nil
}

return &bqRows, nil
}

func buildUsageSummaryQuery(table string, userID string, creatorID string, spec QuerySpec) (string, []interface{}, error) {
if userID == "" {
return "", nil, fmt.Errorf("userID cannot be empty")
}

query := squirrel.Select(
"cast(sum(transcode_total_usage_minutes) as FLOAT64) as transcode_total_usage_minutes",
"cast(sum(transcode_total_usage_mins) as FLOAT64) as transcode_total_usage_mins",
"cast(sum(delivery_usage_gbs) as FLOAT64) as delivery_usage_gbs",
"cast(avg(storage_usage_gbs) as FLOAT64) as storage_usage_gbs").
"cast((sum(storage_usage_mins) / count(distinct usage_hour_ts)) as FLOAT64) as storage_usage_mins").
From(table).
Limit(2)
Limit(maxBigQueryResultRows)
gioelecerati marked this conversation as resolved.
Show resolved Hide resolved

if creatorId := spec.Filter.CreatorID; creatorId != "" {
query = query.Where("creator_id_type = ?", "unverified")
query = query.Where("creator_id = ?", creatorID)
}

if from := spec.From; from != nil {
query = query.Where("usage_hour_ts >= timestamp_millis(?)", from.UnixMilli())
}
if to := spec.To; to != nil {
query = query.Where("usage_hour_ts < timestamp_millis(?)", to.UnixMilli())
}

if timeStep := spec.TimeStep; timeStep != "" {
if !allowedTimeSteps[timeStep] {
return "", nil, fmt.Errorf("invalid time step: %s", timeStep)
}

query = query.
Columns(fmt.Sprintf("timestamp_trunc(usage_hour_ts, %s) as time_interval", timeStep)).
GroupBy("time_interval").
OrderBy("time_interval")
}

query = withUserIdFilter(query, userID)

sql, args, err := query.ToSql()
Expand Down
9 changes: 9 additions & 0 deletions usage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,12 @@ func (c *Client) QuerySummary(ctx context.Context, userID string, creatorID stri

return summary, nil
}

func (c *Client) QuerySummaryWithTimestep(ctx context.Context, userID string, creatorID string, spec QuerySpec) (*[]UsageSummaryRow, error) {
summary, err := c.bigquery.QueryUsageSummaryWithTimestep(ctx, userID, creatorID, spec)
if err != nil {
return nil, err
}

return summary, nil
}