Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

如何用workflow实现将分片的数据拼起来的服务器 #1332

Closed
MasterL-min opened this issue Jul 26, 2023 · 31 comments
Closed

如何用workflow实现将分片的数据拼起来的服务器 #1332

MasterL-min opened this issue Jul 26, 2023 · 31 comments

Comments

@MasterL-min
Copy link

大概需求是,类比“hello,world",把这个拆成一个一个字符发送给服务器,然后服务器再想办法拼起来得到完整的数据。
客户端实现我用了workflow的series,把一个字符当作一个任务创建了一个series然后一起发出。
服务器这边也能一个一个按顺序收到,现在比较困惑的是如果我想让服务器能知道数据传输开始,并且能让服务器知道数据传输结束,然后把这期间的数据拼一起的话有没有什么方法,尤其是涉及到多个客户给服务器发的时候或者单个用户发送了两次”hello,world“,需要分别拼凑成两个hello,world感觉很困难。想请教下各位开发大佬有没有什么好的方法...

@Barenboim
Copy link
Contributor

Barenboim commented Jul 26, 2023

不太理解你说的场景。
你现在是把每个字符当作一个request来发送,然后你想统一到一个时刻(收集完所有请求)回复么?

@MasterL-min
Copy link
Author

不太理解你说的场景。 你现在是把每个字符当作一个request来发送,然后你想统一到一个时刻(收集完所有请求)回复么?

差不多是这个意思!每个字符当作一个request来发送,然后服务器能在这个series结束后能准确的把这些单个字符拼凑成最初完整的字符串。就比如发送了'h','e','l','l','o',作为一个series,按序发送了五个任务,服务器需要有办法意识到这个series的开始和结束,然后将这些单个字符重新拼凑成完整的'hello'。假设同一个客户端发送了两个series,每个series都是发送'h','e','l','l','o',我需要能让服务器最终能拼凑生成的是两个单独的'hello','hello',而不是彻底连在一起的'hellohello‘。

@MasterL-min
Copy link
Author

不太理解你说的场景。 你现在是把每个字符当作一个request来发送,然后你想统一到一个时刻(收集完所有请求)回复么?

不知道我这么表述的是否比较清楚...大概就是其实我是想发’hello‘这个字符串,我把发送整个字符串视为逻辑上的一次任务,但出于一些需要我不能直接把整个字符串塞到一个request的payload里,所以我会一个字符一个字符的发,这里在客户端上的实现目前是用了workflow的series,把发送一个字符当作一个request塞进series里,然后最后series->start整体发送,我希望能让服务器能识别这样一个series或者说逻辑上的发送任务的开始与结束,然后从这些按序到达服务器的req里重新拼凑出一个完整的最初的'hello'..

@Barenboim
Copy link
Contributor

Barenboim commented Jul 27, 2023

1、workflow是req->resp的协议,你把5个字母分别作为5个请求在一个series里发送,那么每个请求都必要收到回复才会发下一个,因为收到回复任务才算结束。如果你想让他们都在同一时刻收回复,那么这些请求应该分别发送,所以客户端的代码是:

int f()
{
    create_task('h')->start();
    create_task('e')->start();
    create_task('l')->start();
    create_task('l')->start();
    create_task('o')->start()
}

而server想统一回复,所以需要5个请求都收集完,可以用counter来解决:

void *process(MyTask *task)
{
   WFCounterTask *counter = WFTaskFactory::create_counter_task("name", 1, [task](void *){
        // set resposne here
    }); 
    series_of(task)->push_back(counter);
    if ('hello' is complete)
        WFTaskFactory::count_by_name("name", 5);
}

2、你肯定需要设计一个协议,让server知道这是哪一个批次的第几个请求,以及同一批次一共有多少请求。server不可能凭空知道client的task是不是同一批。

@kyaru92
Copy link

kyaru92 commented Jul 28, 2023

