From 3a511820471729b59835980ab9eaa56368a5b0e2 Mon Sep 17 00:00:00 2001 From: Gus Class Date: Thu, 24 Jan 2019 16:00:50 -0800 Subject: [PATCH 1/5] Migrates Gateway code to MQTT example --- iot/api-client/manager/requirements.txt | 2 +- .../mqtt_example/cloudiot_mqtt_example.py | 275 ++++++++++++++++-- .../cloudiot_mqtt_example_test.py | 149 ++++++++++ iot/api-client/mqtt_example/requirements.txt | 2 +- 4 files changed, 399 insertions(+), 29 deletions(-) diff --git a/iot/api-client/manager/requirements.txt b/iot/api-client/manager/requirements.txt index d8aaefea5805..21bf382a51dd 100644 --- a/iot/api-client/manager/requirements.txt +++ b/iot/api-client/manager/requirements.txt @@ -1,5 +1,5 @@ cryptography==2.4.2 -flaky==3.4.0 +flaky==3.5.3 gcp-devrel-py-tools==0.0.15 google-api-python-client==1.7.5 google-auth-httplib2==0.0.3 diff --git a/iot/api-client/mqtt_example/cloudiot_mqtt_example.py b/iot/api-client/mqtt_example/cloudiot_mqtt_example.py index d4469554af68..9726e0564a16 100644 --- a/iot/api-client/mqtt_example/cloudiot_mqtt_example.py +++ b/iot/api-client/mqtt_example/cloudiot_mqtt_example.py @@ -169,37 +169,200 @@ def get_client( # [END iot_mqtt_config] +def detach_device(client, device_id, mqtt_bridge_hostname, mqtt_bridge_port): + """Detach the device from the gateway.""" + # [START detach_device] + detach_topic = '/devices/{}/detach'.format(device_id) + print('Detaching: {}'.format(detach_topic)) + client.loop() + client.connect(mqtt_bridge_hostname, mqtt_bridge_port) + client.publish(detach_topic, '{}', qos=1) + time.sleep(5) # wait for the server to respond / will trigger callback + # [END detach_device] + + +def attach_device(client, device_id, mqtt_bridge_hostname, mqtt_bridge_port): + """Attach the device to the gateway.""" + # [START attach_device] + attach_topic = '/devices/{}/attach'.format(device_id) + print('Attaching: {}'.format(attach_topic)) + # TODO {'authorization': ''} + attach_payload = '{}' + client.loop() + client.publish(attach_topic, attach_payload, qos=1) + time.sleep(5) + # [END attach_device] + + +def listen_for_messages( + service_account_json, project_id, cloud_region, registry_id, device_id, + gateway_id, num_messages, private_key_file, algorithm, ca_certs, + mqtt_bridge_hostname, mqtt_bridge_port, jwt_expires_minutes, duration, + cb=None): + """Listens for messages sent to the gateway and bound devices.""" + # [START listen_for_config_messages] + global minimum_backoff_time + + jwt_iat = datetime.datetime.utcnow() + jwt_exp_mins = jwt_expires_minutes + # Use gateway to connect to server + client = get_client( + project_id, cloud_region, registry_id, gateway_id, + private_key_file, algorithm, ca_certs, mqtt_bridge_hostname, + mqtt_bridge_port) + + attach_device(client, device_id, mqtt_bridge_hostname, mqtt_bridge_port) + print('Waiting for device to attach.') + time.sleep(5) + + # The topic devices receive configuration updates on. + device_config_topic = '/devices/{}/config'.format(device_id) + client.subscribe(device_config_topic, qos=1) + + # The topic gateways receive configuration updates on. + gateway_config_topic = '/devices/{}/config'.format(gateway_id) + client.subscribe(gateway_config_topic, qos=1) + + # The topic gateways receive error updates on. QoS must be 0. + error_topic = '/devices/{}/errors'.format(gateway_id) + client.subscribe(error_topic, qos=0) + + # Wait for about a minute for config messages. + for i in range(1, duration): + client.loop() + if cb is not None: + cb(client) + + if should_backoff: + # If backoff time is too large, give up. + if minimum_backoff_time > MAXIMUM_BACKOFF_TIME: + print('Exceeded maximum backoff time. Giving up.') + break + + delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0 + time.sleep(delay) + minimum_backoff_time *= 2 + client.connect(mqtt_bridge_hostname, mqtt_bridge_port) + + seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds + if seconds_since_issue > 60 * jwt_exp_mins: + print('Refreshing token after {}s').format(seconds_since_issue) + jwt_iat = datetime.datetime.utcnow() + client = get_client( + project_id, cloud_region, registry_id, gateway_id, + private_key_file, algorithm, ca_certs, mqtt_bridge_hostname, + mqtt_bridge_port) + + time.sleep(1) + + detach_device(client, device_id, mqtt_bridge_hostname, mqtt_bridge_port) + + print('Finished.') + # [END listen_for_config_messages] + + +def send_data_from_bound_device( + service_account_json, project_id, cloud_region, registry_id, device_id, + gateway_id, num_messages, private_key_file, algorithm, ca_certs, + mqtt_bridge_hostname, mqtt_bridge_port, jwt_expires_minutes, payload): + """Sends data from a gateway on behalf of a device that is bound to it.""" + # [START send_data_from_bound_device] + global minimum_backoff_time + + # Publish device events and gateway state. + device_topic = '/devices/{}/{}'.format(device_id, 'state') + gateway_topic = '/devices/{}/{}'.format(gateway_id, 'state') + + jwt_iat = datetime.datetime.utcnow() + jwt_exp_mins = jwt_expires_minutes + # Use gateway to connect to server + client = get_client( + project_id, cloud_region, registry_id, gateway_id, + private_key_file, algorithm, ca_certs, mqtt_bridge_hostname, + mqtt_bridge_port) + + attach_device(client, device_id, mqtt_bridge_hostname, mqtt_bridge_port) + print('Waiting for device to attach.') + time.sleep(5) + + # Publish state to gateway topic + gateway_state = 'Starting HUB at: {}'.format(time.time()) + print(gateway_state) + client.publish(gateway_topic, gateway_state, qos=1) + + # Publish num_messages mesages to the MQTT bridge + for i in range(1, num_messages + 1): + client.loop() + + if should_backoff: + # If backoff time is too large, give up. + if minimum_backoff_time > MAXIMUM_BACKOFF_TIME: + print('Exceeded maximum backoff time. Giving up.') + break + + delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0 + time.sleep(delay) + minimum_backoff_time *= 2 + client.connect(mqtt_bridge_hostname, mqtt_bridge_port) + + payload = '{}/{}-{}-payload-{}'.format( + registry_id, gateway_id, device_id, i) + + print('Publishing message {}/{}: \'{}\' to {}'.format( + i, num_messages, payload, device_topic)) + client.publish( + device_topic, '{} : {}'.format(device_id, payload), qos=1) + + seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds + if seconds_since_issue > 60 * jwt_exp_mins: + print('Refreshing token after {}s').format(seconds_since_issue) + jwt_iat = datetime.datetime.utcnow() + client = get_client( + project_id, cloud_region, registry_id, gateway_id, + private_key_file, algorithm, ca_certs, mqtt_bridge_hostname, + mqtt_bridge_port) + + time.sleep(5) + + detach_device(client, device_id, mqtt_bridge_hostname, mqtt_bridge_port) + + print('Finished.') + # [END send_data_from_bound_device] + + def parse_command_line_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser(description=( 'Example Google Cloud IoT Core MQTT device connection code.')) - parser.add_argument( - '--project_id', - default=os.environ.get('GOOGLE_CLOUD_PROJECT'), - help='GCP cloud project name') - parser.add_argument( - '--registry_id', required=True, help='Cloud IoT Core registry id') - parser.add_argument( - '--device_id', required=True, help='Cloud IoT Core device id') - parser.add_argument( - '--private_key_file', - required=True, help='Path to private key file.') parser.add_argument( '--algorithm', choices=('RS256', 'ES256'), required=True, help='Which encryption algorithm to use to generate the JWT.') - parser.add_argument( - '--cloud_region', default='us-central1', help='GCP cloud region') parser.add_argument( '--ca_certs', default='roots.pem', help=('CA root from https://pki.google.com/roots.pem')) parser.add_argument( - '--num_messages', + '--cloud_region', default='us-central1', help='GCP cloud region') + parser.add_argument( + '--data', + default='Hello there', + help='The telemetry data sent on behalf of a device') + parser.add_argument( + '--device_id', required=True, help='Cloud IoT Core device id') + parser.add_argument( + '--gateway_id', required=False, help='Gateway identifier.') + parser.add_argument( + '--jwt_expires_minutes', + default=20, type=int, - default=100, - help='Number of messages to publish.') + help=('Expiration time, in minutes, for JWT tokens.')) + parser.add_argument( + '--listen_dur', + default=60, + type=int, + help='Duration (seconds) to listen for configuration messages') parser.add_argument( '--message_type', choices=('event', 'state'), @@ -217,19 +380,48 @@ def parse_command_line_args(): type=int, help='MQTT bridge port.') parser.add_argument( - '--jwt_expires_minutes', - default=20, + '--num_messages', type=int, - help=('Expiration time, in minutes, for JWT tokens.')) + default=100, + help='Number of messages to publish.') + parser.add_argument( + '--private_key_file', + required=True, + help='Path to private key file.') + parser.add_argument( + '--project_id', + default=os.environ.get('GOOGLE_CLOUD_PROJECT'), + help='GCP cloud project name') + parser.add_argument( + '--registry_id', required=True, help='Cloud IoT Core registry id') + parser.add_argument( + '--service_account_json', + default=os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"), + help='Path to service account json file.') + + # Command subparser + command = parser.add_subparsers(dest='command') + + command.add_parser( + 'device_demo', + help=mqtt_device_demo.__doc__) + + command.add_parser( + 'gateway_send', + help=send_data_from_bound_device.__doc__) + + command.add_parser( + 'gateway_listen', + help=listen_for_messages.__doc__) return parser.parse_args() -# [START iot_mqtt_run] -def main(): +def mqtt_device_demo(args): + """Connects a device, sends data, and receives data.""" + # [START iot_mqtt_run] global minimum_backoff_time - - args = parse_command_line_args() + global MAXIMUM_BACKOFF_TIME # Publish to the events or state topic based on the flag. sub_topic = 'events' if args.message_type == 'event' else 'state' @@ -239,9 +431,9 @@ def main(): jwt_iat = datetime.datetime.utcnow() jwt_exp_mins = args.jwt_expires_minutes client = get_client( - args.project_id, args.cloud_region, args.registry_id, args.device_id, - args.private_key_file, args.algorithm, args.ca_certs, - args.mqtt_bridge_hostname, args.mqtt_bridge_port) + args.project_id, args.cloud_region, args.registry_id, + args.device_id, args.private_key_file, args.algorithm, + args.ca_certs, args.mqtt_bridge_hostname, args.mqtt_bridge_port) # Publish num_messages mesages to the MQTT bridge once per second. for i in range(1, args.num_messages + 1): @@ -284,9 +476,38 @@ def main(): # Send events every second. State should not be updated as often time.sleep(1 if args.message_type == 'event' else 5) + # [END iot_mqtt_run] + +def main(): + args = parse_command_line_args() + + def trigger_error(client): + attach_device( + client, + 'invalid_device_id', + 'mqtt.googleapis.com', + 443) + if args.command == 'gateway_listen': + listen_for_messages( + args.service_account_json, args.project_id, + args.cloud_region, args.registry_id, args.device_id, + args.gateway_id, args.num_messages, args.private_key_file, + args.algorithm, args.ca_certs, args.mqtt_bridge_hostname, + args.mqtt_bridge_port, args.jwt_expires_minutes, + args.listen_dur, trigger_error) + return + elif args.command == 'gateway_send': + send_data_from_bound_device( + args.service_account_json, args.project_id, + args.cloud_region, args.registry_id, args.device_id, + args.gateway_id, args.num_messages, args.private_key_file, + args.algorithm, args.ca_certs, args.mqtt_bridge_hostname, + args.mqtt_bridge_port, args.jwt_expires_minutes, args.data) + return + else: + mqtt_device_demo(args) print('Finished.') -# [END iot_mqtt_run] if __name__ == '__main__': diff --git a/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py b/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py index a11f1f3efdf3..e2c0d93f92a7 100644 --- a/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py +++ b/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py @@ -41,6 +41,9 @@ pubsub_topic = 'projects/{}/topics/{}'.format(project_id, topic_id) registry_id = 'test-registry-{}'.format(int(time.time())) +mqtt_bridge_hostname = 'mqtt.googleapis.com' +mqtt_bridge_port = 443 + @pytest.fixture(scope='module') def test_topic(): @@ -229,3 +232,149 @@ def test_receive_command(capsys): out, _ = capsys.readouterr() assert 'on_connect' in out # Verify can connect assert '\'me want cookies\'' in out # Verify can receive command + + +@flaky +def test_gateway_listen_for_bound_device_configs(test_topic, capsys): + gateway_id = device_id_template.format('RS256') + device_id = device_id_template.format('noauthbind') + manager.create_registry( + service_account_json, project_id, cloud_region, pubsub_topic, + registry_id) + manager.create_gateway( + service_account_json, project_id, cloud_region, registry_id, + None, gateway_id, rsa_cert_path, 'RS256') + manager.create_device( + service_account_json, project_id, cloud_region, registry_id, + device_id) + manager.bind_device_to_gateway( + service_account_json, project_id, cloud_region, registry_id, + device_id, gateway_id) + + # Setup for listening for config messages + num_messages = 0 + jwt_exp_time = 60 + listen_time = 30 + + # Connect the gateway + cloudiot_mqtt_example.listen_for_messages( + service_account_json, project_id, cloud_region, registry_id, + device_id, gateway_id, num_messages, rsa_private_path, + 'RS256', ca_cert_path, mqtt_bridge_hostname, mqtt_bridge_port, + jwt_exp_time, listen_time, None) + + # Clean up + manager.unbind_device_from_gateway( + service_account_json, project_id, cloud_region, registry_id, + device_id, gateway_id) + manager.delete_device( + service_account_json, project_id, cloud_region, registry_id, + device_id) + manager.delete_device( + service_account_json, project_id, cloud_region, registry_id, + gateway_id) + manager.delete_registry( + service_account_json, project_id, cloud_region, registry_id) + + out, _ = capsys.readouterr() + assert 'Received message' in out + + +def test_gateway_send_data_for_device(test_topic, capsys): + gateway_id = device_id_template.format('RS256') + device_id = device_id_template.format('noauthbind') + manager.create_registry( + service_account_json, project_id, cloud_region, pubsub_topic, + registry_id) + manager.create_gateway( + service_account_json, project_id, cloud_region, registry_id, + None, gateway_id, rsa_cert_path, 'RS256') + manager.create_device( + service_account_json, project_id, cloud_region, registry_id, + device_id) + manager.bind_device_to_gateway( + service_account_json, project_id, cloud_region, registry_id, + device_id, gateway_id) + + # Setup for listening for config messages + num_messages = 5 + jwt_exp_time = 60 + listen_time = 20 + + # Connect the gateway + cloudiot_mqtt_example.send_data_from_bound_device( + service_account_json, project_id, cloud_region, registry_id, + device_id, gateway_id, num_messages, rsa_private_path, + 'RS256', ca_cert_path, mqtt_bridge_hostname, mqtt_bridge_port, + jwt_exp_time, listen_time) + + # Clean up + manager.unbind_device_from_gateway( + service_account_json, project_id, cloud_region, registry_id, + device_id, gateway_id) + manager.delete_device( + service_account_json, project_id, cloud_region, registry_id, + device_id) + manager.delete_device( + service_account_json, project_id, cloud_region, registry_id, + gateway_id) + manager.delete_registry( + service_account_json, project_id, cloud_region, registry_id) + + out, _ = capsys.readouterr() + assert 'Publishing message 5/5' in out + assert 'Out of memory' not in out # Indicates could not connect + + +def test_gateway_trigger_error_topic(test_topic, capsys): + gateway_id = device_id_template.format('RS256-err') + device_id = device_id_template.format('noauthbind') + manager.create_registry( + service_account_json, project_id, cloud_region, pubsub_topic, + registry_id) + manager.create_gateway( + service_account_json, project_id, cloud_region, registry_id, + None, gateway_id, rsa_cert_path, 'RS256') + manager.create_device( + service_account_json, project_id, cloud_region, registry_id, + device_id) + manager.bind_device_to_gateway( + service_account_json, project_id, cloud_region, registry_id, + device_id, gateway_id) + + # Setup for listening for config messages + num_messages = 4 + jwt_exp_time = 30 + listen_time = 20 + + # Hardcoded callback for causing an error + def trigger_error(client): + cloudiot_mqtt_example.attach_device( + client, + 'invalid_device_id', + 'mqtt.googleapis.com', + 443) + + # Connect the gateway + cloudiot_mqtt_example.listen_for_messages( + service_account_json, project_id, cloud_region, registry_id, + device_id, gateway_id, num_messages, rsa_private_path, + 'RS256', ca_cert_path, mqtt_bridge_hostname, mqtt_bridge_port, + jwt_exp_time, listen_time, trigger_error) + + # Clean up + manager.unbind_device_from_gateway( + service_account_json, project_id, cloud_region, registry_id, + device_id, gateway_id) + manager.delete_device( + service_account_json, project_id, cloud_region, registry_id, + device_id) + manager.delete_device( + service_account_json, project_id, cloud_region, registry_id, + gateway_id) + manager.delete_registry( + service_account_json, project_id, cloud_region, registry_id) + + out, _ = capsys.readouterr() + assert 'GATEWAY_ATTACHMENT_ERROR' in out + assert 'Out of memory' not in out # Indicates could not connect diff --git a/iot/api-client/mqtt_example/requirements.txt b/iot/api-client/mqtt_example/requirements.txt index 15887683155c..e8dd59ae7661 100644 --- a/iot/api-client/mqtt_example/requirements.txt +++ b/iot/api-client/mqtt_example/requirements.txt @@ -1,5 +1,5 @@ cryptography==2.4.2 -flaky==3.4.0 +flaky==3.5.3 gcp-devrel-py-tools==0.0.15 google-api-python-client==1.7.5 google-auth-httplib2==0.0.3 From 8406249fd5d46f8707d68820190497d1622d8ac4 Mon Sep 17 00:00:00 2001 From: Gus Class Date: Fri, 25 Jan 2019 14:58:17 -0800 Subject: [PATCH 2/5] Adds connect to detach, removes unintended error trigger, updates region tag --- .../mqtt_example/cloudiot_mqtt_example.py | 26 ++++++++----------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/iot/api-client/mqtt_example/cloudiot_mqtt_example.py b/iot/api-client/mqtt_example/cloudiot_mqtt_example.py index 9726e0564a16..bb222d731df4 100644 --- a/iot/api-client/mqtt_example/cloudiot_mqtt_example.py +++ b/iot/api-client/mqtt_example/cloudiot_mqtt_example.py @@ -174,20 +174,21 @@ def detach_device(client, device_id, mqtt_bridge_hostname, mqtt_bridge_port): # [START detach_device] detach_topic = '/devices/{}/detach'.format(device_id) print('Detaching: {}'.format(detach_topic)) - client.loop() client.connect(mqtt_bridge_hostname, mqtt_bridge_port) + client.loop() client.publish(detach_topic, '{}', qos=1) time.sleep(5) # wait for the server to respond / will trigger callback # [END detach_device] -def attach_device(client, device_id, mqtt_bridge_hostname, mqtt_bridge_port): +def attach_device( + client, device_id, mqtt_bridge_hostname, mqtt_bridge_port, auth): """Attach the device to the gateway.""" # [START attach_device] attach_topic = '/devices/{}/attach'.format(device_id) print('Attaching: {}'.format(attach_topic)) - # TODO {'authorization': ''} - attach_payload = '{}' + attach_payload = '{{"authorization" : "{}"}}'.format(auth) + client.connect(mqtt_bridge_hostname, mqtt_bridge_port) client.loop() client.publish(attach_topic, attach_payload, qos=1) time.sleep(5) @@ -200,7 +201,7 @@ def listen_for_messages( mqtt_bridge_hostname, mqtt_bridge_port, jwt_expires_minutes, duration, cb=None): """Listens for messages sent to the gateway and bound devices.""" - # [START listen_for_config_messages] + # [START listen_for_messages] global minimum_backoff_time jwt_iat = datetime.datetime.utcnow() @@ -211,7 +212,8 @@ def listen_for_messages( private_key_file, algorithm, ca_certs, mqtt_bridge_hostname, mqtt_bridge_port) - attach_device(client, device_id, mqtt_bridge_hostname, mqtt_bridge_port) + attach_device( + client, device_id, mqtt_bridge_hostname, mqtt_bridge_port, '') print('Waiting for device to attach.') time.sleep(5) @@ -258,7 +260,7 @@ def listen_for_messages( detach_device(client, device_id, mqtt_bridge_hostname, mqtt_bridge_port) print('Finished.') - # [END listen_for_config_messages] + # [END listen_for_messages] def send_data_from_bound_device( @@ -286,7 +288,7 @@ def send_data_from_bound_device( time.sleep(5) # Publish state to gateway topic - gateway_state = 'Starting HUB at: {}'.format(time.time()) + gateway_state = 'Starting gateway at: {}'.format(time.time()) print(gateway_state) client.publish(gateway_topic, gateway_state, qos=1) @@ -482,12 +484,6 @@ def mqtt_device_demo(args): def main(): args = parse_command_line_args() - def trigger_error(client): - attach_device( - client, - 'invalid_device_id', - 'mqtt.googleapis.com', - 443) if args.command == 'gateway_listen': listen_for_messages( args.service_account_json, args.project_id, @@ -495,7 +491,7 @@ def trigger_error(client): args.gateway_id, args.num_messages, args.private_key_file, args.algorithm, args.ca_certs, args.mqtt_bridge_hostname, args.mqtt_bridge_port, args.jwt_expires_minutes, - args.listen_dur, trigger_error) + args.listen_dur) return elif args.command == 'gateway_send': send_data_from_bound_device( From cc2cb64f45665caeb8b81497687ed726ce5d2513 Mon Sep 17 00:00:00 2001 From: Gus Class Date: Fri, 25 Jan 2019 15:46:04 -0800 Subject: [PATCH 3/5] Updates calls to attach --- iot/api-client/mqtt_example/cloudiot_mqtt_example.py | 3 ++- iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/iot/api-client/mqtt_example/cloudiot_mqtt_example.py b/iot/api-client/mqtt_example/cloudiot_mqtt_example.py index bb222d731df4..4adaac83ff9a 100644 --- a/iot/api-client/mqtt_example/cloudiot_mqtt_example.py +++ b/iot/api-client/mqtt_example/cloudiot_mqtt_example.py @@ -283,7 +283,8 @@ def send_data_from_bound_device( private_key_file, algorithm, ca_certs, mqtt_bridge_hostname, mqtt_bridge_port) - attach_device(client, device_id, mqtt_bridge_hostname, mqtt_bridge_port) + attach_device( + client, device_id, mqtt_bridge_hostname, mqtt_bridge_port, '') print('Waiting for device to attach.') time.sleep(5) diff --git a/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py b/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py index e2c0d93f92a7..e88f3ba32a45 100644 --- a/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py +++ b/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py @@ -353,7 +353,8 @@ def trigger_error(client): client, 'invalid_device_id', 'mqtt.googleapis.com', - 443) + 443, + '') # Connect the gateway cloudiot_mqtt_example.listen_for_messages( From 7aed0331c73c10b03389b431f1887a4e9ab42cc9 Mon Sep 17 00:00:00 2001 From: Gus Class Date: Mon, 28 Jan 2019 09:57:32 -0800 Subject: [PATCH 4/5] Refactors attach device and updates tests --- .../mqtt_example/cloudiot_mqtt_example.py | 18 ++++-------------- .../mqtt_example/cloudiot_mqtt_example_test.py | 14 +++----------- 2 files changed, 7 insertions(+), 25 deletions(-) diff --git a/iot/api-client/mqtt_example/cloudiot_mqtt_example.py b/iot/api-client/mqtt_example/cloudiot_mqtt_example.py index 4adaac83ff9a..bbb33d343974 100644 --- a/iot/api-client/mqtt_example/cloudiot_mqtt_example.py +++ b/iot/api-client/mqtt_example/cloudiot_mqtt_example.py @@ -169,29 +169,21 @@ def get_client( # [END iot_mqtt_config] -def detach_device(client, device_id, mqtt_bridge_hostname, mqtt_bridge_port): +def detach_device(client, device_id): """Detach the device from the gateway.""" # [START detach_device] detach_topic = '/devices/{}/detach'.format(device_id) print('Detaching: {}'.format(detach_topic)) - client.connect(mqtt_bridge_hostname, mqtt_bridge_port) - client.loop() client.publish(detach_topic, '{}', qos=1) - time.sleep(5) # wait for the server to respond / will trigger callback # [END detach_device] -def attach_device( - client, device_id, mqtt_bridge_hostname, mqtt_bridge_port, auth): +def attach_device(client, device_id, auth): """Attach the device to the gateway.""" # [START attach_device] attach_topic = '/devices/{}/attach'.format(device_id) - print('Attaching: {}'.format(attach_topic)) attach_payload = '{{"authorization" : "{}"}}'.format(auth) - client.connect(mqtt_bridge_hostname, mqtt_bridge_port) - client.loop() client.publish(attach_topic, attach_payload, qos=1) - time.sleep(5) # [END attach_device] @@ -212,8 +204,7 @@ def listen_for_messages( private_key_file, algorithm, ca_certs, mqtt_bridge_hostname, mqtt_bridge_port) - attach_device( - client, device_id, mqtt_bridge_hostname, mqtt_bridge_port, '') + attach_device(client, device_id, '') print('Waiting for device to attach.') time.sleep(5) @@ -283,8 +274,7 @@ def send_data_from_bound_device( private_key_file, algorithm, ca_certs, mqtt_bridge_hostname, mqtt_bridge_port) - attach_device( - client, device_id, mqtt_bridge_hostname, mqtt_bridge_port, '') + attach_device(client, device_id, '') print('Waiting for device to attach.') time.sleep(5) diff --git a/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py b/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py index e88f3ba32a45..fa48ac88dad8 100644 --- a/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py +++ b/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py @@ -344,24 +344,17 @@ def test_gateway_trigger_error_topic(test_topic, capsys): # Setup for listening for config messages num_messages = 4 - jwt_exp_time = 30 - listen_time = 20 # Hardcoded callback for causing an error def trigger_error(client): - cloudiot_mqtt_example.attach_device( - client, - 'invalid_device_id', - 'mqtt.googleapis.com', - 443, - '') + cloudiot_mqtt_example.attach_device(client, 'invalid_device_id', '') # Connect the gateway cloudiot_mqtt_example.listen_for_messages( service_account_json, project_id, cloud_region, registry_id, device_id, gateway_id, num_messages, rsa_private_path, - 'RS256', ca_cert_path, mqtt_bridge_hostname, mqtt_bridge_port, - jwt_exp_time, listen_time, trigger_error) + 'RS256', ca_cert_path, 'mqtt.googleapis.com', 443, + 20, 15, trigger_error) # Clean up manager.unbind_device_from_gateway( @@ -378,4 +371,3 @@ def trigger_error(client): out, _ = capsys.readouterr() assert 'GATEWAY_ATTACHMENT_ERROR' in out - assert 'Out of memory' not in out # Indicates could not connect From 9e96fe92c904686c1ca37b3b1af657a9be1fec39 Mon Sep 17 00:00:00 2001 From: Gus Class Date: Mon, 28 Jan 2019 12:15:09 -0800 Subject: [PATCH 5/5] Fixes changes for function signature on detach --- iot/api-client/mqtt_example/cloudiot_mqtt_example.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/iot/api-client/mqtt_example/cloudiot_mqtt_example.py b/iot/api-client/mqtt_example/cloudiot_mqtt_example.py index bbb33d343974..ed273d7f7804 100644 --- a/iot/api-client/mqtt_example/cloudiot_mqtt_example.py +++ b/iot/api-client/mqtt_example/cloudiot_mqtt_example.py @@ -24,6 +24,7 @@ # [START iot_mqtt_includes] import argparse import datetime +import logging import os import random import ssl @@ -33,6 +34,8 @@ import paho.mqtt.client as mqtt # [END iot_mqtt_includes] +logging.getLogger('googleapiclient.discovery_cache').setLevel(logging.CRITICAL) + # The initial backoff time after a disconnection occurs, in seconds. minimum_backoff_time = 1 @@ -248,7 +251,7 @@ def listen_for_messages( time.sleep(1) - detach_device(client, device_id, mqtt_bridge_hostname, mqtt_bridge_port) + detach_device(client, device_id) print('Finished.') # [END listen_for_messages] @@ -317,7 +320,7 @@ def send_data_from_bound_device( time.sleep(5) - detach_device(client, device_id, mqtt_bridge_hostname, mqtt_bridge_port) + detach_device(client, device_id) print('Finished.') # [END send_data_from_bound_device]