From 2466d530750b9247569e1bb8e8436e62c9265ad1 Mon Sep 17 00:00:00 2001 From: Gus Class Date: Mon, 5 Feb 2018 14:13:45 -0800 Subject: [PATCH 1/3] Adds exponential backoff to MQTT, fixes docstring in manager, updates README. --- iot/api-client/manager/README.rst | 16 +++++-- iot/api-client/manager/manager.py | 2 +- .../mqtt_example/cloudiot_mqtt_example.py | 46 ++++++++++++++++--- 3 files changed, 54 insertions(+), 10 deletions(-) diff --git a/iot/api-client/manager/README.rst b/iot/api-client/manager/README.rst index e432b8d38654..75ce31c196be 100644 --- a/iot/api-client/manager/README.rst +++ b/iot/api-client/manager/README.rst @@ -77,8 +77,8 @@ To run this sample: [--project_id PROJECT_ID] [--registry_id REGISTRY_ID] [--rsa_certificate_file RSA_CERTIFICATE_FILE] [--service_account_json SERVICE_ACCOUNT_JSON] - [--version VERSION] - {create-es256,create-registry,create-rsa256,create-topic,create-unauth,delete-device,delete-registry,get,get-registry,get-state,list,list-registries,patch-es256,patch-rs256,set-config} + [--version VERSION] [--member MEMBER] [--role ROLE] + {create-es256,create-registry,create-rsa256,create-topic,create-unauth,delete-device,delete-registry,get,get-config-versions,get-iam-permissions,get-registry,get-state,list,list-registries,patch-es256,patch-rs256,set-config,set-iam-permissions} ... Example of using the Google Cloud IoT Core device manager to administer @@ -95,7 +95,7 @@ To run this sample: list positional arguments: - {create-es256,create-registry,create-rsa256,create-topic,create-unauth,delete-device,delete-registry,get,get-registry,get-state,list,list-registries,patch-es256,patch-rs256,set-config} + {create-es256,create-registry,create-rsa256,create-topic,create-unauth,delete-device,delete-registry,get,get-config-versions,get-iam-permissions,get-registry,get-state,list,list-registries,patch-es256,patch-rs256,set-config,set-iam-permissions} create-es256 Create a new device with the given id, using ES256 for authentication. create-registry Gets or creates a device registry. @@ -107,6 +107,11 @@ To run this sample: delete-device Delete the device with the given id. delete-registry Deletes the specified registry. get Retrieve the device with the given id. + get-config-versions + Lists versions of a device config in descending order + (newest first). + get-iam-permissions + Retrieves IAM permissions for the given registry. get-registry Retrieves a device registry. get-state Retrieve a device's state blobs. list List all devices in the registry. @@ -117,6 +122,9 @@ To run this sample: device. set-config Patch the device to add an RSA256 public key to the device. + set-iam-permissions + Sets IAM permissions for the given registry to a + single role/member. optional arguments: -h, --help show this help message and exit @@ -139,6 +147,8 @@ To run this sample: --service_account_json SERVICE_ACCOUNT_JSON Path to service account json file. --version VERSION Version number for setting device configuration. + --member MEMBER Member used for IAM commands. + --role ROLE Role used for IAM commands. diff --git a/iot/api-client/manager/manager.py b/iot/api-client/manager/manager.py index 08137139f538..f8b954ca0bb2 100644 --- a/iot/api-client/manager/manager.py +++ b/iot/api-client/manager/manager.py @@ -438,7 +438,7 @@ def get_iam_permissions( def set_iam_permissions( service_account_json, project_id, cloud_region, registry_id, role, member): - """Retrieves IAM permissions for the given registry.""" + """Sets IAM permissions for the given registry to a single role/member.""" client = get_client(service_account_json) registry_path = 'projects/{}/locations/{}/registries/{}'.format( diff --git a/iot/api-client/mqtt_example/cloudiot_mqtt_example.py b/iot/api-client/mqtt_example/cloudiot_mqtt_example.py index e0450f6e943d..33425b13182a 100644 --- a/iot/api-client/mqtt_example/cloudiot_mqtt_example.py +++ b/iot/api-client/mqtt_example/cloudiot_mqtt_example.py @@ -24,12 +24,22 @@ import argparse import datetime import os +import random import time import jwt import paho.mqtt.client as mqtt +# The initial backoff time after a disconnection occurs, in seconds. +minimum_backoff_time = 1 + +# The maximum backoff time before giving up, in seconds. +MAXIMUM_BACKOFF_TIME = 32 + +# Whether to wait with exponential backoff before publishing. +should_backoff = False + # [START iot_mqtt_jwt] def create_jwt(project_id, private_key_file, algorithm): """Creates a JWT (https://jwt.io) to establish an MQTT connection. @@ -76,11 +86,22 @@ def on_connect(unused_client, unused_userdata, unused_flags, rc): """Callback for when a device connects.""" print('on_connect', mqtt.connack_string(rc)) + # After a successful connect, reset backoff time and stop backing off. + global should_backoff + global minimum_backoff_time + should_backoff = False + minimum_backoff_time = 1 + def on_disconnect(unused_client, unused_userdata, rc): """Paho callback for when a device disconnects.""" print('on_disconnect', error_str(rc)) + # Since a disconnect occurred, the next loop iteration will wait with + # exponential backoff. + global should_backoff + should_backoff = True + def on_publish(unused_client, unused_userdata, unused_mid): """Paho callback when a message is sent to the broker.""" @@ -134,9 +155,6 @@ def get_client( # Subscribe to the config topic. client.subscribe(mqtt_config_topic, qos=1) - # Start the network loop. - client.loop_start() - return client # [END iot_mqtt_config] @@ -199,6 +217,8 @@ def parse_command_line_args(): # [START iot_mqtt_run] def main(): + global minimum_backoff_time + args = parse_command_line_args() # Publish to the events or state topic based on the flag. @@ -215,6 +235,23 @@ def main(): # Publish num_messages mesages to the MQTT bridge once per second. for i in range(1, args.num_messages + 1): + # Process network events. + client.loop() + + # Wait if backoff is required. + if should_backoff: + # If backoff time is too large, give up. + if minimum_backoff_time > MAXIMUM_BACKOFF_TIME: + print('Exceeded maximum backoff time. Giving up.') + break + + # Otherwise, wait and connect again. + delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0 + print('Waiting for ', str(delay), ' before reconnecting.') + time.sleep(delay) + minimum_backoff_time *= 2 + client.connect(args.mqtt_bridge_hostname, args.mqtt_bridge_port) + payload = '{}/{}-payload-{}'.format( args.registry_id, args.device_id, i) print('Publishing message {}/{}: \'{}\''.format( @@ -223,7 +260,6 @@ def main(): seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds if seconds_since_issue > 60 * jwt_exp_mins: print('Refreshing token after {}s').format(seconds_since_issue) - client.loop_stop() jwt_iat = datetime.datetime.utcnow() client = get_client( args.project_id, args.cloud_region, @@ -239,8 +275,6 @@ def main(): # Send events every second. State should not be updated as often time.sleep(1 if args.message_type == 'event' else 5) - # End the network loop and finish. - client.loop_stop() print('Finished.') # [END iot_mqtt_run] From d6b3efcb02721002be4cee18db9cb27cb45940cd Mon Sep 17 00:00:00 2001 From: Gus Class Date: Tue, 6 Feb 2018 09:54:08 -0800 Subject: [PATCH 2/3] Lint and string format > concat --- iot/api-client/mqtt_example/cloudiot_mqtt_example.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/iot/api-client/mqtt_example/cloudiot_mqtt_example.py b/iot/api-client/mqtt_example/cloudiot_mqtt_example.py index 33425b13182a..8302aec59fa2 100644 --- a/iot/api-client/mqtt_example/cloudiot_mqtt_example.py +++ b/iot/api-client/mqtt_example/cloudiot_mqtt_example.py @@ -40,6 +40,7 @@ # Whether to wait with exponential backoff before publishing. should_backoff = False + # [START iot_mqtt_jwt] def create_jwt(project_id, private_key_file, algorithm): """Creates a JWT (https://jwt.io) to establish an MQTT connection. @@ -247,7 +248,7 @@ def main(): # Otherwise, wait and connect again. delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0 - print('Waiting for ', str(delay), ' before reconnecting.') + print('Waiting for {} before reconnecting.'.format(delay)) time.sleep(delay) minimum_backoff_time *= 2 client.connect(args.mqtt_bridge_hostname, args.mqtt_bridge_port) From 567d9527d439408f615fa36693dd223d970fee7e Mon Sep 17 00:00:00 2001 From: Gus Class Date: Tue, 6 Feb 2018 11:08:10 -0800 Subject: [PATCH 3/3] Fixes tests to start the client loop and actually do stuff again. --- iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py b/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py index 8c5d9d0ae4b1..770f306e6423 100644 --- a/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py +++ b/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py @@ -74,6 +74,7 @@ def test_event(test_topic, capsys): rsa_private_path, 'RS256', ca_cert_path, 'mqtt.googleapis.com', 443) + client.loop_start() client.publish(mqtt_topic, 'just test', qos=1) time.sleep(2) client.loop_stop() @@ -115,7 +116,10 @@ def test_state(test_topic, capsys): rsa_private_path, 'RS256', ca_cert_path, 'mqtt.googleapis.com', 443) client.publish(mqtt_topic, 'state test', qos=1) + client.loop_start() + time.sleep(3) + client.loop_stop() manager.get_state( @@ -152,7 +156,10 @@ def test_config(test_topic, capsys): project_id, cloud_region, registry_id, device_id, rsa_private_path, 'RS256', ca_cert_path, 'mqtt.googleapis.com', 443) + client.loop_start() + time.sleep(5) + client.loop_stop() manager.get_state(