想的太具体了
不要考虑客户端怎么发送怎么安排, 服务器端眼里只有从客户端接收到的有序字节流,
你需要做的就是按照一定协议(规则)处理放到缓冲区的字节, 把一个个逻辑上的任务解析出来

@eatcosmos
Copy link

感觉series直接加个开始和结束标记就好了吧,不知道是不是想简单了。

@MasterL-min
Copy link
Author

感觉series直接加个开始和结束标记就好了吧,不知道是不是想简单了。

emmm 目前我就是这样做的,不过我想着这里有个问题是 如果同一个客户端并行或者以很短的时间内并行发了两个series,这个时候我的服务器接口拼接消息的时候我就不知道该怎么判断接收到的是第一个series里的内容还是第二个series里的内容了(或者说会乱序),如果规定一个客户端只能在一段时间内发一个series,然后只有在上一个series结束后再发下一个的话我认为这个思路没什么问题。

@Barenboim
Copy link
Contributor

感觉series直接加个开始和结束标记就好了吧,不知道是不是想简单了。

emmm 目前我就是这样做的,不过我想着这里有个问题是 如果同一个客户端并行或者以很短的时间内并行发了两个series,这个时候我的服务器接口拼接消息的时候我就不知道该怎么判断接收到的是第一个series里的内容还是第二个series里的内容了(或者说会乱序),如果规定一个客户端只能在一段时间内发一个series,然后只有在上一个series结束后再发下一个的话我认为这个思路没什么问题。

你设计一个协议啊😂你这样做哪怕行得通,也是不好的。你需要让server在消息里,就可以知道你是那个series发出的,是第几个请求。另外,如果要一起回复,无法用同一个series吧?看我上面的示例。
自定义协议可以使用TLVMessage进行扩展,我最近刚刚加进来的:
https://github.com/sogou/workflow/blob/master/docs/about-tlv-message.md

@MasterL-min
Copy link
Author

感觉series直接加个开始和结束标记就好了吧,不知道是不是想简单了。

emmm 目前我就是这样做的,不过我想着这里有个问题是 如果同一个客户端并行或者以很短的时间内并行发了两个series,这个时候我的服务器接口拼接消息的时候我就不知道该怎么判断接收到的是第一个series里的内容还是第二个series里的内容了(或者说会乱序),如果规定一个客户端只能在一段时间内发一个series,然后只有在上一个series结束后再发下一个的话我认为这个思路没什么问题。

你设计一个协议啊😂你这样做哪怕行得通,也是不好的。你需要让server在消息里,就可以知道你是那个series发出的,是第几个请求。另外,如果要一起回复,无法用同一个series吧?看我上面的示例。 自定义协议可以使用TLVMessage进行扩展,我最近刚刚加进来的: https://github.com/sogou/workflow/blob/master/docs/about-tlv-message.md

是是是,目前也是在这么去做,感谢您给的建议与方法!非常感谢您!

@gnblao
Copy link

gnblao commented Jul 30, 2023

Hi,这个时候可以考虑一下,wf的channel功能了😄😄,跟你场景很像☺️
参考
#873

@Barenboim
Copy link
Contributor

Hi,这个时候可以考虑一下,wf的channel功能了😄😄,跟你场景很像☺️ 参考 #873

Workflow里的任何内置消息,都可以在你的channel上正确的连续接收吧?

@gnblao
Copy link

gnblao commented Jul 30, 2023

Hi,这个时候可以考虑一下,wf的channel功能了😄😄,跟你场景很像☺️ 参考 #873

Workflow里的任何内置消息,都可以在你的channel上正确的连续接收吧?

嗯,原则都可以连续接收,只是我还没来得及具体测过😂

@Barenboim
Copy link
Contributor

Hi,这个时候可以考虑一下,wf的channel功能了smilesmile,跟你场景很像relaxed 参考 #873

Workflow里的任何内置消息,都可以在你的channel上正确的连续接收吧?

嗯,原则都可以连续接收,只是我还没来得及具体测过joy

理论上,只要消息又处理截断,就都没问题。所以,tutorial-10的消息解析实现,就会有问题。项目的内置消息都是处理了截断,应该都是正确的。你可以拿TLVMessage试一下。

@gnblao
Copy link

gnblao commented Jul 30, 2023

Hi,这个时候可以考虑一下,wf的channel功能了smilesmile,跟你场景很像relaxed 参考 #873

Workflow里的任何内置消息,都可以在你的channel上正确的连续接收吧?

嗯,原则都可以连续接收,只是我还没来得及具体测过joy

理论上,只要消息又处理截断,就都没问题。所以,tutorial-10的消息解析实现,就会有问题。项目的内置消息都是处理了截断,应该都是正确的。你可以拿TLVMessage试一下。

嗯,这里“截断”语义,如果是说某一条用户协议完整消息被拆成几个tcp包发送,这种情况是没问题的。
怕的就是“串包”风险,多个线程写多个协议包在同一条连接上,PA(0-10)协议包有10分片,PB(0-5)协议包5个分片。
发送的时候出现这种情况pa1->pb1->pa2...就问题

@Barenboim
Copy link
Contributor

Hi,这个时候可以考虑一下,wf的channel功能了smilesmile,跟你场景很像relaxed 参考 #873

Workflow里的任何内置消息,都可以在你的channel上正确的连续接收吧?

嗯,原则都可以连续接收,只是我还没来得及具体测过joy

理论上,只要消息又处理截断,就都没问题。所以,tutorial-10的消息解析实现,就会有问题。项目的内置消息都是处理了截断,应该都是正确的。你可以拿TLVMessage试一下。

嗯,这里“截断”语义,如果是说某一条用户协议完整消息被拆成几个tcp包发送,这种情况是没问题的。 怕的就是“串包”风险,多个线程写多个协议包在同一条连接上,PA(0-10)协议包有10分片,PB(0-5)协议包5个分片。 发送的时候出现这种情况pa1->pb1->pa2...就问题

连接在一个消息上必须独占啊,不可能出现pa1->pb1->pa2的情况都。除非你channel代码没有写对。
我的意思是,如果同一个tcp包就包含了许多条完整的消息,这些消息应该可以被正确拆分的。因为内置消息每次append,会在*size参数里返回此次消费字节数,剩余的数据会继续append或产生新的消息。

@gnblao
Copy link

gnblao commented Jul 30, 2023

Hi,这个时候可以考虑一下,wf的channel功能了smilesmile,跟你场景很像relaxed 参考 #873

Workflow里的任何内置消息,都可以在你的channel上正确的连续接收吧?

嗯,原则都可以连续接收,只是我还没来得及具体测过joy

理论上,只要消息又处理截断,就都没问题。所以,tutorial-10的消息解析实现,就会有问题。项目的内置消息都是处理了截断,应该都是正确的。你可以拿TLVMessage试一下。

嗯,这里“截断”语义,如果是说某一条用户协议完整消息被拆成几个tcp包发送,这种情况是没问题的。 怕的就是“串包”风险,多个线程写多个协议包在同一条连接上,PA(0-10)协议包有10分片,PB(0-5)协议包5个分片。 发送的时候出现这种情况pa1->pb1->pa2...就问题

连接在一个消息上必须独占啊,不可能出现pa1->pb1->pa2的情况都。除非你channel代码没有写对。
我的意思是,如果同一个tcp包就包含了许多条完整的消息,这些消息应该可以被正确拆分的。因为内置消息每次append,会在*size参数里返回此次消费字节数,剩余的数据会继续append或产生新的消息。

这样的话完全没问题了。channel里面这几种情况都有相应的处理了

@MasterL-min
Copy link
Author

Hi,这个时候可以考虑一下,wf的channel功能了😄😄,跟你场景很像☺️ 参考 #873

老哥 我服务器是用wfrest的,想确认下这种情况下用您的channel功能会不会有一些影响,这个wf的channel功能我看了一下,我是不是得去您的那个地址里去下载您实现的wf版本,打算自己实际感受一下,有一个stream形式的实现我感觉好像跟我目前这个挺像,也许我可以去看看。

@gnblao
Copy link

gnblao commented Jul 31, 2023

Hi,这个时候可以考虑一下,wf的channel功能了😄😄,跟你场景很像☺️ 参考 #873

老哥 我服务器是用wfrest的,想确认下这种情况下用您的channel功能会不会有一些影响,这个wf的channel功能我看了一下,我是不是得去您的那个地址里去下载您实现的wf版本,打算自己实际感受一下,有一个stream形式的实现我感觉好像跟我目前这个挺像,也许我可以去看看。

感兴趣,可以试试看哈,改的channel功能分支完全兼容wf所有特性

@MasterL-min
Copy link
Author

感觉series直接加个开始和结束标记就好了吧,不知道是不是想简单了。

emmm 目前我就是这样做的,不过我想着这里有个问题是 如果同一个客户端并行或者以很短的时间内并行发了两个series,这个时候我的服务器接口拼接消息的时候我就不知道该怎么判断接收到的是第一个series里的内容还是第二个series里的内容了(或者说会乱序),如果规定一个客户端只能在一段时间内发一个series,然后只有在上一个series结束后再发下一个的话我认为这个思路没什么问题。

你设计一个协议啊😂你这样做哪怕行得通,也是不好的。你需要让server在消息里,就可以知道你是那个series发出的,是第几个请求。另外,如果要一起回复,无法用同一个series吧?看我上面的示例。 自定义协议可以使用TLVMessage进行扩展,我最近刚刚加进来的: https://github.com/sogou/workflow/blob/master/docs/about-tlv-message.md

大佬好,拜读了您的TLVMessage介绍以及给出的服务器示例代码,我大概想了下,结合到我现在的这个需求,也许我需要在TLVMessage上加入两个变量,一个区分series,一个区分是series的第几个请求,区分是series的第几个请求我觉得在TLVMessage的基础上加一个seq也许就可以,但是我不太清楚不同series本身是不是有什么区分或者可以唯一标识的变量去做区分?我看了下TLVRequest以及TLVResponse的设置都是TLVMessage,感觉如果那样做的话服务器可以每接收一个series的小任务给客户端发接收了第几个series的第几个任务,不知道我的想法合不合理。。我的这个应该不需要一起回复,只要可以让客户端知道他的series全部发送完毕即可,或者发生异常报错。

@Barenboim
Copy link
Contributor

Barenboim commented Aug 1, 2023

TLVMessage你可以随便派生的,你可以加上你需要的接口,定义你自己的request和respsone。这个和原始TLVRequest是什么没有关系。
服务器要区分client请求是哪个series里的,这些信息需要包含在你的协议里。client那边怎么确保同一个任务里的request,填写了自己的series信息,需要你程序来保证。比如你在series context里放点什么作为唯一标识,然后每个request发送之前你先拿一下这个标识,填写请求内容。
另外,你的请求是在同一个series里依次发送的话,server也需要依次回复的,没有办法做到你说的,最后统一回复的效果。当然,你可以用别方法来实现这个目的。比如每个消息收到之后都先回复一个确认,然后所有消息收集完成,server再依次请求client发送最终处理结果,也就是说client本身也是一个server,可以用来接收最终结果。可以认为是actor模式。

@MasterL-min
Copy link
Author

TLVMessage你可以随便派生的,你可以加上你需要的接口,定义你自己的request和respsone。这个和原始TLVRequest是什么没有关系。 服务器要区分client请求是哪个series里的,这些信息需要包含在你的协议里。client那边怎么确保同一个任务里的request,填写了自己的series信息,需要你程序来保证。比如你在series context里放点什么作为唯一标识,然后每个request发送之前你先拿一下这个标识,填写请求内容。 另外,你的请求是在同一个series里依次发送的话,server也需要依次回复的,没有办法做到你说的,最后统一回复的效果。当然,你可以用别方法来实现这个目的。比如每个消息收到之后都先回复一个确认,然后所有消息收集完成,server再依次请求client发送最终处理结果,也就是说client本身也是一个server,可以用来接收最终结果。可以认为是actor模式。

