Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous callbacks for reading and writing on Windows, attempt 2. #1328

Merged
merged 1 commit into from
Sep 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 155 additions & 111 deletions src/serialport_win.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ void EIO_Open(uv_work_t* req) {
dcb.Parity = NOPARITY;
dcb.ByteSize = 8;
dcb.StopBits = ONESTOPBIT;


dcb.fOutxDsrFlow = FALSE;
dcb.fOutxCtsFlow = FALSE;

Expand Down Expand Up @@ -152,11 +152,10 @@ void EIO_Open(uv_work_t* req) {
return;
}

// Set the timeouts for read and write operations read operation is to return
// immediately with the bytes that have already been received, even if no bytes
// have been received.
COMMTIMEOUTS commTimeouts = {0};
commTimeouts.ReadIntervalTimeout = MAXDWORD; // Never timeout
// Set the timeouts for read and write operations.
// Read operation will wait for at least 1 byte to be received.
COMMTIMEOUTS commTimeouts = {};
commTimeouts.ReadIntervalTimeout = 0; // Never timeout, always wait for data.
commTimeouts.ReadTotalTimeoutMultiplier = 0; // Do not allow big read timeout when big read buffer used
commTimeouts.ReadTotalTimeoutConstant = 0; // Total read timeout (period of read loop)
commTimeouts.WriteTotalTimeoutConstant = 0; // Const part of write timeout
Expand Down Expand Up @@ -291,58 +290,63 @@ NAN_METHOD(Write) {
baton->bufferLength = bufferLength;
baton->offset = 0;
baton->callback.Reset(info[2].As<v8::Function>());
baton->complete = false;
// WriteFileEx requires a thread that can block. Create a new thread to
// run the write operation, saving the handle so it can be deallocated later.
baton->hThread = CreateThread(NULL, 0, WriteThread, baton, 0, NULL);
}

uv_work_t* req = new uv_work_t();
req->data = baton;

uv_queue_work(uv_default_loop(), req, EIO_Write, (uv_after_work_cb)EIO_AfterWrite);
void __stdcall WriteIOCompletion(DWORD errorCode, DWORD bytesTransferred, OVERLAPPED* ov) {
WriteBaton* baton = static_cast<WriteBaton*>(ov->hEvent);
DWORD bytesWritten;
if (!GetOverlappedResult((HANDLE)baton->fd, ov, &bytesWritten, TRUE)) {
errorCode = GetLastError();
ErrorCodeToString("Writing to COM port (GetOverlappedResult)", errorCode, baton->errorString);
baton->complete = true;
return;
}
if (bytesWritten) {
baton->offset += bytesWritten;
if (baton->offset >= baton->bufferLength) {
baton->complete = true;
}
}
}

void EIO_Write(uv_work_t* req) {
WriteBaton* data = static_cast<WriteBaton*>(req->data);
data->result = 0;

do {
OVERLAPPED ov = {0};
// Event used by GetOverlappedResult(..., TRUE) to wait for outgoing data or timeout
// Event MUST be used if program has several simultaneous asynchronous operations
// on the same handle (i.e. ReadFile and WriteFile)
ov.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);

// Start write operation - synchronous or asynchronous
DWORD bytesWritten = 0;
if (!WriteFile((HANDLE)data->fd, data->bufferData, static_cast<DWORD>(data->bufferLength), &bytesWritten, &ov)) {
DWORD lastError = GetLastError();
if (lastError != ERROR_IO_PENDING) {
// Write operation error
ErrorCodeToString("Writing to COM port (WriteFile)", lastError, data->errorString);
CloseHandle(ov.hEvent);
return;
}
// Write operation is completing asynchronously
// We MUST wait for the operation completion before deallocation of OVERLAPPED struct
// or write data buffer

// block for async write operation completion
bytesWritten = 0;
if (!GetOverlappedResult((HANDLE)data->fd, &ov, &bytesWritten, TRUE)) {
// Write operation error
DWORD lastError = GetLastError();
ErrorCodeToString("Writing to COM port (GetOverlappedResult)", lastError, data->errorString);
CloseHandle(ov.hEvent);
return;
}
DWORD __stdcall WriteThread(LPVOID param) {
WriteBaton* baton = static_cast<WriteBaton*>(param);

OVERLAPPED* ov = new OVERLAPPED;
memset(ov, 0, sizeof(ov));
ov->hEvent = static_cast<void*>(baton);

while (!baton->complete) {
char* offsetPtr = baton->bufferData + baton->offset;
// WriteFileEx requires calling GetLastError even upon success. Clear the error beforehand.
SetLastError(0);
WriteFileEx((HANDLE)baton->fd, offsetPtr, static_cast<DWORD>(baton->bufferLength), ov, WriteIOCompletion);
// Error codes when call is successful, such as ERROR_MORE_DATA.
DWORD lastError = GetLastError();
if (lastError != ERROR_SUCCESS) {
ErrorCodeToString("Writing to COM port (WriteFileEx)", lastError, baton->errorString);
break;
}
// Write operation completed synchronously
data->result = bytesWritten;
data->offset += data->result;
CloseHandle(ov.hEvent);
} while (data->bufferLength > data->offset);
// IOCompletion routine is only called once this thread is in an alertable wait state.
SleepEx(INFINITE, TRUE);
}
delete ov;
// Signal the main thread to run the callback.
uv_async_t* async = new uv_async_t;
uv_async_init(uv_default_loop(), async, EIO_AfterWrite);
async->data = baton;
uv_async_send(async);
ExitThread(0);
}

void EIO_AfterWrite(uv_work_t* req) {
void EIO_AfterWrite(uv_async_t* req) {
Nan::HandleScope scope;
WriteBaton* baton = static_cast<WriteBaton*>(req->data);
WaitForSingleObject(baton->hThread, INFINITE);
delete req;

v8::Local<v8::Value> argv[1];
Expand Down Expand Up @@ -404,81 +408,121 @@ NAN_METHOD(Read) {
baton->bufferLength = bufferLength;
baton->bufferData = node::Buffer::Data(buffer);
baton->callback.Reset(info[4].As<v8::Function>());

uv_work_t* req = new uv_work_t();
req->data = baton;
uv_queue_work(uv_default_loop(), req, EIO_Read, (uv_after_work_cb)EIO_AfterRead);
baton->complete = false;
// ReadFileEx requires a thread that can block. Create a new thread to
// run the read operation, saving the handle so it can be deallocated later.
baton->hThread = CreateThread(NULL, 0, ReadThread, baton, 0, NULL);
}

void EIO_Read(uv_work_t* req) {
ReadBaton* data = static_cast<ReadBaton*>(req->data);
data->bytesRead = 0;
int errorCode = ERROR_SUCCESS;
void __stdcall ReadIOCompletion(DWORD errorCode, DWORD bytesTransferred, OVERLAPPED* ov) {
ReadBaton* baton = static_cast<ReadBaton*>(ov->hEvent);

char* offsetPtr = data->bufferData;
offsetPtr += data->offset;
if (errorCode) {
ErrorCodeToString("Reading from COM port (ReadIOCompletion)", errorCode, baton->errorString);
baton->complete = true;
return;
}

// Event used by GetOverlappedResult(..., TRUE) to wait for incoming data or timeout
// Event MUST be used if program has several simultaneous asynchronous operations
// on the same handle (i.e. ReadFile and WriteFile)
HANDLE hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
DWORD lastError;
if (!GetOverlappedResult((HANDLE)baton->fd, ov, &bytesTransferred, TRUE)) {
lastError = GetLastError();
ErrorCodeToString("Reading from COM port (GetOverlappedResult)", lastError, baton->errorString);
baton->complete = true;
return;
}
if (bytesTransferred) {
baton->bytesRead += bytesTransferred;
baton->offset += bytesTransferred;
}

while (true) {
OVERLAPPED ov = {0};
ov.hEvent = hEvent;

// Start read operation - synchrounous or asynchronous
DWORD bytesReadSync = 0;
if (!ReadFile((HANDLE)data->fd, offsetPtr, data->bytesToRead, &bytesReadSync, &ov)) {
errorCode = GetLastError();
if (errorCode != ERROR_IO_PENDING) {
// Read operation error
if (errorCode == ERROR_OPERATION_ABORTED) {
} else {
ErrorCodeToString("Reading from COM port (ReadFile)", errorCode, data->errorString);
CloseHandle(hEvent);
return;
}
break;
}
// ReadFileEx and GetOverlappedResult retrieved only 1 byte. Read any additional data in the input
// buffer. Set the timeout to MAXDWORD in order to disable timeouts, so the read operation will
// return immediately no matter how much data is available.
COMMTIMEOUTS commTimeouts = {};
commTimeouts.ReadIntervalTimeout = MAXDWORD;
if (!SetCommTimeouts((HANDLE)baton->fd, &commTimeouts)) {
lastError = GetLastError();
ErrorCodeToString("Setting COM timeout (SetCommTimeouts)", lastError, baton->errorString);
baton->complete = true;
return;
}

// Read operation is asynchronous and is pending
// We MUST wait for operation completion before deallocation of OVERLAPPED struct
// or read data buffer

// Wait for async read operation completion or timeout
DWORD bytesReadAsync = 0;
if (!GetOverlappedResult((HANDLE)data->fd, &ov, &bytesReadAsync, TRUE)) {
// Read operation error
errorCode = GetLastError();
if (errorCode == ERROR_OPERATION_ABORTED) {
} else {
ErrorCodeToString("Reading from COM port (GetOverlappedResult)", errorCode, data->errorString);
CloseHandle(hEvent);
return;
}
break;
} else {
// Read operation completed asynchronously
data->bytesRead = bytesReadAsync;
}
} else {
// Read operation completed synchronously
data->bytesRead = bytesReadSync;
// Store additional data after whatever data has already been read.
char* offsetPtr = baton->bufferData + baton->offset;

// ReadFile, unlike ReadFileEx, needs an event in the overlapped structure.
ov->hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
if (!ReadFile((HANDLE)baton->fd, offsetPtr, baton->bytesToRead, &bytesTransferred, ov)) {
errorCode = GetLastError();

if (errorCode != ERROR_IO_PENDING) {
ErrorCodeToString("Reading from COM port (ReadFile)", errorCode, baton->errorString);
baton->complete = true;
CloseHandle(ov->hEvent);
return;
}

// Return data received if any
if (data->bytesRead > 0) {
break;
if (!GetOverlappedResult((HANDLE)baton->fd, ov, &bytesTransferred, TRUE)) {
lastError = GetLastError();
ErrorCodeToString("Reading from COM port (GetOverlappedResult)", lastError, baton->errorString);
baton->complete = true;
CloseHandle(ov->hEvent);
return;
}
}
CloseHandle(ov->hEvent);

baton->bytesToRead -= bytesTransferred;
baton->bytesRead += bytesTransferred;
baton->complete = true;
}

CloseHandle(hEvent);
DWORD __stdcall ReadThread(LPVOID param) {
ReadBaton* baton = static_cast<ReadBaton*>(param);
DWORD lastError;

OVERLAPPED* ov = new OVERLAPPED;
memset(ov, 0, sizeof(OVERLAPPED));
ov->hEvent = static_cast<void*>(baton);

while (!baton->complete) {
// Reset the read timeout to 0, so that it will block until more data arrives.
COMMTIMEOUTS commTimeouts = {};
commTimeouts.ReadIntervalTimeout = 0;
if (!SetCommTimeouts((HANDLE)baton->fd, &commTimeouts)) {
lastError = GetLastError();
ErrorCodeToString("Setting COM timeout (SetCommTimeouts)", lastError, baton->errorString);
break;
}
// ReadFileEx doesn't use overlapped's hEvent, so it is reserved for user data.
ov->hEvent = static_cast<HANDLE>(baton);
char* offsetPtr = baton->bufferData + baton->offset;
// ReadFileEx requires calling GetLastError even upon success. Clear the error beforehand.
SetLastError(0);
// Only read 1 byte, so that the callback will be triggered once any data arrives.
ReadFileEx((HANDLE)baton->fd, offsetPtr, 1, ov, ReadIOCompletion);
// Error codes when call is successful, such as ERROR_MORE_DATA.
lastError = GetLastError();
if (lastError != ERROR_SUCCESS) {
ErrorCodeToString("Reading from COM port (ReadFileEx)", lastError, baton->errorString);
break;
}
// IOCompletion routine is only called once this thread is in an alertable wait state.
SleepEx(INFINITE, TRUE);
}
delete ov;
// Signal the main thread to run the callback.
uv_async_t* async = new uv_async_t;
uv_async_init(uv_default_loop(), async, EIO_AfterRead);
async->data = baton;
uv_async_send(async);
ExitThread(0);
}

void EIO_AfterRead(uv_work_t* req) {
void EIO_AfterRead(uv_async_t* req) {
Nan::HandleScope scope;
ReadBaton* baton = static_cast<ReadBaton*>(req->data);
WaitForSingleObject(baton->hThread, INFINITE);
delete req;

v8::Local<v8::Value> argv[2];
Expand Down
12 changes: 10 additions & 2 deletions src/serialport_win.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ struct WriteBaton {
size_t bufferLength;
size_t offset;
size_t bytesWritten;
void* hThread;
bool complete;
Nan::Persistent<v8::Object> buffer;
Nan::Callback callback;
int result;
Expand All @@ -23,7 +25,9 @@ struct WriteBaton {

NAN_METHOD(Write);
void EIO_Write(uv_work_t* req);
void EIO_AfterWrite(uv_work_t* req);
void EIO_AfterWrite(uv_async_t* req);
DWORD __stdcall WriteThread(LPVOID param);


struct ReadBaton {
int fd;
Expand All @@ -32,13 +36,17 @@ struct ReadBaton {
size_t bytesRead;
size_t bytesToRead;
size_t offset;
void* hThread;
bool complete;
char errorString[ERROR_STRING_SIZE];
Nan::Callback callback;
};

NAN_METHOD(Read);
void EIO_Read(uv_work_t* req);
void EIO_AfterRead(uv_work_t* req);
void EIO_AfterRead(uv_async_t* req);
DWORD __stdcall ReadThread(LPVOID param);


NAN_METHOD(List);
void EIO_List(uv_work_t* req);
Expand Down