Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT Exponential backoff and manager updates #1345

Merged
merged 3 commits into from
Feb 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions iot/api-client/manager/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ To run this sample:
[--project_id PROJECT_ID] [--registry_id REGISTRY_ID]
[--rsa_certificate_file RSA_CERTIFICATE_FILE]
[--service_account_json SERVICE_ACCOUNT_JSON]
[--version VERSION]
{create-es256,create-registry,create-rsa256,create-topic,create-unauth,delete-device,delete-registry,get,get-registry,get-state,list,list-registries,patch-es256,patch-rs256,set-config}
[--version VERSION] [--member MEMBER] [--role ROLE]
{create-es256,create-registry,create-rsa256,create-topic,create-unauth,delete-device,delete-registry,get,get-config-versions,get-iam-permissions,get-registry,get-state,list,list-registries,patch-es256,patch-rs256,set-config,set-iam-permissions}
...
Example of using the Google Cloud IoT Core device manager to administer
Expand All @@ -95,7 +95,7 @@ To run this sample:
list
positional arguments:
{create-es256,create-registry,create-rsa256,create-topic,create-unauth,delete-device,delete-registry,get,get-registry,get-state,list,list-registries,patch-es256,patch-rs256,set-config}
{create-es256,create-registry,create-rsa256,create-topic,create-unauth,delete-device,delete-registry,get,get-config-versions,get-iam-permissions,get-registry,get-state,list,list-registries,patch-es256,patch-rs256,set-config,set-iam-permissions}
create-es256 Create a new device with the given id, using ES256 for
authentication.
create-registry Gets or creates a device registry.
Expand All @@ -107,6 +107,11 @@ To run this sample:
delete-device Delete the device with the given id.
delete-registry Deletes the specified registry.
get Retrieve the device with the given id.
get-config-versions
Lists versions of a device config in descending order
(newest first).
get-iam-permissions
Retrieves IAM permissions for the given registry.
get-registry Retrieves a device registry.
get-state Retrieve a device's state blobs.
list List all devices in the registry.
Expand All @@ -117,6 +122,9 @@ To run this sample:
device.
set-config Patch the device to add an RSA256 public key to the
device.
set-iam-permissions
Sets IAM permissions for the given registry to a
single role/member.
optional arguments:
-h, --help show this help message and exit
Expand All @@ -139,6 +147,8 @@ To run this sample:
--service_account_json SERVICE_ACCOUNT_JSON
Path to service account json file.
--version VERSION Version number for setting device configuration.
--member MEMBER Member used for IAM commands.
--role ROLE Role used for IAM commands.
Expand Down
2 changes: 1 addition & 1 deletion iot/api-client/manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ def get_iam_permissions(
def set_iam_permissions(
service_account_json, project_id, cloud_region, registry_id, role,
member):
"""Retrieves IAM permissions for the given registry."""
"""Sets IAM permissions for the given registry to a single role/member."""
client = get_client(service_account_json)

registry_path = 'projects/{}/locations/{}/registries/{}'.format(
Expand Down
47 changes: 41 additions & 6 deletions iot/api-client/mqtt_example/cloudiot_mqtt_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,23 @@
import argparse
import datetime
import os
import random
import time

import jwt
import paho.mqtt.client as mqtt


# The initial backoff time after a disconnection occurs, in seconds.
minimum_backoff_time = 1

# The maximum backoff time before giving up, in seconds.
MAXIMUM_BACKOFF_TIME = 32

# Whether to wait with exponential backoff before publishing.
should_backoff = False


# [START iot_mqtt_jwt]
def create_jwt(project_id, private_key_file, algorithm):
"""Creates a JWT (https://jwt.io) to establish an MQTT connection.
Expand Down Expand Up @@ -76,11 +87,22 @@ def on_connect(unused_client, unused_userdata, unused_flags, rc):
"""Callback for when a device connects."""
print('on_connect', mqtt.connack_string(rc))

# After a successful connect, reset backoff time and stop backing off.
global should_backoff
global minimum_backoff_time
should_backoff = False
minimum_backoff_time = 1


def on_disconnect(unused_client, unused_userdata, rc):
"""Paho callback for when a device disconnects."""
print('on_disconnect', error_str(rc))

# Since a disconnect occurred, the next loop iteration will wait with
# exponential backoff.
global should_backoff
should_backoff = True


def on_publish(unused_client, unused_userdata, unused_mid):
"""Paho callback when a message is sent to the broker."""
Expand Down Expand Up @@ -134,9 +156,6 @@ def get_client(
# Subscribe to the config topic.
client.subscribe(mqtt_config_topic, qos=1)

# Start the network loop.
client.loop_start()

return client
# [END iot_mqtt_config]

Expand Down Expand Up @@ -199,6 +218,8 @@ def parse_command_line_args():

# [START iot_mqtt_run]
def main():
global minimum_backoff_time

args = parse_command_line_args()

# Publish to the events or state topic based on the flag.
Expand All @@ -215,6 +236,23 @@ def main():

# Publish num_messages mesages to the MQTT bridge once per second.
for i in range(1, args.num_messages + 1):
# Process network events.
client.loop()

# Wait if backoff is required.
if should_backoff:
# If backoff time is too large, give up.
if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
print('Exceeded maximum backoff time. Giving up.')
break

# Otherwise, wait and connect again.
delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
print('Waiting for {} before reconnecting.'.format(delay))
time.sleep(delay)
minimum_backoff_time *= 2
client.connect(args.mqtt_bridge_hostname, args.mqtt_bridge_port)

payload = '{}/{}-payload-{}'.format(
args.registry_id, args.device_id, i)
print('Publishing message {}/{}: \'{}\''.format(
Expand All @@ -223,7 +261,6 @@ def main():
seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
if seconds_since_issue > 60 * jwt_exp_mins:
print('Refreshing token after {}s').format(seconds_since_issue)
client.loop_stop()
jwt_iat = datetime.datetime.utcnow()
client = get_client(
args.project_id, args.cloud_region,
Expand All @@ -239,8 +276,6 @@ def main():
# Send events every second. State should not be updated as often
time.sleep(1 if args.message_type == 'event' else 5)

# End the network loop and finish.
client.loop_stop()
print('Finished.')
# [END iot_mqtt_run]

Expand Down
7 changes: 7 additions & 0 deletions iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def test_event(test_topic, capsys):
rsa_private_path, 'RS256', ca_cert_path,
'mqtt.googleapis.com', 443)

client.loop_start()
client.publish(mqtt_topic, 'just test', qos=1)
time.sleep(2)
client.loop_stop()
Expand Down Expand Up @@ -115,7 +116,10 @@ def test_state(test_topic, capsys):
rsa_private_path, 'RS256', ca_cert_path,
'mqtt.googleapis.com', 443)
client.publish(mqtt_topic, 'state test', qos=1)
client.loop_start()

time.sleep(3)

client.loop_stop()

manager.get_state(
Expand Down Expand Up @@ -152,7 +156,10 @@ def test_config(test_topic, capsys):
project_id, cloud_region, registry_id, device_id,
rsa_private_path, 'RS256', ca_cert_path,
'mqtt.googleapis.com', 443)
client.loop_start()

time.sleep(5)

client.loop_stop()

manager.get_state(
Expand Down