好的好的 谢谢您的建议!

@MasterL-min
Copy link
Author

TLVMessage你可以随便派生的,你可以加上你需要的接口,定义你自己的request和respsone。这个和原始TLVRequest是什么没有关系。 服务器要区分client请求是哪个series里的,这些信息需要包含在你的协议里。client那边怎么确保同一个任务里的request,填写了自己的series信息,需要你程序来保证。比如你在series context里放点什么作为唯一标识,然后每个request发送之前你先拿一下这个标识,填写请求内容。 另外,你的请求是在同一个series里依次发送的话,server也需要依次回复的,没有办法做到你说的,最后统一回复的效果。当然,你可以用别方法来实现这个目的。比如每个消息收到之后都先回复一个确认,然后所有消息收集完成,server再依次请求client发送最终处理结果,也就是说client本身也是一个server,可以用来接收最终结果。可以认为是actor模式。

还想请教大佬一个问题,假设在某个series中发送发生异常,在客户端的回调函数里发现错误,希望在回调函数里重新创建整个series然后重新发送给服务器,这里是不是也涉及到您这里所说的series context,在这种情况下唯一的解决方法是不是把本身要发送的完整的字符串存在这个series context里。

@Barenboim
Copy link
Contributor

Barenboim commented Aug 2, 2023

TLVMessage你可以随便派生的,你可以加上你需要的接口,定义你自己的request和respsone。这个和原始TLVRequest是什么没有关系。 服务器要区分client请求是哪个series里的,这些信息需要包含在你的协议里。client那边怎么确保同一个任务里的request,填写了自己的series信息,需要你程序来保证。比如你在series context里放点什么作为唯一标识,然后每个request发送之前你先拿一下这个标识,填写请求内容。 另外,你的请求是在同一个series里依次发送的话,server也需要依次回复的,没有办法做到你说的,最后统一回复的效果。当然,你可以用别方法来实现这个目的。比如每个消息收到之后都先回复一个确认,然后所有消息收集完成,server再依次请求client发送最终处理结果,也就是说client本身也是一个server,可以用来接收最终结果。可以认为是actor模式。

还想请教大佬一个问题,假设在某个series中发送发生异常,在客户端的回调函数里发现错误,希望在回调函数里重新创建整个series然后重新发送给服务器,这里是不是也涉及到您这里所说的series context,在这种情况下唯一的解决方法是不是把本身要发送的完整的字符串存在这个series context里。

这个随便你怎么实现了。series context就是一个void *,你可以保存你的所有上下文。所以,如果需要完整的字符串当然可以放在series context里。
你可以把series理解成一个协程,series context就是协程的局部变量。

@MasterL-min
Copy link
Author

TLVMessage你可以随便派生的,你可以加上你需要的接口,定义你自己的request和respsone。这个和原始TLVRequest是什么没有关系。 服务器要区分client请求是哪个series里的,这些信息需要包含在你的协议里。client那边怎么确保同一个任务里的request,填写了自己的series信息,需要你程序来保证。比如你在series context里放点什么作为唯一标识,然后每个request发送之前你先拿一下这个标识,填写请求内容。 另外,你的请求是在同一个series里依次发送的话,server也需要依次回复的,没有办法做到你说的,最后统一回复的效果。当然,你可以用别方法来实现这个目的。比如每个消息收到之后都先回复一个确认,然后所有消息收集完成,server再依次请求client发送最终处理结果,也就是说client本身也是一个server,可以用来接收最终结果。可以认为是actor模式。

还想请教大佬一个问题,假设在某个series中发送发生异常,在客户端的回调函数里发现错误,希望在回调函数里重新创建整个series然后重新发送给服务器,这里是不是也涉及到您这里所说的series context,在这种情况下唯一的解决方法是不是把本身要发送的完整的字符串存在这个series context里。

