Skip to content

Commit

Permalink
MQTT Exponential backoff and manager updates (#1345)
Browse files Browse the repository at this point in the history
  • Loading branch information
gguuss authored and andrewsg committed Feb 6, 2018
1 parent 9240951 commit b8f73b6
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 10 deletions.
16 changes: 13 additions & 3 deletions iot/api-client/manager/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ To run this sample:
[--project_id PROJECT_ID] [--registry_id REGISTRY_ID]
[--rsa_certificate_file RSA_CERTIFICATE_FILE]
[--service_account_json SERVICE_ACCOUNT_JSON]
[--version VERSION]
{create-es256,create-registry,create-rsa256,create-topic,create-unauth,delete-device,delete-registry,get,get-registry,get-state,list,list-registries,patch-es256,patch-rs256,set-config}
[--version VERSION] [--member MEMBER] [--role ROLE]
{create-es256,create-registry,create-rsa256,create-topic,create-unauth,delete-device,delete-registry,get,get-config-versions,get-iam-permissions,get-registry,get-state,list,list-registries,patch-es256,patch-rs256,set-config,set-iam-permissions}
...
Example of using the Google Cloud IoT Core device manager to administer
Expand All @@ -95,7 +95,7 @@ To run this sample:
list
positional arguments:
{create-es256,create-registry,create-rsa256,create-topic,create-unauth,delete-device,delete-registry,get,get-registry,get-state,list,list-registries,patch-es256,patch-rs256,set-config}
{create-es256,create-registry,create-rsa256,create-topic,create-unauth,delete-device,delete-registry,get,get-config-versions,get-iam-permissions,get-registry,get-state,list,list-registries,patch-es256,patch-rs256,set-config,set-iam-permissions}
create-es256 Create a new device with the given id, using ES256 for
authentication.
create-registry Gets or creates a device registry.
Expand All @@ -107,6 +107,11 @@ To run this sample:
delete-device Delete the device with the given id.
delete-registry Deletes the specified registry.
get Retrieve the device with the given id.
get-config-versions
Lists versions of a device config in descending order
(newest first).
get-iam-permissions
Retrieves IAM permissions for the given registry.
get-registry Retrieves a device registry.
get-state Retrieve a device's state blobs.
list List all devices in the registry.
Expand All @@ -117,6 +122,9 @@ To run this sample:
device.
set-config Patch the device to add an RSA256 public key to the
device.
set-iam-permissions
Sets IAM permissions for the given registry to a
single role/member.
optional arguments:
-h, --help show this help message and exit
Expand All @@ -139,6 +147,8 @@ To run this sample:
--service_account_json SERVICE_ACCOUNT_JSON
Path to service account json file.
--version VERSION Version number for setting device configuration.
--member MEMBER Member used for IAM commands.
--role ROLE Role used for IAM commands.
Expand Down
2 changes: 1 addition & 1 deletion iot/api-client/manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ def get_iam_permissions(
def set_iam_permissions(
service_account_json, project_id, cloud_region, registry_id, role,
member):
"""Retrieves IAM permissions for the given registry."""
"""Sets IAM permissions for the given registry to a single role/member."""
client = get_client(service_account_json)

registry_path = 'projects/{}/locations/{}/registries/{}'.format(
Expand Down
47 changes: 41 additions & 6 deletions iot/api-client/mqtt_example/cloudiot_mqtt_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,23 @@
import argparse
import datetime
import os
import random
import time

import jwt
import paho.mqtt.client as mqtt


# The initial backoff time after a disconnection occurs, in seconds.
minimum_backoff_time = 1

# The maximum backoff time before giving up, in seconds.
MAXIMUM_BACKOFF_TIME = 32

# Whether to wait with exponential backoff before publishing.
should_backoff = False


# [START iot_mqtt_jwt]
def create_jwt(project_id, private_key_file, algorithm):
"""Creates a JWT (https://jwt.io) to establish an MQTT connection.
Expand Down Expand Up @@ -76,11 +87,22 @@ def on_connect(unused_client, unused_userdata, unused_flags, rc):
"""Callback for when a device connects."""
print('on_connect', mqtt.connack_string(rc))

# After a successful connect, reset backoff time and stop backing off.
global should_backoff
global minimum_backoff_time
should_backoff = False
minimum_backoff_time = 1


def on_disconnect(unused_client, unused_userdata, rc):
"""Paho callback for when a device disconnects."""
print('on_disconnect', error_str(rc))

# Since a disconnect occurred, the next loop iteration will wait with
# exponential backoff.
global should_backoff
should_backoff = True


def on_publish(unused_client, unused_userdata, unused_mid):
"""Paho callback when a message is sent to the broker."""
Expand Down Expand Up @@ -134,9 +156,6 @@ def get_client(
# Subscribe to the config topic.
client.subscribe(mqtt_config_topic, qos=1)

# Start the network loop.
client.loop_start()

return client
# [END iot_mqtt_config]

Expand Down Expand Up @@ -199,6 +218,8 @@ def parse_command_line_args():

# [START iot_mqtt_run]
def main():
global minimum_backoff_time

args = parse_command_line_args()

# Publish to the events or state topic based on the flag.
Expand All @@ -215,6 +236,23 @@ def main():

# Publish num_messages mesages to the MQTT bridge once per second.
for i in range(1, args.num_messages + 1):
# Process network events.
client.loop()

# Wait if backoff is required.
if should_backoff:
# If backoff time is too large, give up.
if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
print('Exceeded maximum backoff time. Giving up.')
break

# Otherwise, wait and connect again.
delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
print('Waiting for {} before reconnecting.'.format(delay))
time.sleep(delay)
minimum_backoff_time *= 2
client.connect(args.mqtt_bridge_hostname, args.mqtt_bridge_port)

payload = '{}/{}-payload-{}'.format(
args.registry_id, args.device_id, i)
print('Publishing message {}/{}: \'{}\''.format(
Expand All @@ -223,7 +261,6 @@ def main():
seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
if seconds_since_issue > 60 * jwt_exp_mins:
print('Refreshing token after {}s').format(seconds_since_issue)
client.loop_stop()
jwt_iat = datetime.datetime.utcnow()
client = get_client(
args.project_id, args.cloud_region,
Expand All @@ -239,8 +276,6 @@ def main():
# Send events every second. State should not be updated as often
time.sleep(1 if args.message_type == 'event' else 5)

# End the network loop and finish.
client.loop_stop()
print('Finished.')
# [END iot_mqtt_run]

Expand Down
7 changes: 7 additions & 0 deletions iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def test_event(test_topic, capsys):
rsa_private_path, 'RS256', ca_cert_path,
'mqtt.googleapis.com', 443)

client.loop_start()
client.publish(mqtt_topic, 'just test', qos=1)
time.sleep(2)
client.loop_stop()
Expand Down Expand Up @@ -115,7 +116,10 @@ def test_state(test_topic, capsys):
rsa_private_path, 'RS256', ca_cert_path,
'mqtt.googleapis.com', 443)
client.publish(mqtt_topic, 'state test', qos=1)
client.loop_start()

time.sleep(3)

client.loop_stop()

manager.get_state(
Expand Down Expand Up @@ -152,7 +156,10 @@ def test_config(test_topic, capsys):
project_id, cloud_region, registry_id, device_id,
rsa_private_path, 'RS256', ca_cert_path,
'mqtt.googleapis.com', 443)
client.loop_start()

time.sleep(5)

client.loop_stop()

manager.get_state(
Expand Down

0 comments on commit b8f73b6

Please sign in to comment.