Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TaskQueue Schema Collapse #500

Merged
merged 16 commits into from
Jul 17, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/persistence/sql/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (f *Factory) NewTaskStore() (p.TaskStore, error) {
if err != nil {
return nil, err
}
return newTaskPersistence(conn, f.cfg.NumShards, f.logger)
return newTaskPersistence(conn, f.cfg.TaskScanPartitions, f.logger)
}

// NewShardStore returns a new shard store
Expand Down
14 changes: 7 additions & 7 deletions common/persistence/sql/sqlPersistenceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ func NewTestCluster(pluginName, dbName, username, password, host string, port in
}
result.schemaDir = schemaDir
result.cfg = config.SQL{
User: username,
Password: password,
ConnectAddr: fmt.Sprintf("%v:%v", host, port),
ConnectProtocol: "tcp",
PluginName: pluginName,
DatabaseName: dbName,
NumShards: 4,
User: username,
Password: password,
ConnectAddr: fmt.Sprintf("%v:%v", host, port),
ConnectProtocol: "tcp",
PluginName: pluginName,
DatabaseName: dbName,
TaskScanPartitions: 4,
}
return &result
}
Expand Down
55 changes: 38 additions & 17 deletions common/persistence/sql/sqlTaskManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,21 @@ import (

type sqlTaskManager struct {
sqlStore
nShards int
taskScanPartitions int
}

var (
minUUID = "00000000-0000-0000-0000-000000000000"
)

// newTaskPersistence creates a new instance of TaskManager
func newTaskPersistence(db sqlplugin.DB, nShards int, log log.Logger) (persistence.TaskManager, error) {
func newTaskPersistence(db sqlplugin.DB, taskScanPartitions int, log log.Logger) (persistence.TaskManager, error) {
return &sqlTaskManager{
sqlStore: sqlStore{
db: db,
logger: log,
},
nShards: nShards,
taskScanPartitions: taskScanPartitions,
}, nil
}

Expand All @@ -71,7 +71,7 @@ func (m *sqlTaskManager) LeaseTaskQueue(request *persistence.LeaseTaskQueueReque
if err != nil {
return nil, serviceerror.NewInternal(err.Error())
}
shardID := m.shardID(nidBytes, request.TaskQueue)
shardID := m.shardID(nidBytes, request.TaskQueue, request.TaskType)
namespaceID := request.NamespaceID
rows, err := m.db.SelectFromTaskQueues(&sqlplugin.TaskQueuesFilter{
ShardID: shardID,
Expand Down Expand Up @@ -175,7 +175,7 @@ func (m *sqlTaskManager) UpdateTaskQueue(request *persistence.UpdateTaskQueueReq
return nil, serviceerror.NewInternal(err.Error())
}

shardID := m.shardID(nidBytes, request.TaskQueueInfo.Name)
shardID := m.shardID(nidBytes, request.TaskQueueInfo.Name, request.TaskQueueInfo.TaskType)

tq := request.TaskQueueInfo
tq.LastUpdated = types.TimestampNow()
Expand Down Expand Up @@ -239,7 +239,7 @@ func (m *sqlTaskManager) UpdateTaskQueue(request *persistence.UpdateTaskQueueReq
}

type taskQueuePageToken struct {
ShardID int
ShardID uint32
shawnhathaway marked this conversation as resolved.
Show resolved Hide resolved
NamespaceID string
Name string
TaskType int64
Expand All @@ -255,9 +255,26 @@ func (m *sqlTaskManager) ListTaskQueue(request *persistence.ListTaskQueueRequest
var err error
var rows []sqlplugin.TaskQueuesRow
namespaceID := primitives.MustParseUUID(pageToken.NamespaceID)
for pageToken.ShardID < m.nShards {
var shardGreaterThan uint32
var shardLessThan uint32
for i := 0; i < m.taskScanPartitions; i++ {
shawnhathaway marked this conversation as resolved.
Show resolved Hide resolved
shardGreaterThan = uint32((float32(i)/float32(m.taskScanPartitions))*math.MaxUint32)

if i + 1 == m.taskScanPartitions {
shardLessThan = math.MaxUint32
} else {
shardLessThan = uint32((float32(i + 1) / float32(m.taskScanPartitions)) * math.MaxUint32)
}

if pageToken.ShardID > shardLessThan {
continue
shawnhathaway marked this conversation as resolved.
Show resolved Hide resolved
} else if pageToken.ShardID > shardGreaterThan {
shardGreaterThan = pageToken.ShardID
}

rows, err = m.db.SelectFromTaskQueues(&sqlplugin.TaskQueuesFilter{
shawnhathaway marked this conversation as resolved.
Show resolved Hide resolved
ShardID: pageToken.ShardID,
ShardIDGreaterThanEqualTo: shardGreaterThan,
shawnhathaway marked this conversation as resolved.
Show resolved Hide resolved
ShardIDLessThanEqualTo: shardLessThan,
NamespaceIDGreaterThan: &namespaceID,
NameGreaterThan: &pageToken.Name,
TaskTypeGreaterThan: &pageToken.TaskType,
Expand All @@ -269,7 +286,6 @@ func (m *sqlTaskManager) ListTaskQueue(request *persistence.ListTaskQueueRequest
if len(rows) > 0 {
break
}
pageToken = taskQueuePageToken{ShardID: pageToken.ShardID + 1, TaskType: math.MinInt16, NamespaceID: minUUID}
}

var nextPageToken []byte
Expand All @@ -282,8 +298,14 @@ func (m *sqlTaskManager) ListTaskQueue(request *persistence.ListTaskQueueRequest
Name: lastRow.Name,
TaskType: lastRow.TaskType,
})
case pageToken.ShardID+1 < m.nShards:
nextPageToken, err = gobSerialize(&taskQueuePageToken{ShardID: pageToken.ShardID + 1, TaskType: math.MinInt16})
case shardLessThan < math.MaxUint32:
var lastShardId uint32
if len(rows) == 0 {
lastShardId = shardLessThan
} else {
lastShardId = rows[len(rows)-1].ShardID
}
nextPageToken, err = gobSerialize(&taskQueuePageToken{ShardID: lastShardId+1, TaskType: math.MinInt16})
shawnhathaway marked this conversation as resolved.
Show resolved Hide resolved
}

if err != nil {
Expand Down Expand Up @@ -316,7 +338,7 @@ func (m *sqlTaskManager) DeleteTaskQueue(request *persistence.DeleteTaskQueueReq
}

result, err := m.db.DeleteFromTaskQueues(&sqlplugin.TaskQueuesFilter{
ShardID: m.shardID(nidBytes, request.TaskQueue.Name),
ShardID: m.shardID(nidBytes, request.TaskQueue.Name, request.TaskQueue.TaskType),
NamespaceID: &nidBytes,
Name: &request.TaskQueue.Name,
TaskType: convert.Int64Ptr(int64(request.TaskQueue.TaskType)),
Expand Down Expand Up @@ -369,7 +391,7 @@ func (m *sqlTaskManager) CreateTasks(request *persistence.CreateTasksRequest) (*
}
// Lock task queue before committing.
err1 := lockTaskQueue(tx,
m.shardID(nidBytes, request.TaskQueueInfo.Data.Name),
m.shardID(nidBytes, request.TaskQueueInfo.Data.Name, request.TaskQueueInfo.Data.TaskType),
nidBytes,
request.TaskQueueInfo.Data.Name,
request.TaskQueueInfo.Data.TaskType,
Expand Down Expand Up @@ -454,12 +476,11 @@ func (m *sqlTaskManager) CompleteTasksLessThan(request *persistence.CompleteTask
return int(nRows), nil
}

func (m *sqlTaskManager) shardID(namespaceID primitives.UUID, name string) int {
id := farm.Hash32(append(namespaceID, []byte("_"+name)...)) % uint32(m.nShards)
return int(id)
func (m *sqlTaskManager) shardID(namespaceID primitives.UUID, name string, taskType enumspb.TaskQueueType) uint32 {
return farm.Hash32(append(namespaceID, []byte("_"+name+"_"+string(int(taskType)))...))
}

func lockTaskQueue(tx sqlplugin.Tx, shardID int, namespaceID primitives.UUID, name string, taskQueueType enumspb.TaskQueueType, oldRangeID int64) error {
func lockTaskQueue(tx sqlplugin.Tx, shardID uint32, namespaceID primitives.UUID, name string, taskQueueType enumspb.TaskQueueType, oldRangeID int64) error {
rangeID, err := tx.LockTaskQueues(&sqlplugin.TaskQueuesFilter{
ShardID: shardID, NamespaceID: &namespaceID, Name: &name, TaskType: convert.Int64Ptr(int64(taskQueueType))})
if err != nil {
Expand Down
22 changes: 12 additions & 10 deletions common/persistence/sql/sqlplugin/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ type (

// TaskQueuesRow represents a row in task_queues table
TaskQueuesRow struct {
ShardID int
ShardID uint32
NamespaceID primitives.UUID
Name string
TaskType int64
Expand All @@ -241,15 +241,17 @@ type (
// TaskQueuesFilter contains the column names within task_queues table that
// can be used to filter results through a WHERE clause
TaskQueuesFilter struct {
ShardID int
NamespaceID *primitives.UUID
Name *string
TaskType *int64
NamespaceIDGreaterThan *primitives.UUID
NameGreaterThan *string
TaskTypeGreaterThan *int64
RangeID *int64
PageSize *int
ShardID uint32
ShardIDGreaterThanEqualTo uint32
ShardIDLessThanEqualTo uint32
NamespaceID *primitives.UUID
Name *string
TaskType *int64
NamespaceIDGreaterThan *primitives.UUID
NameGreaterThan *string
TaskTypeGreaterThan *int64
RangeID *int64
PageSize *int
}

// ReplicationTasksRow represents a row in replication_tasks table
Expand Down
26 changes: 19 additions & 7 deletions common/persistence/sql/sqlplugin/mysql/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,13 @@ name = :name AND
task_type = :task_type
`

listTaskQueueQry = `SELECT namespace_id, range_id, name, task_type, data, data_encoding ` +
listTaskQueueWithShardRangeQry = `SELECT shard_id, namespace_id, range_id, name, task_type, data, data_encoding ` +
`FROM task_queues ` +
`WHERE shard_id = ? AND namespace_id > ? AND name > ? AND task_type > ? ORDER BY namespace_id,name,task_type LIMIT ?`
`WHERE shard_id >= ? AND shard_id <= ? AND namespace_id > ? AND name > ? AND task_type > ? ORDER BY shard_id, namespace_id,name,task_type LIMIT ?`

listTaskQueueQry = `SELECT shard_id, namespace_id, range_id, name, task_type, data, data_encoding ` +
`FROM task_queues ` +
`WHERE shard_id = ? AND namespace_id > ? AND name > ? AND task_type > ? ORDER BY shard_id,namespace_id,name,task_type LIMIT ?`

getTaskQueueQry = `SELECT namespace_id, range_id, name, task_type, data, data_encoding ` +
`FROM task_queues ` +
Expand Down Expand Up @@ -139,6 +143,9 @@ func (mdb *db) UpdateTaskQueues(row *sqlplugin.TaskQueuesRow) (sql.Result, error
func (mdb *db) SelectFromTaskQueues(filter *sqlplugin.TaskQueuesFilter) ([]sqlplugin.TaskQueuesRow, error) {
switch {
case filter.NamespaceID != nil && filter.Name != nil && filter.TaskType != nil:
if filter.ShardIDLessThanEqualTo != 0 || filter.ShardIDGreaterThanEqualTo != 0 {
return nil, fmt.Errorf("shardID range not supported for specific selection")
shawnhathaway marked this conversation as resolved.
Show resolved Hide resolved
}
return mdb.selectFromTaskQueues(filter)
case filter.NamespaceIDGreaterThan != nil && filter.NameGreaterThan != nil && filter.TaskTypeGreaterThan != nil && filter.PageSize != nil:
return mdb.rangeSelectFromTaskQueues(filter)
Expand All @@ -160,14 +167,19 @@ func (mdb *db) selectFromTaskQueues(filter *sqlplugin.TaskQueuesFilter) ([]sqlpl
func (mdb *db) rangeSelectFromTaskQueues(filter *sqlplugin.TaskQueuesFilter) ([]sqlplugin.TaskQueuesRow, error) {
var err error
var rows []sqlplugin.TaskQueuesRow
err = mdb.conn.Select(&rows, listTaskQueueQry,
filter.ShardID, *filter.NamespaceIDGreaterThan, *filter.NameGreaterThan, *filter.TaskTypeGreaterThan, *filter.PageSize)

if filter.ShardIDLessThanEqualTo != 0 {
err = mdb.conn.Select(&rows, listTaskQueueWithShardRangeQry,
filter.ShardIDGreaterThanEqualTo, filter.ShardIDLessThanEqualTo, *filter.NamespaceIDGreaterThan, *filter.NameGreaterThan, *filter.TaskTypeGreaterThan, *filter.PageSize)
} else {
err = mdb.conn.Select(&rows, listTaskQueueQry,
filter.ShardID, *filter.NamespaceIDGreaterThan, *filter.NameGreaterThan, *filter.TaskTypeGreaterThan, *filter.PageSize)
}

if err != nil {
return nil, err
}
for i := range rows {
rows[i].ShardID = filter.ShardID
}

return rows, nil
}

Expand Down
23 changes: 16 additions & 7 deletions common/persistence/sql/sqlplugin/postgres/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,13 @@ namespace_id = :namespace_id AND
name = :name AND
task_type = :task_type
`
listTaskQueueWithShardRangeQry = `SELECT shard_id, namespace_id, range_id, name, task_type, data, data_encoding ` +
`FROM task_queues ` +
`WHERE shard_id >= $1 AND shard_id <= $2 AND namespace_id > $3 AND name > $4 AND task_type > $5 ORDER BY shard_id, namespace_id,name,task_type LIMIT $6`

listTaskQueueQry = `SELECT namespace_id, range_id, name, task_type, data, data_encoding ` +
listTaskQueueQry = `SELECT shard_id, namespace_id, range_id, name, task_type, data, data_encoding ` +
`FROM task_queues ` +
`WHERE shard_id = $1 AND namespace_id > $2 AND name > $3 AND task_type > $4 ORDER BY namespace_id,name,task_type LIMIT $5`
`WHERE shard_id = $1 AND namespace_id > $2 AND name > $3 AND task_type > $4 ORDER BY shard_id, namespace_id,name,task_type LIMIT $5`

getTaskQueueQry = `SELECT namespace_id, range_id, name, task_type, data, data_encoding ` +
`FROM task_queues ` +
Expand Down Expand Up @@ -144,6 +147,9 @@ func (pdb *db) UpdateTaskQueues(row *sqlplugin.TaskQueuesRow) (sql.Result, error
func (pdb *db) SelectFromTaskQueues(filter *sqlplugin.TaskQueuesFilter) ([]sqlplugin.TaskQueuesRow, error) {
switch {
case filter.NamespaceID != nil && filter.Name != nil && filter.TaskType != nil:
if filter.ShardIDLessThanEqualTo != 0 || filter.ShardIDGreaterThanEqualTo != 0 {
return nil, fmt.Errorf("shardID range not supported for specific selection")
}
return pdb.selectFromTaskQueues(filter)
case filter.NamespaceIDGreaterThan != nil && filter.NameGreaterThan != nil && filter.TaskTypeGreaterThan != nil && filter.PageSize != nil:
return pdb.rangeSelectFromTaskQueues(filter)
Expand All @@ -165,14 +171,17 @@ func (pdb *db) selectFromTaskQueues(filter *sqlplugin.TaskQueuesFilter) ([]sqlpl
func (pdb *db) rangeSelectFromTaskQueues(filter *sqlplugin.TaskQueuesFilter) ([]sqlplugin.TaskQueuesRow, error) {
var err error
var rows []sqlplugin.TaskQueuesRow
err = pdb.conn.Select(&rows, listTaskQueueQry,
filter.ShardID, *filter.NamespaceIDGreaterThan, *filter.NameGreaterThan, *filter.TaskTypeGreaterThan, *filter.PageSize)
if filter.ShardIDLessThanEqualTo > 0 {
err = pdb.conn.Select(&rows, listTaskQueueWithShardRangeQry,
filter.ShardIDGreaterThanEqualTo, filter.ShardIDLessThanEqualTo, *filter.NamespaceIDGreaterThan, *filter.NameGreaterThan, *filter.TaskTypeGreaterThan, *filter.PageSize)
} else {
err = pdb.conn.Select(&rows, listTaskQueueQry,
filter.ShardID, *filter.NamespaceIDGreaterThan, *filter.NameGreaterThan, *filter.TaskTypeGreaterThan, *filter.PageSize)
}
if err != nil {
return nil, err
}
for i := range rows {
rows[i].ShardID = filter.ShardID
}

return rows, nil
}

Expand Down
7 changes: 4 additions & 3 deletions common/service/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,10 @@ type (
MaxIdleConns int `yaml:"maxIdleConns"`
// MaxConnLifetime is the maximum time a connection can be alive
MaxConnLifetime time.Duration `yaml:"maxConnLifetime"`
// NumShards is the number of storage shards to use for tables
// in a sharded sql database. The default value for this param is 1
NumShards int `yaml:"nShards"`
// EXPERIMENTAL - TaskScanPartitions is the number of partitions to sequentially scan during ListTaskQueue operations.
// This is used for in a sharded sql database such as Vitess for heavy task workloads to minimize scatter gather.
// The default value for this param is 1, and should not be configured without a thorough understanding of what this does.
TaskScanPartitions int `yaml:"taskScanPartitions"`
// TLS is the configuration for TLS connections
TLS *auth.TLS `yaml:"tls"`
}
Expand Down
4 changes: 2 additions & 2 deletions common/service/config/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func (c *Persistence) Validate() error {
if ds.SQL != nil && ds.Cassandra != nil {
return fmt.Errorf("persistence config: datastore %v: only one of SQL or cassandra can be specified", st)
}
if ds.SQL != nil && ds.SQL.NumShards == 0 {
ds.SQL.NumShards = 1
if ds.SQL != nil && ds.SQL.TaskScanPartitions == 0 {
ds.SQL.TaskScanPartitions = 1
}
}
return nil
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ require (
github.com/golang/mock v1.4.3
github.com/golang/protobuf v1.4.2
github.com/google/uuid v1.1.1
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
shawnhathaway marked this conversation as resolved.
Show resolved Hide resolved
github.com/hashicorp/go-version v1.2.0
github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334
github.com/jcmturner/gokrb5/v8 v8.3.0 // indirect
Expand Down Expand Up @@ -76,7 +75,6 @@ require (
google.golang.org/api v0.26.0
google.golang.org/grpc v1.30.0
google.golang.org/grpc/examples v0.0.0-20200625174016-7a808837ae92
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/validator.v2 v2.0.0-20200605151824-2b28d334fa05
gopkg.in/yaml.v2 v2.3.0
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect
Expand Down
2 changes: 1 addition & 1 deletion schema/mysql/v57/temporal/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ CREATE TABLE tasks (
);

CREATE TABLE task_queues (
shard_id INT NOT NULL,
shard_id INT UNSIGNED NOT NULL,
namespace_id BINARY(16) NOT NULL,
name VARCHAR(255) NOT NULL,
task_type TINYINT NOT NULL, -- {Activity, Decision}
Expand Down
2 changes: 1 addition & 1 deletion schema/mysql/v57/temporal/versioned/v1.0/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ CREATE TABLE tasks (
);

CREATE TABLE task_queues (
shard_id INT NOT NULL,
shard_id INT UNSIGNED NOT NULL,
namespace_id BINARY(16) NOT NULL,
name VARCHAR(255) NOT NULL,
task_type TINYINT NOT NULL, -- {Activity, Decision}
Expand Down
2 changes: 1 addition & 1 deletion schema/postgres/temporal/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ CREATE TABLE tasks (
);

CREATE TABLE task_queues (
shard_id INTEGER NOT NULL,
shard_id BIGINT NOT NULL,
namespace_id BYTEA NOT NULL,
name VARCHAR(255) NOT NULL,
task_type SMALLINT NOT NULL, -- {Activity, Decision}
Expand Down
2 changes: 1 addition & 1 deletion schema/postgres/temporal/versioned/v1.0/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ CREATE TABLE tasks (
);

CREATE TABLE task_queues (
shard_id INTEGER NOT NULL,
shard_id BIGINT NOT NULL,
namespace_id BYTEA NOT NULL,
name VARCHAR(255) NOT NULL,
task_type SMALLINT NOT NULL, -- {Activity, Decision}
Expand Down