这个随便你怎么实现了。series context就是一个void *,你可以保存你的所有上下文。所以,如果需要完整的字符串当然可以放在series context里。 你可以把series理解成一个协程,series context就是协程的局部变量。

老哥还想请教您一下,现在客户端给服务器发送分片数据啥的没问题了,现在希望客户端能从服务器中把字符一个一个的读取回来,假设服务器存储“h e l l o \n”,以字符‘\n’作为结束标记。
这个时候由于两个原因:
1.http是拉取式的,没办法客户端发送一个读取请求,服务器就一次性的把‘h’,‘e','l','l','o','\n'每个字符当作一次resp返回回去,需要客户端一个请求一个请求的读回来
2.条件需求限制我们,客户端不允许知道他要读取的数据长度,导致只能不断发请求,直到回调函数中读出来的回复里有'\n',才会停止创建新的httptask,这种情况下导致我不知道怎么事先创建好series按长度一个一个提前创建好启动http任务然后series->start(),我只能创了个
while(true)
{
create_http_task();
wait_group.wait();
}
然后在回调函数里读出来'\n'的时候退出这个循环我发现我的思路很蠢,而且由于http任务是异步的,也不会等第一个字符读出来才创建第二个,会有一些问题,想请教下老哥这种情况有没有什么好的解决办法。。

@MasterL-min
Copy link
Author

我现在这个的解决方法就是每次循环里sleep10ms,但感觉这个顶不了大事,只能算勉强实现,就想请教下这种情况不知道有没有更好的办法。。

@Barenboim
Copy link
Contributor

TLVMessage你可以随便派生的,你可以加上你需要的接口,定义你自己的request和respsone。这个和原始TLVRequest是什么没有关系。 服务器要区分client请求是哪个series里的,这些信息需要包含在你的协议里。client那边怎么确保同一个任务里的request,填写了自己的series信息,需要你程序来保证。比如你在series context里放点什么作为唯一标识,然后每个request发送之前你先拿一下这个标识,填写请求内容。 另外,你的请求是在同一个series里依次发送的话,server也需要依次回复的,没有办法做到你说的,最后统一回复的效果。当然,你可以用别方法来实现这个目的。比如每个消息收到之后都先回复一个确认,然后所有消息收集完成,server再依次请求client发送最终处理结果,也就是说client本身也是一个server,可以用来接收最终结果。可以认为是actor模式。

还想请教大佬一个问题,假设在某个series中发送发生异常,在客户端的回调函数里发现错误,希望在回调函数里重新创建整个series然后重新发送给服务器,这里是不是也涉及到您这里所说的series context,在这种情况下唯一的解决方法是不是把本身要发送的完整的字符串存在这个series context里。

这个随便你怎么实现了。series context就是一个void *,你可以保存你的所有上下文。所以,如果需要完整的字符串当然可以放在series context里。 你可以把series理解成一个协程,series context就是协程的局部变量。

老哥还想请教您一下,现在客户端给服务器发送分片数据啥的没问题了,现在希望客户端能从服务器中把字符一个一个的读取回来,假设服务器存储“h e l l o \n”,以字符‘\n’作为结束标记。 这个时候由于两个原因: 1.http是拉取式的,没办法客户端发送一个读取请求,服务器就一次性的把‘h’,‘e','l','l','o','\n'每个字符当作一次resp返回回去,需要客户端一个请求一个请求的读回来 2.条件需求限制我们,客户端不允许知道他要读取的数据长度,导致只能不断发请求,直到回调函数中读出来的回复里有'\n',才会停止创建新的httptask,这种情况下导致我不知道怎么事先创建好series按长度一个一个提前创建好启动http任务然后series->start(),我只能创了个 while(true) { create_http_task(); wait_group.wait(); } 然后在回调函数里读出来'\n'的时候退出这个循环我发现我的思路很蠢,而且由于http任务是异步的,也不会等第一个字符读出来才创建第二个,会有一些问题,想请教下老哥这种情况有没有什么好的解决办法。。

