Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds IEventExecutor.Schedule, proper cancellation of scheduled tasks #80

Merged
merged 1 commit into from
Apr 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/DotNetty.Buffers/ByteBufferUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ public static string DecodeString(IByteBuffer src, int readerIndex, int len, Enc

if (src.HasArray)
{
return encoding.GetString(src.Array, readerIndex, len);
return encoding.GetString(src.Array, src.ArrayOffset + readerIndex, len);
}
else
{
Expand Down
15 changes: 15 additions & 0 deletions src/DotNetty.Common/Concurrency/AbstractEventExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,21 @@ public void Execute(Action action)
this.Execute(new ActionTaskQueueNode(action));
}

public virtual IScheduledTask Schedule(Action action, TimeSpan delay)
{
throw new NotSupportedException();
}

public virtual IScheduledTask Schedule(Action<object> action, object state, TimeSpan delay)
{
throw new NotSupportedException();
}

public virtual IScheduledTask Schedule(Action<object, object> action, object context, object state, TimeSpan delay)
{
throw new NotSupportedException();
}

public virtual Task ScheduleAsync(Action action, TimeSpan delay)
{
return this.ScheduleAsync(action, delay, CancellationToken.None);
Expand Down
163 changes: 39 additions & 124 deletions src/DotNetty.Common/Concurrency/AbstractScheduledEventExecutor.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) Microsoft. All rights reserved.
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace DotNetty.Common.Concurrency
Expand All @@ -15,8 +15,6 @@ namespace DotNetty.Common.Concurrency
/// </summary>
public abstract class AbstractScheduledEventExecutor : AbstractEventExecutor
{
static readonly Action<object, object> AddScheduledTaskAction = (e, t) => ((AbstractScheduledEventExecutor)e).ScheduledTaskQueue.Enqueue((IScheduledRunnable)t);

protected readonly PriorityQueue<IScheduledRunnable> ScheduledTaskQueue = new PriorityQueue<IScheduledRunnable>();

// TODO: support for EventExecutorGroup
Expand Down Expand Up @@ -94,172 +92,89 @@ protected bool HasScheduledTasks()
return scheduledTask != null && scheduledTask.Deadline <= PreciseTimeSpan.FromStart;
}

public override Task ScheduleAsync(Action action, TimeSpan delay, CancellationToken cancellationToken)
public override IScheduledTask Schedule(Action action, TimeSpan delay)
{
var scheduledTask = new ActionScheduledTask(action, PreciseTimeSpan.Deadline(delay), cancellationToken);
if (this.InEventLoop)
{
this.ScheduledTaskQueue.Enqueue(scheduledTask);
}
else
{
this.Execute(AddScheduledTaskAction, this, scheduledTask);
}
return scheduledTask.Completion;
}

public override Task ScheduleAsync(Action<object> action, object state, TimeSpan delay, CancellationToken cancellationToken)
{
var scheduledTask = new StateActionScheduledTask(action, state, PreciseTimeSpan.Deadline(delay), cancellationToken);
if (this.InEventLoop)
{
this.ScheduledTaskQueue.Enqueue(scheduledTask);
}
else
{
this.Execute(AddScheduledTaskAction, this, scheduledTask);
}
return scheduledTask.Completion;
return this.Schedule(new ActionScheduledTask(this, action, PreciseTimeSpan.Deadline(delay)));
}

public override Task ScheduleAsync(Action<object, object> action, object context, object state, TimeSpan delay, CancellationToken cancellationToken)
public override IScheduledTask Schedule(Action<object> action, object state, TimeSpan delay)
{
var scheduledTask = new StateActionWithContextScheduledTask(action, context, state, PreciseTimeSpan.Deadline(delay), cancellationToken);
if (this.InEventLoop)
{
this.ScheduledTaskQueue.Enqueue(scheduledTask);
}
else
{
this.Execute(AddScheduledTaskAction, this, scheduledTask);
}
return scheduledTask.Completion;
return this.Schedule(new StateActionScheduledTask(this, action, state, PreciseTimeSpan.Deadline(delay)));
}

#region Scheduled task data structures

protected interface IScheduledRunnable : IRunnable, IComparable<IScheduledRunnable>
public override IScheduledTask Schedule(Action<object, object> action, object context, object state, TimeSpan delay)
{
PreciseTimeSpan Deadline { get; }

bool Cancel();
return this.Schedule(new StateActionWithContextScheduledTask(this, action, context, state, PreciseTimeSpan.Deadline(delay)));
}

protected abstract class ScheduledTaskBase : MpscLinkedQueueNode<IRunnable>, IScheduledRunnable
public override Task ScheduleAsync(Action action, TimeSpan delay, CancellationToken cancellationToken)
{
readonly TaskCompletionSource promise;

protected ScheduledTaskBase(PreciseTimeSpan deadline, TaskCompletionSource promise, CancellationToken cancellationToken)
{
this.promise = promise;
this.Deadline = deadline;
this.CancellationToken = cancellationToken;
}

public PreciseTimeSpan Deadline { get; private set; }

public bool Cancel()
if (cancellationToken.IsCancellationRequested)
{
return this.promise.TrySetCanceled();
return TaskEx.Cancelled;
}

public Task Completion
if (!cancellationToken.CanBeCanceled)
{
get { return this.promise.Task; }
return this.Schedule(action, delay).Completion;
}

public CancellationToken CancellationToken { get; private set; }

int IComparable<IScheduledRunnable>.CompareTo(IScheduledRunnable other)
{
Contract.Requires(other != null);

return this.Deadline.CompareTo(other.Deadline);
}
return this.Schedule(new ActionScheduledAsyncTask(this, action, PreciseTimeSpan.Deadline(delay), cancellationToken)).Completion;
}

public override IRunnable Value
public override Task ScheduleAsync(Action<object> action, object state, TimeSpan delay, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
get { return this; }
return TaskEx.Cancelled;
}

public void Run()
if (!cancellationToken.CanBeCanceled)
{
if (this.CancellationToken.IsCancellationRequested)
{
this.promise.TrySetCanceled();
return;
}
if (this.Completion.IsCanceled)
{
return;
}
try
{
this.Execute();
this.promise.TryComplete();
}
catch (Exception ex)
{
// todo: check for fatal
this.promise.TrySetException(ex);
}
return this.Schedule(action, state, delay).Completion;
}

protected abstract void Execute();
return this.Schedule(new StateActionScheduledAsyncTask(this, action, state, PreciseTimeSpan.Deadline(delay), cancellationToken)).Completion;
}

