Skip to content

Commit

Permalink
Asynchronous callbacks for reading and writing on Windows, attempt 2.
Browse files Browse the repository at this point in the history
Instead of ReadFile and WriteFile, which block and transfer data synchronously, use ReadFileEx and WriteFileEx, which both allow async callbacks. In addition, change how timeouts are used for ReadFile*, using an unlimited timeout for the first byte, and no timeout for the rest of the data in the input buffer. This removes the need to poll entirely, while still retrieving all data available in the input buffer. In both cases, the I/O operations happen in their own threads, since Windows requires IOCompletion callbacks to wait for their calling thread to be in an "alertable wait state".

Fixes serialport#1221
  • Loading branch information
dustmop committed Sep 6, 2017
1 parent c7a3be4 commit 4cc435c
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 113 deletions.
266 changes: 155 additions & 111 deletions src/serialport_win.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ void EIO_Open(uv_work_t* req) {
dcb.Parity = NOPARITY;
dcb.ByteSize = 8;
dcb.StopBits = ONESTOPBIT;


dcb.fOutxDsrFlow = FALSE;
dcb.fOutxCtsFlow = FALSE;

Expand Down Expand Up @@ -152,11 +152,10 @@ void EIO_Open(uv_work_t* req) {
return;
}

// Set the timeouts for read and write operations read operation is to return
// immediately with the bytes that have already been received, even if no bytes
// have been received.
COMMTIMEOUTS commTimeouts = {0};
commTimeouts.ReadIntervalTimeout = MAXDWORD; // Never timeout
// Set the timeouts for read and write operations.
// Read operation will wait for at least 1 byte to be received.
COMMTIMEOUTS commTimeouts = {};
commTimeouts.ReadIntervalTimeout = 0; // Never timeout, always wait for data.
commTimeouts.ReadTotalTimeoutMultiplier = 0; // Do not allow big read timeout when big read buffer used
commTimeouts.ReadTotalTimeoutConstant = 0; // Total read timeout (period of read loop)
commTimeouts.WriteTotalTimeoutConstant = 0; // Const part of write timeout
Expand Down Expand Up @@ -291,58 +290,63 @@ NAN_METHOD(Write) {
baton->bufferLength = bufferLength;
baton->offset = 0;
baton->callback.Reset(info[2].As<v8::Function>());
baton->complete = false;
// WriteFileEx requires a thread that can block. Create a new thread to
// run the write operation, saving the handle so it can be deallocated later.
baton->hThread = CreateThread(NULL, 0, WriteThread, baton, 0, NULL);
}

uv_work_t* req = new uv_work_t();
req->data = baton;

uv_queue_work(uv_default_loop(), req, EIO_Write, (uv_after_work_cb)EIO_AfterWrite);
void __stdcall WriteIOCompletion(DWORD errorCode, DWORD bytesTransferred, OVERLAPPED* ov) {
WriteBaton* baton = static_cast<WriteBaton*>(ov->hEvent);
DWORD bytesWritten;
if (!GetOverlappedResult((HANDLE)baton->fd, ov, &bytesWritten, TRUE)) {
errorCode = GetLastError();
ErrorCodeToString("Writing to COM port (GetOverlappedResult)", errorCode, baton->errorString);
baton->complete = true;
return;
}
if (bytesWritten) {
baton->offset += bytesWritten;
if (baton->offset >= baton->bufferLength) {
baton->complete = true;
}
}
}

void EIO_Write(uv_work_t* req) {
WriteBaton* data = static_cast<WriteBaton*>(req->data);
data->result = 0;

do {
OVERLAPPED ov = {0};
// Event used by GetOverlappedResult(..., TRUE) to wait for outgoing data or timeout
// Event MUST be used if program has several simultaneous asynchronous operations
// on the same handle (i.e. ReadFile and WriteFile)
ov.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);

// Start write operation - synchronous or asynchronous
DWORD bytesWritten = 0;
if (!WriteFile((HANDLE)data->fd, data->bufferData, static_cast<DWORD>(data->bufferLength), &bytesWritten, &ov)) {
DWORD lastError = GetLastError();
if (lastError != ERROR_IO_PENDING) {
// Write operation error
ErrorCodeToString("Writing to COM port (WriteFile)", lastError, data->errorString);
CloseHandle(ov.hEvent);
return;
}
// Write operation is completing asynchronously
// We MUST wait for the operation completion before deallocation of OVERLAPPED struct
// or write data buffer

// block for async write operation completion
bytesWritten = 0;
if (!GetOverlappedResult((HANDLE)data->fd, &ov, &bytesWritten, TRUE)) {
// Write operation error
DWORD lastError = GetLastError();
ErrorCodeToString("Writing to COM port (GetOverlappedResult)", lastError, data->errorString);
CloseHandle(ov.hEvent);
return;
}
DWORD __stdcall WriteThread(LPVOID param) {
WriteBaton* baton = static_cast<WriteBaton*>(param);

OVERLAPPED* ov = new OVERLAPPED;
memset(ov, 0, sizeof(ov));
ov->hEvent = static_cast<void*>(baton);

while (!baton->complete) {
char* offsetPtr = baton->bufferData + baton->offset;
// WriteFileEx requires calling GetLastError even upon success. Clear the error beforehand.
SetLastError(0);
WriteFileEx((HANDLE)baton->fd, offsetPtr, static_cast<DWORD>(baton->bufferLength), ov, WriteIOCompletion);
// Error codes when call is successful, such as ERROR_MORE_DATA.
DWORD lastError = GetLastError();
if (lastError != ERROR_SUCCESS) {
ErrorCodeToString("Writing to COM port (WriteFileEx)", lastError, baton->errorString);
break;
}
// Write operation completed synchronously
data->result = bytesWritten;
data->offset += data->result;
CloseHandle(ov.hEvent);
} while (data->bufferLength > data->offset);
// IOCompletion routine is only called once this thread is in an alertable wait state.
SleepEx(INFINITE, TRUE);
}
delete ov;
// Signal the main thread to run the callback.
uv_async_t* async = new uv_async_t;
uv_async_init(uv_default_loop(), async, EIO_AfterWrite);
async->data = baton;
uv_async_send(async);
ExitThread(0);
}

void EIO_AfterWrite(uv_work_t* req) {
void EIO_AfterWrite(uv_async_t* req) {
Nan::HandleScope scope;
WriteBaton* baton = static_cast<WriteBaton*>(req->data);
WaitForSingleObject(baton->hThread, INFINITE);
delete req;

v8::Local<v8::Value> argv[1];
Expand Down Expand Up @@ -404,81 +408,121 @@ NAN_METHOD(Read) {
baton->bufferLength = bufferLength;
baton->bufferData = node::Buffer::Data(buffer);
baton->callback.Reset(info[4].As<v8::Function>());

uv_work_t* req = new uv_work_t();
req->data = baton;
uv_queue_work(uv_default_loop(), req, EIO_Read, (uv_after_work_cb)EIO_AfterRead);
baton->complete = false;
// ReadFileEx requires a thread that can block. Create a new thread to
// run the read operation, saving the handle so it can be deallocated later.
baton->hThread = CreateThread(NULL, 0, ReadThread, baton, 0, NULL);
}

void EIO_Read(uv_work_t* req) {
ReadBaton* data = static_cast<ReadBaton*>(req->data);
data->bytesRead = 0;
int errorCode = ERROR_SUCCESS;
void __stdcall ReadIOCompletion(DWORD errorCode, DWORD bytesTransferred, OVERLAPPED* ov) {
ReadBaton* baton = static_cast<ReadBaton*>(ov->hEvent);

char* offsetPtr = data->bufferData;
offsetPtr += data->offset;
if (errorCode) {
ErrorCodeToString("Reading from COM port (ReadIOCompletion)", errorCode, baton->errorString);
baton->complete = true;
return;
}

// Event used by GetOverlappedResult(..., TRUE) to wait for incoming data or timeout
// Event MUST be used if program has several simultaneous asynchronous operations
// on the same handle (i.e. ReadFile and WriteFile)
HANDLE hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
DWORD lastError;
if (!GetOverlappedResult((HANDLE)baton->fd, ov, &bytesTransferred, TRUE)) {
lastError = GetLastError();
ErrorCodeToString("Reading from COM port (GetOverlappedResult)", lastError, baton->errorString);
baton->complete = true;
return;
}
if (bytesTransferred) {
baton->bytesRead += bytesTransferred;
baton->offset += bytesTransferred;
}

while (true) {
OVERLAPPED ov = {0};
ov.hEvent = hEvent;

// Start read operation - synchrounous or asynchronous
DWORD bytesReadSync = 0;
if (!ReadFile((HANDLE)data->fd, offsetPtr, data->bytesToRead, &bytesReadSync, &ov)) {
errorCode = GetLastError();
if (errorCode != ERROR_IO_PENDING) {
// Read operation error
if (errorCode == ERROR_OPERATION_ABORTED) {
} else {
ErrorCodeToString("Reading from COM port (ReadFile)", errorCode, data->errorString);
CloseHandle(hEvent);
return;
}
break;
}
// ReadFileEx and GetOverlappedResult retrieved only 1 byte. Read any additional data in the input
// buffer. Set the timeout to MAXDWORD in order to disable timeouts, so the read operation will
// return immediately no matter how much data is available.
COMMTIMEOUTS commTimeouts = {};
commTimeouts.ReadIntervalTimeout = MAXDWORD;
if (!SetCommTimeouts((HANDLE)baton->fd, &commTimeouts)) {
lastError = GetLastError();
ErrorCodeToString("Setting COM timeout (SetCommTimeouts)", lastError, baton->errorString);
baton->complete = true;
return;
}

// Read operation is asynchronous and is pending
// We MUST wait for operation completion before deallocation of OVERLAPPED struct
// or read data buffer

// Wait for async read operation completion or timeout
DWORD bytesReadAsync = 0;
if (!GetOverlappedResult((HANDLE)data->fd, &ov, &bytesReadAsync, TRUE)) {
// Read operation error
errorCode = GetLastError();
if (errorCode == ERROR_OPERATION_ABORTED) {
} else {
ErrorCodeToString("Reading from COM port (GetOverlappedResult)", errorCode, data->errorString);
CloseHandle(hEvent);
return;
}
break;
} else {
// Read operation completed asynchronously
data->bytesRead = bytesReadAsync;
}
} else {
// Read operation completed synchronously
data->bytesRead = bytesReadSync;
// Store additional data after whatever data has already been read.
char* offsetPtr = baton->bufferData + baton->offset;

// ReadFile, unlike ReadFileEx, needs an event in the overlapped structure.
ov->hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
if (!ReadFile((HANDLE)baton->fd, offsetPtr, baton->bytesToRead, &bytesTransferred, ov)) {
errorCode = GetLastError();

if (errorCode != ERROR_IO_PENDING) {
ErrorCodeToString("Reading from COM port (ReadFile)", errorCode, baton->errorString);
baton->complete = true;
CloseHandle(ov->hEvent);
return;
}

// Return data received if any
if (data->bytesRead > 0) {
break;
if (!GetOverlappedResult((HANDLE)baton->fd, ov, &bytesTransferred, TRUE)) {
lastError = GetLastError();
ErrorCodeToString("Reading from COM port (GetOverlappedResult)", lastError, baton->errorString);
baton->complete = true;
CloseHandle(ov->hEvent);
return;
}
}
CloseHandle(ov->hEvent);

baton->bytesToRead -= bytesTransferred;
baton->bytesRead += bytesTransferred;
baton->complete = true;
}

CloseHandle(hEvent);
DWORD __stdcall ReadThread(LPVOID param) {
ReadBaton* baton = static_cast<ReadBaton*>(param);
DWORD lastError;

OVERLAPPED* ov = new OVERLAPPED;
memset(ov, 0, sizeof(OVERLAPPED));
ov->hEvent = static_cast<void*>(baton);

while (!baton->complete) {
// Reset the read timeout to 0, so that it will block until more data arrives.
COMMTIMEOUTS commTimeouts = {};
commTimeouts.ReadIntervalTimeout = 0;
if (!SetCommTimeouts((HANDLE)baton->fd, &commTimeouts)) {
lastError = GetLastError();
ErrorCodeToString("Setting COM timeout (SetCommTimeouts)", lastError, baton->errorString);
break;
}
// ReadFileEx doesn't use overlapped's hEvent, so it is reserved for user data.
ov->hEvent = static_cast<HANDLE>(baton);
char* offsetPtr = baton->bufferData + baton->offset;
// ReadFileEx requires calling GetLastError even upon success. Clear the error beforehand.
SetLastError(0);
// Only read 1 byte, so that the callback will be triggered once any data arrives.
ReadFileEx((HANDLE)baton->fd, offsetPtr, 1, ov, ReadIOCompletion);
// Error codes when call is successful, such as ERROR_MORE_DATA.
lastError = GetLastError();
if (lastError != ERROR_SUCCESS) {
ErrorCodeToString("Reading from COM port (ReadFileEx)", lastError, baton->errorString);
break;
}
// IOCompletion routine is only called once this thread is in an alertable wait state.
SleepEx(INFINITE, TRUE);
}
delete ov;
// Signal the main thread to run the callback.
uv_async_t* async = new uv_async_t;
uv_async_init(uv_default_loop(), async, EIO_AfterRead);
async->data = baton;
uv_async_send(async);
ExitThread(0);
}

void EIO_AfterRead(uv_work_t* req) {
void EIO_AfterRead(uv_async_t* req) {
Nan::HandleScope scope;
ReadBaton* baton = static_cast<ReadBaton*>(req->data);
WaitForSingleObject(baton->hThread, INFINITE);
delete req;

v8::Local<v8::Value> argv[2];
Expand Down
12 changes: 10 additions & 2 deletions src/serialport_win.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ struct WriteBaton {
size_t bufferLength;
size_t offset;
size_t bytesWritten;
void* hThread;
bool complete;
Nan::Persistent<v8::Object> buffer;
Nan::Callback callback;
int result;
Expand All @@ -23,7 +25,9 @@ struct WriteBaton {

NAN_METHOD(Write);
void EIO_Write(uv_work_t* req);
void EIO_AfterWrite(uv_work_t* req);
void EIO_AfterWrite(uv_async_t* req);
DWORD __stdcall WriteThread(LPVOID param);


struct ReadBaton {
int fd;
Expand All @@ -32,13 +36,17 @@ struct ReadBaton {
size_t bytesRead;
size_t bytesToRead;
size_t offset;
void* hThread;
bool complete;
char errorString[ERROR_STRING_SIZE];
Nan::Callback callback;
};

NAN_METHOD(Read);
void EIO_Read(uv_work_t* req);
void EIO_AfterRead(uv_work_t* req);
void EIO_AfterRead(uv_async_t* req);
DWORD __stdcall ReadThread(LPVOID param);


NAN_METHOD(List);
void EIO_List(uv_work_t* req);
Expand Down

0 comments on commit 4cc435c

Please sign in to comment.