既然你需要串行拉取,为啥不是在task的callback里创建下一个task?

@MasterL-min
Copy link
Author

TLVMessage你可以随便派生的,你可以加上你需要的接口,定义你自己的request和respsone。这个和原始TLVRequest是什么没有关系。 服务器要区分client请求是哪个series里的,这些信息需要包含在你的协议里。client那边怎么确保同一个任务里的request,填写了自己的series信息,需要你程序来保证。比如你在series context里放点什么作为唯一标识,然后每个request发送之前你先拿一下这个标识,填写请求内容。 另外,你的请求是在同一个series里依次发送的话,server也需要依次回复的,没有办法做到你说的,最后统一回复的效果。当然,你可以用别方法来实现这个目的。比如每个消息收到之后都先回复一个确认,然后所有消息收集完成,server再依次请求client发送最终处理结果,也就是说client本身也是一个server,可以用来接收最终结果。可以认为是actor模式。

还想请教大佬一个问题,假设在某个series中发送发生异常,在客户端的回调函数里发现错误,希望在回调函数里重新创建整个series然后重新发送给服务器,这里是不是也涉及到您这里所说的series context,在这种情况下唯一的解决方法是不是把本身要发送的完整的字符串存在这个series context里。

这个随便你怎么实现了。series context就是一个void *,你可以保存你的所有上下文。所以,如果需要完整的字符串当然可以放在series context里。 你可以把series理解成一个协程,series context就是协程的局部变量。

老哥还想请教您一下,现在客户端给服务器发送分片数据啥的没问题了,现在希望客户端能从服务器中把字符一个一个的读取回来,假设服务器存储“h e l l o \n”,以字符‘\n’作为结束标记。 这个时候由于两个原因: 1.http是拉取式的,没办法客户端发送一个读取请求,服务器就一次性的把‘h’,‘e','l','l','o','\n'每个字符当作一次resp返回回去,需要客户端一个请求一个请求的读回来 2.条件需求限制我们,客户端不允许知道他要读取的数据长度,导致只能不断发请求,直到回调函数中读出来的回复里有'\n',才会停止创建新的httptask,这种情况下导致我不知道怎么事先创建好series按长度一个一个提前创建好启动http任务然后series->start(),我只能创了个 while(true) { create_http_task(); wait_group.wait(); } 然后在回调函数里读出来'\n'的时候退出这个循环我发现我的思路很蠢,而且由于http任务是异步的,也不会等第一个字符读出来才创建第二个,会有一些问题,想请教下老哥这种情况有没有什么好的解决办法。。

既然你需要串行拉取,为啥不是在task的callback里创建下一个task?

感谢老哥指教,我悟了

@MasterL-min
Copy link
Author

MasterL-min commented Aug 9, 2023

TLVMessage你可以随便派生的,你可以加上你需要的接口,定义你自己的request和respsone。这个和原始TLVRequest是什么没有关系。 服务器要区分client请求是哪个series里的,这些信息需要包含在你的协议里。client那边怎么确保同一个任务里的request,填写了自己的series信息,需要你程序来保证。比如你在series context里放点什么作为唯一标识,然后每个request发送之前你先拿一下这个标识,填写请求内容。 另外,你的请求是在同一个series里依次发送的话,server也需要依次回复的,没有办法做到你说的,最后统一回复的效果。当然,你可以用别方法来实现这个目的。比如每个消息收到之后都先回复一个确认,然后所有消息收集完成,server再依次请求client发送最终处理结果,也就是说client本身也是一个server,可以用来接收最终结果。可以认为是actor模式。

还想请教大佬一个问题,假设在某个series中发送发生异常,在客户端的回调函数里发现错误,希望在回调函数里重新创建整个series然后重新发送给服务器,这里是不是也涉及到您这里所说的series context,在这种情况下唯一的解决方法是不是把本身要发送的完整的字符串存在这个series context里。

这个随便你怎么实现了。series context就是一个void *,你可以保存你的所有上下文。所以,如果需要完整的字符串当然可以放在series context里。 你可以把series理解成一个协程,series context就是协程的局部变量。

老哥还想请教您一下,现在客户端给服务器发送分片数据啥的没问题了,现在希望客户端能从服务器中把字符一个一个的读取回来,假设服务器存储“h e l l o \n”,以字符‘\n’作为结束标记。 这个时候由于两个原因: 1.http是拉取式的,没办法客户端发送一个读取请求,服务器就一次性的把‘h’,‘e','l','l','o','\n'每个字符当作一次resp返回回去,需要客户端一个请求一个请求的读回来 2.条件需求限制我们,客户端不允许知道他要读取的数据长度,导致只能不断发请求,直到回调函数中读出来的回复里有'\n',才会停止创建新的httptask,这种情况下导致我不知道怎么事先创建好series按长度一个一个提前创建好启动http任务然后series->start(),我只能创了个 while(true) { create_http_task(); wait_group.wait(); } 然后在回调函数里读出来'\n'的时候退出这个循环我发现我的思路很蠢,而且由于http任务是异步的,也不会等第一个字符读出来才创建第二个,会有一些问题,想请教下老哥这种情况有没有什么好的解决办法。。

既然你需要串行拉取,为啥不是在task的callback里创建下一个task?

想请教下老哥,我现在串行拉取在task的callback里创建下一个task没问题了,但是我想请教下wait_group的布局问题,假设我在主线程创建了一个http task,然后task->start(),然后这个http task的回调函数里会创建新的直到回调函数读取到的数据是结束标记才停止创建新的http任务,http_task回调函数的wait_group布局如下:

            auto help=(WFFacilities::WaitGroup *)task->user_data;
            help->done();
            WFHttpTask *task = WFTaskFactory::create_http_task(url, 4, 2, receive_callback);
            task->user_data= (void *)&wait_group;
            protocol::HttpRequest *req = task->get_req();
            req->add_header_pair("User-Agent", "Wget/1.14 (linux-gnu)");
            req->add_header_pair("Connection", "close");
            req->add_header_pair("Content-Length",std::to_string(newreqlength));
            req->set_method("POST");
            std::string myString = createString(newreqlength, '1');
            req->append_output_body(myString);
            task->start();
            wait_group.wait();

这里help就是http task本身的wait_group,他在回调函数里先wait_group.done(),然后再创建新的http task,然后新的http task->start(),wait_group.wait(),但是现在有个问题就是由于help->done()后我发现主线程就开始运行了,他类似于与回调函数创建的新的http task并行运行了,有没有什么办法能让主线程等待所有回调函数结束后才开始运行,我尝试变换了下wait_group.done()的位置但是好像程序运行出问题了,想请教下老哥这种情况该如何处理。。

@Barenboim
Copy link
Contributor

Barenboim commented Aug 9, 2023

我感觉你好像不太会用我们的框架啊。你在callback里,什么要调用task->start()呢?正确的用法是:

void http_callback(WFHttpTask *task)
{
    ...
    next = WFTaskFactory::create_http_task(..., http_callback);
    series_of(task)->push_back(next);
}

我们的正确用法,是全程无等待的,不应该在callback里wait。唯一要wait的地方,只可能是main函数里。

@Barenboim
Copy link
Contributor

建议你看一下我们的tutorial-02和tutorial-03,理解一下我们series的用法。

@MasterL-min
Copy link
Author

我感觉你好像不太会用我们的框架啊。你在callback里,什么要调用task->start()呢?正确的用法是:

void http_callback(WFHttpTask *task)
{
    ...
    next = WFTaskFactory::create_http_task(..., http_callback);
    series_of(task)->push_back(next);
}

我们的正确用法,是全程无等待的,不应该在callback里wait。唯一要wait的地方,只可能是main函数里。

明白了,看来这里是要效仿http_proxy那个示例里的用法,我以为情况不太一样。。。感谢您的指教,给您填麻烦了

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants