From 4cc435ce3885d73c57fdeccf3dd875be9e84b802 Mon Sep 17 00:00:00 2001 From: Dustin Long Date: Tue, 5 Sep 2017 22:33:26 -0400 Subject: [PATCH] Asynchronous callbacks for reading and writing on Windows, attempt 2. Instead of ReadFile and WriteFile, which block and transfer data synchronously, use ReadFileEx and WriteFileEx, which both allow async callbacks. In addition, change how timeouts are used for ReadFile*, using an unlimited timeout for the first byte, and no timeout for the rest of the data in the input buffer. This removes the need to poll entirely, while still retrieving all data available in the input buffer. In both cases, the I/O operations happen in their own threads, since Windows requires IOCompletion callbacks to wait for their calling thread to be in an "alertable wait state". Fixes #1221 --- src/serialport_win.cpp | 266 ++++++++++++++++++++++++----------------- src/serialport_win.h | 12 +- 2 files changed, 165 insertions(+), 113 deletions(-) diff --git a/src/serialport_win.cpp b/src/serialport_win.cpp index 9213bae60..d252f5ddb 100644 --- a/src/serialport_win.cpp +++ b/src/serialport_win.cpp @@ -89,8 +89,8 @@ void EIO_Open(uv_work_t* req) { dcb.Parity = NOPARITY; dcb.ByteSize = 8; dcb.StopBits = ONESTOPBIT; - - + + dcb.fOutxDsrFlow = FALSE; dcb.fOutxCtsFlow = FALSE; @@ -152,11 +152,10 @@ void EIO_Open(uv_work_t* req) { return; } - // Set the timeouts for read and write operations read operation is to return - // immediately with the bytes that have already been received, even if no bytes - // have been received. - COMMTIMEOUTS commTimeouts = {0}; - commTimeouts.ReadIntervalTimeout = MAXDWORD; // Never timeout + // Set the timeouts for read and write operations. + // Read operation will wait for at least 1 byte to be received. + COMMTIMEOUTS commTimeouts = {}; + commTimeouts.ReadIntervalTimeout = 0; // Never timeout, always wait for data. commTimeouts.ReadTotalTimeoutMultiplier = 0; // Do not allow big read timeout when big read buffer used commTimeouts.ReadTotalTimeoutConstant = 0; // Total read timeout (period of read loop) commTimeouts.WriteTotalTimeoutConstant = 0; // Const part of write timeout @@ -291,58 +290,63 @@ NAN_METHOD(Write) { baton->bufferLength = bufferLength; baton->offset = 0; baton->callback.Reset(info[2].As()); + baton->complete = false; + // WriteFileEx requires a thread that can block. Create a new thread to + // run the write operation, saving the handle so it can be deallocated later. + baton->hThread = CreateThread(NULL, 0, WriteThread, baton, 0, NULL); +} - uv_work_t* req = new uv_work_t(); - req->data = baton; - - uv_queue_work(uv_default_loop(), req, EIO_Write, (uv_after_work_cb)EIO_AfterWrite); +void __stdcall WriteIOCompletion(DWORD errorCode, DWORD bytesTransferred, OVERLAPPED* ov) { + WriteBaton* baton = static_cast(ov->hEvent); + DWORD bytesWritten; + if (!GetOverlappedResult((HANDLE)baton->fd, ov, &bytesWritten, TRUE)) { + errorCode = GetLastError(); + ErrorCodeToString("Writing to COM port (GetOverlappedResult)", errorCode, baton->errorString); + baton->complete = true; + return; + } + if (bytesWritten) { + baton->offset += bytesWritten; + if (baton->offset >= baton->bufferLength) { + baton->complete = true; + } + } } -void EIO_Write(uv_work_t* req) { - WriteBaton* data = static_cast(req->data); - data->result = 0; - - do { - OVERLAPPED ov = {0}; - // Event used by GetOverlappedResult(..., TRUE) to wait for outgoing data or timeout - // Event MUST be used if program has several simultaneous asynchronous operations - // on the same handle (i.e. ReadFile and WriteFile) - ov.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); - - // Start write operation - synchronous or asynchronous - DWORD bytesWritten = 0; - if (!WriteFile((HANDLE)data->fd, data->bufferData, static_cast(data->bufferLength), &bytesWritten, &ov)) { - DWORD lastError = GetLastError(); - if (lastError != ERROR_IO_PENDING) { - // Write operation error - ErrorCodeToString("Writing to COM port (WriteFile)", lastError, data->errorString); - CloseHandle(ov.hEvent); - return; - } - // Write operation is completing asynchronously - // We MUST wait for the operation completion before deallocation of OVERLAPPED struct - // or write data buffer - - // block for async write operation completion - bytesWritten = 0; - if (!GetOverlappedResult((HANDLE)data->fd, &ov, &bytesWritten, TRUE)) { - // Write operation error - DWORD lastError = GetLastError(); - ErrorCodeToString("Writing to COM port (GetOverlappedResult)", lastError, data->errorString); - CloseHandle(ov.hEvent); - return; - } +DWORD __stdcall WriteThread(LPVOID param) { + WriteBaton* baton = static_cast(param); + + OVERLAPPED* ov = new OVERLAPPED; + memset(ov, 0, sizeof(ov)); + ov->hEvent = static_cast(baton); + + while (!baton->complete) { + char* offsetPtr = baton->bufferData + baton->offset; + // WriteFileEx requires calling GetLastError even upon success. Clear the error beforehand. + SetLastError(0); + WriteFileEx((HANDLE)baton->fd, offsetPtr, static_cast(baton->bufferLength), ov, WriteIOCompletion); + // Error codes when call is successful, such as ERROR_MORE_DATA. + DWORD lastError = GetLastError(); + if (lastError != ERROR_SUCCESS) { + ErrorCodeToString("Writing to COM port (WriteFileEx)", lastError, baton->errorString); + break; } - // Write operation completed synchronously - data->result = bytesWritten; - data->offset += data->result; - CloseHandle(ov.hEvent); - } while (data->bufferLength > data->offset); + // IOCompletion routine is only called once this thread is in an alertable wait state. + SleepEx(INFINITE, TRUE); + } + delete ov; + // Signal the main thread to run the callback. + uv_async_t* async = new uv_async_t; + uv_async_init(uv_default_loop(), async, EIO_AfterWrite); + async->data = baton; + uv_async_send(async); + ExitThread(0); } -void EIO_AfterWrite(uv_work_t* req) { +void EIO_AfterWrite(uv_async_t* req) { Nan::HandleScope scope; WriteBaton* baton = static_cast(req->data); + WaitForSingleObject(baton->hThread, INFINITE); delete req; v8::Local argv[1]; @@ -404,81 +408,121 @@ NAN_METHOD(Read) { baton->bufferLength = bufferLength; baton->bufferData = node::Buffer::Data(buffer); baton->callback.Reset(info[4].As()); - - uv_work_t* req = new uv_work_t(); - req->data = baton; - uv_queue_work(uv_default_loop(), req, EIO_Read, (uv_after_work_cb)EIO_AfterRead); + baton->complete = false; + // ReadFileEx requires a thread that can block. Create a new thread to + // run the read operation, saving the handle so it can be deallocated later. + baton->hThread = CreateThread(NULL, 0, ReadThread, baton, 0, NULL); } -void EIO_Read(uv_work_t* req) { - ReadBaton* data = static_cast(req->data); - data->bytesRead = 0; - int errorCode = ERROR_SUCCESS; +void __stdcall ReadIOCompletion(DWORD errorCode, DWORD bytesTransferred, OVERLAPPED* ov) { + ReadBaton* baton = static_cast(ov->hEvent); - char* offsetPtr = data->bufferData; - offsetPtr += data->offset; + if (errorCode) { + ErrorCodeToString("Reading from COM port (ReadIOCompletion)", errorCode, baton->errorString); + baton->complete = true; + return; + } - // Event used by GetOverlappedResult(..., TRUE) to wait for incoming data or timeout - // Event MUST be used if program has several simultaneous asynchronous operations - // on the same handle (i.e. ReadFile and WriteFile) - HANDLE hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); + DWORD lastError; + if (!GetOverlappedResult((HANDLE)baton->fd, ov, &bytesTransferred, TRUE)) { + lastError = GetLastError(); + ErrorCodeToString("Reading from COM port (GetOverlappedResult)", lastError, baton->errorString); + baton->complete = true; + return; + } + if (bytesTransferred) { + baton->bytesRead += bytesTransferred; + baton->offset += bytesTransferred; + } - while (true) { - OVERLAPPED ov = {0}; - ov.hEvent = hEvent; - - // Start read operation - synchrounous or asynchronous - DWORD bytesReadSync = 0; - if (!ReadFile((HANDLE)data->fd, offsetPtr, data->bytesToRead, &bytesReadSync, &ov)) { - errorCode = GetLastError(); - if (errorCode != ERROR_IO_PENDING) { - // Read operation error - if (errorCode == ERROR_OPERATION_ABORTED) { - } else { - ErrorCodeToString("Reading from COM port (ReadFile)", errorCode, data->errorString); - CloseHandle(hEvent); - return; - } - break; - } + // ReadFileEx and GetOverlappedResult retrieved only 1 byte. Read any additional data in the input + // buffer. Set the timeout to MAXDWORD in order to disable timeouts, so the read operation will + // return immediately no matter how much data is available. + COMMTIMEOUTS commTimeouts = {}; + commTimeouts.ReadIntervalTimeout = MAXDWORD; + if (!SetCommTimeouts((HANDLE)baton->fd, &commTimeouts)) { + lastError = GetLastError(); + ErrorCodeToString("Setting COM timeout (SetCommTimeouts)", lastError, baton->errorString); + baton->complete = true; + return; + } - // Read operation is asynchronous and is pending - // We MUST wait for operation completion before deallocation of OVERLAPPED struct - // or read data buffer - - // Wait for async read operation completion or timeout - DWORD bytesReadAsync = 0; - if (!GetOverlappedResult((HANDLE)data->fd, &ov, &bytesReadAsync, TRUE)) { - // Read operation error - errorCode = GetLastError(); - if (errorCode == ERROR_OPERATION_ABORTED) { - } else { - ErrorCodeToString("Reading from COM port (GetOverlappedResult)", errorCode, data->errorString); - CloseHandle(hEvent); - return; - } - break; - } else { - // Read operation completed asynchronously - data->bytesRead = bytesReadAsync; - } - } else { - // Read operation completed synchronously - data->bytesRead = bytesReadSync; + // Store additional data after whatever data has already been read. + char* offsetPtr = baton->bufferData + baton->offset; + + // ReadFile, unlike ReadFileEx, needs an event in the overlapped structure. + ov->hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); + if (!ReadFile((HANDLE)baton->fd, offsetPtr, baton->bytesToRead, &bytesTransferred, ov)) { + errorCode = GetLastError(); + + if (errorCode != ERROR_IO_PENDING) { + ErrorCodeToString("Reading from COM port (ReadFile)", errorCode, baton->errorString); + baton->complete = true; + CloseHandle(ov->hEvent); + return; } - // Return data received if any - if (data->bytesRead > 0) { - break; + if (!GetOverlappedResult((HANDLE)baton->fd, ov, &bytesTransferred, TRUE)) { + lastError = GetLastError(); + ErrorCodeToString("Reading from COM port (GetOverlappedResult)", lastError, baton->errorString); + baton->complete = true; + CloseHandle(ov->hEvent); + return; } } + CloseHandle(ov->hEvent); + + baton->bytesToRead -= bytesTransferred; + baton->bytesRead += bytesTransferred; + baton->complete = true; +} - CloseHandle(hEvent); +DWORD __stdcall ReadThread(LPVOID param) { + ReadBaton* baton = static_cast(param); + DWORD lastError; + + OVERLAPPED* ov = new OVERLAPPED; + memset(ov, 0, sizeof(OVERLAPPED)); + ov->hEvent = static_cast(baton); + + while (!baton->complete) { + // Reset the read timeout to 0, so that it will block until more data arrives. + COMMTIMEOUTS commTimeouts = {}; + commTimeouts.ReadIntervalTimeout = 0; + if (!SetCommTimeouts((HANDLE)baton->fd, &commTimeouts)) { + lastError = GetLastError(); + ErrorCodeToString("Setting COM timeout (SetCommTimeouts)", lastError, baton->errorString); + break; + } + // ReadFileEx doesn't use overlapped's hEvent, so it is reserved for user data. + ov->hEvent = static_cast(baton); + char* offsetPtr = baton->bufferData + baton->offset; + // ReadFileEx requires calling GetLastError even upon success. Clear the error beforehand. + SetLastError(0); + // Only read 1 byte, so that the callback will be triggered once any data arrives. + ReadFileEx((HANDLE)baton->fd, offsetPtr, 1, ov, ReadIOCompletion); + // Error codes when call is successful, such as ERROR_MORE_DATA. + lastError = GetLastError(); + if (lastError != ERROR_SUCCESS) { + ErrorCodeToString("Reading from COM port (ReadFileEx)", lastError, baton->errorString); + break; + } + // IOCompletion routine is only called once this thread is in an alertable wait state. + SleepEx(INFINITE, TRUE); + } + delete ov; + // Signal the main thread to run the callback. + uv_async_t* async = new uv_async_t; + uv_async_init(uv_default_loop(), async, EIO_AfterRead); + async->data = baton; + uv_async_send(async); + ExitThread(0); } -void EIO_AfterRead(uv_work_t* req) { +void EIO_AfterRead(uv_async_t* req) { Nan::HandleScope scope; ReadBaton* baton = static_cast(req->data); + WaitForSingleObject(baton->hThread, INFINITE); delete req; v8::Local argv[2]; diff --git a/src/serialport_win.h b/src/serialport_win.h index 2eb194efc..6ae2fea02 100644 --- a/src/serialport_win.h +++ b/src/serialport_win.h @@ -15,6 +15,8 @@ struct WriteBaton { size_t bufferLength; size_t offset; size_t bytesWritten; + void* hThread; + bool complete; Nan::Persistent buffer; Nan::Callback callback; int result; @@ -23,7 +25,9 @@ struct WriteBaton { NAN_METHOD(Write); void EIO_Write(uv_work_t* req); -void EIO_AfterWrite(uv_work_t* req); +void EIO_AfterWrite(uv_async_t* req); +DWORD __stdcall WriteThread(LPVOID param); + struct ReadBaton { int fd; @@ -32,13 +36,17 @@ struct ReadBaton { size_t bytesRead; size_t bytesToRead; size_t offset; + void* hThread; + bool complete; char errorString[ERROR_STRING_SIZE]; Nan::Callback callback; }; NAN_METHOD(Read); void EIO_Read(uv_work_t* req); -void EIO_AfterRead(uv_work_t* req); +void EIO_AfterRead(uv_async_t* req); +DWORD __stdcall ReadThread(LPVOID param); + NAN_METHOD(List); void EIO_List(uv_work_t* req);