-
Notifications
You must be signed in to change notification settings - Fork 615
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SCF multithreading issue #1136
Comments
Composed function is a single function. It is not a chain of things to be executed. |
Thank you for your response. I understand that function composition is seen as a single function. However, I can't understand why requests are serialized. After all, the tcpSupplierFlow is outside the function composition and therefore should handle requests in parallel. Can you give me some explanation about it? |
Perhaps I am missing something. It would be simpler if you create a sample app that reproduces the issue and push it to github somewhere so I can run it and better understand what you are trying to accomplish |
Any followup? |
Sorry for the delayed response, but I wanted to ensure everything was working correctly. I chose another solution to prevent the issue at its root. |
Hi everyone,
I've developed a server application that receives data in parallel from a client. I've noticed that when I send two packets simultaneously, they are processed in parallel up to my function composition. However, I deliberately added a sleep(To simulate the blocking operations present in the real case, such as file writing) at the end of my function composition to verify that the data were actually being processed in parallel. Unfortunately, the logs show that all packets are being processed every 3 seconds, as if they were being executed sequentially.
Could someone help me understand what's happening?
My code
`
`
Output Logs
2024-04-15T15:44:38.755+02:00 DEBUG 26152 --- [pool-5-thread-1] .s.i.i.t.c.TcpNetServerConnectionFactory : Accepted connection from 127.0.0.1:64720 2024-04-15T15:44:38.759+02:00 DEBUG 26152 --- [pool-5-thread-1] o.s.i.i.tcp.connection.TcpNetConnection : New connection 127.0.0.1:64720:12345:93d694a1-a774-4cb7-911a-e18e40047bea 2024-04-15T15:44:38.759+02:00 DEBUG 26152 --- [pool-5-thread-1] .s.i.i.t.c.TcpNetServerConnectionFactory : tcpSourceConnectionFactory: Added new connection: 127.0.0.1:64720:12345:93d694a1-a774-4cb7-911a-e18e40047bea 2024-04-15T15:44:38.759+02:00 DEBUG 26152 --- [pool-5-thread-2] o.s.i.i.tcp.connection.TcpNetConnection : 127.0.0.1:64720:12345:93d694a1-a774-4cb7-911a-e18e40047bea Reading... 2024-04-15T15:44:38.759+02:00 DEBUG 26152 --- [pool-5-thread-1] .s.i.i.t.c.TcpNetServerConnectionFactory : Accepted connection from 127.0.0.1:64719 2024-04-15T15:44:38.759+02:00 DEBUG 26152 --- [pool-5-thread-1] o.s.i.i.tcp.connection.TcpNetConnection : New connection 127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a 2024-04-15T15:44:38.759+02:00 DEBUG 26152 --- [pool-5-thread-1] .s.i.i.t.c.TcpNetServerConnectionFactory : tcpSourceConnectionFactory: Added new connection: 127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a 2024-04-15T15:44:38.759+02:00 DEBUG 26152 --- [pool-5-thread-3] o.s.i.i.tcp.connection.TcpNetConnection : 127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a Reading... 2024-04-15T15:44:38.763+02:00 DEBUG 26152 --- [pool-5-thread-3] o.s.i.i.tcp.connection.TcpNetConnection : Message received GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755�n, headers={ip_tcp_remotePort=64719, ip_connectionId=127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=8d07d3e2-5008-a609-1987-f1057e262712, ip_hostname=127.0.0.1, timestamp=1713188678763}] 2024-04-15T15:44:38.763+02:00 DEBUG 26152 --- [pool-5-thread-2] o.s.i.i.tcp.connection.TcpNetConnection : Message received GenericMessage [payload=Hello server from threadId 24 | timestamp 1713188678755�n, headers={ip_tcp_remotePort=64720, ip_connectionId=127.0.0.1:64720:12345:93d694a1-a774-4cb7-911a-e18e40047bea, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=0b49f68f-5f2b-eea4-45a1-5cdedecd39fa, ip_hostname=127.0.0.1, timestamp=1713188678763}] 2024-04-15T15:44:38.765+02:00 DEBUG 26152 --- [pool-5-thread-2] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'tcpSupplierFlow.channel#0'; defined in: 'class path resource [com/example/demo/Conf.class]'; from source: 'bean method tcpSupplierFlow'', message: GenericMessage [payload=Hello server from threadId 24 | timestamp 1713188678755�n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@22a5034d, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@22a5034d, ip_tcp_remotePort=64720, ip_connectionId=127.0.0.1:64720:12345:93d694a1-a774-4cb7-911a-e18e40047bea, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=5083e134-7990-7ef2-0379-364bde276e6b, ip_hostname=127.0.0.1, timestamp=1713188678765}] 2024-04-15T15:44:38.765+02:00 DEBUG 26152 --- [pool-5-thread-3] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'tcpSupplierFlow.channel#0'; defined in: 'class path resource [com/example/demo/Conf.class]'; from source: 'bean method tcpSupplierFlow'', message: GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755�n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, ip_tcp_remotePort=64719, ip_connectionId=127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=ebbe5efc-0a4b-6b18-ca11-d86ba28acdc0, ip_hostname=127.0.0.1, timestamp=1713188678765}] 2024-04-15T15:44:38.765+02:00 DEBUG 26152 --- [pool-5-thread-3] o.s.i.t.MessageTransformingHandler : bean 'tcpSupplierFlow.header-filter#1' for component 'tcpSupplierFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/example/demo/Conf.class]'; from source: 'bean method tcpSupplierFlow' received message: GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755�n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, ip_tcp_remotePort=64719, ip_connectionId=127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=ebbe5efc-0a4b-6b18-ca11-d86ba28acdc0, ip_hostname=127.0.0.1, timestamp=1713188678765}] 2024-04-15T15:44:38.765+02:00 DEBUG 26152 --- [pool-5-thread-2] o.s.i.t.MessageTransformingHandler : bean 'tcpSupplierFlow.header-filter#1' for component 'tcpSupplierFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/example/demo/Conf.class]'; from source: 'bean method tcpSupplierFlow' received message: GenericMessage [payload=Hello server from threadId 24 | timestamp 1713188678755�n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@22a5034d, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@22a5034d, ip_tcp_remotePort=64720, ip_connectionId=127.0.0.1:64720:12345:93d694a1-a774-4cb7-911a-e18e40047bea, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=5083e134-7990-7ef2-0379-364bde276e6b, ip_hostname=127.0.0.1, timestamp=1713188678765}] 2024-04-15T15:44:38.765+02:00 DEBUG 26152 --- [pool-5-thread-3] o.s.i.channel.FluxMessageChannel : preSend on channel 'bean 'tcpSupplierFlow.channel#1'; defined in: 'class path resource [com/example/demo/Conf.class]'; from source: 'bean method tcpSupplierFlow'', message: GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755�n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, ip_tcp_remotePort=64719, ip_connectionId=127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a, ip_address=127.0.0.1, id=47ea1331-8e47-4618-d208-e84d0b70f2b0, ip_hostname=127.0.0.1, timestamp=1713188678765}] 2024-04-15T15:44:38.765+02:00 DEBUG 26152 --- [pool-5-thread-2] o.s.i.channel.FluxMessageChannel : preSend on channel 'bean 'tcpSupplierFlow.channel#1'; defined in: 'class path resource [com/example/demo/Conf.class]'; from source: 'bean method tcpSupplierFlow'', message: GenericMessage [payload=Hello server from threadId 24 | timestamp 1713188678755�n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@22a5034d, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@22a5034d, ip_tcp_remotePort=64720, ip_connectionId=127.0.0.1:64720:12345:93d694a1-a774-4cb7-911a-e18e40047bea, ip_address=127.0.0.1, id=3693d126-16c9-059d-e820-7af715e1337a, ip_hostname=127.0.0.1, timestamp=1713188678765}] 2024-04-15T15:44:38.767+02:00 DEBUG 26152 --- [pool-5-thread-3] c.f.c.c.BeanFactoryAwareFunctionRegistry : Converted Message: GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755�n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, ip_tcp_remotePort=64719, ip_connectionId=127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a, ip_address=127.0.0.1, id=47ea1331-8e47-4618-d208-e84d0b70f2b0, ip_hostname=127.0.0.1, timestamp=1713188678765}] to: class org.springframework.messaging.support.GenericMessage 2024-04-15T15:44:38.767+02:00 DEBUG 26152 --- [pool-5-thread-3] c.f.c.c.BeanFactoryAwareFunctionRegistry : Invoking function: messageHandler<org.springframework.messaging.Message<java.lang.String>, org.springframework.messaging.Message<java.lang.String>>with input type: org.springframework.messaging.Message<java.lang.String> 2024-04-15T15:44:38.767+02:00 ERROR 26152 --- [pool-5-thread-3] com.example.demo.Conf$$SpringCGLIB$$0 : RECEIVED MESSAGE: Hello server from threadId 23 | timestamp 1713188678755�n | receivedTimestamp 1713188678767 2024-04-15T15:44:41.769+02:00 DEBUG 26152 --- [oundedElastic-1] o.s.i.channel.FluxMessageChannel : preSend on channel 'bean 'tcpSupplier|messageHandler_integrationflow.channel#0'', message: GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755�n, headers={id=7c3be25e-23cc-7054-c017-988dd42029a0, timestamp=1713188681769}] 2024-04-15T15:44:41.769+02:00 DEBUG 26152 --- [oundedElastic-1] o.s.i.router.MethodInvokingRouter : bean 'tcpSupplier|messageHandler_integrationflow.router#0' for component 'tcpSupplier|messageHandler_integrationflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0' received message: GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755�n, headers={id=7c3be25e-23cc-7054-c017-988dd42029a0, timestamp=1713188681769}] 2024-04-15T15:44:41.769+02:00 DEBUG 26152 --- [oundedElastic-1] o.s.c.s.m.DirectWithAttributesChannel : preSend on channel 'bean 'output'', message: GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755�n, headers={id=7c3be25e-23cc-7054-c017-988dd42029a0, timestamp=1713188681769}] 2024-04-15T15:44:41.769+02:00 DEBUG 26152 --- [oundedElastic-1] tractMessageChannelBinder$SendingHandler : org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler@231e4dda received message: GenericMessage [payload=byte[57], headers={contentType=application/json, id=c38b7db9-1727-a2b3-d95c-9a5fcedafaec, timestamp=1713188681769}] 2024-04-15T15:44:41.769+02:00 DEBUG 26152 --- [oundedElastic-1] o.s.i.a.outbound.AmqpOutboundEndpoint : org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@4331b295 received message: GenericMessage [payload=byte[57], headers={contentType=application/json, id=ae45cb45-95bd-67eb-3de3-7a858b37b37f, timestamp=1713188681769}] 2024-04-15T15:44:41.769+02:00 DEBUG 26152 --- [pool-5-thread-3] o.s.i.channel.FluxMessageChannel : postSend (sent=true) on channel 'bean 'tcpSupplierFlow.channel#1'; defined in: 'class path resource [com/example/demo/Conf.class]'; from source: 'bean method tcpSupplierFlow'', message: GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755�n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, ip_tcp_remotePort=64719, ip_connectionId=127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a, ip_address=127.0.0.1, id=47ea1331-8e47-4618-d208-e84d0b70f2b0, ip_hostname=127.0.0.1, timestamp=1713188678765}] 2024-04-15T15:44:41.769+02:00 DEBUG 26152 --- [pool-5-thread-3] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'tcpSupplierFlow.channel#0'; defined in: 'class path resource [com/example/demo/Conf.class]'; from source: 'bean method tcpSupplierFlow'', message: GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755�n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, ip_tcp_remotePort=64719, ip_connectionId=127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=ebbe5efc-0a4b-6b18-ca11-d86ba28acdc0, ip_hostname=127.0.0.1, timestamp=1713188678765}] 2024-04-15T15:44:41.769+02:00 DEBUG 26152 --- [oundedElastic-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[contentType] WILL be mapped, matched pattern=* 2024-04-15T15:44:41.769+02:00 INFO 26152 --- [oundedElastic-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [rabbitmq:5672] 2024-04-15T15:44:41.769+02:00 DEBUG 26152 --- [oundedElastic-1] c.r.client.impl.ConsumerWorkService : Creating executor service with 16 thread(s) for consumer work service 2024-04-15T15:44:41.785+02:00 DEBUG 26152 --- [pool-5-thread-2] c.f.c.c.BeanFactoryAwareFunctionRegistry : Converted Message: GenericMessage [payload=Hello server from threadId 24 | timestamp 1713188678755�n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@22a5034d, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@22a5034d, ip_tcp_remotePort=64720, ip_connectionId=127.0.0.1:64720:12345:93d694a1-a774-4cb7-911a-e18e40047bea, ip_address=127.0.0.1, id=3693d126-16c9-059d-e820-7af715e1337a, ip_hostname=127.0.0.1, timestamp=1713188678765}] to: class org.springframework.messaging.support.GenericMessage 2024-04-15T15:44:41.785+02:00 DEBUG 26152 --- [pool-5-thread-2] c.f.c.c.BeanFactoryAwareFunctionRegistry : Invoking function: messageHandler<org.springframework.messaging.Message<java.lang.String>, org.springframework.messaging.Message<java.lang.String>>with input type: org.springframework.messaging.Message<java.lang.String> 2024-04-15T15:44:41.785+02:00 ERROR 26152 --- [pool-5-thread-2] com.example.demo.Conf$$SpringCGLIB$$0 : RECEIVED MESSAGE: Hello server from threadId 24 | timestamp 1713188678755�n | receivedTimestamp 1713188681785
In my test, I opened two socket connections on the client and sent a packet for each connection. As seen from the logs, the server processes the packet with threadId 24 with a delay of 3 seconds (from 2024-04-15T15:44:38.767+02:00 to 2024-04-15T15:44:41.785+02:00).
The text was updated successfully, but these errors were encountered: