Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Technical Question] Messages from edgeHub to IoT Edge modules may be dropped at the time of SAS token update. #1215

Open
hiroyha1 opened this issue Mar 6, 2024 · 3 comments

Comments

@hiroyha1
Copy link

hiroyha1 commented Mar 6, 2024

By default, a SAS token is renewed every 45 minutes, at that time disconnection and reconnection happen. The problem is that if a message is sent from the HUB at the time of reconnection, the message will be lost. How can I avoid this message drop?

In the C# SDK, reconnection does not seem to occur for IoT edge modules, and the same problem does not occur.

Code sample exhibiting the issue

Source module (C#)

    async Task SendEvents(CancellationToken cancellationToken)
    {
        int messageDelay = 100;
        int count = 1;
        int dataSize = 10;

        while (!cancellationToken.IsCancellationRequested)
        {
            string messageString = new string('*', dataSize);
            using Message message = new(Encoding.UTF8.GetBytes(messageString));
            message.ContentEncoding = "utf-8";
            message.ContentType = "text/plain";
            message.Properties.Add("sequenceNumber", count.ToString());
            _logger.LogInformation($"Send message: {count}, Size: [{dataSize}]");
            await _moduleClient!.SendEventAsync("output1", message, cancellationToken);
            count++;
            await Task.Delay(messageDelay, cancellationToken);
        }
    }

Sink module (Node.js)

'use strict';

var Transport = require('azure-iot-device-mqtt').Mqtt;
var Client = require('azure-iot-device').ModuleClient;

Client.fromEnvironment(Transport, function (err, client) {
  if (err) {
    console.error(err.toString());
    process.exit(-1);
  } else {
    let options = client._methodClient._options;
    options.tokenRenewal = {
      tokenValidTimeInSeconds: 10,
      tokenRenewalMarginInSeconds: 5
    };
    client.setOptions(options);
    client.open(function (err) {
      if (err) {
        console.error(err.toString());
        process.exit(-1);        
      }
      console.log('IoT Hub module client initialized');
      client.on('inputMessage', function (inputName, msg) {
        pipeMessage(client, inputName, msg);
      });
    });
  }
});

function pipeMessage(client, inputName, msg) {
  client.complete(msg, printResultFor('Receiving message'));

  if (inputName === 'input1') {
    var message = msg.getBytes().toString('utf8');
    console.log('message size: ' + message.length);
  }
}

function printResultFor(op) {
  return function printResult(err, res) {
    if (err) {
      console.log(op + ' error: ' + err.toString());
    }
    if (res) {
      console.log(op + ' status: ' + res.constructor.name);
    }
  };
}

Log (Sink Module)

2024-03-06T00:55:49.703Z azure-iot-device-mqtt:MqttTwinClient Message received on a topic we can ignore: devices/...
2024-03-06T00:55:49.804Z azure-iot-mqtt-base:MqttBase connected -> reconnecting (connected.updateSharedAccessSignature)
2024-03-06T00:55:49.805Z azure-iot-mqtt-base:MqttBase disconnecting mqtt client
2024-03-06T00:55:49.805Z azure-iot-mqtt-base:MqttBase removing all listeners
...
2024-03-06T00:55:49.813Z mqttjs:client end :: finish :: calling process.nextTick on closeStores
2024-03-06T00:55:49.813Z mqttjs:client end :: closeStores: closing incoming and outgoing stores
2024-03-06T00:55:49.813Z mqttjs:client end :: closeStores: emitting end
2024-03-06T00:55:49.813Z mqttjs:client end :: closeStores: invoking callback with args
2024-03-06T00:55:49.813Z azure-iot-mqtt-base:MqttBase mqtt client disconnected - reconnecting

Log (edgeHub)

<7> 2024-03-06 00:55:49.808 +00:00 [DBG] [Microsoft.Azure.Devices.Edge.Hub.Core.Routing.ModuleEndpoint] - Sending 1 message(s) to module iotedge1/ModuleC.                                                                                      
<7> 2024-03-06 00:55:49.808 +00:00 [DBG] [Microsoft.Azure.Devices.Edge.Hub.Core.Device.DeviceMessageHandler] - Sent message with correlation ID 0d64f7d1-be58-466e-8429-c09370c9a8c2 to iotedge1/ModuleC
<6> 2024-03-06 00:55:49.808 +00:00 [INF] [EdgeHub] - "Closing connection for device: iotedge1/ModuleC, , "
...
<4> 2024-03-06 00:56:19.809 +00:00 [WRN] [Microsoft.Azure.Devices.Edge.Hub.Core.Device.DeviceMessageHandler] - Did not receive ack for message 0d64f7d1-be58-466e-8429-c09370c9a8c2 from device/module iotedge1/ModuleC
<4> 2024-03-06 00:56:19.810 +00:00 [WRN] [Microsoft.Azure.Devices.Edge.Hub.Core.Routing.ModuleEndpoint] - Error sending messages to module iotedge1/ModuleC
System.TimeoutException: Message completion response not received
   at Microsoft.Azure.Devices.Edge.Hub.Core.Device.DeviceMessageHandler.SendMessageAsync(IMessage message, String input) in /home/hiroyha/iotedge/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Core/device/DeviceMessageHandler.cs:line 498
   at Microsoft.Azure.Devices.Edge.Hub.Core.Routing.ModuleEndpoint.ModuleMessageProcessor.ProcessAsync(ICollection`1 routingMessages, IDeviceProxy dp, CancellationToken token) in /home/hiroyha/iotedge/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Core/routing/ModuleEndpoint.cs:line 166
@vishnureddy17
Copy link
Member

It looks like in your Node sample you are using Mqtt. What transport are you using in the C# case?

@hiroyha1
Copy link
Author

hiroyha1 commented Apr 8, 2024

I use Mqtt in the C# case, too.

@vishnureddy17
Copy link
Member

Thank you for bringing this up, we will investigate. If you need to mitigate this issue immediately, it may be worth using Amqp as a workaround for now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants