Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrates Gateway code to MQTT example #1977

Merged
merged 13 commits into from
Jan 28, 2019
Merged
2 changes: 1 addition & 1 deletion iot/api-client/manager/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
cryptography==2.4.2
flaky==3.4.0
flaky==3.5.3
gcp-devrel-py-tools==0.0.15
google-api-python-client==1.7.5
google-auth-httplib2==0.0.3
Expand Down
265 changes: 238 additions & 27 deletions iot/api-client/mqtt_example/cloudiot_mqtt_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
# [START iot_mqtt_includes]
import argparse
import datetime
import logging
import os
import random
import ssl
Expand All @@ -33,6 +34,8 @@
import paho.mqtt.client as mqtt
# [END iot_mqtt_includes]

logging.getLogger('googleapiclient.discovery_cache').setLevel(logging.CRITICAL)

# The initial backoff time after a disconnection occurs, in seconds.
minimum_backoff_time = 1

Expand Down Expand Up @@ -169,37 +172,193 @@ def get_client(
# [END iot_mqtt_config]


def detach_device(client, device_id):
"""Detach the device from the gateway."""
# [START detach_device]
detach_topic = '/devices/{}/detach'.format(device_id)
print('Detaching: {}'.format(detach_topic))
client.publish(detach_topic, '{}', qos=1)
# [END detach_device]


def attach_device(client, device_id, auth):
"""Attach the device to the gateway."""
# [START attach_device]
attach_topic = '/devices/{}/attach'.format(device_id)
attach_payload = '{{"authorization" : "{}"}}'.format(auth)
client.publish(attach_topic, attach_payload, qos=1)
# [END attach_device]


def listen_for_messages(
service_account_json, project_id, cloud_region, registry_id, device_id,
gateway_id, num_messages, private_key_file, algorithm, ca_certs,
mqtt_bridge_hostname, mqtt_bridge_port, jwt_expires_minutes, duration,
cb=None):
"""Listens for messages sent to the gateway and bound devices."""
# [START listen_for_messages]
global minimum_backoff_time

jwt_iat = datetime.datetime.utcnow()
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_client(
project_id, cloud_region, registry_id, gateway_id,
private_key_file, algorithm, ca_certs, mqtt_bridge_hostname,
mqtt_bridge_port)

attach_device(client, device_id, '')
print('Waiting for device to attach.')
time.sleep(5)

# The topic devices receive configuration updates on.
device_config_topic = '/devices/{}/config'.format(device_id)
client.subscribe(device_config_topic, qos=1)

# The topic gateways receive configuration updates on.
gateway_config_topic = '/devices/{}/config'.format(gateway_id)
client.subscribe(gateway_config_topic, qos=1)

# The topic gateways receive error updates on. QoS must be 0.
error_topic = '/devices/{}/errors'.format(gateway_id)
client.subscribe(error_topic, qos=0)

# Wait for about a minute for config messages.
for i in range(1, duration):
client.loop()
if cb is not None:
cb(client)

if should_backoff:
# If backoff time is too large, give up.
if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
print('Exceeded maximum backoff time. Giving up.')
break

delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
time.sleep(delay)
minimum_backoff_time *= 2
client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
if seconds_since_issue > 60 * jwt_exp_mins:
print('Refreshing token after {}s').format(seconds_since_issue)
jwt_iat = datetime.datetime.utcnow()
client = get_client(
project_id, cloud_region, registry_id, gateway_id,
private_key_file, algorithm, ca_certs, mqtt_bridge_hostname,
mqtt_bridge_port)

time.sleep(1)

detach_device(client, device_id)

print('Finished.')
# [END listen_for_messages]


def send_data_from_bound_device(
service_account_json, project_id, cloud_region, registry_id, device_id,
gateway_id, num_messages, private_key_file, algorithm, ca_certs,
mqtt_bridge_hostname, mqtt_bridge_port, jwt_expires_minutes, payload):
"""Sends data from a gateway on behalf of a device that is bound to it."""
# [START send_data_from_bound_device]
global minimum_backoff_time

# Publish device events and gateway state.
device_topic = '/devices/{}/{}'.format(device_id, 'state')
gateway_topic = '/devices/{}/{}'.format(gateway_id, 'state')

jwt_iat = datetime.datetime.utcnow()
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_client(
project_id, cloud_region, registry_id, gateway_id,
private_key_file, algorithm, ca_certs, mqtt_bridge_hostname,
mqtt_bridge_port)

attach_device(client, device_id, '')
print('Waiting for device to attach.')
time.sleep(5)

# Publish state to gateway topic
gateway_state = 'Starting gateway at: {}'.format(time.time())
print(gateway_state)
client.publish(gateway_topic, gateway_state, qos=1)

# Publish num_messages mesages to the MQTT bridge
for i in range(1, num_messages + 1):
client.loop()

if should_backoff:
# If backoff time is too large, give up.
if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
print('Exceeded maximum backoff time. Giving up.')
break

delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
time.sleep(delay)
minimum_backoff_time *= 2
client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

payload = '{}/{}-{}-payload-{}'.format(
registry_id, gateway_id, device_id, i)

print('Publishing message {}/{}: \'{}\' to {}'.format(
i, num_messages, payload, device_topic))
client.publish(
device_topic, '{} : {}'.format(device_id, payload), qos=1)

seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
if seconds_since_issue > 60 * jwt_exp_mins:
print('Refreshing token after {}s').format(seconds_since_issue)
jwt_iat = datetime.datetime.utcnow()
client = get_client(
project_id, cloud_region, registry_id, gateway_id,
private_key_file, algorithm, ca_certs, mqtt_bridge_hostname,
mqtt_bridge_port)

time.sleep(5)

detach_device(client, device_id)

print('Finished.')
# [END send_data_from_bound_device]


def parse_command_line_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description=(
'Example Google Cloud IoT Core MQTT device connection code.'))
parser.add_argument(
'--project_id',
default=os.environ.get('GOOGLE_CLOUD_PROJECT'),
help='GCP cloud project name')
parser.add_argument(
'--registry_id', required=True, help='Cloud IoT Core registry id')
parser.add_argument(
'--device_id', required=True, help='Cloud IoT Core device id')
parser.add_argument(
'--private_key_file',
required=True, help='Path to private key file.')
parser.add_argument(
'--algorithm',
choices=('RS256', 'ES256'),
required=True,
help='Which encryption algorithm to use to generate the JWT.')
parser.add_argument(
'--cloud_region', default='us-central1', help='GCP cloud region')
parser.add_argument(
'--ca_certs',
default='roots.pem',
help=('CA root from https://pki.google.com/roots.pem'))
parser.add_argument(
'--num_messages',
'--cloud_region', default='us-central1', help='GCP cloud region')
parser.add_argument(
'--data',
default='Hello there',
help='The telemetry data sent on behalf of a device')
parser.add_argument(
'--device_id', required=True, help='Cloud IoT Core device id')
parser.add_argument(
'--gateway_id', required=False, help='Gateway identifier.')
parser.add_argument(
'--jwt_expires_minutes',
default=20,
type=int,
default=100,
help='Number of messages to publish.')
help=('Expiration time, in minutes, for JWT tokens.'))
parser.add_argument(
'--listen_dur',
default=60,
type=int,
help='Duration (seconds) to listen for configuration messages')
parser.add_argument(
'--message_type',
choices=('event', 'state'),
Expand All @@ -217,19 +376,48 @@ def parse_command_line_args():
type=int,
help='MQTT bridge port.')
parser.add_argument(
'--jwt_expires_minutes',
default=20,
'--num_messages',
type=int,
help=('Expiration time, in minutes, for JWT tokens.'))
default=100,
help='Number of messages to publish.')
parser.add_argument(
'--private_key_file',
required=True,
help='Path to private key file.')
parser.add_argument(
'--project_id',
default=os.environ.get('GOOGLE_CLOUD_PROJECT'),
help='GCP cloud project name')
parser.add_argument(
'--registry_id', required=True, help='Cloud IoT Core registry id')
parser.add_argument(
'--service_account_json',
default=os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"),
help='Path to service account json file.')

