-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Windows下基于iocp的的异步文件IO #637
Comments
这个help也太难了吧😓 |
https://github.com/sogou/workflow/blob/master/docs/tutorial-09-http_file_server.md |
文件名的接口已经写好了,忘了更新文档。 workflow/src/factory/WFTaskFactory.h Line 190 in b71d744
改std::function不现实。接口太难看了。文件名的话就可以跨平台了。 |
嗯....传文件名字作为参数,那现在在读取前,仍然不知道文件的大小和相关信息,这代表create_pread_task前还是要先打开文件获取到文件大小再调用create_pread_task来读取。这难道不会导致create_pread_task使用文件名字作为参数没有意义嘛 |
如果要读取整个文件,Linux下的做法是先用state函数,得到文件大小。 |
所以说我提出将一个回调作为参数,然后文件名也作为参数 |
我们没有这种形式的接口。我们之前另一个想法是做一个WFFileSystem,用文件名创建任务的时候,由这个类来管理fd或HANDLE。你说的这些功能我更愿意用这个方式来解决。 |
能不能麻烦解释下IOService中的各个成员的作用 |
目前我已经理解了框架中的task如何提交到Service这种运作方式。 |
IOService和series是两个层次的东西。最好不用混着一起看。如果你想实现iocp下的文件aio,只需要实现iocp版IOService和IOSession的接口就可以了。但这个真的挺难的。 |
收到,阅读源码过程中最不好理解的就是这个部分,现在有了你的描述就好说了 |
class __WFFilepreadTask : public WFFilepreadTask
{
public:
__WFFilepreadTask(const std::string& path, void *buf, size_t count,
off_t offset, IOService *service, fio_callback_t&& cb):
WFFilepreadTask(-1, buf, count, offset, service, std::move(cb)),
pathname(path)
{
}
protected:
virtual int prepare()
{
HANDLE handle = CreateFile(
this->pathname.c_str(),// 文件路径
GENERIC_READ,// 打开文件以进行读取
FILE_SHARE_READ,// 共享模式
NULL,// 安全属性(可以为NULL)
OPEN_EXISTING,// 打开现有文件
FILE_ATTRIBUTE_NORMAL,// 文件属性
NULL// 模板文件句柄(可以为NULL)
);
int fd = _open_osfhandle((intptr_t)handle, 0);
this->args.fd =fd;
if (this->args.fd < 0)
return -1;
return WFFilepreadTask::prepare();
}
virtual SubTask *done()
{
if (this->args.fd >= 0)
{
close(this->args.fd);
this->args.fd = -1;
}
return WFFilepreadTask::done();
}
protected:
std::string pathname;
};
class __WFFilepwriteTask : public WFFilepwriteTask
{
public:
__WFFilepwriteTask(const std::string& path, const void *buf, size_t count,
off_t offset, IOService *service, fio_callback_t&& cb):
WFFilepwriteTask(-1, buf, count, offset, service, std::move(cb)),
pathname(path)
{
}
protected:
virtual int prepare()
{
HANDLE handle = CreateFile(
this->pathname.c_str(),// 文件路径
GENERIC_READ,// 打开文件以进行读取
FILE_SHARE_READ,// 共享模式
NULL,// 安全属性(可以为NULL)
OPEN_EXISTING,// 打开现有文件
FILE_ATTRIBUTE_NORMAL,// 文件属性
NULL// 模板文件句柄(可以为NULL)
);
int fd = _open_osfhandle((intptr_t)handle, 0);
this->args.fd =fd;
if (this->args.fd < 0)
return -1;
return WFFilepwriteTask::prepare();
}
virtual SubTask *done()
{
if (this->args.fd >= 0)
{
close(this->args.fd);
this->args.fd = -1;
}
return WFFilepwriteTask::done();
}
protected:
std::string pathname;
};
class __WFFilepreadvTask : public WFFilepreadvTask
{
public:
__WFFilepreadvTask(const std::string& path, const struct iovec *iov,
int iovcnt, off_t offset, IOService *service,
fvio_callback_t&& cb) :
WFFilepreadvTask(-1, iov, iovcnt, offset, service, std::move(cb)),
pathname(path)
{
}
protected:
virtual int prepare()
{
HANDLE handle = CreateFile(
this->pathname.c_str(),// 文件路径
GENERIC_READ,// 打开文件以进行读取
FILE_SHARE_READ,// 共享模式
NULL,// 安全属性(可以为NULL)
OPEN_EXISTING,// 打开现有文件
FILE_ATTRIBUTE_NORMAL,// 文件属性
NULL// 模板文件句柄(可以为NULL)
);
int fd = _open_osfhandle((intptr_t)handle, 0);
this->args.fd =fd;
if (this->args.fd < 0)
return -1;
return WFFilepreadvTask::prepare();
}
virtual SubTask *done()
{
if (this->args.fd >= 0)
{
close(this->args.fd);
this->args.fd = -1;
}
return WFFilepreadvTask::done();
}
protected:
std::string pathname;
};
class __WFFilepwritevTask : public WFFilepwritevTask
{
public:
__WFFilepwritevTask(const std::string& path, const struct iovec *iov,
int iovcnt, off_t offset, IOService *service,
fvio_callback_t&& cb) :
WFFilepwritevTask(-1, iov, iovcnt, offset, service, std::move(cb)),
pathname(path)
{
}
protected:
virtual int prepare()
{
HANDLE handle = CreateFile(
this->pathname.c_str(),// 文件路径
GENERIC_READ,// 打开文件以进行读取
FILE_SHARE_READ,// 共享模式
NULL,// 安全属性(可以为NULL)
OPEN_EXISTING,// 打开现有文件
FILE_ATTRIBUTE_NORMAL,// 文件属性
NULL// 模板文件句柄(可以为NULL)
);
int fd = _open_osfhandle((intptr_t)handle, 0);
this->args.fd =fd;
if (this->args.fd < 0)
return -1;
return WFFilepwritevTask::prepare();
}
protected:
virtual SubTask *done()
{
if (this->args.fd >= 0)
{
close(this->args.fd);
this->args.fd = -1;
}
return WFFilepwritevTask::done();
}
protected:
std::string pathname;
};
static int __writefile_io(IOCPData *iocp_data, int timeout)
{
WriteContext *ctx = (WriteContext *)iocp_data->data.context;
int ret = WriteFile(iocp_data->data.handle, ctx->entry, ctx->count, NULL,
&iocp_data->overlap);
if (ret == 0 || WSAGetLastError() == WSA_IO_PENDING)
{
if (ret != 0 && timeout == 0)
CancelIoEx(iocp_data->data.handle, &iocp_data->overlap);
return -1;
}
errno = WSAGetLastError();
return 0; // 成功启动异步操作
}
static int __readfile_io(IOCPData *iocp_data, int timeout)
{
ReadContext *ctx = (ReadContext *)iocp_data->data.context;
int ret = ReadFile(iocp_data->data.handle, ctx->entry, ctx->msgsize, NULL,
&iocp_data->overlap);
if (ret == 0 || WSAGetLastError() == WSA_IO_PENDING)
{
if (ret != 0 && timeout == 0)
CancelIoEx(iocp_data->data.handle, &iocp_data->overlap);
return -1;
}
errno = WSAGetLastError();
return 0; // 成功启动异步操作
} 各位大神看看这个可以不 |
Workflow支持异步文件IO任务,具体实现目前在Linux下是由操作系统支持的异步IO系统,在非Linux的系统下是用多线程实现的。而Windows下目前也这个需求,所以欢迎熟悉iocp开发的小伙伴可以积极参与共建~
在此把原异步文件IO的流程大概梳理如下,以供参考:
create_pread_task()
为例子:WFFileIOTask *
类型的task,而内部会根据pread行为创建一个__WFFilepreadTask
:__WFFilepreadTask
需要实现prepare()
,供内部IOService调用,具体是做与异步文件相关的起始操作:IOService是接管所有文件异步IO的服务,IOSession是一次IO请求的上下文,需要根据Windows下iocp的机制来具体实现。
虽然Linux使用了系统的libaio,但大家要做的事情是类似:
CommScheduler::io_bind()
把自己的eventfd和回调绑定到通信器中,同理也需要io_unbind()
。其他内部接口需要根据iocp的机制按需添加。目的是做到当系统有异步事件的时候,会通过注册到通信器的机制来告诉框架,框架调起当时的那片上下文的handle(),即可回到task的逻辑中:__FileIOService
, 从IOService
派生,并且在全局单例中提供接口供调用,这样也可以保证不用异步文件IO的用户不会创建相应资源:以上是整个异步文件IO的基本流程,希望在windows下的实现同时遵循Workflow一如既往的对资源的极度节制以及对高并发的严谨。如有了解iocp的小伙伴愿意尝试欢迎随时交流。
The text was updated successfully, but these errors were encountered: