Skip to content

Commit

Permalink
fix(windows): Asynchronous callbacks for reading and writing on Windo…
Browse files Browse the repository at this point in the history
…ws (#1313)

Instead of ReadFile and WriteFile, which block and transfer data synchronously, use ReadFileEx and WriteFileEx, which both allow async callbacks. In addition, change how timeouts are used for ReadFile*, using an unlimited timeout for the first byte, and no timeout for the rest of the data in the input buffer. This removes the need to poll entirely, while still retrieving all data available in the input buffer. In both cases, the I/O operations happen in their own threads, since Windows requires IOCompletion callbacks to wait for their calling thread to be in an "alertable wait state".

Fixes #1221
  • Loading branch information
dustmop authored and reconbot committed Jul 24, 2018
1 parent 78e9ea9 commit 250a750
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 111 deletions.
262 changes: 153 additions & 109 deletions packages/serialport-util/packages/node-serialport/src/serialport_win.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ typedef BOOL (WINAPI *CancelIoExType)(HANDLE hFile, LPOVERLAPPED lpOverlapped);

std::list<int> g_closingHandles;


void ErrorCodeToString(const char* prefix, int errorCode, char *errorStr) {
switch (errorCode) {
case ERROR_FILE_NOT_FOUND:
Expand Down Expand Up @@ -152,11 +153,10 @@ void EIO_Open(uv_work_t* req) {
return;
}

// Set the timeouts for read and write operations read operation is to return
// immediately with the bytes that have already been received, even if no bytes
// have been received.
COMMTIMEOUTS commTimeouts = {0};
commTimeouts.ReadIntervalTimeout = MAXDWORD; // Never timeout
// Set the timeouts for read and write operations.
// Read operation will wait for at least 1 byte to be received.
COMMTIMEOUTS commTimeouts = {};
commTimeouts.ReadIntervalTimeout = 0; // Never timeout, always wait for data.
commTimeouts.ReadTotalTimeoutMultiplier = 0; // Do not allow big read timeout when big read buffer used
commTimeouts.ReadTotalTimeoutConstant = 0; // Total read timeout (period of read loop)
commTimeouts.WriteTotalTimeoutConstant = 0; // Const part of write timeout
Expand Down Expand Up @@ -291,58 +291,63 @@ NAN_METHOD(Write) {
baton->bufferLength = bufferLength;
baton->offset = 0;
baton->callback.Reset(info[2].As<v8::Function>());
baton->complete = false;
// WriteFileEx requires a thread that can block. Create a new thread to
// run the write operation, saving the handle so it can be deallocated later.
baton->hThread = CreateThread(NULL, 0, WriteThread, baton, 0, NULL);
}

uv_work_t* req = new uv_work_t();
req->data = baton;

uv_queue_work(uv_default_loop(), req, EIO_Write, (uv_after_work_cb)EIO_AfterWrite);
void __stdcall WriteIOCompletion(DWORD errorCode, DWORD bytesTransferred, OVERLAPPED* ov) {
WriteBaton* baton = static_cast<WriteBaton*>(ov->hEvent);
DWORD bytesWritten;
if (!GetOverlappedResult((HANDLE)baton->fd, ov, &bytesWritten, TRUE)) {
errorCode = GetLastError();
ErrorCodeToString("Writing to COM port (GetOverlappedResult)", errorCode, baton->errorString);
baton->complete = true;
return;
}
if (bytesWritten) {
baton->offset += bytesWritten;
if (baton->offset >= baton->bufferLength) {
baton->complete = true;
}
}
}

void EIO_Write(uv_work_t* req) {
WriteBaton* data = static_cast<WriteBaton*>(req->data);
data->result = 0;

do {
OVERLAPPED ov = {0};
// Event used by GetOverlappedResult(..., TRUE) to wait for outgoing data or timeout
// Event MUST be used if program has several simultaneous asynchronous operations
// on the same handle (i.e. ReadFile and WriteFile)
ov.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);

// Start write operation - synchronous or asynchronous
DWORD bytesWritten = 0;
if (!WriteFile((HANDLE)data->fd, data->bufferData, static_cast<DWORD>(data->bufferLength), &bytesWritten, &ov)) {
DWORD lastError = GetLastError();
if (lastError != ERROR_IO_PENDING) {
// Write operation error
ErrorCodeToString("Writing to COM port (WriteFile)", lastError, data->errorString);
CloseHandle(ov.hEvent);
return;
}
// Write operation is completing asynchronously
// We MUST wait for the operation completion before deallocation of OVERLAPPED struct
// or write data buffer

// block for async write operation completion
bytesWritten = 0;
if (!GetOverlappedResult((HANDLE)data->fd, &ov, &bytesWritten, TRUE)) {
// Write operation error
DWORD lastError = GetLastError();
ErrorCodeToString("Writing to COM port (GetOverlappedResult)", lastError, data->errorString);
CloseHandle(ov.hEvent);
return;
}
DWORD WriteThread(void* param) {
WriteBaton* baton = static_cast<WriteBaton*>(param);

OVERLAPPED* ov = new OVERLAPPED;
memset(ov, 0, sizeof(ov));
ov->hEvent = static_cast<void*>(baton);

while (!baton->complete) {
char* offsetPtr = baton->bufferData + baton->offset;
// WriteFileEx requires calling GetLastError even upon success. Clear the error beforehand.
SetLastError(0);
WriteFileEx((HANDLE)baton->fd, offsetPtr, static_cast<DWORD>(baton->bufferLength), ov, WriteIOCompletion);
// Error codes when call is successful, such as ERROR_MORE_DATA.
DWORD lastError = GetLastError();
if (lastError != ERROR_SUCCESS) {
ErrorCodeToString("Writing to COM port (WriteFileEx)", lastError, baton->errorString);
break;
}
// Write operation completed synchronously
data->result = bytesWritten;
data->offset += data->result;
CloseHandle(ov.hEvent);
} while (data->bufferLength > data->offset);
// IOCompletion routine is only called once this thread is in an alertable wait state.
SleepEx(INFINITE, TRUE);
}
delete ov;
// Signal the main thread to run the callback.
uv_async_t* async = new uv_async_t;
uv_async_init(uv_default_loop(), async, EIO_AfterWrite);
async->data = baton;
uv_async_send(async);
ExitThread(0);
}

void EIO_AfterWrite(uv_work_t* req) {
void EIO_AfterWrite(uv_async_t* req) {
Nan::HandleScope scope;
WriteBaton* baton = static_cast<WriteBaton*>(req->data);
WaitForSingleObject(baton->hThread, INFINITE);
delete req;

v8::Local<v8::Value> argv[1];
Expand Down Expand Up @@ -404,81 +409,120 @@ NAN_METHOD(Read) {
baton->bufferLength = bufferLength;
baton->bufferData = node::Buffer::Data(buffer);
baton->callback.Reset(info[4].As<v8::Function>());

uv_work_t* req = new uv_work_t();
req->data = baton;
uv_queue_work(uv_default_loop(), req, EIO_Read, (uv_after_work_cb)EIO_AfterRead);
baton->complete = false;
// ReadFileEx requires a thread that can block. Create a new thread to
// run the read operation, saving the handle so it can be deallocated later.
baton->hThread = CreateThread(NULL, 0, ReadThread, baton, 0, NULL);
}

void EIO_Read(uv_work_t* req) {
ReadBaton* data = static_cast<ReadBaton*>(req->data);
data->bytesRead = 0;
int errorCode = ERROR_SUCCESS;
void __stdcall ReadIOCompletion(DWORD errorCode, DWORD bytesTransferred, OVERLAPPED* ov) {
ReadBaton* baton = static_cast<ReadBaton*>(ov->hEvent);

if (errorCode) {
ErrorCodeToString("Reading from COM port (ReadIOCompletion)", errorCode, baton->errorString);
baton->complete = true;
return;
}

DWORD lastError;
if (!GetOverlappedResult((HANDLE)baton->fd, ov, &bytesTransferred, TRUE)) {
lastError = GetLastError();
ErrorCodeToString("Reading from COM port (GetOverlappedResult)", lastError, baton->errorString);
baton->complete = true;
return;
}
if (bytesTransferred) {
baton->bytesRead += bytesTransferred;
baton->offset += bytesTransferred;
}

char* offsetPtr = data->bufferData;
offsetPtr += data->offset;
// ReadFileEx and GetOverlappedResult retrieved only 1 byte. Read any additional data in the input
// buffer. Set the timeout to MAXDWORD in order to disable timeouts, so the read operation will
// return immediately no matter how much data is available.
COMMTIMEOUTS commTimeouts = {};
commTimeouts.ReadIntervalTimeout = MAXDWORD;
if (!SetCommTimeouts((HANDLE)baton->fd, &commTimeouts)) {
lastError = GetLastError();
ErrorCodeToString("Setting COM timeout (SetCommTimeouts)", lastError, baton->errorString);
baton->complete = true;
return;
}

// Event used by GetOverlappedResult(..., TRUE) to wait for incoming data or timeout
// Event MUST be used if program has several simultaneous asynchronous operations
// on the same handle (i.e. ReadFile and WriteFile)
HANDLE hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
// Store additional data after whatever data has already been read.
char* offsetPtr = baton->bufferData + baton->offset;

while (true) {
OVERLAPPED ov = {0};
ov.hEvent = hEvent;

// Start read operation - synchrounous or asynchronous
DWORD bytesReadSync = 0;
if (!ReadFile((HANDLE)data->fd, offsetPtr, data->bytesToRead, &bytesReadSync, &ov)) {
errorCode = GetLastError();
if (errorCode != ERROR_IO_PENDING) {
// Read operation error
if (errorCode == ERROR_OPERATION_ABORTED) {
} else {
ErrorCodeToString("Reading from COM port (ReadFile)", errorCode, data->errorString);
CloseHandle(hEvent);
return;
}
break;
}
// ReadFile, unlike ReadFileEx, needs an event in the overlapped structure.
ov->hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
if (!ReadFile((HANDLE)baton->fd, offsetPtr, baton->bufferLength - baton->bytesRead, &bytesTransferred, ov)) {
errorCode = GetLastError();

// Read operation is asynchronous and is pending
// We MUST wait for operation completion before deallocation of OVERLAPPED struct
// or read data buffer

// Wait for async read operation completion or timeout
DWORD bytesReadAsync = 0;
if (!GetOverlappedResult((HANDLE)data->fd, &ov, &bytesReadAsync, TRUE)) {
// Read operation error
errorCode = GetLastError();
if (errorCode == ERROR_OPERATION_ABORTED) {
} else {
ErrorCodeToString("Reading from COM port (GetOverlappedResult)", errorCode, data->errorString);
CloseHandle(hEvent);
return;
}
break;
} else {
// Read operation completed asynchronously
data->bytesRead = bytesReadAsync;
}
} else {
// Read operation completed synchronously
data->bytesRead = bytesReadSync;
if (errorCode != ERROR_IO_PENDING) {
ErrorCodeToString("Reading from COM port (ReadFile)", errorCode, baton->errorString);
baton->complete = true;
CloseHandle(ov->hEvent);
return;
}

// Return data received if any
if (data->bytesRead > 0) {
break;
if (!GetOverlappedResult((HANDLE)baton->fd, ov, &bytesTransferred, TRUE)) {
lastError = GetLastError();
ErrorCodeToString("Reading from COM port (GetOverlappedResult)", lastError, baton->errorString);
baton->complete = true;
CloseHandle(ov->hEvent);
return;
}
}
CloseHandle(ov->hEvent);

baton->bytesRead += bytesTransferred;
baton->complete = true;
}

CloseHandle(hEvent);
DWORD __stdcall ReadThread(void* param) {
ReadBaton* baton = static_cast<ReadBaton*>(param);
DWORD lastError;

OVERLAPPED* ov = new OVERLAPPED;
memset(ov, 0, sizeof(OVERLAPPED));
ov->hEvent = static_cast<void*>(baton);

while (!baton->complete) {
// Reset the read timeout to 0, so that it will block until more data arrives.
COMMTIMEOUTS commTimeouts = {};
commTimeouts.ReadIntervalTimeout = 0;
if (!SetCommTimeouts((HANDLE)baton->fd, &commTimeouts)) {
lastError = GetLastError();
ErrorCodeToString("Setting COM timeout (SetCommTimeouts)", lastError, baton->errorString);
break;
}
// ReadFileEx doesn't use overlapped's hEvent, so it is reserved for user data.
ov->hEvent = static_cast<HANDLE>(baton);
char* offsetPtr = baton->bufferData + baton->offset;
// ReadFileEx requires calling GetLastError even upon success. Clear the error beforehand.
SetLastError(0);
// Only read 1 byte, so that the callback will be triggered once any data arrives.
ReadFileEx((HANDLE)baton->fd, offsetPtr, 1, ov, ReadIOCompletion);
// Error codes when call is successful, such as ERROR_MORE_DATA.
lastError = GetLastError();
if (lastError != ERROR_SUCCESS) {
ErrorCodeToString("Reading from COM port (ReadFileEx)", lastError, baton->errorString);
break;
}
// IOCompletion routine is only called once this thread is in an alertable wait state.
SleepEx(INFINITE, TRUE);
}
delete ov;
// Signal the main thread to run the callback.
uv_async_t* async = new uv_async_t;
uv_async_init(uv_default_loop(), async, EIO_AfterRead);
async->data = baton;
uv_async_send(async);
ExitThread(0);
}

void EIO_AfterRead(uv_work_t* req) {
void EIO_AfterRead(uv_async_t* req) {
Nan::HandleScope scope;
ReadBaton* baton = static_cast<ReadBaton*>(req->data);
WaitForSingleObject(baton->hThread, INFINITE);
delete req;

v8::Local<v8::Value> argv[2];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ struct WriteBaton {
size_t bufferLength;
size_t offset;
size_t bytesWritten;
void* hThread;
bool complete;
Nan::Persistent<v8::Object> buffer;
Nan::Callback callback;
int result;
Expand All @@ -23,7 +25,9 @@ struct WriteBaton {

NAN_METHOD(Write);
void EIO_Write(uv_work_t* req);
void EIO_AfterWrite(uv_work_t* req);
void EIO_AfterWrite(uv_async_t* req);
DWORD WriteThread(void* param);


struct ReadBaton {
int fd;
Expand All @@ -32,13 +36,17 @@ struct ReadBaton {
size_t bytesRead;
size_t bytesToRead;
size_t offset;
void* hThread;
bool complete;
char errorString[ERROR_STRING_SIZE];
Nan::Callback callback;
};

NAN_METHOD(Read);
void EIO_Read(uv_work_t* req);
void EIO_AfterRead(uv_work_t* req);
void EIO_AfterRead(uv_async_t* req);
DWORD ReadThread(void* param);


NAN_METHOD(List);
void EIO_List(uv_work_t* req);
Expand Down

0 comments on commit 250a750

Please sign in to comment.