Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Service Bus] receiveMessages in queue receiver in peekLock mode will cause message lost(lock) near error msg "Received transfer when credit was 0". #15606

Closed
1 of 6 tasks
enoeden opened this issue Jun 8, 2021 · 4 comments · Fixed by #15989
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus
Milestone

Comments

@enoeden
Copy link

enoeden commented Jun 8, 2021

  • Package Name: @azure/service-bus
  • Package Version: 7.1.0
  • Operating system:
  • nodejs
    • version: 10.13.0
  • browser
    • name/version:
  • typescript
    • version:
  • Is the bug related to documentation in

Describe the bug
similar as #12711 test scenario, we found msg lost(lock) when call receiveMessages in queue receiver in peekLock mode. and there is wording "Received transfer when credit was 0" around the lost point which seems related.

To Reproduce
Steps to reproduce the behavior:
package.json

{
  "name": "azure_sbtest",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "author": "",
  "license": "ISC",
  "dependencies": {
    "@azure/service-bus": "7.1.0",
    "applicationinsights": "^1.8.8",
    "async-mutex": "^0.2.6",
    "ws": "^7.4.0"
  }
}

ext-store.js

const { ServiceBusClient, ServiceBusAdministrationClient } = require("@azure/service-bus");
const { AzureLogger, setLogLevel } = require("@azure/logger");
const { AbortController } = require("@azure/abort-controller");
//const { ServiceBusClient, ReceiveMode } = require("azuresb1");

//setLogLevel("error");
const WebSocket = require("ws");

class _mutaxmock {
  constructor(){}
  acquire() {return Promise.resolve(()=>{})}
  isLocked() {return false;}
}

//const Mutex = require('async-mutex').Mutex;
const Mutex = _mutaxmock;

class QueueServiceBus {
  constructor(config, logger) {
    this.config = config;
    this.logger = logger || { debug: ()=>{}, info: ()=>{}, error: ()=>{}};
    let _config = JSON.parse(JSON.stringify(config));
    this.serviceBusClient = new ServiceBusClient(this.config.connectionString, {webSocketOptions:{
        webSocket: WebSocket
      }});

    this.queuePrefix = _config.queuePrefix;
    this.provider = 'azure';
    this.mutex = new Mutex();
    this.queuePool = {};
  }
  getReceiver(queueName) {
    if(!this.queuePool[queueName]){
      this.queuePool[queueName] = this.serviceBusClient.createReceiver(`${queueName}`, { maxAutoLockRenewalDurationInMs: 0});
    }
    return this.queuePool[queueName];
  }

  async receiveMessage (queueName, num, pullDuration, options) {
    if(this.mutex.isLocked()) {
      await new Promise(resolve => {setTimeout(() => {resolve();}, pullDuration * 1000);});
      return [];
    }
    const release = await this.mutex.acquire();
    try {
      let queueReceiver =  this.serviceBusClient.createReceiver(`${queueName}`, { maxAutoLockRenewalDurationInMs: 0});
      //let queueReceiver =  this.getReceiver(queueName);
      let msgs = await queueReceiver.receiveMessages(parseInt(num), {maxWaitTimeInMs: pullDuration * 1000});
      await queueReceiver.close();
      return msgs;
    }catch (e) {
      this.logger.error(`${e}`);
    } finally {
      release();
    }
  }

  async deleteMessage (queueName, msg) {
    const release = await this.mutex.acquire();
    try{
      this.logger.debug(`azure delMessage ${msg} from ${queueName}`);
      let queueReceiver = this.serviceBusClient.createReceiver(`${queueName}`, { maxAutoLockRenewalDurationInMs: 0});
      //let queueReceiver =  this.getReceiver(queueName);
      await queueReceiver.completeMessage(msg);
      await queueReceiver.close();
    }catch(error){
      this.logger.info(error);
    } finally {
      release();
    }
  }
}

const HIGHWATERMART = 50;

Queue = {
  azure: QueueServiceBus,
  azurev1: QueueServiceBus
};

let _queue = null;

module.exports.getQueue = (config, logger) => {
  if (_queue) {
    return _queue;
  } else {
    const QueueImpl = Queue[config.provider.queue];
    _queue = new QueueImpl(config.queue[config.provider.queue], logger);
    return _queue
  }
};

testqueue.js

const config = {
  "provider": {
    "queue": "azurev1"
  },
  "queue": {
    "azurev1" : {
      "connectionString": "",
      "queuePrefix" : ""
    }
  },
};

const queue = require('./ext-store').getQueue(config);

function getRandomInt(max) {return Math.floor(Math.random() * Math.floor(max));}

sleep = async (ts) => {
  return new Promise(resolve => {
    setTimeout(() => {
      resolve(true);
    }, ts || 2000);
  });
};

