Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Weird behavior when pulling messages and using modifyAckDeadline #1017

Closed
mziccard opened this issue May 20, 2016 · 3 comments
Closed

Weird behavior when pulling messages and using modifyAckDeadline #1017

mziccard opened this issue May 20, 2016 · 3 comments
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API.

Comments

@mziccard
Copy link
Contributor

mziccard commented May 20, 2016

The scenario is the following: 1 topic and 1 subscription with default ack deadline (10 seconds). I publish 2 messages (message1 and message2) to the topic and then pull messages from the subscription.

After the first pull I set the ack deadline of message1 to a big value (let's say 100 seconds). Then I pull messages from the subscription every 20 seconds. The expected behavior is that pulls return only message2 until ~100 seconds are passed, and then return both message1 and message2.

When I run this scenario against the actual service I get the weird behavior: pull requests made before that ~100 seconds are passed after the deadline modification return no messages (as if both messages where affected by modifyAckDeadline).

The veneer toolkit code that reproduces the error is the following:

PublisherApi publisher = PublisherApi.create(publisherSettings);
SubscriberApi subscriber = SubscriberApi.create(subscriberSettings);
String topicName = PublisherApi.formatTopicName("gcloud-devel", "test-topic");
String subscriptionName =
    SubscriberApi.formatSubscriptionName("gcloud-devel", "test-subscription");
publisher.createTopic(topicName);
subscriber.createSubscription(Subscription.newBuilder()
    .setName(subscriptionName)
    .setTopic(topicName)
    .setAckDeadlineSeconds(10)
    .build());
publisher.publish(topicName, ImmutableList.of(
    PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("message1")).build(),
    PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("message2")).build()));
PullResponse pullResponse = subscriber.pull(subscriptionName, true, 2);
List<ReceivedMessage> receivedMessages = pullResponse.getReceivedMessagesList();
for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
  System.out.printf("[%d][PULLED MESSAGE %s]%n",
      System.currentTimeMillis(), message.getMessage().getData().toStringUtf8());
}
// Set the deadline of the first message far away
subscriber.modifyAckDeadline(subscriptionName,
    ImmutableList.of(receivedMessages.get(0).getAckId()), 100);

// Second message should be again available for pulling after .sleep()
Thread.sleep(20000);
for (int i = 0; i < 10; i++) {
  pullResponse = subscriber.pull(subscriptionName, true, 2);
  receivedMessages = pullResponse.getReceivedMessagesList();
  if (receivedMessages.isEmpty()) {
    System.out.printf("[%d][PULLED NO MESSAGES]%n", System.currentTimeMillis());
  } else {
    for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
      System.out.printf("[%d][PULLED MESSAGE %s]%n",
          System.currentTimeMillis(), message.getMessage().getData().toStringUtf8());
    }
  }
  Thread.sleep(20000);
}
publisher.deleteTopic(topicName);
subscriber.deleteSubscription(subscriptionName);

When I run it against the actual service I get the following output:

[1463750536877][PULLED MESSAGE message1]
[1463750536877][PULLED MESSAGE message2]
[1463750561558][PULLED NO MESSAGES] // <-- here and in the following I would expect message2
[1463750584638][PULLED NO MESSAGES]
[1463750608178][PULLED NO MESSAGES]
[1463750631240][PULLED NO MESSAGES]
[1463750653656][PULLED MESSAGE message1]
[1463750653657][PULLED MESSAGE message2]
[1463750676241][PULLED MESSAGE message1]
[1463750676241][PULLED MESSAGE message2]
[1463750698302][PULLED MESSAGE message1]
[1463750698302][PULLED MESSAGE message2]
[1463750720269][PULLED MESSAGE message1]
[1463750720270][PULLED MESSAGE message2]
[1463750742847][PULLED MESSAGE message1]
[1463750742847][PULLED MESSAGE message2]

Instead when I run it agains the emulator the output is as expected:

[1463749875238][PULLED MESSAGE message1]
[1463749875238][PULLED MESSAGE message2]
[1463749895270][PULLED MESSAGE message2]
[1463749915289][PULLED MESSAGE message2]
[1463749935308][PULLED MESSAGE message2]
[1463749955329][PULLED MESSAGE message2]
[1463749975347][PULLED MESSAGE message1]
[1463749975347][PULLED MESSAGE message2]
[1463749995363][PULLED MESSAGE message1]
[1463749995363][PULLED MESSAGE message2]
[1463750015384][PULLED MESSAGE message2]
[1463750015384][PULLED MESSAGE message1]
[1463750035403][PULLED MESSAGE message2]
[1463750035403][PULLED MESSAGE message1]
[1463750055421][PULLED MESSAGE message2]
[1463750055422][PULLED MESSAGE message1]

@eschapira @garrettjonesgoogle Any idea about what's going on?

@mziccard mziccard added the api: pubsub Issues related to the Pub/Sub API. label May 20, 2016
@garrettjonesgoogle
Copy link
Member

I tried this out myself, but adding one thing to the printout, the ack id:

      System.out.printf("[%d][PULLED MESSAGE %s, ackId = %s]%n",
          System.currentTimeMillis(), message.getMessage().getData().toStringUtf8(),
          message.getAckId());

What I discovered is that the ack id was the same for both message1 and message2.

@mziccard
Copy link
Contributor Author

What I discovered is that the ack id was the same for both message1 and message2.

Actually they are very similar but different (they differ of a couple of letters).

I managed to reproduce this issue (thanks to @garrettjonesgoogle) also with gcloud-node (code can be found here).

