Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix threadpool load induced delayed responses to plugin requests #3141

Merged
merged 10 commits into from
Dec 4, 2019
91 changes: 51 additions & 40 deletions src/NuGet.Core/NuGet.Protocol/Plugins/InboundRequestContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public sealed class InboundRequestContext : IDisposable
private readonly IConnection _connection;
private bool _isDisposed;
private readonly IPluginLogger _logger;
private readonly InboundRequestProcessingHandler _inboundRequestProcessingHandler;

/// <summary>
/// Gets the request ID.
Expand All @@ -37,7 +38,7 @@ public InboundRequestContext(
IConnection connection,
string requestId,
CancellationToken cancellationToken)
: this(connection, requestId, cancellationToken, PluginLogger.DefaultInstance)
: this(connection, requestId, cancellationToken, new InboundRequestProcessingHandler(), PluginLogger.DefaultInstance)
nkolev92 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was there any decision about extracting this to an Interface? Not sure if it should be something configurable where different Clients could want their own Request Handler. Maybe some want to restrict threads and don't care if it takes 5 minutes to begin processing a request in a CI, and others want to prioritize user experience?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these types are internal.
So no one but NuGet can use a dedicated handler.
I did that on purpose because others do not need to make that decision, only the client does.

{
}

Expand All @@ -52,12 +53,15 @@ public InboundRequestContext(
/// is <c>null</c>.</exception>
/// <exception cref="ArgumentException">Thrown if <paramref name="requestId" />
/// is either <c>null</c> or an empty string.</exception>
/// <exception cref="ArgumentNullException">Thrown if <paramref name="logger" />
/// <exception cref="ArgumentNullException">Thrown if <paramref name="inboundRequestProcessingHandler" />
/// is <c>null</c>.</exception>
/// /// <exception cref="ArgumentNullException">Thrown if <paramref name="logger" />
nkolev92 marked this conversation as resolved.
Show resolved Hide resolved
/// is <c>null</c>.</exception>
internal InboundRequestContext(
IConnection connection,
string requestId,
CancellationToken cancellationToken,
InboundRequestProcessingHandler inboundRequestProcessingHandler,
IPluginLogger logger)
{
if (connection == null)
Expand All @@ -75,6 +79,11 @@ internal InboundRequestContext(
throw new ArgumentNullException(nameof(logger));
}

if (inboundRequestProcessingHandler == null)
{
throw new ArgumentNullException(nameof(inboundRequestProcessingHandler));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it is important that this handler be specified. I'd expect the default constructor to be disabled so that nobody can create a Context without one:

private InboundRequestContext() { //disabled }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no default constructor.
The only public constructor creates a handler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind you're right

}

_connection = connection;
RequestId = requestId;

Expand All @@ -85,6 +94,42 @@ internal InboundRequestContext(
_cancellationToken = _cancellationTokenSource.Token;

_logger = logger;

_inboundRequestProcessingHandler = inboundRequestProcessingHandler;
}

private async Task ProcessResponseAsync(IRequestHandler requestHandler, Message request, IResponseHandler responseHandler)
{
try
{
if (_logger.IsEnabled)
{
_logger.Write(new TaskLogMessage(_logger.Now, request.RequestId, request.Method, request.Type, TaskState.Executing));
}

await requestHandler.HandleResponseAsync(
_connection,
request,
responseHandler,
_cancellationToken);
}
catch (OperationCanceledException) when (_cancellationToken.IsCancellationRequested)
{
var response = MessageUtilities.Create(request.RequestId, MessageType.Cancel, request.Method);

await _connection.SendAsync(response, CancellationToken.None);
}
catch (Exception ex)
{
BeginFaultAsync(request, ex);
}
finally
{
if (_logger.IsEnabled)
{
_logger.Write(new TaskLogMessage(_logger.Now, request.RequestId, request.Method, request.Type, TaskState.Completed));
}
}
}

/// <summary>
Expand All @@ -108,8 +153,7 @@ public void Dispose()
{
}

// Do not dispose of _connection or _logger. This context does not own them.

// Do not dispose of the _connection, _logger or _requestProcessingContext. This context does not own them.
GC.SuppressFinalize(this);

_isDisposed = true;
Expand Down Expand Up @@ -210,42 +254,9 @@ public void BeginResponseAsync(
{
_logger.Write(new TaskLogMessage(_logger.Now, request.RequestId, request.Method, request.Type, TaskState.Queued));
}
Func<Task> task = async () => await ProcessResponseAsync(requestHandler, request, responseHandler);
nkolev92 marked this conversation as resolved.
Show resolved Hide resolved

Task.Run(async () =>
{
// Top-level exception handler for a worker pool thread.
try
{
if (_logger.IsEnabled)
{
_logger.Write(new TaskLogMessage(_logger.Now, request.RequestId, request.Method, request.Type, TaskState.Executing));
}

await requestHandler.HandleResponseAsync(
_connection,
request,
responseHandler,
_cancellationToken);
}
catch (OperationCanceledException) when (_cancellationToken.IsCancellationRequested)
{
var response = MessageUtilities.Create(request.RequestId, MessageType.Cancel, request.Method);

await _connection.SendAsync(response, CancellationToken.None);
}
catch (Exception ex)
{
BeginFaultAsync(request, ex);
}
finally
{
if (_logger.IsEnabled)
{
_logger.Write(new TaskLogMessage(_logger.Now, request.RequestId, request.Method, request.Type, TaskState.Completed));
}
}
},
_cancellationToken);
_inboundRequestProcessingHandler.Handle(request.Method, task, _cancellationToken);
}

/// <summary>
Expand All @@ -262,4 +273,4 @@ public void Cancel()
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace NuGet.Protocol.Plugins
{
internal class InboundRequestProcessingHandler : IDisposable
nkolev92 marked this conversation as resolved.
Show resolved Hide resolved
{
private readonly ISet<MessageMethod> _fastProccessingMethods;
private readonly Lazy<DedicatedAsynchronousProcessingThread> _processingThread;
private bool _isDisposed;

public InboundRequestProcessingHandler() :
this(new HashSet<MessageMethod>())
{
}

/// <summary>
/// Requests from the processing methods provided in this set are handled on a dedicated thread.
/// </summary>
/// <param name="fastProcessingMethods"></param>
public InboundRequestProcessingHandler(ISet<MessageMethod> fastProcessingMethods)
nkolev92 marked this conversation as resolved.
Show resolved Hide resolved
{
_fastProccessingMethods = fastProcessingMethods ?? throw new ArgumentNullException(nameof(fastProcessingMethods));
// Lazily initialize the processing thread. It is not needed if there are no time critical methods.
_processingThread = new Lazy<DedicatedAsynchronousProcessingThread>(() =>
{
var thread = new DedicatedAsynchronousProcessingThread();
thread.Start();
return thread;
});

}

/// <summary>
/// Methods that are in the fast processing method list will be handled on a separate thread. Everything else will be queued on the threadpool.
/// </summary>
/// <param name="messageMethod"></param>
/// <param name="task"></param>
/// <param name="cancellationToken"></param>
internal void Handle(MessageMethod messageMethod, Func<Task> task, CancellationToken cancellationToken)
{
ThrowIfDisposed();
if (_fastProccessingMethods.Contains(messageMethod))
{
_processingThread.Value.Enqueue(task);
}
else
{
Task.Run(async () =>
nkolev92 marked this conversation as resolved.
Show resolved Hide resolved
{
await task();
}, cancellationToken);
}
}

public void Dispose()
{
if (_isDisposed)
{
return;
}
_processingThread.Value.Dispose();
nkolev92 marked this conversation as resolved.
Show resolved Hide resolved
GC.SuppressFinalize(this);

_isDisposed = true;
}

private void ThrowIfDisposed()
{
if (_isDisposed)
{
throw new ObjectDisposedException(GetType().Name);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@ internal sealed class AssemblyLogMessage : PluginLogMessage
private readonly string _fileVersion;
private readonly string _fullName;
private readonly string _informationalVersion;
private readonly string _entryAssemblyFullName;

internal AssemblyLogMessage(DateTimeOffset now)
: base(now)
{
var assembly = typeof(PluginFactory).Assembly;
var entryAssembly = Assembly.GetEntryAssembly();
var informationalVersionAttribute = assembly.GetCustomAttribute<AssemblyInformationalVersionAttribute>();
var fileVersionAttribute = assembly.GetCustomAttribute<AssemblyFileVersionAttribute>();

_fullName = assembly.FullName;
_entryAssemblyFullName = entryAssembly?.FullName;

if (fileVersionAttribute != null)
{
Expand All @@ -35,7 +38,9 @@ internal AssemblyLogMessage(DateTimeOffset now)

public override string ToString()
{
var message = new JObject(new JProperty("assembly full name", _fullName));
var message = new JObject(
new JProperty("assembly full name", _fullName),
new JProperty("entry assembly full name", _entryAssemblyFullName));

if (!string.IsNullOrEmpty(_fileVersion))
{
Expand All @@ -50,4 +55,4 @@ public override string ToString()
return ToString("assembly", message);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ internal sealed class ProcessLogMessage : PluginLogMessage
{
private readonly int _processId;
private readonly string _processName;
private readonly DateTime _processStartTime;

internal ProcessLogMessage(DateTimeOffset now)
: base(now)
Expand All @@ -19,16 +20,18 @@ internal ProcessLogMessage(DateTimeOffset now)
{
_processId = process.Id;
_processName = process.ProcessName;
_processStartTime = process.StartTime.ToUniversalTime();
}
}

public override string ToString()
{
var message = new JObject(
new JProperty("process ID", _processId),
new JProperty("process name", _processName));
new JProperty("process name", _processName),
new JProperty("process start time", _processStartTime.ToString("O")));

return ToString("process", message);
}
}
}
}
32 changes: 14 additions & 18 deletions src/NuGet.Core/NuGet.Protocol/Plugins/MessageDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public sealed class MessageDispatcher : IMessageDispatcher, IResponseHandler
private readonly ConcurrentDictionary<string, InboundRequestContext> _inboundRequestContexts;
private readonly IPluginLogger _logger;
private readonly ConcurrentDictionary<string, OutboundRequestContext> _outboundRequestContexts;
private readonly InboundRequestProcessingHandler _inboundRequestProcessingContext;

/// <summary>
/// Gets the request handlers for use by the dispatcher.
Expand All @@ -38,7 +39,7 @@ public sealed class MessageDispatcher : IMessageDispatcher, IResponseHandler
/// <exception cref="ArgumentNullException">Thrown if <paramref name="idGenerator" />
/// is <c>null</c>.</exception>
public MessageDispatcher(IRequestHandlers requestHandlers, IIdGenerator idGenerator)
: this(requestHandlers, idGenerator, PluginLogger.DefaultInstance)
: this(requestHandlers, idGenerator, new InboundRequestProcessingHandler(), PluginLogger.DefaultInstance)
{
}

Expand All @@ -51,9 +52,11 @@ public MessageDispatcher(IRequestHandlers requestHandlers, IIdGenerator idGenera
/// is <c>null</c>.</exception>
/// <exception cref="ArgumentNullException">Thrown if <paramref name="idGenerator" />
/// is <c>null</c>.</exception>
/// <exception cref="ArgumentNullException">Thrown if <paramref name="logger" />
/// <exception cref="ArgumentNullException">Thrown if <paramref name="inboundRequestProcessingHandler" />
/// is <c>null</c>.</exception>
internal MessageDispatcher(IRequestHandlers requestHandlers, IIdGenerator idGenerator, IPluginLogger logger)
/// /// <exception cref="ArgumentNullException">Thrown if <paramref name="logger" />
/// is <c>null</c>.</exception>
internal MessageDispatcher(IRequestHandlers requestHandlers, IIdGenerator idGenerator, InboundRequestProcessingHandler inboundRequestProcessingHandler, IPluginLogger logger)
{
if (requestHandlers == null)
{
Expand All @@ -65,6 +68,10 @@ internal MessageDispatcher(IRequestHandlers requestHandlers, IIdGenerator idGene
throw new ArgumentNullException(nameof(idGenerator));
}

if(inboundRequestProcessingHandler == null)
{
throw new ArgumentNullException(nameof(inboundRequestProcessingHandler));
}
if (logger == null)
{
throw new ArgumentNullException(nameof(logger));
Expand All @@ -76,6 +83,7 @@ internal MessageDispatcher(IRequestHandlers requestHandlers, IIdGenerator idGene

_inboundRequestContexts = new ConcurrentDictionary<string, InboundRequestContext>();
_outboundRequestContexts = new ConcurrentDictionary<string, OutboundRequestContext>();
_inboundRequestProcessingContext = inboundRequestProcessingHandler;
}

/// <summary>
Expand All @@ -89,7 +97,7 @@ public void Dispose()
}

Close();

_inboundRequestProcessingContext.Dispose();
SetConnection(connection: null);

// Do not dispose of _logger. This message dispatcher does not own it.
Expand Down Expand Up @@ -645,19 +653,6 @@ private IRequestHandler GetInboundRequestHandler(MessageMethod method)
return handler;
}

private InboundRequestContext GetInboundRequestContext(string requestId)
{
InboundRequestContext requestContext;

if (!_inboundRequestContexts.TryGetValue(requestId, out requestContext))
{
throw new ProtocolException(
string.Format(CultureInfo.CurrentCulture, Strings.Plugin_RequestContextDoesNotExist, requestId));
}

return requestContext;
}

private OutboundRequestContext GetOutboundRequestContext(string requestId)
{
OutboundRequestContext requestContext;
Expand Down Expand Up @@ -699,6 +694,7 @@ private InboundRequestContext CreateInboundRequestContext(
_connection,
message.RequestId,
cancellationToken,
_inboundRequestProcessingContext,
_logger);
}

Expand Down Expand Up @@ -741,4 +737,4 @@ private sealed class NullPayload
{
}
}
}
}
Loading