diff --git a/iot/api-client/manager/requirements.txt b/iot/api-client/manager/requirements.txt index d8aaefea5805..21bf382a51dd 100644 --- a/iot/api-client/manager/requirements.txt +++ b/iot/api-client/manager/requirements.txt @@ -1,5 +1,5 @@ cryptography==2.4.2 -flaky==3.4.0 +flaky==3.5.3 gcp-devrel-py-tools==0.0.15 google-api-python-client==1.7.5 google-auth-httplib2==0.0.3 diff --git a/iot/api-client/mqtt_example/cloudiot_mqtt_example.py b/iot/api-client/mqtt_example/cloudiot_mqtt_example.py index d4469554af68..ed273d7f7804 100644 --- a/iot/api-client/mqtt_example/cloudiot_mqtt_example.py +++ b/iot/api-client/mqtt_example/cloudiot_mqtt_example.py @@ -24,6 +24,7 @@ # [START iot_mqtt_includes] import argparse import datetime +import logging import os import random import ssl @@ -33,6 +34,8 @@ import paho.mqtt.client as mqtt # [END iot_mqtt_includes] +logging.getLogger('googleapiclient.discovery_cache').setLevel(logging.CRITICAL) + # The initial backoff time after a disconnection occurs, in seconds. minimum_backoff_time = 1 @@ -169,37 +172,193 @@ def get_client( # [END iot_mqtt_config] +def detach_device(client, device_id): + """Detach the device from the gateway.""" + # [START detach_device] + detach_topic = '/devices/{}/detach'.format(device_id) + print('Detaching: {}'.format(detach_topic)) + client.publish(detach_topic, '{}', qos=1) + # [END detach_device] + + +def attach_device(client, device_id, auth): + """Attach the device to the gateway.""" + # [START attach_device] + attach_topic = '/devices/{}/attach'.format(device_id) + attach_payload = '{{"authorization" : "{}"}}'.format(auth) + client.publish(attach_topic, attach_payload, qos=1) + # [END attach_device] + + +def listen_for_messages( + service_account_json, project_id, cloud_region, registry_id, device_id, + gateway_id, num_messages, private_key_file, algorithm, ca_certs, + mqtt_bridge_hostname, mqtt_bridge_port, jwt_expires_minutes, duration, + cb=None): + """Listens for messages sent to the gateway and bound devices.""" + # [START listen_for_messages] + global minimum_backoff_time + + jwt_iat = datetime.datetime.utcnow() + jwt_exp_mins = jwt_expires_minutes + # Use gateway to connect to server + client = get_client( + project_id, cloud_region, registry_id, gateway_id, + private_key_file, algorithm, ca_certs, mqtt_bridge_hostname, + mqtt_bridge_port) + + attach_device(client, device_id, '') + print('Waiting for device to attach.') + time.sleep(5) + + # The topic devices receive configuration updates on. + device_config_topic = '/devices/{}/config'.format(device_id) + client.subscribe(device_config_topic, qos=1) + + # The topic gateways receive configuration updates on. + gateway_config_topic = '/devices/{}/config'.format(gateway_id) + client.subscribe(gateway_config_topic, qos=1) + + # The topic gateways receive error updates on. QoS must be 0. + error_topic = '/devices/{}/errors'.format(gateway_id) + client.subscribe(error_topic, qos=0) + + # Wait for about a minute for config messages. + for i in range(1, duration): + client.loop() + if cb is not None: + cb(client) + + if should_backoff: + # If backoff time is too large, give up. + if minimum_backoff_time > MAXIMUM_BACKOFF_TIME: + print('Exceeded maximum backoff time. Giving up.') + break + + delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0 + time.sleep(delay) + minimum_backoff_time *= 2 + client.connect(mqtt_bridge_hostname, mqtt_bridge_port) + + seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds + if seconds_since_issue > 60 * jwt_exp_mins: + print('Refreshing token after {}s').format(seconds_since_issue) + jwt_iat = datetime.datetime.utcnow() + client = get_client( + project_id, cloud_region, registry_id, gateway_id, + private_key_file, algorithm, ca_certs, mqtt_bridge_hostname, + mqtt_bridge_port) + + time.sleep(1) + + detach_device(client, device_id) + + print('Finished.') + # [END listen_for_messages] + + +def send_data_from_bound_device( + service_account_json, project_id, cloud_region, registry_id, device_id, + gateway_id, num_messages, private_key_file, algorithm, ca_certs, + mqtt_bridge_hostname, mqtt_bridge_port, jwt_expires_minutes, payload): + """Sends data from a gateway on behalf of a device that is bound to it.""" + # [START send_data_from_bound_device] + global minimum_backoff_time + + # Publish device events and gateway state. + device_topic = '/devices/{}/{}'.format(device_id, 'state') + gateway_topic = '/devices/{}/{}'.format(gateway_id, 'state') + + jwt_iat = datetime.datetime.utcnow() + jwt_exp_mins = jwt_expires_minutes + # Use gateway to connect to server + client = get_client( + project_id, cloud_region, registry_id, gateway_id, + private_key_file, algorithm, ca_certs, mqtt_bridge_hostname, + mqtt_bridge_port) + + attach_device(client, device_id, '') + print('Waiting for device to attach.') + time.sleep(5) + + # Publish state to gateway topic + gateway_state = 'Starting gateway at: {}'.format(time.time()) + print(gateway_state) + client.publish(gateway_topic, gateway_state, qos=1) + + # Publish num_messages mesages to the MQTT bridge + for i in range(1, num_messages + 1): + client.loop() + + if should_backoff: + # If backoff time is too large, give up. + if minimum_backoff_time > MAXIMUM_BACKOFF_TIME: + print('Exceeded maximum backoff time. Giving up.') + break + + delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0 + time.sleep(delay) + minimum_backoff_time *= 2 + client.connect(mqtt_bridge_hostname, mqtt_bridge_port) + + payload = '{}/{}-{}-payload-{}'.format( + registry_id, gateway_id, device_id, i) + + print('Publishing message {}/{}: \'{}\' to {}'.format( + i, num_messages, payload, device_topic)) + client.publish( + device_topic, '{} : {}'.format(device_id, payload), qos=1) + + seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds + if seconds_since_issue > 60 * jwt_exp_mins: + print('Refreshing token after {}s').format(seconds_since_issue) + jwt_iat = datetime.datetime.utcnow() + client = get_client( + project_id, cloud_region, registry_id, gateway_id, + private_key_file, algorithm, ca_certs, mqtt_bridge_hostname, + mqtt_bridge_port) + + time.sleep(5) + + detach_device(client, device_id) + + print('Finished.') + # [END send_data_from_bound_device] + + def parse_command_line_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser(description=( 'Example Google Cloud IoT Core MQTT device connection code.')) - parser.add_argument( - '--project_id', - default=os.environ.get('GOOGLE_CLOUD_PROJECT'), - help='GCP cloud project name') - parser.add_argument( - '--registry_id', required=True, help='Cloud IoT Core registry id') - parser.add_argument( - '--device_id', required=True, help='Cloud IoT Core device id') - parser.add_argument( - '--private_key_file', - required=True, help='Path to private key file.') parser.add_argument( '--algorithm', choices=('RS256', 'ES256'), required=True, help='Which encryption algorithm to use to generate the JWT.') - parser.add_argument( - '--cloud_region', default='us-central1', help='GCP cloud region') parser.add_argument( '--ca_certs', default='roots.pem', help=('CA root from https://pki.google.com/roots.pem')) parser.add_argument( - '--num_messages', + '--cloud_region', default='us-central1', help='GCP cloud region') + parser.add_argument( + '--data', + default='Hello there', + help='The telemetry data sent on behalf of a device') + parser.add_argument( + '--device_id', required=True, help='Cloud IoT Core device id') + parser.add_argument( + '--gateway_id', required=False, help='Gateway identifier.') + parser.add_argument( + '--jwt_expires_minutes', + default=20, type=int, - default=100, - help='Number of messages to publish.') + help=('Expiration time, in minutes, for JWT tokens.')) + parser.add_argument( + '--listen_dur', + default=60, + type=int, + help='Duration (seconds) to listen for configuration messages') parser.add_argument( '--message_type', choices=('event', 'state'), @@ -217,19 +376,48 @@ def parse_command_line_args(): type=int, help='MQTT bridge port.') parser.add_argument( - '--jwt_expires_minutes', - default=20, + '--num_messages', type=int, - help=('Expiration time, in minutes, for JWT tokens.')) + default=100, + help='Number of messages to publish.') + parser.add_argument( + '--private_key_file', + required=True, + help='Path to private key file.') + parser.add_argument( + '--project_id', + default=os.environ.get('GOOGLE_CLOUD_PROJECT'), + help='GCP cloud project name') + parser.add_argument( + '--registry_id', required=True, help='Cloud IoT Core registry id') + parser.add_argument( + '--service_account_json', + default=os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"), + help='Path to service account json file.') + + # Command subparser + command = parser.add_subparsers(dest='command') + + command.add_parser( + 'device_demo', + help=mqtt_device_demo.__doc__) + + command.add_parser( + 'gateway_send', + help=send_data_from_bound_device.__doc__) + + command.add_parser( + 'gateway_listen', + help=listen_for_messages.__doc__) return parser.parse_args() -# [START iot_mqtt_run] -def main(): +def mqtt_device_demo(args): + """Connects a device, sends data, and receives data.""" + # [START iot_mqtt_run] global minimum_backoff_time - - args = parse_command_line_args() + global MAXIMUM_BACKOFF_TIME # Publish to the events or state topic based on the flag. sub_topic = 'events' if args.message_type == 'event' else 'state' @@ -239,9 +427,9 @@ def main(): jwt_iat = datetime.datetime.utcnow() jwt_exp_mins = args.jwt_expires_minutes client = get_client( - args.project_id, args.cloud_region, args.registry_id, args.device_id, - args.private_key_file, args.algorithm, args.ca_certs, - args.mqtt_bridge_hostname, args.mqtt_bridge_port) + args.project_id, args.cloud_region, args.registry_id, + args.device_id, args.private_key_file, args.algorithm, + args.ca_certs, args.mqtt_bridge_hostname, args.mqtt_bridge_port) # Publish num_messages mesages to the MQTT bridge once per second. for i in range(1, args.num_messages + 1): @@ -284,9 +472,32 @@ def main(): # Send events every second. State should not be updated as often time.sleep(1 if args.message_type == 'event' else 5) + # [END iot_mqtt_run] + +def main(): + args = parse_command_line_args() + + if args.command == 'gateway_listen': + listen_for_messages( + args.service_account_json, args.project_id, + args.cloud_region, args.registry_id, args.device_id, + args.gateway_id, args.num_messages, args.private_key_file, + args.algorithm, args.ca_certs, args.mqtt_bridge_hostname, + args.mqtt_bridge_port, args.jwt_expires_minutes, + args.listen_dur) + return + elif args.command == 'gateway_send': + send_data_from_bound_device( + args.service_account_json, args.project_id, + args.cloud_region, args.registry_id, args.device_id, + args.gateway_id, args.num_messages, args.private_key_file, + args.algorithm, args.ca_certs, args.mqtt_bridge_hostname, + args.mqtt_bridge_port, args.jwt_expires_minutes, args.data) + return + else: + mqtt_device_demo(args) print('Finished.') -# [END iot_mqtt_run] if __name__ == '__main__': diff --git a/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py b/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py index a11f1f3efdf3..fa48ac88dad8 100644 --- a/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py +++ b/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py @@ -41,6 +41,9 @@ pubsub_topic = 'projects/{}/topics/{}'.format(project_id, topic_id) registry_id = 'test-registry-{}'.format(int(time.time())) +mqtt_bridge_hostname = 'mqtt.googleapis.com' +mqtt_bridge_port = 443 + @pytest.fixture(scope='module') def test_topic(): @@ -229,3 +232,142 @@ def test_receive_command(capsys): out, _ = capsys.readouterr() assert 'on_connect' in out # Verify can connect assert '\'me want cookies\'' in out # Verify can receive command + + +@flaky +def test_gateway_listen_for_bound_device_configs(test_topic, capsys): + gateway_id = device_id_template.format('RS256') + device_id = device_id_template.format('noauthbind') + manager.create_registry( + service_account_json, project_id, cloud_region, pubsub_topic, + registry_id) + manager.create_gateway( + service_account_json, project_id, cloud_region, registry_id, + None, gateway_id, rsa_cert_path, 'RS256') + manager.create_device( + service_account_json, project_id, cloud_region, registry_id, + device_id) + manager.bind_device_to_gateway( + service_account_json, project_id, cloud_region, registry_id, + device_id, gateway_id) + + # Setup for listening for config messages + num_messages = 0 + jwt_exp_time = 60 + listen_time = 30 + + # Connect the gateway + cloudiot_mqtt_example.listen_for_messages( + service_account_json, project_id, cloud_region, registry_id, + device_id, gateway_id, num_messages, rsa_private_path, + 'RS256', ca_cert_path, mqtt_bridge_hostname, mqtt_bridge_port, + jwt_exp_time, listen_time, None) + + # Clean up + manager.unbind_device_from_gateway( + service_account_json, project_id, cloud_region, registry_id, + device_id, gateway_id) + manager.delete_device( + service_account_json, project_id, cloud_region, registry_id, + device_id) + manager.delete_device( + service_account_json, project_id, cloud_region, registry_id, + gateway_id) + manager.delete_registry( + service_account_json, project_id, cloud_region, registry_id) + + out, _ = capsys.readouterr() + assert 'Received message' in out + + +def test_gateway_send_data_for_device(test_topic, capsys): + gateway_id = device_id_template.format('RS256') + device_id = device_id_template.format('noauthbind') + manager.create_registry( + service_account_json, project_id, cloud_region, pubsub_topic, + registry_id) + manager.create_gateway( + service_account_json, project_id, cloud_region, registry_id, + None, gateway_id, rsa_cert_path, 'RS256') + manager.create_device( + service_account_json, project_id, cloud_region, registry_id, + device_id) + manager.bind_device_to_gateway( + service_account_json, project_id, cloud_region, registry_id, + device_id, gateway_id) + + # Setup for listening for config messages + num_messages = 5 + jwt_exp_time = 60 + listen_time = 20 + + # Connect the gateway + cloudiot_mqtt_example.send_data_from_bound_device( + service_account_json, project_id, cloud_region, registry_id, + device_id, gateway_id, num_messages, rsa_private_path, + 'RS256', ca_cert_path, mqtt_bridge_hostname, mqtt_bridge_port, + jwt_exp_time, listen_time) + + # Clean up + manager.unbind_device_from_gateway( + service_account_json, project_id, cloud_region, registry_id, + device_id, gateway_id) + manager.delete_device( + service_account_json, project_id, cloud_region, registry_id, + device_id) + manager.delete_device( + service_account_json, project_id, cloud_region, registry_id, + gateway_id) + manager.delete_registry( + service_account_json, project_id, cloud_region, registry_id) + + out, _ = capsys.readouterr() + assert 'Publishing message 5/5' in out + assert 'Out of memory' not in out # Indicates could not connect + + +def test_gateway_trigger_error_topic(test_topic, capsys): + gateway_id = device_id_template.format('RS256-err') + device_id = device_id_template.format('noauthbind') + manager.create_registry( + service_account_json, project_id, cloud_region, pubsub_topic, + registry_id) + manager.create_gateway( + service_account_json, project_id, cloud_region, registry_id, + None, gateway_id, rsa_cert_path, 'RS256') + manager.create_device( + service_account_json, project_id, cloud_region, registry_id, + device_id) + manager.bind_device_to_gateway( + service_account_json, project_id, cloud_region, registry_id, + device_id, gateway_id) + + # Setup for listening for config messages + num_messages = 4 + + # Hardcoded callback for causing an error + def trigger_error(client): + cloudiot_mqtt_example.attach_device(client, 'invalid_device_id', '') + + # Connect the gateway + cloudiot_mqtt_example.listen_for_messages( + service_account_json, project_id, cloud_region, registry_id, + device_id, gateway_id, num_messages, rsa_private_path, + 'RS256', ca_cert_path, 'mqtt.googleapis.com', 443, + 20, 15, trigger_error) + + # Clean up + manager.unbind_device_from_gateway( + service_account_json, project_id, cloud_region, registry_id, + device_id, gateway_id) + manager.delete_device( + service_account_json, project_id, cloud_region, registry_id, + device_id) + manager.delete_device( + service_account_json, project_id, cloud_region, registry_id, + gateway_id) + manager.delete_registry( + service_account_json, project_id, cloud_region, registry_id) + + out, _ = capsys.readouterr() + assert 'GATEWAY_ATTACHMENT_ERROR' in out diff --git a/iot/api-client/mqtt_example/requirements.txt b/iot/api-client/mqtt_example/requirements.txt index 15887683155c..e8dd59ae7661 100644 --- a/iot/api-client/mqtt_example/requirements.txt +++ b/iot/api-client/mqtt_example/requirements.txt @@ -1,5 +1,5 @@ cryptography==2.4.2 -flaky==3.4.0 +flaky==3.5.3 gcp-devrel-py-tools==0.0.15 google-api-python-client==1.7.5 google-auth-httplib2==0.0.3