let sum = 0;
async function main(tn) {
  console.log(`enter task::${tn}`);
  while (true){
    let messages = await queue.receiveMessage('bbb', getRandomInt(10)+1, 5);
    if (messages &&  messages.length) {
      sum += messages.length;

      console.log(`${process.pid} ${new Date().toISOString()} task::${tn}  received::${messages.length}, current sum:: ${sum}`);
      for(let message of messages){
        console.log(`===> got ${message.body}`);
        await queue.deleteMessage('bbb', message)
      }
    }

    await sleep(20)
  }

}
for (let idx=0; idx < 80; idx++){
  main(idx);
}

sender.js

const { ServiceBusClient, ServiceBusAdministrationClient } = require("@azure/service-bus");
const WebSocket = require("ws");
let numMessagesToSend = 2000;
let connectionString = "";
let queueName = 'bbb';
serviceBusClient = new ServiceBusClient(connectionString, {webSocketOptions:{
    webSocket: WebSocket
  }});

(async () => {
    try {
        const sender = serviceBusClient.createSender(queueName);
        const messages = await sender.createMessageBatch();
        for (let i = 0; i < numMessagesToSend; ++i) {
            messages.tryAddMessage({
                body: `Message ${i}`
            });
        }
        await sender.sendMessages(messages);
    } finally {
        await serviceBusClient.close();
    }
})();

Steps to reproduce the behavior:

we have a thin wrapper around SDK for multi-platform support while we think is simple enough.
in test case, we send 2000 msg via sender.js and then call testqueue.js to receive, queue is configured with 2min lock duration and Max delivery count 1 to make it more easy to reproduce.
in our test case we can found

cat 1623131203.new.log | grep -e === -e sum

......
187 2021-06-08T05:47:51.735Z task::2  received::1, current sum:: 1991
===> got Message 1992
===> got Message 1985
===> got Message 1979
187 2021-06-08T05:47:51.894Z task::68  received::1, current sum:: 1992
===> got Message 1993
===> got Message 1986
===> got Message 1980
187 2021-06-08T05:47:52.054Z task::14  received::4, current sum:: 1996
===> got Message 1994
===> got Message 1987
===> got Message 1981
===> got Message 1995
===> got Message 1988
===> got Message 1982
===> got Message 1996
===> got Message 1989
===> got Message 1997
===> got Message 1990
187 2021-06-08T05:47:53.376Z task::66  received::2, current sum:: 1998
===> got Message 1998
===> got Message 1999

which seems 2 msg lost in this round, as we mark every msg from 0 to 2000, and set DEBUG=* for detailed log
after brief log analysis,
msg 189, 190 are lost from application's view
we can still find raw msg info 189 in sdk's detailed log,
there are "Received transfer when credit was 0" before msg 191 received

Expected behavior
We except no message lost(lock) in such case.

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
1623131203.new.zip

@ghost ghost added needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Jun 8, 2021
@ramya-rao-a ramya-rao-a added Client This issue points to a problem in the data-plane of the library. Service Bus labels Jun 8, 2021
@ghost ghost removed the needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. label Jun 8, 2021
@ghost ghost added the needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team label Jun 8, 2021
@richardpark-msft richardpark-msft added this to the [2021] July milestone Jun 18, 2021
@richardpark-msft
Copy link
Member

Hello @enoeden, I apologize for the late reply on this issue.

That error message is a big tell and it's given me a good area to look at, so I appreciate you providing so much information here. I will be looking at this more tomorrow morning.

Just a couple of observations/questions:

  • Is it possible to hold onto links longer than you do? I notice that you tend to open a link (ie, receiver), do one thing, then close it. Is there any particular reason you don't hold onto the link? (I would note - there is nothing invalid about what you are doing but an alternate design where you keep the link open is a potential workaround).
  • The provided sending code has a possible bug where you do not check the result of tryAddMessage(). It's probably unlikely to be the root cause of this issue but I wanted to point out that tryAddMessage returns a boolean (and does not throw) if a message fails to be added to a batch because it is too big.

@enoeden
Copy link
Author

enoeden commented Jun 23, 2021

  • Is it possible to hold onto links longer than you do? I notice that you tend to open a link (ie, receiver), do one thing, then close it. Is there any particular reason you don't hold onto the link? (I would note - there is nothing invalid about what you are doing but an alternate design where you keep the link open is a potential workaround).

Hi @richardpark-msft , For question1, our use case here a is a task dispatch server which store tasks in message queue, we built a thin wrapper around service bus and aws sqs(for cross platform) and expose task retrieve api with features like long pulling (maxWaitTime) and batch receive.

We create one receiver per request and close it after one time receive is just want to simplify the implmentation on server side and when it comes to receiver reuse in our case, we may have concern about fault isolation and receive performance.

With patch below to ext-store.js we do reuse the receiver and some weird behavior just stopped us, correct me if any misunderstand.

