Skip to content

Commit

Permalink
Add support for arbitrary predicates for multi cursor queues (tempora…
Browse files Browse the repository at this point in the history
…lio#5323)

## What changed?

- Add support for arbitrary predicates for multi cursor queues, adds a
Grouper abstraction for grouping tasks and constructing predicates
- Add a DestinationPredicate (will be used later in multi-destination
queues)
- Add Grouper implementations for namespace ID and namespace ID +
destination.

## Why?

We need this flexibility for Nexus, this is some of the prep work.

## How did you test it?

Add some unit tests, ran existing tests.
  • Loading branch information
bergundy authored Jan 20, 2024
1 parent d9cad45 commit 6b6b0bf
Show file tree
Hide file tree
Showing 28 changed files with 678 additions and 165 deletions.
1 change: 1 addition & 0 deletions api/enums/v1/predicate.go-helpers.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions api/enums/v1/predicate.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions api/persistence/v1/predicates.go-helpers.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

198 changes: 144 additions & 54 deletions api/persistence/v1/predicates.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ enum PredicateType {
PREDICATE_TYPE_NOT = 5;
PREDICATE_TYPE_NAMESPACE_ID = 6;
PREDICATE_TYPE_TASK_TYPE = 7;
PREDICATE_TYPE_DESTINATION = 8;
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ message Predicate {
NotPredicateAttributes not_predicate_attributes = 6;
NamespaceIdPredicateAttributes namespace_id_predicate_attributes = 7;
TaskTypePredicateAttributes task_type_predicate_attributes = 8;
DestinationPredicateAttributes destination_predicate_attributes = 9;
}
}

Expand Down Expand Up @@ -66,3 +67,8 @@ message NamespaceIdPredicateAttributes {
message TaskTypePredicateAttributes {
repeated temporal.server.api.enums.v1.TaskType task_types = 1;
}

message DestinationPredicateAttributes {
repeated string destinations = 1;
}

92 changes: 45 additions & 47 deletions service/history/queues/action_pending_task_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import (
"golang.org/x/exp/slices"

"go.temporal.io/server/common/collection"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/service/history/tasks"
)

const (
Expand All @@ -46,24 +44,28 @@ type (
attributes *AlertAttributesQueuePendingTaskCount
monitor Monitor
maxReaderCount int64

// state of the action, used when running the action
tasksPerNamespace map[namespace.ID]int
pendingTaskPerNamespacePerSlice map[Slice]map[namespace.ID]int
slicesPerNamespace map[namespace.ID][]Slice
namespaceToClearPerSlice map[Slice][]namespace.ID
grouper Grouper

// Fields below this line make up the state of the action. Used when running the action.
// Key type is "any" to support grouping tasks by arbitrary keys.
tasksPerKey map[any]int
pendingTasksPerKeyPerSlice map[Slice]map[any]int
slicesPerKey map[any][]Slice
keysToClearPerSlice map[Slice][]any
}
)

func newQueuePendingTaskAction(
attributes *AlertAttributesQueuePendingTaskCount,
monitor Monitor,
maxReaderCount int,
grouper Grouper,
) *actionQueuePendingTask {
return &actionQueuePendingTask{
attributes: attributes,
monitor: monitor,
maxReaderCount: int64(maxReaderCount),
grouper: grouper,
}
}

Expand Down Expand Up @@ -102,30 +104,31 @@ func (a *actionQueuePendingTask) tryShrinkSlice(
}

func (a *actionQueuePendingTask) init() {
a.tasksPerNamespace = make(map[namespace.ID]int)
a.pendingTaskPerNamespacePerSlice = make(map[Slice]map[namespace.ID]int)
a.slicesPerNamespace = make(map[namespace.ID][]Slice)
a.namespaceToClearPerSlice = make(map[Slice][]namespace.ID)
a.tasksPerKey = make(map[any]int)
a.pendingTasksPerKeyPerSlice = make(map[Slice]map[any]int)
a.slicesPerKey = make(map[any][]Slice)
a.keysToClearPerSlice = make(map[Slice][]any)
}

func (a *actionQueuePendingTask) gatherStatistics(
readers map[int64]Reader,
) {
// gather statistic for
// 1. total # of pending tasks per namespace
// 2. for each slice, # of pending taks per namespace
// 3. for each namespace, a list of slices that contains pending tasks from that namespace,
// 1. total # of pending tasks per key
// 2. for each slice, # of pending taks per key
// 3. for each key, a list of slices that contains pending tasks from that key,
// reversely ordered by slice range. Upon unloading, first unload newer slices.
for _, reader := range readers {
reader.WalkSlices(func(s Slice) {
a.pendingTaskPerNamespacePerSlice[s] = s.TaskStats().PendingPerNamespace
for namespaceID, pendingTaskCount := range a.pendingTaskPerNamespacePerSlice[s] {
a.tasksPerNamespace[namespaceID] += pendingTaskCount
a.slicesPerNamespace[namespaceID] = append(a.slicesPerNamespace[namespaceID], s)
pendingPerKey := s.TaskStats().PendingPerKey
a.pendingTasksPerKeyPerSlice[s] = pendingPerKey
for key, pendingTaskCount := range pendingPerKey {
a.tasksPerKey[key] += pendingTaskCount
a.slicesPerKey[key] = append(a.slicesPerKey[key], s)
}
})
}
for _, sliceList := range a.slicesPerNamespace {
for _, sliceList := range a.slicesPerKey {
slices.SortFunc(sliceList, func(this, that Slice) int {
thisMin := this.Scope().Range.InclusiveMin
thatMin := that.Scope().Range.InclusiveMin
Expand All @@ -139,40 +142,40 @@ func (a *actionQueuePendingTask) findSliceToClear(
targetPendingTasks int,
) {
currentPendingTasks := 0
// order namespace by # of pending tasks
namespaceIDs := make([]namespace.ID, 0, len(a.tasksPerNamespace))
for namespaceID, namespacePendingTasks := range a.tasksPerNamespace {
currentPendingTasks += namespacePendingTasks
namespaceIDs = append(namespaceIDs, namespaceID)
// order key by # of pending tasks
keys := make([]any, 0, len(a.tasksPerKey))
for key, keyPendingTasks := range a.tasksPerKey {
currentPendingTasks += keyPendingTasks
keys = append(keys, key)
}
pq := collection.NewPriorityQueueWithItems(
func(this, that namespace.ID) bool {
return a.tasksPerNamespace[this] > a.tasksPerNamespace[that]
func(this, that any) bool {
return a.tasksPerKey[this] > a.tasksPerKey[that]
},
namespaceIDs,
keys,
)

for currentPendingTasks > targetPendingTasks && !pq.IsEmpty() {
namespaceID := pq.Remove()
key := pq.Remove()

sliceList := a.slicesPerNamespace[namespaceID]
sliceList := a.slicesPerKey[key]
if len(sliceList) == 0 {
panic("Found namespace with non-zero pending task count but has no correspoding Slice")
panic("Found key with non-zero pending task count but has no correspoding Slice")
}

// pop the first slice in the list
sliceToClear := sliceList[0]
sliceList = sliceList[1:]
a.slicesPerNamespace[namespaceID] = sliceList
a.slicesPerKey[key] = sliceList

tasksCleared := a.pendingTaskPerNamespacePerSlice[sliceToClear][namespaceID]
a.tasksPerNamespace[namespaceID] -= tasksCleared
tasksCleared := a.pendingTasksPerKeyPerSlice[sliceToClear][key]
a.tasksPerKey[key] -= tasksCleared
currentPendingTasks -= tasksCleared
if a.tasksPerNamespace[namespaceID] > 0 {
pq.Add(namespaceID)
if a.tasksPerKey[key] > 0 {
pq.Add(key)
}

a.namespaceToClearPerSlice[sliceToClear] = append(a.namespaceToClearPerSlice[sliceToClear], namespaceID)
a.keysToClearPerSlice[sliceToClear] = append(a.keysToClearPerSlice[sliceToClear], key)
}
}

Expand All @@ -189,7 +192,7 @@ func (a *actionQueuePendingTask) splitAndClearSlice(
// we can't do further split, have to clear entire slice
cleared := false
reader.ClearSlices(func(s Slice) bool {
_, ok := a.namespaceToClearPerSlice[s]
_, ok := a.keysToClearPerSlice[s]
cleared = cleared || ok
return ok
})
Expand All @@ -201,17 +204,12 @@ func (a *actionQueuePendingTask) splitAndClearSlice(

var splitSlices []Slice
reader.SplitSlices(func(s Slice) ([]Slice, bool) {
namespaceIDs, ok := a.namespaceToClearPerSlice[s]
keys, ok := a.keysToClearPerSlice[s]
if !ok {
return nil, false
}

namespaceIDStrings := make([]string, 0, len(namespaceIDs))
for _, namespaceID := range namespaceIDs {
namespaceIDStrings = append(namespaceIDStrings, namespaceID.String())
}

split, remain := s.SplitByPredicate(tasks.NewNamespacePredicate(namespaceIDStrings))
split, remain := s.SplitByPredicate(a.grouper.Predicate(keys))
split.Clear()
splitSlices = append(splitSlices, split)
return []Slice{remain}, true
Expand Down Expand Up @@ -250,9 +248,9 @@ func (a *actionQueuePendingTask) ensureNewReaders(

needNewReader := false
reader.WalkSlices(func(s Slice) {
// namespaceToClearPerSlice contains all the slices
// keysToClearPerSlice contains all the slices
// that needs to be split & cleared
_, ok := a.namespaceToClearPerSlice[s]
_, ok := a.keysToClearPerSlice[s]
needNewReader = needNewReader || ok
})

Expand Down
23 changes: 23 additions & 0 deletions service/history/queues/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ func ToPersistencePredicate(
return ToPersistenceNamespaceIDPredicate(predicate)
case *tasks.TypePredicate:
return ToPersistenceTaskTypePredicate(predicate)
case *tasks.DestinationPredicate:
return ToPersistenceDestinationPredicate(predicate)
default:
panic(fmt.Sprintf("unknown task predicate type: %T", predicate))
}
Expand All @@ -166,6 +168,8 @@ func FromPersistencePredicate(
return FromPersistenceNamespaceIDPredicate(predicate.GetNamespaceIdPredicateAttributes())
case enumsspb.PREDICATE_TYPE_TASK_TYPE:
return FromPersistenceTaskTypePredicate(predicate.GetTaskTypePredicateAttributes())
case enumsspb.PREDICATE_TYPE_DESTINATION:
return FromPersistenceDestinationPredicate(predicate.GetDestinationPredicateAttributes())
default:
panic(fmt.Sprintf("unknown persistence task predicate type: %v", predicate.GetPredicateType()))
}
Expand Down Expand Up @@ -315,3 +319,22 @@ func FromPersistenceTaskTypePredicate(
) tasks.Predicate {
return tasks.NewTypePredicate(attributes.TaskTypes)
}

func ToPersistenceDestinationPredicate(
taskDestinationPredicate *tasks.DestinationPredicate,
) *persistencespb.Predicate {
return &persistencespb.Predicate{
PredicateType: enumsspb.PREDICATE_TYPE_DESTINATION,
Attributes: &persistencespb.Predicate_DestinationPredicateAttributes{
DestinationPredicateAttributes: &persistencespb.DestinationPredicateAttributes{
Destinations: maps.Keys(taskDestinationPredicate.Destinations),
},
},
}
}

func FromPersistenceDestinationPredicate(
attributes *persistencespb.DestinationPredicateAttributes,
) tasks.Predicate {
return tasks.NewDestinationPredicate(attributes.Destinations)
}
8 changes: 8 additions & 0 deletions service/history/queues/executable.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,14 @@ func (e *executableImpl) SetScheduledTime(t time.Time) {
e.scheduledTime = t
}

// GetDestination returns the embedded task's destination if it exists. Defaults to an empty string.
func (e *executableImpl) GetDestination() string {
if t, ok := e.Task.(tasks.HasDestination); ok {
return t.GetDestination()
}
return ""
}

func (e *executableImpl) shouldResubmitOnNack(attempt int, err error) bool {
// this is an optimization for skipping rescheduler and retry the task sooner.
// this is useful for errors like workflow busy, which doesn't have to wait for
Expand Down
Loading

0 comments on commit 6b6b0bf

Please sign in to comment.