The problem seems to be related to the fact that the two messages are published using a single call to publish. If we use two publish calls:

publisher.publish(topicName, ImmutableList.of(
    PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("message1")).build()));
publisher.publish(topicName, ImmutableList.of(
    PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("message2")).build()));

The issues does not occur and the example programs works as expected. Any clues?
`

@mziccard
Copy link
Contributor Author

mziccard commented Sep 6, 2016

This issue was reported internally, can be closed here

@mziccard mziccard closed this as completed Sep 6, 2016
github-actions bot pushed a commit that referenced this issue Aug 9, 2022
🤖 I have created a release *beep* *boop*
---


## [3.2.0](googleapis/java-aiplatform@v3.1.0...v3.2.0) (2022-08-09)


### Features

* add a DeploymentResourcePool API resource_definition ([#997](googleapis/java-aiplatform#997)) ([82551d8](googleapis/java-aiplatform@82551d8))
* add DeploymentResourcePool in aiplatform v1beta1 deployment_resource_pool.proto ([#998](googleapis/java-aiplatform#998)) ([76dc64f](googleapis/java-aiplatform@76dc64f))
* add DeploymentResourcePoolService in aiplatform v1beta1 deployment_resource_pool_service.proto ([76dc64f](googleapis/java-aiplatform@76dc64f))
* add shared_resources for supported prediction_resources ([82551d8](googleapis/java-aiplatform@82551d8))
* add SHARED_RESOURCES to DeploymentResourcesType in aiplatform v1beta1 model.proto ([76dc64f](googleapis/java-aiplatform@76dc64f))
* added SHARED_RESOURCES enum to aiplatform v1 model.proto ([301cfb0](googleapis/java-aiplatform@301cfb0))
* DeploymentResourcePool and DeployementResourcePoolService added to aiplatform v1beta1 model.proto (cl/463147866) ([301cfb0](googleapis/java-aiplatform@301cfb0))
* making network arg optional in aiplatform v1 custom_job.proto ([#999](googleapis/java-aiplatform#999)) ([301cfb0](googleapis/java-aiplatform@301cfb0))
* making network arg optional in aiplatform v1beta1 custom_job.proto ([301cfb0](googleapis/java-aiplatform@301cfb0))
* **samples:** add all feature samples ([#980](googleapis/java-aiplatform#980)) ([8c2a485](googleapis/java-aiplatform@8c2a485))
* **samples:** add all feature values samples ([#981](googleapis/java-aiplatform#981)) ([2d4e6fe](googleapis/java-aiplatform@2d4e6fe))


### Bug Fixes

* declaring test-scope artifact as runtime ([#1014](googleapis/java-aiplatform#1014)) ([6c47c65](googleapis/java-aiplatform@6c47c65))


### Documentation

* doc edits to aiplatform v1 dataset_service.proto, job_service.proto, model_service.proto, pipeline_service.proto, saved_query.proto, study.proto, types.proto ([301cfb0](googleapis/java-aiplatform@301cfb0))
* doc edits to aiplatform v1beta1 job_service.proto, model_service.proto, pipeline_service.proto, saved_query.proto, study.proto, types.proto ([301cfb0](googleapis/java-aiplatform@301cfb0))


### Dependencies

* update dependency com.google.api.grpc:proto-google-cloud-aiplatform-v1beta1 to v0.17.0 ([#1003](googleapis/java-aiplatform#1003)) ([b793732](googleapis/java-aiplatform@b793732))
* update dependency com.google.cloud:google-cloud-bigquery to v2.14.1 ([#1006](googleapis/java-aiplatform#1006)) ([6bb8982](googleapis/java-aiplatform@6bb8982))
* update dependency com.google.cloud:google-cloud-bigquery to v2.14.3 ([#1009](googleapis/java-aiplatform#1009)) ([8cca8b5](googleapis/java-aiplatform@8cca8b5))
* update dependency com.google.cloud:google-cloud-shared-dependencies to v3 ([#1000](googleapis/java-aiplatform#1000)) ([c93de30](googleapis/java-aiplatform@c93de30))
* update dependency com.google.cloud:google-cloud-storage to v2.10.0 ([#1004](googleapis/java-aiplatform#1004)) ([dd52cad](googleapis/java-aiplatform@dd52cad))
* update dependency com.google.cloud:google-cloud-storage to v2.11.0 ([#1005](googleapis/java-aiplatform#1005)) ([60e2f76](googleapis/java-aiplatform@60e2f76))
* update dependency com.google.cloud:google-cloud-storage to v2.11.1 ([#1008](googleapis/java-aiplatform#1008)) ([9a2fe64](googleapis/java-aiplatform@9a2fe64))
* update dependency com.google.cloud:google-cloud-storage to v2.11.2 ([#1010](googleapis/java-aiplatform#1010)) ([3c2ac16](googleapis/java-aiplatform@3c2ac16))
* update dependency com.google.code.gson:gson to v2.9.1 ([#1001](googleapis/java-aiplatform#1001)) ([a6ffed4](googleapis/java-aiplatform@a6ffed4))

---
This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
suztomo pushed a commit that referenced this issue Feb 1, 2023
…1017)

* chore(java): update gcp-releasetool and cryptography in java requirements file
Source-Link: https://togithub.com/googleapis/synthtool/commit/74d0956884c1bb9dc901b52de35ca2bca025a74e
Post-Processor: gcr.io/cloud-devrel-public-resources/owlbot-java:latest@sha256:142286d973c7b6d58186070f203b50058a20a7d7b42147996db24921a18da1b0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API.
Projects
None yet
Development

No branches or pull requests

2 participants