# Command subparser
command = parser.add_subparsers(dest='command')

command.add_parser(
'device_demo',
help=mqtt_device_demo.__doc__)

command.add_parser(
'gateway_send',
help=send_data_from_bound_device.__doc__)

command.add_parser(
'gateway_listen',
help=listen_for_messages.__doc__)

return parser.parse_args()


# [START iot_mqtt_run]
def main():
def mqtt_device_demo(args):
"""Connects a device, sends data, and receives data."""
# [START iot_mqtt_run]
global minimum_backoff_time

args = parse_command_line_args()
global MAXIMUM_BACKOFF_TIME

# Publish to the events or state topic based on the flag.
sub_topic = 'events' if args.message_type == 'event' else 'state'
Expand All @@ -239,9 +427,9 @@ def main():
jwt_iat = datetime.datetime.utcnow()
jwt_exp_mins = args.jwt_expires_minutes
client = get_client(
args.project_id, args.cloud_region, args.registry_id, args.device_id,
args.private_key_file, args.algorithm, args.ca_certs,
args.mqtt_bridge_hostname, args.mqtt_bridge_port)
args.project_id, args.cloud_region, args.registry_id,
args.device_id, args.private_key_file, args.algorithm,
args.ca_certs, args.mqtt_bridge_hostname, args.mqtt_bridge_port)

# Publish num_messages mesages to the MQTT bridge once per second.
for i in range(1, args.num_messages + 1):
Expand Down Expand Up @@ -284,9 +472,32 @@ def main():

# Send events every second. State should not be updated as often
time.sleep(1 if args.message_type == 'event' else 5)
# [END iot_mqtt_run]


def main():
args = parse_command_line_args()

if args.command == 'gateway_listen':
listen_for_messages(
args.service_account_json, args.project_id,
args.cloud_region, args.registry_id, args.device_id,
args.gateway_id, args.num_messages, args.private_key_file,
args.algorithm, args.ca_certs, args.mqtt_bridge_hostname,
args.mqtt_bridge_port, args.jwt_expires_minutes,
args.listen_dur)
return
elif args.command == 'gateway_send':
send_data_from_bound_device(
args.service_account_json, args.project_id,
args.cloud_region, args.registry_id, args.device_id,
args.gateway_id, args.num_messages, args.private_key_file,
args.algorithm, args.ca_certs, args.mqtt_bridge_hostname,
args.mqtt_bridge_port, args.jwt_expires_minutes, args.data)
return
else:
mqtt_device_demo(args)
print('Finished.')
# [END iot_mqtt_run]


if __name__ == '__main__':
Expand Down
Loading