@@ -43,10 +43,10 @@
     }
     const release = await this.mutex.acquire();
     try {
-      let queueReceiver =  this.serviceBusClient.createReceiver(`${queueName}`, { maxAutoLockRenewalDurationInMs: 0});
-      //let queueReceiver =  this.getReceiver(queueName);
+      //let queueReceiver =  this.serviceBusClient.createReceiver(`${queueName}`, { maxAutoLockRenewalDurationInMs: 0});
+      let queueReceiver =  this.getReceiver(queueName);
       let msgs = await queueReceiver.receiveMessages(parseInt(num), {maxWaitTimeInMs: pullDuration * 1000});
-      await queueReceiver.close();
+      //await queueReceiver.close();
       return msgs;
     }catch (e) {
       this.logger.error(`${e}`);
@@ -59,10 +59,10 @@
     const release = await this.mutex.acquire();
     try{
       this.logger.debug(`azure delMessage ${msg} from ${queueName}`);
-      let queueReceiver = this.serviceBusClient.createReceiver(`${queueName}`, { maxAutoLockRenewalDurationInMs: 0});
-      //let queueReceiver =  this.getReceiver(queueName);
+      //let queueReceiver = this.serviceBusClient.createReceiver(`${queueName}`, { maxAutoLockRenewalDurationInMs: 0});
+      let queueReceiver =  this.getReceiver(queueName);
       await queueReceiver.completeMessage(msg);
-      await queueReceiver.close();
+      //await queueReceiver.close();
     }catch(error){
       this.logger.info(error);
     } finally {
  1. For receiver reuse version code, run testqueue.js before sender.js we can receive messages successfully while seems call receiveMessages on single receiver concurrently is not possible and we need add lock for the receiver if we need re-use it and it will make the receive action in single server serialized which brings bad performance, not to mention the control of long polling duration.
        azure:service-bus:receiver:warning [connection-1|receiver:bbb] is already receiving : Error: The receiver for "bbb" is already receiving messages.
            at ServiceBusReceiverImpl._throwIfAlreadyReceiving (/node_modules/@azure/service-bus/dist/index.js:8049:27)
            at ServiceBusReceiverImpl.<anonymous> (/node_modules/@azure/service-bus/dist/index.js:8069:18)
            at Generator.next (<anonymous>)
            at /node_modules/tslib/tslib.js:114:75
            at new Promise (<anonymous>)
            at Object.__awaiter (/node_modules/tslib/tslib.js:110:16)
            at ServiceBusReceiverImpl.receiveMessages (/node_modules/@azure/service-bus/dist/index.js:8067:22)
            at QueueServiceBus.receiveMessage (/src/ext-store.js:48:38)

recv_before_send.zip

  1. For receiver reuse version code, run testqueue.js after sender.js we lost almost all messages and seems when there are alreadys messages in queue, we may see "Received transfer when credit was 0" many many times and almost all messages lost as a result, since the sdk already raise exception, we may not treat this as normal use case but this strange behavior and difference may still worth a look.

recv_after_send.zip

@richardpark-msft
Copy link
Member

@enoeden - now that I understand your use case it makes a lot of sense. Your model makes a lot of sense.

As a status update - I'm still trying to narrow down the bug somewhat. I was able to reproduce the condition that leads to the Received transfer when credit was 0 message but that doesn't always lead to messages being missing. I'll keep updating this issue as I find out more.

richardpark-msft added a commit that referenced this issue Jul 1, 2021
…ete (#15989)

Fixing an issue where we could lose messages or provoke an alarming message from rhea (`Received transfer when credit was 0`)
    
The message loss issue is related to how we trigger 'drain' using 'addCredit(1)'. Our 'receiver.drain; receiver.addCredit(1)' pattern actually does add a credit, which shows up in the flow frame that gets sent for our drain. This has led to occasionally receiving more messages than we intended.
    
The second part of this was that we were masking this error because we had code that specifically threw out messages if more arrived than were requested. If the message was being auto-renewed it's possible for the message to appear to be missing, and if we were in receiveAndDelete the message is effectively lost at that point. That code is now removed (we defer to just allowing the extrra message, should a bug arise that causes that) and we log an error indicating it did happen.
    
The rhea error message appeared to be triggered by our accidentally allowing multiple overlapping 'drain's to occur (finalAction did not check to see if we were _already_ draining and would allow it to happen multiple times). Removing the concurrent drains fixed this issue but I didn't fully investigate why.

Fixes #15606, #15115
@richardpark-msft
Copy link
Member

Hi @enoeden, a fix for this has been released as part of https://www.npmjs.com/package/@azure/service-bus/v/7.3.0

@github-actions github-actions bot locked and limited conversation to collaborators Apr 12, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus
Projects
None yet
4 participants