Skip to content

Commit

Permalink
Merge pull request #1727 from rejectedsoftware/issue_1726_tcp_close
Browse files Browse the repository at this point in the history
Fix TCP close behavior in case of cuncurrent read+write operations. Fixes #1726.
(cherry picked from commit 7479a8b)

# Conflicts:
#	source/vibe/core/drivers/libasync.d
  • Loading branch information
s-ludwig committed Apr 10, 2017
1 parent f9f4b62 commit 747b920
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 75 deletions.
124 changes: 57 additions & 67 deletions source/vibe/core/drivers/libasync.d
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ final class LibasyncDriver : EventDriver {
});

if (Task.getThis() != Task())
tcp_connection.acquireWriter();
tcp_connection.m_settings.writer.acquire();

tcp_connection.m_tcpImpl.conn = conn;
//conn.local = bind_addr;
Expand All @@ -295,7 +295,7 @@ final class LibasyncDriver : EventDriver {
tcp_connection.m_tcpImpl.localAddr = conn.local;

if (Task.getThis() != Task())
tcp_connection.releaseWriter();
tcp_connection.m_settings.writer.release();
return tcp_connection;
}

Expand Down Expand Up @@ -1134,8 +1134,8 @@ final class LibasyncTCPConnection : TCPConnection/*, Buffered*/ {

@property bool dataAvailableForRead(){
logTrace("dataAvailableForRead");
acquireReader();
scope(exit) releaseReader();
m_settings.reader.acquire();
scope(exit) m_settings.reader.release();
return !readEmpty;
}

Expand All @@ -1153,8 +1153,8 @@ final class LibasyncTCPConnection : TCPConnection/*, Buffered*/ {
@property ulong leastSize()
{
logTrace("leastSize TCP");
acquireReader();
scope(exit) releaseReader();
m_settings.reader.acquire();
scope(exit) m_settings.reader.release();

while( m_readBuffer.empty ){
if (!connected)
Expand All @@ -1169,9 +1169,23 @@ final class LibasyncTCPConnection : TCPConnection/*, Buffered*/ {
void close()
{
logTrace("Close TCP enter");

// resume any reader, so that the read operation can be ended with a failure
Task reader = m_settings.reader.task;
while (m_settings.reader.isWaiting && reader.running) {
logTrace("resuming reader first");
getDriverCore().yieldAndResumeTask(reader);
}

// test if the connection is already closed
if (m_closed) {
logTrace("connection already closed.");
return;
}

//logTrace("closing");
acquireWriter();
scope(exit) releaseWriter();
m_settings.writer.acquire();
scope(exit) m_settings.writer.release();

// checkConnected();
m_readBuffer.dispose();
Expand All @@ -1184,13 +1198,13 @@ final class LibasyncTCPConnection : TCPConnection/*, Buffered*/ {
if (timeout == 0.seconds)
timeout = Duration.max;
logTrace("WaitForData enter, timeout %s :: Ptr %s", timeout.toString(), (cast(void*)this).to!string);
acquireReader();
m_settings.reader.acquire();
auto _driver = getEventDriver();
auto tm = _driver.createTimer(null);
scope(exit) {
_driver.stopTimer(tm);
_driver.releaseTimer(tm);
releaseReader();
m_settings.reader.release();
}
_driver.m_timers.getUserData(tm).owner = Task.getThis();
if (timeout != Duration.max) _driver.rearmTimer(tm, timeout, false);
Expand Down Expand Up @@ -1219,8 +1233,8 @@ final class LibasyncTCPConnection : TCPConnection/*, Buffered*/ {
const(ubyte)[] peek()
{
logTrace("Peek TCP enter");
acquireReader();
scope(exit) releaseReader();
m_settings.reader.acquire();
scope(exit) m_settings.reader.release();

if (!readEmpty)
return (m_slice.length > 0) ? cast(const(ubyte)[]) m_slice : m_readBuffer.peek();
Expand All @@ -1233,8 +1247,8 @@ final class LibasyncTCPConnection : TCPConnection/*, Buffered*/ {
if (!dst.length) return;
assert(dst !is null && !m_slice);
logTrace("Read TCP");
acquireReader();
scope(exit) releaseReader();
m_settings.reader.acquire();
scope(exit) m_settings.reader.release();
while( dst.length > 0 ){
while( m_readBuffer.empty ){
checkConnected();
Expand All @@ -1256,8 +1270,8 @@ final class LibasyncTCPConnection : TCPConnection/*, Buffered*/ {
{
assert(bytes_ !is null);
logTrace("%s", "write enter");
acquireWriter();
scope(exit) releaseWriter();
m_settings.writer.acquire();
scope(exit) m_settings.writer.release();
checkConnected();
const(ubyte)[] bytes = bytes_;
logTrace("TCP write with %s bytes called", bytes.length);
Expand All @@ -1283,8 +1297,8 @@ final class LibasyncTCPConnection : TCPConnection/*, Buffered*/ {
void flush()
{
logTrace("%s", "Flush");
acquireWriter();
scope(exit) releaseWriter();
m_settings.writer.acquire();
scope(exit) m_settings.writer.release();

checkConnected();

Expand All @@ -1301,52 +1315,6 @@ final class LibasyncTCPConnection : TCPConnection/*, Buffered*/ {
writeDefault(stream, nbytes);
}

void acquireReader() {
if (Task.getThis() == Task()) {
logTrace("Reading without task");
return;
}
logTrace("%s", "Acquire Reader");
assert(!amReadOwner());
m_settings.reader.task = Task.getThis();
logTrace("Task waiting in: %X", cast(void*) this);
m_settings.reader.isWaiting = true;
}

void releaseReader() {
if (Task.getThis() == Task()) return;
logTrace("%s", "Release Reader");
assert(amReadOwner());
m_settings.reader.isWaiting = false;
}

bool amReadOwner() const {
if (m_settings.reader.isWaiting && m_settings.reader.task == Task.getThis())
return true;
return false;
}

void acquireWriter() {
if (Task.getThis() == Task()) return;
logTrace("%s", "Acquire Writer");
assert(!amWriteOwner(), "Failed to acquire writer in task: " ~ Task.getThis().fiber.to!string ~ ", it was busy with: " ~ m_settings.writer.task.to!string);
m_settings.writer.task = Task.getThis();
m_settings.writer.isWaiting = true;
}

void releaseWriter() {
if (Task.getThis() == Task()) return;
logTrace("%s", "Release Writer");
assert(amWriteOwner());
m_settings.writer.isWaiting = false;
}

bool amWriteOwner() const {
if (m_settings.writer.isWaiting && m_settings.writer.task == Task.getThis())
return true;
return false;
}

private void checkConnected()
{
enforce(connected, "The remote peer has closed the connection.");
Expand Down Expand Up @@ -1471,12 +1439,12 @@ final class LibasyncTCPConnection : TCPConnection/*, Buffered*/ {
bool hasUniqueReader = m_settings.reader.isWaiting;
bool hasUniqueWriter = m_settings.writer.isWaiting && reader != writer;

if (hasUniqueReader && Task.getThis() != reader) {
getDriverCore().resumeTask(reader, m_settings.reader.noExcept?null:ex);
}
if (hasUniqueWriter && Task.getThis() != writer && wake_ex) {
getDriverCore().resumeTask(writer, ex);
}
if (hasUniqueReader && Task.getThis() != reader) {
getDriverCore().resumeTask(reader, m_settings.reader.noExcept?null:ex);
}
}

void onConnect() {
Expand Down Expand Up @@ -1547,6 +1515,28 @@ final class LibasyncTCPConnection : TCPConnection/*, Buffered*/ {
Task task; // we can only have one task waiting for read/write operations
bool isWaiting; // if a task is actively waiting
bool noExcept;

void acquire() {
assert(!this.isWaiting, "Acquiring waiter that is already in use.");
if (Task.getThis() == Task()) return;
logTrace("%s", "Acquire waiter");
assert(!amOwner(), "Failed to acquire waiter in task: " ~ Task.getThis().fiber.to!string ~ ", it was busy with: " ~ this.task.to!string);
this.task = Task.getThis();
this.isWaiting = true;
}

void release() {
if (Task.getThis() == Task()) return;
logTrace("%s", "Release waiter");
assert(amOwner());
this.isWaiting = false;
}

bool amOwner() const {
if (this.isWaiting && this.task == Task.getThis())
return true;
return false;
}
}

struct Settings {
Expand Down
23 changes: 15 additions & 8 deletions source/vibe/core/drivers/libevent2_tcp.d
Original file line number Diff line number Diff line change
Expand Up @@ -751,15 +751,22 @@ package nothrow extern(C)

ctx.core.eventException = ex;

if (ctx.readOwner && ctx.readOwner.running && !ctx.core.isScheduledForResume(ctx.readOwner)) {
logTrace("resuming corresponding task%s...", ex is null ? "" : " with exception");
if (ctx.readOwner.fiber.state == Fiber.State.EXEC) ctx.exception = ex;
else ctx.core.resumeTask(ctx.readOwner, ex);
// ctx can be destroyed after resuming the reader, so get everything that is required from it first
auto reader = ctx.readOwner;
auto writer = ctx.writeOwner;
auto core = ctx.core;

if (ex && (reader && reader.fiber.state == Fiber.State.EXEC || writer && writer.fiber.state == Fiber.State.EXEC))
ctx.exception = ex;

if (writer && writer.running && writer.fiber.state != Fiber.State.EXEC) {
logTrace("resuming corresponding write task%s...", ex is null ? "" : " with exception");
core.resumeTask(writer, ex);
}
if (ctx.writeOwner && ctx.writeOwner != ctx.readOwner && ctx.writeOwner.running) {
logTrace("resuming corresponding task%s...", ex is null ? "" : " with exception");
if (ctx.writeOwner.fiber.state == Fiber.State.EXEC) ctx.exception = ex;
else ctx.core.resumeTask(ctx.writeOwner, ex);

if (reader && writer != reader && reader.running && !core.isScheduledForResume(reader) && reader.fiber.state != Fiber.State.EXEC) {
logTrace("resuming corresponding read task%s...", ex is null ? "" : " with exception");
core.resumeTask(reader, ex);
}
} catch (UncaughtException e) {
logWarn("Got exception when resuming task onSocketEvent: %s", e.msg);
Expand Down
4 changes: 4 additions & 0 deletions tests/vibe.core.net.1726/dub.sdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
name "tests"
description "TCP disconnect task issue"
dependency "vibe-d:core" path="../../"
versions "VibeDefaultMain"
53 changes: 53 additions & 0 deletions tests/vibe.core.net.1726/source/app.d
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import vibe.core.core;
import vibe.core.net;
import core.time : msecs;
import vibe.core.log;

shared static this()
{
bool done = false;
auto buf = new ubyte[512*1024*1024];

listenTCP(11375,(conn) {
bool read_ex = false;
bool write_ex = false;
auto rt = runTask!TCPConnection((conn) {
try {
conn.read(buf);
assert(false, "Expected read() to throw an exception.");
} catch (Exception) {
read_ex = true;
conn.close();
logInfo("read out");
} // expected
}, conn);
auto wt = runTask!TCPConnection((conn) {
try {
conn.write(buf);
assert(false, "Expected read() to throw an exception.");
} catch (Exception) {
write_ex = true;
conn.close();
logInfo("write out");
} // expected
}, conn);

rt.join();
wt.join();
assert(read_ex, "No read exception thrown");
assert(write_ex, "No write exception thrown");
done = true;
});


runTask({
try {
auto conn = connectTCP("127.0.0.1", 11375);
conn.close();
} catch (Exception e) assert(false, e.msg);
sleep(50.msecs);
assert(done, "Not done");

exitEventLoop();
});
}

0 comments on commit 747b920

Please sign in to comment.