sealed class ActionScheduledTask : ScheduledTaskBase
public override Task ScheduleAsync(Action<object, object> action, object context, object state, TimeSpan delay, CancellationToken cancellationToken)
{
readonly Action action;

public ActionScheduledTask(Action action, PreciseTimeSpan deadline, CancellationToken cancellationToken)
: base(deadline, new TaskCompletionSource(), cancellationToken)
if (cancellationToken.IsCancellationRequested)
{
this.action = action;
return TaskEx.Cancelled;
}

protected override void Execute()
if (!cancellationToken.CanBeCanceled)
{
this.action();
return this.Schedule(action, context, state, delay).Completion;
}

return this.Schedule(new StateActionWithContextScheduledAsyncTask(this, action, context, state, PreciseTimeSpan.Deadline(delay), cancellationToken)).Completion;
}

sealed class StateActionScheduledTask : ScheduledTaskBase
protected IScheduledRunnable Schedule(IScheduledRunnable task)
{
readonly Action<object> action;

public StateActionScheduledTask(Action<object> action, object state, PreciseTimeSpan deadline,
CancellationToken cancellationToken)
: base(deadline, new TaskCompletionSource(state), cancellationToken)
if (this.InEventLoop)
{
this.action = action;
this.ScheduledTaskQueue.Enqueue(task);
}

protected override void Execute()
else
{
this.action(this.Completion.AsyncState);
this.Execute((e, t) => ((AbstractScheduledEventExecutor)e).ScheduledTaskQueue.Enqueue((IScheduledRunnable)t), this, task);
}
return task;
}

sealed class StateActionWithContextScheduledTask : ScheduledTaskBase
internal void RemoveScheduled(IScheduledRunnable task)
{
readonly Action<object, object> action;
readonly object context;

public StateActionWithContextScheduledTask(Action<object, object> action, object context, object state,
PreciseTimeSpan deadline, CancellationToken cancellationToken)
: base(deadline, new TaskCompletionSource(state), cancellationToken)
if (this.InEventLoop)
{
this.action = action;
this.context = context;
this.ScheduledTaskQueue.Remove(task);
}

protected override void Execute()
else
{
this.action(this.context, this.Completion.AsyncState);
this.Execute((e, t) => ((AbstractScheduledEventExecutor)e).ScheduledTaskQueue.Remove((IScheduledRunnable)t), this, task);
}
}

#endregion
}
}
24 changes: 24 additions & 0 deletions src/DotNetty.Common/Concurrency/ActionScheduledAsyncTask.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace DotNetty.Common.Concurrency
{
using System;
using System.Threading;

sealed class ActionScheduledAsyncTask : ScheduledAsyncTask
{
readonly Action action;

public ActionScheduledAsyncTask(AbstractScheduledEventExecutor executor, Action action, PreciseTimeSpan deadline, CancellationToken cancellationToken)
: base(executor, deadline, new TaskCompletionSource(), cancellationToken)
{
this.action = action;
}

protected override void Execute()
{
this.action();
}
}
}
23 changes: 23 additions & 0 deletions src/DotNetty.Common/Concurrency/ActionScheduledTask.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace DotNetty.Common.Concurrency
{
using System;

sealed class ActionScheduledTask : ScheduledTask
{
readonly Action action;

public ActionScheduledTask(AbstractScheduledEventExecutor executor, Action action, PreciseTimeSpan deadline)
: base(executor, deadline, new TaskCompletionSource())
{
this.action = action;
}

protected override void Execute()
{
this.action();
}
}
}
28 changes: 28 additions & 0 deletions src/DotNetty.Common/Concurrency/IEventExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,34 @@ public interface IEventExecutor
/// </remarks>
void Execute(Action<object, object> action, object context, object state);

/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
/// <remarks>
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
IScheduledTask Schedule(Action action, TimeSpan delay);

/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
/// <remarks>
/// <paramref name="state"/> parameter is useful to when repeated execution of an action against
/// different objects is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
IScheduledTask Schedule(Action<object> action, object state, TimeSpan delay);

/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
/// <remarks>
/// <paramref name="context"/> and <paramref name="state"/> parameters are useful when repeated execution of
/// an action against different objects in different context is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
IScheduledTask Schedule(Action<object, object> action, object context, object state, TimeSpan delay);

/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
Expand Down
11 changes: 11 additions & 0 deletions src/DotNetty.Common/Concurrency/IScheduledRunnable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace DotNetty.Common.Concurrency
{
using System;

public interface IScheduledRunnable : IRunnable, IScheduledTask, IComparable<IScheduledRunnable>
{
}
}
19 changes: 19 additions & 0 deletions src/DotNetty.Common/Concurrency/IScheduledTask.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace DotNetty.Common.Concurrency
{
using System.Runtime.CompilerServices;
using System.Threading.Tasks;

public interface IScheduledTask
{
bool Cancel();

PreciseTimeSpan Deadline { get; }

Task Completion { get; }

TaskAwaiter GetAwaiter();
}
}
Loading