-
Notifications
You must be signed in to change notification settings - Fork 0
/
mqtt_pub.py
270 lines (257 loc) · 12.6 KB
/
mqtt_pub.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
import paho.mqtt.client as mqtt
import random
import os
import time
import threading
import datetime
from optparse import OptionParser
from optparse import OptionGroup
import configparser
# The username and password are store in the mqtt_conf.ini
cp_mqtt = configparser.ConfigParser()
cp_mqtt.read("mqtt_config.ini")
username = cp_mqtt["account"]['username']
password = cp_mqtt["account"]['password']
flag_pub = 0
#The following codes is for creating a beautiful CLI menu easily
parser = OptionParser()
# Add an options group with the tittle
group = OptionGroup(parser, "Mandatory Options")
# Add an options to the menu
group.add_option("-q", "--qos",
action="store", type="int", dest="qos",
help="MQTT qos value from 0 to 2")
group.add_option("-t", "--topic",
action="store", type="string", dest="topic",
help="topic where to publish")
group.add_option("-n", "--number",
action="store", type="long", dest="number_message",
help="Number of message to be publish on the topic")
group.add_option("-m", "--message_length",
action="store", type="int", dest="message_length",
help="Length of the message provide (in oct)")
group.add_option("-c", "--clean_session",
action="store", dest="clean_session", default=True,
help="True or false")
parser.add_option_group(group)
group = OptionGroup(parser, "Optional Options")
group.add_option("-u", "--username",
action="store", type="string", dest="username", default=username,
help="MQTT username")
group.add_option("-p", "--password",
action="store", type="string", dest="password", default=password,
help="MQTT password")
group.add_option("-M","--message_length_2",
action="store", type="int", dest="message_length_2", default=0,
help="if provide need a sleep time and set the length of the message after the sleep time")
group.add_option("-a", action="store", type="int", dest="active_time", default=0,
help="active time before sleep")
group.add_option("-s", action="store", type="int", dest="sleeping_time", default=0,
help="sleeping time")
group.add_option("-T", action="store", type="int", dest="n_topic", default=1,
help="Number of desired topic")
group.add_option("-N", action="store", type="int", dest="n_run", default=0,
help="number of runs")
parser.add_option_group(group)
(options, args) = parser.parse_args()
# Create a countdown with the active_time
def countdown():
global my_timer
global stop_threads
my_timer = options.active_time
for x in range(options.active_time):
if(stop_threads):
break
my_timer = my_timer - 1
time.sleep(1)
stop_threads = False
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
# Display the result code of the connection
print("Connected with result code "+str(rc))
ack = 0
#Wait all the ack from the broker and finish
def on_publish(client, userdata, mid):
global flag_pub
now = datetime.datetime.now()
print(str(now.strftime("%Y-%m-%d %H:%M:%S")) +"|"+ "mid : " + str(mid))
global ack
ack = ack + 1
if(ack == options.number_message):
print("all ack recieved : " + str(ack))
flag_pub = 1
def pub_now(client, number_message, n_topic, topic, message_length, qos):
x = 1.0
#create a msg with a specific number of length
message_send = length_conv(message_length)
# publish the number_message
while(x <= number_message):
# publish in all the topics
for i in range(n_topic):
#Publish the message
client.publish(topic + str(i + 1), message_send,qos)
time.sleep(0.001)
x = x + 1
# Publish with the sleeping time and active time
def pub_active(client, number_message, n_topic, topic, message_length, message_length_2, qos, n_run, sleeping_time):
print("Starting publishing for " + str(options.active_time) + "sec")
#The number of the run that is active now
run_active = 1
#The number of the message that has been send
message_send = 1.0
flag_message_length = 0
# height of the sequence that will be published
runs_height = number_message / n_run
# While the n_run is not achieved do
while(run_active <= n_run):
#Start my countdown thread with the active time
global stop_threads
stop_threads = False
countdown_thread = threading.Thread(target=countdown)
countdown_thread.start()
#Check the flag and attribute the right msg length and swith the flag to the other message length
if(flag_message_length == 0):
msg = length_conv(message_length)
flag_message_length = 1
else:
msg = length_conv(message_length_2)
flag_message_length = 0
#While the thread is active do
while(countdown_thread.is_alive()):
#While the number of message has not reach the height of the run and my countdow is on do
while(message_send <= runs_height and my_timer > 0):
#Publish in the number of topics specified with the correct message length and a specific Qos
for i in range(n_topic):
client.publish(topic + str(i + 1), msg ,qos) #add + str(message_send) after msg to see the number of messages in the sub
time.sleep(0.001)
message_send = message_send + 1
#If all the message were send, stop the countdown and print the current_value
if(message_send == runs_height):
print("All messages was send in : " + str(options.active_time - my_timer) + "sec")
stop_threads = True
#Check if the number of the message in the active time has all been realeased
if(message_send < runs_height):
print("Only published : " + str(message_send - 1) + " Messages on " + str(n_topic) + " Topic(s)")
else:
print("Message sends : " + str(message_send - 1) + " Messages on " + str(n_topic) + " Topic(s)")
#If there is a sleeping time wait for a certain amount of time
if(sleeping_time != 0):
print("Waiting : " + str(sleeping_time) + " sec")
time.sleep(sleeping_time)
#Advance one step ahead in the runs
run_active = run_active + 1
# reset the number of message sent
message_send = 0
stop_threads = True
#Used for no run set
def pub_run_0(client, number_message, n_topic, topic, message_length, message_length_2, qos, n_run, sleeping_time):
print("Starting publishing for " + str(options.active_time) + "sec")
#The number of the message that has been send
message_send = 1
flag_message_length = 0
# height of the sequence that will be published
runs_height = number_message
# While all the message has not being send
while(message_send <= runs_height):
#Start my countdown thread with the active time
global stop_threads
stop_threads = False
countdown_thread = threading.Thread(target=countdown)
countdown_thread.start()
#Check the flag and attribute the right msg length and swith the flag to the other message length
if(flag_message_length == 0):
msg = length_conv(message_length)
flag_message_length = 1
else:
msg = length_conv(message_length_2)
flag_message_length = 0
#While the thread is active do
while(countdown_thread.is_alive()):
#While the number of message has not reach the height of the run and my countdow is on do
while(message_send <= runs_height and my_timer > 0):
#Publish in the number of topics specified with the correct message length and a specific Qos
for i in range(n_topic):
client.publish(topic + str(i + 1), msg ,qos) #add + str(message_send) after msg to see the number of messages in the sub
time.sleep(0.001)
message_send = message_send + 1
#If all the message were send, stop the countdown and print the current_value
if(message_send == runs_height):
print("All messages was send in the last run in : " + str(options.active_time - my_timer) + "sec")
stop_threads = True
#Check if the number of the message in the active time has all been realeased
if(message_send < runs_height):
print("Only published : " + str(message_send - 1) + " Messages on " + str(n_topic) + " Topic(s)" )
else:
print("All sends : " + str(message_send - 1) + " Messages on " + str(n_topic) + " Topic(s)")
break
#If there is a sleeping time wait for a certain amount of time
if(sleeping_time != 0):
print("Waiting : " + str(sleeping_time) + " sec")
time.sleep(sleeping_time)
stop_threads = True
# Create a random string with a specific byte length
def length_conv(message_length):
rand_string = str(os.urandom(message_length))
return rand_string
# Send to the correct publisher
def publisher(client, topic, qos, number_message, message_length, message_length_2, active_time, sleeping_time, n_topic, n_run):
# If there is no message length 2 set
if(message_length_2 == 0):
message_length_2 = message_length
# If there is no active time set
if(active_time == 0):
pub_now(client, number_message, n_topic, topic, message_length, qos)
if(sleeping_time != 0):
time.sleep(sleeping_time)
pub_now(client, number_message, n_topic, topic, message_length, qos)
else:
if(n_run == 0):
pub_run_0(client, number_message, n_topic, topic, message_length, message_length_2, qos, n_run, sleeping_time)
else:
pub_active(client, number_message, n_topic, topic, message_length, message_length_2, qos, n_run, sleeping_time)
print("End of pub")
now = datetime.datetime.now()
print (now.strftime("%Y-%m-%d %H:%M:%S")) #show time at the end
#Show a log message when there is an ERROR or Warning
def on_log(client, userdata, level, buf):
if(level < 0x10 and level >= 0x04):
print("log: ",level,buf)
# Pub settings for the client
def pub(qos, topic, username, password, clean_session, number_message, message_length, message_length_2, active_time, sleeping_time, n_topic, n_run) :
#Create a random id with the username
now = datetime.datetime.now() #show time at the begin of ex
print (now.strftime("%Y-%m-%d %H:%M:%S"))
nbr = random.randint(0, 10000000)
client_id=username+"_pub_id:"+str(nbr)
#Settings for the client
client = mqtt.Client(client_id, clean_session, userdata=None, transport="tcp")
client.username_pw_set(username, password)
if(qos >= 1):
client.max_inflight_messages_set(number_message) # to send the message in parallele
#Connect to the broker
client.on_connect= on_connect
client.on_publish = on_publish
client._on_log = on_log
client.connect(cp_mqtt["server"]['host'], int(cp_mqtt['server']['port']), 3600) #Keep alive 3600 sec - 1h
if(qos >= 1):
client.loop_start()
print("will publish")
publisher(client, topic, qos, number_message, message_length, message_length_2, active_time, sleeping_time, n_topic, n_run)
if(qos >= 1):
while(flag_pub == 0): #to let the main thread running till the others threads has been done
time.sleep(1)
client.loop_stop()
client.disconnect()
else:
client.loop_forever()
#To specify that those options are mandatory
if(options.qos == None or options.clean_session == None or options.topic == None or options.number_message == None or options.message_length == None):
print("Pls provide all the mandatory options (for help -h)")
else:
#If you use a different user you need to specify both password and user
if(options.username == username and options.password != password):
print("Pls provide a password")
elif(options.username != username and options.password == password):
print("Pls provide a username")
else:
pub(options.qos, options.topic, options.username, options.password, options.clean_session, options.number_message, options.message_length, options.message_length_2, options.active_time, options.sleeping_time, options.n_topic, options.n_run)