Skip to content

Commit

Permalink
Fix network edge cases after switching to GetQueuedCompletionStatusEx.
Browse files Browse the repository at this point in the history
  • Loading branch information
amacal committed Apr 3, 2017
1 parent 278d68a commit f131435
Show file tree
Hide file tree
Showing 13 changed files with 65 additions and 63 deletions.
2 changes: 0 additions & 2 deletions sources/Leak.Completion/CompletionCallback.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,5 @@ namespace Leak.Completion
public interface CompletionCallback
{
unsafe void Complete(NativeOverlapped* overlapped, int affected);

unsafe void Fail(NativeOverlapped* overlapped);
}
}
10 changes: 1 addition & 9 deletions sources/Leak.Completion/CompletionThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,7 @@ CompletionInterop.OverlappedEntry[] entries
Overlapped overlapped = Overlapped.Unpack(entry.lpOverlapped);
CompletionCallback callback = overlapped.AsyncResult as CompletionCallback;

if (result)
{
callback?.Complete(entry.lpOverlapped, (int)entry.dwNumberOfBytesTransferred);
}
else
{
callback?.Fail(entry.lpOverlapped);
}

callback?.Complete(entry.lpOverlapped, (int)entry.dwNumberOfBytesTransferred);
Overlapped.Free(entry.lpOverlapped);
}
}
Expand Down
7 changes: 7 additions & 0 deletions sources/Leak.Files/FileInterop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ public static extern unsafe int WriteFile(
public static extern bool FlushFileBuffers(
[In] IntPtr handle);

[DllImport("kernel32.dll", SetLastError = true)]
public static extern unsafe uint GetOverlappedResult(
[In] IntPtr handle,
[In] NativeOverlapped* lpOverlapped,
[Out] out uint ptrBytesTransferred,
[In] bool wait);

public static uint GetLastError()
{
return (uint)Marshal.GetLastWin32Error();
Expand Down
30 changes: 18 additions & 12 deletions sources/Leak.Files/FileResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,21 @@ public void Pin(object instance)

public unsafe void Complete(NativeOverlapped* overlapped, int affected)
{
uint ignore;
uint result = FileInterop.GetOverlappedResult(Handle, overlapped, out ignore, false);

Affected = affected;
IsCompleted = true;

Event?.Set();
Event?.Dispose();
Pinned?.Free();

Complete();
}

unsafe void CompletionCallback.Fail(NativeOverlapped* overlapped)
{
Fail();
if (result != 0 || affected > 0)
{
Release();
Complete();
}
else
{
Fail();
}
}

public void Fail()
Expand All @@ -66,11 +68,15 @@ public void Fail(uint code)
Status = (FileStatus)code;
IsCompleted = true;

Release();
Complete();
}

private void Release()
{
Event?.Set();
Event?.Dispose();
Pinned?.Free();

Complete();
}

protected abstract void Complete();
Expand Down
3 changes: 1 addition & 2 deletions sources/Leak.Sockets.Tests/ReceiveTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ public async Task CanHandleTerminatedStream()
byte[] buffer = new byte[10];
TcpSocketReceive received = await socket.Receive(buffer);

Assert.That(received.Status, Is.EqualTo(SocketStatus.OK));
Assert.That(received.Count, Is.Zero);
Assert.That(received.Status, Is.Not.EqualTo(SocketStatus.OK));
}
}
}
Expand Down
48 changes: 24 additions & 24 deletions sources/Leak.Sockets/SocketResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,49 +49,49 @@ public void Pin(object instance)

public unsafe void Complete(NativeOverlapped* overlapped, int affected)
{
Affected = affected;
IsCompleted = true;

Event?.Set();
Event?.Dispose();

Pinned1?.Free();
Pinned2?.Free();

OnCompleted(affected);
}

unsafe void CompletionCallback.Fail(NativeOverlapped* overlapped)
{
uint affected;
uint ignore;
uint flags;

TcpSocketInterop.WSAGetOverlappedResult(Handle, overlapped, out affected, false, out flags);
uint result = TcpSocketInterop.WSAGetOverlappedResult(Handle, overlapped, out ignore, false, out flags);

Fail();
Affected = affected;
IsCompleted = true;

if (result != 0 || affected > 0)
{
Release();
OnCompleted();
}
else
{
Fail();
}
}

public void Fail()
{
Fail(TcpSocketInterop.GetLastError());
}

public void Fail(uint code)
public void Fail(uint reason)
{
Status = (SocketStatus)code;
IsCompleted = true;
Status = (SocketStatus)reason;

Release();
OnFailed();
}

