Skip to content

Commit

Permalink
Introduce small delay to avoid race between publisher and subscriber
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Paine <[email protected]>
  • Loading branch information
timkpaine committed Nov 5, 2024
1 parent 8b7a6bf commit ad2cdf0
Showing 1 changed file with 11 additions and 8 deletions.
19 changes: 11 additions & 8 deletions csp/tests/adapters/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,18 @@ def curtime(x: ts[object]) -> ts[datetime]:
return csp.now()

def graph(symbols: list, count: int):
b = csp.merge(
csp.timer(timedelta(seconds=0.2), True),
csp.delay(csp.timer(timedelta(seconds=0.2), False), timedelta(seconds=0.1)),
delay = timedelta(seconds=1)
b = csp.delay(
csp.merge(
csp.timer(timedelta(seconds=0.2), True),
csp.delay(csp.timer(timedelta(seconds=0.2), False), timedelta(seconds=0.1)),
),
delay=delay,
)
i = csp.count(csp.timer(timedelta(seconds=0.15)))
d = csp.count(csp.timer(timedelta(seconds=0.2))) / 2.0
s = csp.sample(csp.timer(timedelta(seconds=0.4)), csp.const("STRING"))

i = csp.delay(csp.count(csp.timer(timedelta(seconds=0.15))), delay=delay)
d = csp.delay(csp.count(csp.timer(timedelta(seconds=0.2))) / 2.0, delay=delay)
s = csp.delay(csp.sample(csp.timer(timedelta(seconds=0.4)), csp.const("STRING")), delay=delay)
dt = curtime(b)
struct = MyData.collectts(b=b, i=i, d=d, s=s, dt=dt)

Expand All @@ -157,8 +162,6 @@ def graph(symbols: list, count: int):
)
csp.add_graph_output(f"pall_{symbol}", pub_data)

# csp.print('status', kafkaadapter.status())

sub_data = kafkaadapter.subscribe(
ts_type=SubData,
msg_mapper=msg_mapper,
Expand Down

0 comments on commit ad2cdf0

Please sign in to comment.