-
Notifications
You must be signed in to change notification settings - Fork 825
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for arbitrary predicates for multi cursor queues #5323
Add support for arbitrary predicates for multi cursor queues #5323
Conversation
ff13872
to
9a95fa7
Compare
Predicate(keys []any) tasks.Predicate | ||
} | ||
|
||
type GrouperNamespaceID struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: What about GroupByNamespaceID
instead? It reads a bit better imo
9a95fa7
to
2e7183a
Compare
@@ -531,6 +531,14 @@ func (e *executableImpl) SetScheduledTime(t time.Time) { | |||
e.scheduledTime = t | |||
} | |||
|
|||
// GetDestination returns the embedded task's destination if it exists. Defaults to an empty string. | |||
func (e *executableImpl) GetDestination() string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: feels like it should be a method on tasks.Task, not executable. But I don't have strong opinion.
@@ -135,6 +135,7 @@ func NewScheduledQueue( | |||
options, | |||
hostRateLimiter, | |||
readerCompletionFn, | |||
GrouperNamespaceID{}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be a parameter as well...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's wait for a use case, so far I don't see one.
service/history/queues/slice.go
Outdated
if len(s.executableTracker.pendingPerNamespace) > shrinkPredicateMaxPendingNamespaces { | ||
// TODO: this should be generic enough to shrink any predicate type, probably doesn't belong here. | ||
pendingPerKey := s.executableTracker.pendingPerKey | ||
if len(pendingPerKey) > shrinkPredicateMaxPendingKeys { | ||
// only shrink predicate if there're few namespaces left |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// only shrink predicate if there're few namespaces left | |
// only shrink predicate if there're few keys left |
service/history/queues/slice.go
Outdated
namespacePredicate := tasks.NewNamespacePredicate(pendingNamespaceIDs) | ||
s.scope.Predicate = tasks.AndPredicates(s.scope.Predicate, namespacePredicate) | ||
minimalPredicate := s.grouper.Predicate(maps.Keys(pendingPerKey)) | ||
s.scope.Predicate = tasks.AndPredicates(s.scope.Predicate, minimalPredicate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now that I think about it, the tasks.AndPredicates
looks unnecessary to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, good catch!
What changed?
Why?
We need this flexibility for Nexus, this is some of the prep work.
How did you test it?
Add some unit tests, ran existing tests.
Potential risks
We should look closely at this as it may mess up task processing multi-cursor logic.
Is hotfix candidate?
No.