Skip to content

Commit

Permalink
Changes ListClosedWorkflow result ordering for SQL-based visibility s…
Browse files Browse the repository at this point in the history
…tores to be based off of workflow close time (#578)

This PR fixes the following issues:

SQL stores were using the Workflow Open Time instead of the Close Time for pagination, which is not aligned with Cassandra and is not the intention.

The SQL store pagination logic had a bug where the tie-breaker logic of using run_id for two rows with the same close_time was not implemented properly.

The SQL store pagination logic had a bug where it was using the MinStart time instead of the MaxStart time for pagination purposes (to be confirmed as to whether this is truly a bug or not, but it does appear that way)

Fixes Makefile so that schema installation for MySQL / Postgres functions properly.

Fixes Postgres username / password in postgres development environment.

Adds appropriate MySQL/Postgres indexes for querying by close_time
  • Loading branch information
mastermanu authored Jul 21, 2020
1 parent 7c6df6d commit 9d81f1d
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 50 deletions.
23 changes: 11 additions & 12 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -328,21 +328,20 @@ install-schema-mysql-pre5720: temporal-sql-tool

install-schema-mysql: temporal-sql-tool
@printf $(COLOR) "Install MySQL schema..."
./temporal-sql-tool --ep 127.0.0.1 create --db temporal
./temporal-sql-tool --ep 127.0.0.1 --db temporal setup-schema -v 0.0
./temporal-sql-tool --ep 127.0.0.1 --db temporal update-schema -d ./schema/mysql/v57/temporal/versioned
./temporal-sql-tool --ep 127.0.0.1 create --db temporal_visibility
./temporal-sql-tool --ep 127.0.0.1 --db temporal_visibility setup-schema -v 0.0
./temporal-sql-tool --ep 127.0.0.1 --db temporal_visibility update-schema -d ./schema/mysql/v57/visibility/versioned
./temporal-sql-tool --ep 127.0.0.1 -u root --pw root create --db temporal
./temporal-sql-tool --ep 127.0.0.1 -u root --pw root --db temporal setup-schema -v 0.0
./temporal-sql-tool --ep 127.0.0.1 -u root --pw root --db temporal update-schema -d ./schema/mysql/v57/temporal/versioned
./temporal-sql-tool --ep 127.0.0.1 -u root --pw root create --db temporal_visibility
./temporal-sql-tool --ep 127.0.0.1 -u root --pw root --db temporal_visibility setup-schema -v 0.0
./temporal-sql-tool --ep 127.0.0.1 -u root --pw root --db temporal_visibility update-schema -d ./schema/mysql/v57/visibility/versioned

install-schema-postgres: temporal-sql-tool
@printf $(COLOR) "Install Postgres schema..."
./temporal-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw temporal --pl postgres create --db temporal
./temporal-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw temporal --pl postgres --db temporal setup -v 0.0
./temporal-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw temporal --pl postgres --db temporal update-schema -d ./schema/postgres/temporal/versioned
./temporal-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw temporal --pl postgres create --db temporal_visibility
./temporal-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw temporal --pl postgres --db temporal_visibility setup-schema -v 0.0
./temporal-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw temporal --pl postgres --db temporal_visibility update-schema -d ./schema/postgres/visibility/versioned
./temporal-sql-tool --ep 127.0.0.1 -p 5432 -u temporal -pw temporal --pl postgres --db temporal setup -v 0.0
./temporal-sql-tool --ep 127.0.0.1 -p 5432 -u temporal -pw temporal --pl postgres --db temporal update-schema -d ./schema/postgres/temporal/versioned
./temporal-sql-tool --ep 127.0.0.1 -p 5432 -u temporal -pw temporal --pl postgres create --db temporal_visibility
./temporal-sql-tool --ep 127.0.0.1 -p 5432 -u temporal -pw temporal --pl postgres --db temporal_visibility setup-schema -v 0.0
./temporal-sql-tool --ep 127.0.0.1 -p 5432 -u temporal -pw temporal --pl postgres --db temporal_visibility update-schema -d ./schema/postgres/visibility/versioned

install-schema-cdc: temporal-cassandra-tool
@printf $(COLOR) "Set up temporal_active key space..."
Expand Down
32 changes: 21 additions & 11 deletions common/persistence/sql/sqlVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (s *sqlVisibilityStore) UpsertWorkflowExecution(request *p.InternalUpsertWo
}

func (s *sqlVisibilityStore) ListOpenWorkflowExecutions(request *p.ListWorkflowExecutionsRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
return s.listWorkflowExecutions("ListOpenWorkflowExecutions", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime,
return s.listWorkflowExecutions("ListOpenWorkflowExecutions", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime, false,
func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
minStartTime := time.Unix(0, request.EarliestStartTime)
return s.db.SelectFromVisibility(&sqlplugin.VisibilityFilter{
Expand All @@ -132,7 +132,7 @@ func (s *sqlVisibilityStore) ListOpenWorkflowExecutions(request *p.ListWorkflowE
}

func (s *sqlVisibilityStore) ListClosedWorkflowExecutions(request *p.ListWorkflowExecutionsRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
return s.listWorkflowExecutions("ListClosedWorkflowExecutions", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime,
return s.listWorkflowExecutions("ListClosedWorkflowExecutions", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime, true,
func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
minStartTime := time.Unix(0, request.EarliestStartTime)
return s.db.SelectFromVisibility(&sqlplugin.VisibilityFilter{
Expand All @@ -146,7 +146,7 @@ func (s *sqlVisibilityStore) ListClosedWorkflowExecutions(request *p.ListWorkflo
}

func (s *sqlVisibilityStore) ListOpenWorkflowExecutionsByType(request *p.ListWorkflowExecutionsByTypeRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
return s.listWorkflowExecutions("ListOpenWorkflowExecutionsByType", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime,
return s.listWorkflowExecutions("ListOpenWorkflowExecutionsByType", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime, false,
func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
minStartTime := time.Unix(0, request.EarliestStartTime)
return s.db.SelectFromVisibility(&sqlplugin.VisibilityFilter{
Expand All @@ -162,7 +162,7 @@ func (s *sqlVisibilityStore) ListOpenWorkflowExecutionsByType(request *p.ListWor
}

func (s *sqlVisibilityStore) ListClosedWorkflowExecutionsByType(request *p.ListWorkflowExecutionsByTypeRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
return s.listWorkflowExecutions("ListClosedWorkflowExecutionsByType", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime,
return s.listWorkflowExecutions("ListClosedWorkflowExecutionsByType", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime, true,
func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
minStartTime := time.Unix(0, request.EarliestStartTime)
return s.db.SelectFromVisibility(&sqlplugin.VisibilityFilter{
Expand All @@ -177,7 +177,7 @@ func (s *sqlVisibilityStore) ListClosedWorkflowExecutionsByType(request *p.ListW
}

func (s *sqlVisibilityStore) ListOpenWorkflowExecutionsByWorkflowID(request *p.ListWorkflowExecutionsByWorkflowIDRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
return s.listWorkflowExecutions("ListOpenWorkflowExecutionsByWorkflowID", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime,
return s.listWorkflowExecutions("ListOpenWorkflowExecutionsByWorkflowID", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime, false,
func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
minStartTime := time.Unix(0, request.EarliestStartTime)
return s.db.SelectFromVisibility(&sqlplugin.VisibilityFilter{
Expand All @@ -193,7 +193,7 @@ func (s *sqlVisibilityStore) ListOpenWorkflowExecutionsByWorkflowID(request *p.L
}

func (s *sqlVisibilityStore) ListClosedWorkflowExecutionsByWorkflowID(request *p.ListWorkflowExecutionsByWorkflowIDRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
return s.listWorkflowExecutions("ListClosedWorkflowExecutionsByWorkflowID", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime,
return s.listWorkflowExecutions("ListClosedWorkflowExecutionsByWorkflowID", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime, true,
func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
minStartTime := time.Unix(0, request.EarliestStartTime)
return s.db.SelectFromVisibility(&sqlplugin.VisibilityFilter{
Expand All @@ -208,7 +208,7 @@ func (s *sqlVisibilityStore) ListClosedWorkflowExecutionsByWorkflowID(request *p
}

func (s *sqlVisibilityStore) ListClosedWorkflowExecutionsByStatus(request *p.ListClosedWorkflowExecutionsByStatusRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
return s.listWorkflowExecutions("ListClosedWorkflowExecutionsByStatus", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime,
return s.listWorkflowExecutions("ListClosedWorkflowExecutionsByStatus", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime, true,
func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
minStartTime := time.Unix(0, request.EarliestStartTime)
return s.db.SelectFromVisibility(&sqlplugin.VisibilityFilter{
Expand Down Expand Up @@ -287,7 +287,13 @@ func (s *sqlVisibilityStore) rowToInfo(row *sqlplugin.VisibilityRow) *p.Visibili
return info
}

func (s *sqlVisibilityStore) listWorkflowExecutions(opName string, pageToken []byte, earliestTime int64, latestTime int64, selectOp func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error)) (*p.InternalListWorkflowExecutionsResponse, error) {
func (s *sqlVisibilityStore) listWorkflowExecutions(
opName string,
pageToken []byte,
earliestTime int64,
latestTime int64,
closeQuery bool,
selectOp func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error)) (*p.InternalListWorkflowExecutionsResponse, error) {
var readLevel *visibilityPageToken
var err error
if len(pageToken) > 0 {
Expand All @@ -312,10 +318,14 @@ func (s *sqlVisibilityStore) listWorkflowExecutions(opName string, pageToken []b
}
var nextPageToken []byte
lastRow := rows[len(rows)-1]
lastStartTime := lastRow.StartTime
if lastStartTime.Sub(time.Unix(0, earliestTime)).Nanoseconds() > 0 {
lastTime := lastRow.StartTime
if closeQuery {
lastTime = *lastRow.CloseTime
}

if lastTime.Sub(time.Unix(0, earliestTime)).Nanoseconds() > 0 {
nextPageToken, err = s.serializePageToken(&visibilityPageToken{
Time: lastStartTime,
Time: lastTime,
RunID: lastRow.RunID,
})
if err != nil {
Expand Down
25 changes: 18 additions & 7 deletions common/persistence/sql/sqlplugin/mysql/visibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,16 @@ const (
templateConditions = ` AND namespace_id = ?
AND start_time >= ?
AND start_time <= ?
AND (run_id > ? OR start_time < ?)
AND ((run_id > ? and start_time = ?) OR (start_time < ?))
ORDER BY start_time DESC, run_id
LIMIT ?`
LIMIT ?`

templateConditionsClosedWorkflows = ` AND namespace_id = ?
AND close_time >= ?
AND close_time <= ?
AND ((run_id > ? and close_time = ?) OR (close_time < ?))
ORDER BY close_time DESC, run_id
LIMIT ?`

templateOpenFieldNames = `workflow_id, run_id, start_time, execution_time, workflow_type_name, status, memo, encoding`
templateOpenSelect = `SELECT ` + templateOpenFieldNames + ` FROM executions_visibility WHERE status = 1 `
Expand All @@ -57,17 +64,17 @@ const (

templateGetOpenWorkflowExecutions = templateOpenSelect + templateConditions

templateGetClosedWorkflowExecutions = templateClosedSelect + templateConditions
templateGetClosedWorkflowExecutions = templateClosedSelect + templateConditionsClosedWorkflows

templateGetOpenWorkflowExecutionsByType = templateOpenSelect + `AND workflow_type_name = ?` + templateConditions

templateGetClosedWorkflowExecutionsByType = templateClosedSelect + `AND workflow_type_name = ?` + templateConditions
templateGetClosedWorkflowExecutionsByType = templateClosedSelect + `AND workflow_type_name = ?` + templateConditionsClosedWorkflows

templateGetOpenWorkflowExecutionsByID = templateOpenSelect + `AND workflow_id = ?` + templateConditions

templateGetClosedWorkflowExecutionsByID = templateClosedSelect + `AND workflow_id = ?` + templateConditions
templateGetClosedWorkflowExecutionsByID = templateClosedSelect + `AND workflow_id = ?` + templateConditionsClosedWorkflows

templateGetClosedWorkflowExecutionsByStatus = templateClosedSelect + `AND status = ?` + templateConditions
templateGetClosedWorkflowExecutionsByStatus = templateClosedSelect + `AND status = ?` + templateConditionsClosedWorkflows

templateGetClosedWorkflowExecution = `SELECT workflow_id, run_id, start_time, execution_time, memo, encoding, close_time, workflow_type_name, status, history_length
FROM executions_visibility
Expand Down Expand Up @@ -153,7 +160,8 @@ func (mdb *db) SelectFromVisibility(filter *sqlplugin.VisibilityFilter) ([]sqlpl
mdb.converter.ToMySQLDateTime(*filter.MinStartTime),
mdb.converter.ToMySQLDateTime(*filter.MaxStartTime),
*filter.RunID,
*filter.MinStartTime,
*filter.MaxStartTime,
*filter.MaxStartTime,
*filter.PageSize)
case filter.MinStartTime != nil && filter.WorkflowTypeName != nil:
qry := templateGetOpenWorkflowExecutionsByType
Expand All @@ -168,6 +176,7 @@ func (mdb *db) SelectFromVisibility(filter *sqlplugin.VisibilityFilter) ([]sqlpl
mdb.converter.ToMySQLDateTime(*filter.MaxStartTime),
*filter.RunID,
*filter.MaxStartTime,
*filter.MaxStartTime,
*filter.PageSize)
case filter.MinStartTime != nil && filter.Status != 0 && filter.Status != 1: // 0 is UNSPECIFIED, 1 is RUNNING
err = mdb.conn.Select(&rows,
Expand All @@ -178,6 +187,7 @@ func (mdb *db) SelectFromVisibility(filter *sqlplugin.VisibilityFilter) ([]sqlpl
mdb.converter.ToMySQLDateTime(*filter.MaxStartTime),
*filter.RunID,
mdb.converter.ToMySQLDateTime(*filter.MaxStartTime),
mdb.converter.ToMySQLDateTime(*filter.MaxStartTime),
*filter.PageSize)
case filter.MinStartTime != nil:
qry := templateGetOpenWorkflowExecutions
Expand All @@ -191,6 +201,7 @@ func (mdb *db) SelectFromVisibility(filter *sqlplugin.VisibilityFilter) ([]sqlpl
mdb.converter.ToMySQLDateTime(*filter.MaxStartTime),
*filter.RunID,
mdb.converter.ToMySQLDateTime(*filter.MaxStartTime),
mdb.converter.ToMySQLDateTime(*filter.MaxStartTime),
*filter.PageSize)
default:
return nil, fmt.Errorf("invalid query filter")
Expand Down
34 changes: 26 additions & 8 deletions common/persistence/sql/sqlplugin/postgres/visibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,31 @@ const (
templateConditions1 = ` AND namespace_id = $1
AND start_time >= $2
AND start_time <= $3
AND (run_id > $4 OR start_time < $5)
AND ((run_id > $4 and start_time = $5) OR (start_time < $6))
ORDER BY start_time DESC, run_id
LIMIT $6`
LIMIT $7`

templateConditions2 = ` AND namespace_id = $2
AND start_time >= $3
AND start_time <= $4
AND (run_id > $5 OR start_time < $6)
AND ((run_id > $5 and start_time = $6) OR (start_time < $7))
ORDER BY start_time DESC, run_id
LIMIT $8`

templateConditionsClosedWorkflow1 = ` AND namespace_id = $1
AND close_time >= $2
AND close_time <= $3
AND ((run_id > $4 and close_time = $5) OR (close_time < $6))
ORDER BY close_time DESC, run_id
LIMIT $7`

templateConditionsClosedWorkflow2 = ` AND namespace_id = $2
AND close_time >= $3
AND close_time <= $4
AND ((run_id > $5 and close_time = $6) OR (close_time < $7))
ORDER BY close_time DESC, run_id
LIMIT $8`

templateOpenFieldNames = `workflow_id, run_id, start_time, execution_time, workflow_type_name, status, memo, encoding`
templateOpenSelect = `SELECT ` + templateOpenFieldNames + ` FROM executions_visibility WHERE status = 1 `

Expand All @@ -76,17 +90,17 @@ const (

templateGetOpenWorkflowExecutions = templateOpenSelect + templateConditions1

templateGetClosedWorkflowExecutions = templateClosedSelect + templateConditions1
templateGetClosedWorkflowExecutions = templateClosedSelect + templateConditionsClosedWorkflow1

templateGetOpenWorkflowExecutionsByType = templateOpenSelect + `AND workflow_type_name = $1` + templateConditions2

templateGetClosedWorkflowExecutionsByType = templateClosedSelect + `AND workflow_type_name = $1` + templateConditions2
templateGetClosedWorkflowExecutionsByType = templateClosedSelect + `AND workflow_type_name = $1` + templateConditionsClosedWorkflow2

templateGetOpenWorkflowExecutionsByID = templateOpenSelect + `AND workflow_id = $1` + templateConditions2

templateGetClosedWorkflowExecutionsByID = templateClosedSelect + `AND workflow_id = $1` + templateConditions2
templateGetClosedWorkflowExecutionsByID = templateClosedSelect + `AND workflow_id = $1` + templateConditionsClosedWorkflow2

templateGetClosedWorkflowExecutionsByStatus = templateClosedSelect + `AND status = $1` + templateConditions2
templateGetClosedWorkflowExecutionsByStatus = templateClosedSelect + `AND status = $1` + templateConditionsClosedWorkflow2

templateGetClosedWorkflowExecution = `SELECT workflow_id, run_id, start_time, execution_time, memo, encoding, close_time, workflow_type_name, status, history_length
FROM executions_visibility
Expand Down Expand Up @@ -172,7 +186,8 @@ func (pdb *db) SelectFromVisibility(filter *sqlplugin.VisibilityFilter) ([]sqlpl
pdb.converter.ToPostgresDateTime(*filter.MinStartTime),
pdb.converter.ToPostgresDateTime(*filter.MaxStartTime),
*filter.RunID,
*filter.MinStartTime,
*filter.MaxStartTime,
*filter.MaxStartTime,
*filter.PageSize)
case filter.MinStartTime != nil && filter.WorkflowTypeName != nil:
qry := templateGetOpenWorkflowExecutionsByType
Expand All @@ -187,6 +202,7 @@ func (pdb *db) SelectFromVisibility(filter *sqlplugin.VisibilityFilter) ([]sqlpl
pdb.converter.ToPostgresDateTime(*filter.MaxStartTime),
*filter.RunID,
*filter.MaxStartTime,
*filter.MaxStartTime,
*filter.PageSize)
case filter.MinStartTime != nil && filter.Status != 0 && filter.Status != 1: // 0 is UNSPECIFIED, 1 is RUNNING
err = pdb.conn.Select(&rows,
Expand All @@ -197,6 +213,7 @@ func (pdb *db) SelectFromVisibility(filter *sqlplugin.VisibilityFilter) ([]sqlpl
pdb.converter.ToPostgresDateTime(*filter.MaxStartTime),
*filter.RunID,
pdb.converter.ToPostgresDateTime(*filter.MaxStartTime),
pdb.converter.ToPostgresDateTime(*filter.MaxStartTime),
*filter.PageSize)
case filter.MinStartTime != nil:
qry := templateGetOpenWorkflowExecutions
Expand All @@ -212,6 +229,7 @@ func (pdb *db) SelectFromVisibility(filter *sqlplugin.VisibilityFilter) ([]sqlpl
maxSt,
*filter.RunID,
maxSt,
maxSt,
*filter.PageSize)
default:
return nil, fmt.Errorf("invalid query filter")
Expand Down
4 changes: 2 additions & 2 deletions config/development_postgres.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ persistence:
databaseName: "temporal"
connectAddr: "127.0.0.1:5432"
connectProtocol: "tcp"
user: "postgres"
user: "temporal"
password: "temporal"
maxConns: 20
maxIdleConns: 20
Expand All @@ -20,7 +20,7 @@ persistence:
databaseName: "temporal_visibility"
connectAddr: "127.0.0.1:5432"
connectProtocol: "tcp"
user: "postgres"
user: "temporal"
password: "temporal"
maxConns: 2
maxIdleConns: 2
Expand Down
9 changes: 6 additions & 3 deletions schema/mysql/v57/visibility/schema.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
CREATE TABLE executions_visibility (
namespace_id CHAR(64) NOT NULL,
namespace_id CHAR(64) NOT NULL,
run_id CHAR(64) NOT NULL,
start_time DATETIME(6) NOT NULL,
execution_time DATETIME(6) NOT NULL,
Expand All @@ -10,11 +10,14 @@ CREATE TABLE executions_visibility (
history_length BIGINT,
memo BLOB,
encoding VARCHAR(64) NOT NULL,
task_queue VARCHAR(255) DEFAULT '' NOT NULL,
task_queue VARCHAR(255) DEFAULT '' NOT NULL,

PRIMARY KEY (namespace_id, run_id)
);

CREATE INDEX by_type_start_time ON executions_visibility (namespace_id, workflow_type_name, status, start_time DESC, run_id);
CREATE INDEX by_workflow_id_start_time ON executions_visibility (namespace_id, workflow_id, status, start_time DESC, run_id);
CREATE INDEX by_status_by_close_time ON executions_visibility (namespace_id, status, start_time DESC, run_id);
CREATE INDEX by_status_by_start_time ON executions_visibility (namespace_id, status, start_time DESC, run_id);
CREATE INDEX by_type_close_time ON executions_visibility (namespace_id, workflow_type_name, status, close_time DESC, run_id);
CREATE INDEX by_workflow_id_close_time ON executions_visibility (namespace_id, workflow_id, status, close_time DESC, run_id);
CREATE INDEX by_status_by_close_time ON executions_visibility (namespace_id, status, close_time DESC, run_id);
Loading

0 comments on commit 9d81f1d

Please sign in to comment.