-
Notifications
You must be signed in to change notification settings - Fork 1
/
asyncio_test.py
84 lines (74 loc) · 2.06 KB
/
asyncio_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import asyncio
import datetime
import threading
async def say(t, msg):
await asyncio.sleep(t)
print(msg, datetime.datetime.now())
async def main1():
print(datetime.datetime.now())
await asyncio.sleep(3)
print('3sleep')
task1 = asyncio.create_task(say(1, '4hello'))
task2 = asyncio.create_task(say(2, '5world'))
await task1
await task2
for i in range(3):
print(5)
await say(1, '6eholl')
async def main2():
print(datetime.datetime.now())
task1 = asyncio.create_task(say(1, '1hello'))
task2 = asyncio.create_task(say(2, '2world'))
await asyncio.sleep(3)
print('3sleep')
await task1
await task2
for i in range(3):
print(3)
await say(1, '4eholl')
async def main3():
# Require Python version >= 3.11
async with asyncio.TaskGroup() as tg:
tg.create_task(say(1, '1hello'))
tg.create_task(say(2, '2hello'))
tg.create_task(asyncio.sleep(3))
print('3sleep')
await say(1, '4eholl')
async def main4():
print(datetime.datetime.now())
task1 = asyncio.create_task(say(1, '1hello'))
task2 = asyncio.create_task(say(2, '2world'))
task3 = asyncio.create_task(asyncio.sleep(3))
await task1
await task2
await task3
print('3sleep')
task1 = asyncio.create_task(say(1, '4hello'))
task2 = asyncio.create_task(say(2, '5world'))
await task1
await task2
async def main5():
print(datetime.datetime.now())
task1 = asyncio.create_task(say(1, '1I'))
task2 = asyncio.create_task(say(2, '2love'))
task3 = asyncio.create_task(asyncio.sleep(3))
await task1
await task2
await task3
print('3sleep')
task1 = asyncio.create_task(say(1, '4you'))
task2 = asyncio.create_task(say(2, '5!!!!'))
await task1
await task2
async def main5_helper():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.create_task(main5())
loop.run_forever()
def main4_async():
asyncio.run(main4())
#asyncio.run(main4())
#main4_async()
t = threading.Thread(target=main4_async)
t.start()
print("end")