From ad2cdf094a2137cd92331a240007a928b9e6f865 Mon Sep 17 00:00:00 2001 From: Tim Paine <3105306+timkpaine@users.noreply.github.com> Date: Fri, 13 Sep 2024 15:31:43 -0400 Subject: [PATCH] Introduce small delay to avoid race between publisher and subscriber Signed-off-by: Tim Paine <3105306+timkpaine@users.noreply.github.com> --- csp/tests/adapters/test_kafka.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/csp/tests/adapters/test_kafka.py b/csp/tests/adapters/test_kafka.py index 334d6cb43..0e640d712 100644 --- a/csp/tests/adapters/test_kafka.py +++ b/csp/tests/adapters/test_kafka.py @@ -127,13 +127,18 @@ def curtime(x: ts[object]) -> ts[datetime]: return csp.now() def graph(symbols: list, count: int): - b = csp.merge( - csp.timer(timedelta(seconds=0.2), True), - csp.delay(csp.timer(timedelta(seconds=0.2), False), timedelta(seconds=0.1)), + delay = timedelta(seconds=1) + b = csp.delay( + csp.merge( + csp.timer(timedelta(seconds=0.2), True), + csp.delay(csp.timer(timedelta(seconds=0.2), False), timedelta(seconds=0.1)), + ), + delay=delay, ) - i = csp.count(csp.timer(timedelta(seconds=0.15))) - d = csp.count(csp.timer(timedelta(seconds=0.2))) / 2.0 - s = csp.sample(csp.timer(timedelta(seconds=0.4)), csp.const("STRING")) + + i = csp.delay(csp.count(csp.timer(timedelta(seconds=0.15))), delay=delay) + d = csp.delay(csp.count(csp.timer(timedelta(seconds=0.2))) / 2.0, delay=delay) + s = csp.delay(csp.sample(csp.timer(timedelta(seconds=0.4)), csp.const("STRING")), delay=delay) dt = curtime(b) struct = MyData.collectts(b=b, i=i, d=d, s=s, dt=dt) @@ -157,8 +162,6 @@ def graph(symbols: list, count: int): ) csp.add_graph_output(f"pall_{symbol}", pub_data) - # csp.print('status', kafkaadapter.status()) - sub_data = kafkaadapter.subscribe( ts_type=SubData, msg_mapper=msg_mapper,