Skip to content

Commit

Permalink
Implement group matching functionality for jobs and triggers
Browse files Browse the repository at this point in the history
The commit adds a new matching function in RavenJobStore that allows getting jobs or triggers based on group patterns. The function supports a variety of string operations including equality, start/ends with and contains. Changes were required in Job and Trigger entities to include group in their serialized data. The system's behavior is further validated through corresponding unit tests.
  • Loading branch information
JezhikLaas committed Dec 20, 2023
1 parent 43f139a commit ab88e1d
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 15 deletions.
46 changes: 46 additions & 0 deletions Quartz.Impl.RavenJobStore.UnitTests/ImplementationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1569,6 +1569,52 @@ public async Task If_a_group_matcher_starts_with_is_used_Then_GetJobKeys_finds_e
.ContainSingle(x => x.Group == "2Group" && x.Name == "Job");
}

[Fact(DisplayName = "If a group matcher contains is used Then GetJobKeys finds entries")]
public async Task If_a_group_matcher_contains_is_used_Then_GetJobKeys_finds_entries()
{
await Target.SchedulerStartedAsync(CancellationToken.None);

var job = new JobDetailImpl("Job", "1Group!", typeof(NoOpJob));
await Target.StoreJobAsync(job, false, CancellationToken.None);

job = new JobDetailImpl("Job", "2Group!", typeof(NoOpJob));
await Target.StoreJobAsync(job, false, CancellationToken.None);

job = new JobDetailImpl("Job", "3Group!", typeof(NoOpJob));
await Target.StoreJobAsync(job, false, CancellationToken.None);

var result = await Target.GetJobKeysAsync
(
GroupMatcher<JobKey>.GroupContains("Group"), CancellationToken.None
);

result.Should()
.HaveCount(3);
}

[Fact(DisplayName = "If a group matcher any is used Then GetJobKeys finds entries")]
public async Task If_a_group_matcher_any_is_used_Then_GetJobKeys_finds_entries()
{
await Target.SchedulerStartedAsync(CancellationToken.None);

var job = new JobDetailImpl("Job", "1Group!", typeof(NoOpJob));
await Target.StoreJobAsync(job, false, CancellationToken.None);

job = new JobDetailImpl("Job", "2Group!", typeof(NoOpJob));
await Target.StoreJobAsync(job, false, CancellationToken.None);

job = new JobDetailImpl("Job", "3Group!", typeof(NoOpJob));
await Target.StoreJobAsync(job, false, CancellationToken.None);

var result = await Target.GetJobKeysAsync
(
GroupMatcher<JobKey>.AnyGroup(), CancellationToken.None
);

result.Should()
.HaveCount(3);
}

[Fact(DisplayName = "If a group matcher equals is used Then GetTriggerKeys finds entries")]
public async Task If_a_group_matcher_equals_is_used_Then_GetTriggerKeys_finds_entries()
{
Expand Down
7 changes: 6 additions & 1 deletion Quartz.Impl.RavenJobStore/Entities/Job.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace Domla.Quartz.Raven.Entities;

internal class Job : SerializeQuartzData
internal class Job : SerializeQuartzData, IGroupedElement
{
public Job(IJobDetail? job, string schedulerInstanceName)
{
Expand Down Expand Up @@ -59,6 +59,11 @@ public IJobDetail? Item
}
}

internal interface IGroupedElement
{
string Group { get; set; }
}

internal static class JobKeyExtensions
{
public static string GetDatabaseId(this JobKey? jobKey, string schedulerName) =>
Expand Down
2 changes: 1 addition & 1 deletion Quartz.Impl.RavenJobStore/Entities/Trigger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace Domla.Quartz.Raven.Entities;

internal class Trigger : SerializeQuartzData
internal class Trigger : SerializeQuartzData, IGroupedElement
{
public Trigger(IOperableTrigger? trigger, string schedulerInstanceName)
{
Expand Down
3 changes: 2 additions & 1 deletion Quartz.Impl.RavenJobStore/Indexes/JobIndex.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ internal JobIndex()
select new
{
job.Scheduler,
job.RequestsRecovery
job.RequestsRecovery,
job.Group
};
}
}
3 changes: 2 additions & 1 deletion Quartz.Impl.RavenJobStore/Indexes/TriggerIndex.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ internal TriggerIndex()
trigger.JobId,
trigger.State,
trigger.NextFireTimeUtc,
trigger.Priority
trigger.Priority,
trigger.Group
};
}
}
46 changes: 35 additions & 11 deletions Quartz.Impl.RavenJobStore/RavenJobStore.Implementation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -885,24 +885,36 @@ internal async Task<IReadOnlyCollection<JobKey>> GetJobKeysAsync(
TraceEnter(Logger);

await WaitForIndexingAsync(nameof(JobIndex)).ConfigureAwait(false);

using var session = GetNonWaitingSession();

var query = session
.Query<Job>(nameof(JobIndex))
.Where(x => x.Scheduler == InstanceName)
.ProjectInto<JobKey>();
.Where(x => x.Scheduler == InstanceName);

(var clientMatch, query) = GetMatcherWhereClause(query, matcher);
var projection = query.ProjectInto<JobKey>();

await using var stream = await session
.Advanced
.StreamAsync(query, token)
.StreamAsync(projection, token)
.ConfigureAwait(false);

var result = new HashSet<JobKey>();

while (await stream.MoveNextAsync().ConfigureAwait(false))
if (clientMatch)
{
if (matcher.IsMatch(stream.Current.Document)) result.Add(stream.Current.Document);
while (await stream.MoveNextAsync().ConfigureAwait(false))
{
if (matcher.IsMatch(stream.Current.Document)) result.Add(stream.Current.Document);
}
}
else
{
while (await stream.MoveNextAsync().ConfigureAwait(false))
{
result.Add(stream.Current.Document);
}
}

TraceExit(Logger, result);
Expand All @@ -922,19 +934,31 @@ internal async Task<IReadOnlyCollection<TriggerKey>> GetTriggerKeysAsync(

var query = session
.Query<Trigger>(nameof(TriggerIndex))
.Where(x => x.Scheduler == InstanceName)
.ProjectInto<TriggerKey>();
.Where(x => x.Scheduler == InstanceName);

(var clientMatch, query) = GetMatcherWhereClause(query, matcher);
var projection = query.ProjectInto<TriggerKey>();

await using var stream = await session
.Advanced
.StreamAsync(query, token)
.StreamAsync(projection, token)
.ConfigureAwait(false);

var result = new HashSet<TriggerKey>();

while (await stream.MoveNextAsync().ConfigureAwait(false))
if (clientMatch)
{
while (await stream.MoveNextAsync().ConfigureAwait(false))
{
if (matcher.IsMatch(stream.Current.Document)) result.Add(stream.Current.Document);
}
}
else
{
if (matcher.IsMatch(stream.Current.Document)) result.Add(stream.Current.Document);
while (await stream.MoveNextAsync().ConfigureAwait(false))
{
result.Add(stream.Current.Document);
}
}

TraceExit(Logger, result);
Expand Down
31 changes: 31 additions & 0 deletions Quartz.Impl.RavenJobStore/RavenJobStore.Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
using Domla.Quartz.Raven.Indexes;
using Microsoft.Extensions.Logging;
using Quartz;
using Quartz.Impl.Matchers;
using Quartz.Simpl;
using Quartz.Spi;
using Quartz.Util;
using Raven.Client.Documents;
using Raven.Client.Documents.Indexes;
using Raven.Client.Documents.Linq;
Expand Down Expand Up @@ -401,6 +403,35 @@ select trigger
result.ForEach(x => buffer.Enqueue(x, -x.Priority));
}

private (bool clientMatch, IRavenQueryable<T>) GetMatcherWhereClause<T, TKey>(

Check notice on line 406 in Quartz.Impl.RavenJobStore/RavenJobStore.Utils.cs

View workflow job for this annotation

GitHub Actions / Qodana for .NET

RoslynAnalyzers Mark members as static

Member 'GetMatcherWhereClause' does not access instance data and can be marked as static

Check notice on line 406 in Quartz.Impl.RavenJobStore/RavenJobStore.Utils.cs

View workflow job for this annotation

GitHub Actions / Qodana for .NET

Member can be made static (shared) (private accessibility)

Method 'GetMatcherWhereClause' can be made static
IRavenQueryable<T> source,
GroupMatcher<TKey> matcher) where T : IGroupedElement where TKey : Key<TKey>
{
if (matcher.CompareWithOperator.Equals(StringOperator.Equality))
{
return (false, source.Where(x => x.Group == matcher.CompareToValue));
}

if (matcher.CompareWithOperator.Equals(StringOperator.StartsWith))
{
return (false, source.Where(x => x.Group.StartsWith(matcher.CompareToValue)));
}

if (matcher.CompareWithOperator.Equals(StringOperator.EndsWith))
{
return (false, source.Where(x => x.Group.EndsWith(matcher.CompareToValue)));
}

if (matcher.CompareWithOperator.Equals(StringOperator.Anything))
{
return (false, source);
}

// Contains
// We cannot execute a 'contains', at least without an FTS index.
return (true, source);
}

private async Task WaitForIndexingAsync(params string[] names)
{
var operationExecutor = DocumentStore!.Maintenance.ForDatabase(DocumentStore!.Database);
Expand Down

0 comments on commit ab88e1d

Please sign in to comment.