private void Release()
{
Event?.Set();
Event?.Dispose();

Pinned1?.Free();
Pinned2?.Free();

OnFailed(Status);
}

protected abstract void OnCompleted(int affected);
protected abstract void OnCompleted();

protected abstract void OnFailed(SocketStatus status);
protected abstract void OnFailed();
}
}
4 changes: 2 additions & 2 deletions sources/Leak.Sockets/TcpSocketAcceptResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ public TcpSocketAccept Unpack(IAsyncResult result)
return new TcpSocketAccept(Status, Socket, Connection, GetEndpoint);
}

protected override void OnCompleted(int affected)
protected override void OnCompleted()
{
OnAccepted?.Invoke(new TcpSocketAccept(Status, Socket, Connection, GetEndpoint));
}

protected override void OnFailed(SocketStatus status)
protected override void OnFailed()
{
OnAccepted?.Invoke(new TcpSocketAccept(Status, Socket, Connection, null));
}
Expand Down
4 changes: 2 additions & 2 deletions sources/Leak.Sockets/TcpSocketConnectResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ public TcpSocketConnect Unpack(IAsyncResult result)
return new TcpSocketConnect(Status, Socket, Endpoint);
}

protected override void OnCompleted(int affected)
protected override void OnCompleted()
{
OnConnected?.Invoke(new TcpSocketConnect(Status, Socket, Endpoint));
}

protected override void OnFailed(SocketStatus status)
protected override void OnFailed()
{
OnConnected?.Invoke(new TcpSocketConnect(Status, Socket, Endpoint));
}
Expand Down
4 changes: 2 additions & 2 deletions sources/Leak.Sockets/TcpSocketDisconnectResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ public TcpSocketDisconnect CreateData()
return new TcpSocketDisconnect(Status, Socket);
}

protected override void OnCompleted(int affected)
protected override void OnCompleted()
{
OnDisconnected?.Invoke(new TcpSocketDisconnect(Status, Socket));
}

protected override void OnFailed(SocketStatus status)
protected override void OnFailed()
{
OnDisconnected?.Invoke(new TcpSocketDisconnect(Status, Socket));
}
Expand Down
4 changes: 2 additions & 2 deletions sources/Leak.Sockets/TcpSocketReceiveResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ public TcpSocketReceive CreateData()
return new TcpSocketReceive(Status, Affected, Socket, Buffer);
}

protected override void OnCompleted(int affected)
protected override void OnCompleted()
{
OnReceived?.Invoke(new TcpSocketReceive(Status, Affected, Socket, Buffer));
}

protected override void OnFailed(SocketStatus status)
protected override void OnFailed()
{
OnReceived?.Invoke(new TcpSocketReceive(Status, Affected, Socket, Buffer));
}
Expand Down
4 changes: 2 additions & 2 deletions sources/Leak.Sockets/TcpSocketSendResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ public TcpSocketSend CreateData()
return new TcpSocketSend(Status, Affected, Socket, Buffer);
}

protected override void OnCompleted(int affected)
protected override void OnCompleted()
{
OnSent?.Invoke(new TcpSocketSend(Status, Affected, Socket, Buffer));
}

protected override void OnFailed(SocketStatus status)
protected override void OnFailed()
{
OnSent?.Invoke(new TcpSocketSend(Status, Affected, Socket, Buffer));
}
Expand Down
4 changes: 2 additions & 2 deletions sources/Leak.Sockets/UdpSocketReceiveResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ public UdpSocketReceive CreateData()
return new UdpSocketReceive(Status, Affected, Socket, Buffer, Address);
}

protected override void OnCompleted(int affected)
protected override void OnCompleted()
{
OnReceived?.Invoke(new UdpSocketReceive(Status, Affected, Socket, Buffer, Address));
}

protected override void OnFailed(SocketStatus status)
protected override void OnFailed()
{
OnReceived?.Invoke(new UdpSocketReceive(Status, Affected, Socket, Buffer, Address));
}
Expand Down
4 changes: 2 additions & 2 deletions sources/Leak.Sockets/UdpSocketSendResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ public UdpSocketSend CreateData()
return new UdpSocketSend(Status, Affected, Socket, Buffer, Endpoint);
}

protected override void OnCompleted(int affected)
protected override void OnCompleted()
{
OnSent?.Invoke(new UdpSocketSend(Status, Affected, Socket, Buffer, Endpoint));
}

protected override void OnFailed(SocketStatus status)
protected override void OnFailed()
{
OnSent?.Invoke(new UdpSocketSend(Status, Affected, Socket, Buffer, Endpoint));
}
Expand Down

0 comments on commit f131435

Please sign in to comment.