Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can the bundling behavior of gRPC be disabled for pubsub publishers? #1432

Closed
mattnworb opened this issue Nov 28, 2016 · 27 comments
Closed

Can the bundling behavior of gRPC be disabled for pubsub publishers? #1432

mattnworb opened this issue Nov 28, 2016 · 27 comments
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@mattnworb
Copy link

I've encountered a case where an application that publishes pubsub messages (using google-cloud-pubsub:0.5.1) has encountered OutOfMemoryErrors which I have traced down to an instance of com.google.api.gax.bundling.ThresholdBundler taking up 92% of the heap space. This application purposefully runs with a small heap of ~256mb.

This application publishes all of it's messages to a single topic, so it seems as if all of the requests end up in the same ThresholdBundler instance. In the heap dump that I have, the ThresholdBundler instance has 63984 elements in the closedBundles list.

It looks like the gax library allows for the bundling behavior to be disabled via BundlingSettings$Builder.setIsEnabled(Boolean), but the pubsub layer does not expose any of these options and always leaves the bundling behavior as enabled.

Would it be possible to add flags to PubsubOptions (or wherever is appropriate) to allow for this request batching to be disabled if desired?

@garrettjonesgoogle
Copy link
Member

This is very unusual - the 63k elements is far beyond the limit set in the bundler; that amount of elements should have triggered a publish. The default limits are 1000 elements, 10 ms, and 10 mb; surpassing any individual limit should trigger a publish.

Are you publishing synchronously or asynchronously?

I think it makes sense to expose the controls - we are already working on a design that will do that.

@mattnworb
Copy link
Author

Hey Garret, we are publishing asynchronously, in a fire-and-forget manner. Our code adds a listener to the returned future to log if it succeeded or failed but we otherwise don't handle the response in any way. Each of our publish requests contains one message.

I think I forgot to mention earlier that we are using all of the default settings with the client.

Is there any state in particular of the ThresholdBundler or other pubsub-related instances that would be good to know about? Since I have a heap dump of an affected host I can examine anything that might seem relevant for the unexpected Bundler behavior.

@davidxia
Copy link

davidxia commented Nov 29, 2016

Hi, @garrettjonesgoogle. I see the code for those limits you're talking about. These seem to be settings for each individual bundle and not the ThresholdBundler itself. I think @mattnworb is saying 63K is the number of closedBundles not the number of elements in each bundle.

Is the ThresholdBundler.closedBundles array bounded in any way?

Speculation: What if some exception happens in ThresholdBundlingForwarder here before the bundle can be drained here? Or what if closedBundles is added to more quickly than the other thread can drain it?

Could this cause our OOM?

@garrettjonesgoogle
Copy link
Member

Ahh yes you're right - there is no limit on closedBundles. Both explanations (draining isn't fast enough, draining is failing) make sense for an OOM. If the runnable exits (e.g. due to an exception), there isn't currently anything that will restart it, which is a flaw. Do you have any trace of an exception that happened?

@garrettjonesgoogle garrettjonesgoogle added the type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. label Nov 29, 2016
@davidxia
Copy link

davidxia commented Nov 29, 2016

@garrettjonesgoogle, this is a stacktrace where BundleExecutor.processBundle() throws a RuntimeException. This is from a machine that became OOM. I don't know if this stackstrace is the actual cause of the OOM, but it seems likely?

2016-11-10T18:50:44.418+00:00 my.host.com helios[2831]: Unable to send an event to Google PubSub, topic: com.spotify.tingle.helios-events.HeliosTaskStatusEvents
2016-11-10T18:50:44.418+00:00 my.host.com helios[2831]:    com.google.cloud.pubsub.PubSubException: io.grpc.StatusRuntimeException: UNAUTHENTICATED
2016-11-10T18:50:44.418+00:00 my.host.com helios[2831]:    at com.google.cloud.pubsub.spi.DefaultPubSubRpc$1.apply(DefaultPubSubRpc.java:179)
2016-11-10T18:50:44.418+00:00 my.host.com helios[2831]:    at com.google.cloud.pubsub.spi.DefaultPubSubRpc$1.apply(DefaultPubSubRpc.java:173)
2016-11-10T18:50:44.418+00:00 my.host.com helios[2831]:    at com.google.common.util.concurrent.Futures$CatchingFuture.doFallback(Futures.java:842)
2016-11-10T18:50:44.418+00:00 my.host.com helios[2831]:    at com.google.common.util.concurrent.Futures$CatchingFuture.doFallback(Futures.java:834)
2016-11-10T18:50:44.418+00:00 my.host.com helios[2831]:    at com.google.common.util.concurrent.Futures$AbstractCatchingFuture.run(Futures.java:789)
2016-11-10T18:50:44.418+00:00 my.host.com helios[2831]:    at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:456)
2016-11-10T18:50:44.418+00:00 my.host.com helios[2831]:    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:817)
2016-11-10T18:50:44.418+00:00 my.host.com helios[2831]:    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:753)
2016-11-10T18:50:44.418+00:00 my.host.com helios[2831]:    at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:634)
2016-11-10T18:50:44.418+00:00 my.host.com helios[2831]:    at com.google.common.util.concurrent.SettableFuture.setException(SettableFuture.java:53)
2016-11-10T18:50:44.418+00:00 my.host.com helios[2831]:    at com.google.api.gax.grpc.BundlingFuture.setException(BundlingFuture.java:118)
2016-11-10T18:50:44.418+00:00 my.host.com helios[2831]:    at com.google.api.gax.grpc.BundlingContext.sendResult(BundlingContext.java:99)
2016-11-10T18:50:44.418+00:00 my.host.com helios[2831]:    at com.google.api.gax.grpc.BundleExecutor.processBundle(BundleExecutor.java:91)
2016-11-10T18:50:44.418+00:00 my.host.com helios[2831]:    at com.google.api.gax.bundling.ThresholdBundlingForwarder$BundleForwardingRunnable.processBundle(ThresholdBundlingForwarder.java:110)
2016-11-10T18:50:44.418+00:00 my.host.com helios[2831]:    at com.google.api.gax.bundling.ThresholdBundlingForwarder$BundleForwardingRunnable.run(ThresholdBundlingForwarder.java:93)
2016-11-10T18:50:44.418+00:00 my.host.com helios[2831]:    at java.lang.Thread.run(Thread.java:745)
2016-11-10T18:50:44.418+00:00 my.host.com helios[2831]:    Caused by: com.google.api.gax.grpc.ApiException: io.grpc.StatusRuntimeException: UNAUTHENTICATED

Let me know if this helps and/or if you need more info.

@garrettjonesgoogle
Copy link
Member

So have you been able to successfully send any messages? UNAUTHENTICATED implies that you haven't - or another scenario might be that you authenticated once, but for some reason the authentication expired.

@davidxia
Copy link

@garrettjonesgoogle I think we have been, but isn't the unauthenticated part an orthogonal issue?

@garrettjonesgoogle
Copy link
Member

Yes it is, but it might be a secondary issue that also needs to be addressed - if authentication expires, it needs to be refreshed somehow.

@davidxia
Copy link

davidxia commented Nov 29, 2016

Gotcha. I've attached the full stack trace here.
pubsub_oom_stracktrace.txt

@garrettjonesgoogle
Copy link
Member

The thing I'm curious about is what type of credentials you are using. Are you using the default credentials (which use Application Default Credentials under the hood), or something else?

@davidxia
Copy link

I think default? @mattnworb will have more details.

@lndbrg
Copy link

lndbrg commented Nov 30, 2016

we are injecting a path to a credentials file from GOOGLE_APPLICATION_CREDENTIALS environment variable.

(we use a version of the library before it moved to google-auth-library-java, so it follows this pattern:

  1. Credentials supplied when building the service options
  2. App Engine credentials
  3. Key file pointed to by the GOOGLE_APPLICATION_CREDENTIALS environment variable
  4. Google Cloud SDK credentials
  5. Compute Engine credentials)

It's verified to pick up the correct credentials and being able to use them without auth issues on other instances and services (and seems to have no problem re-authing after having seen UNAUTHENTICATED in the logs).

@mattnworb
Copy link
Author

mattnworb commented Nov 30, 2016

To verify what @lndbrg mentioned we are using a service account with the credentials supplied as he mentioned.

Our publisher application runs on 1000s of hosts and we have seen this problem with the OutOfMemoryError on just a handful of hosts.

Looking at the logs of one of the affected host, it seems like it begins to get publishing errors almost immediately after startup due to an inability to connect to get an access token for the service account:

Unable to send an event to Google PubSub, topic: <the-topic-name>
  com.google.cloud.pubsub.PubSubException: io.grpc.StatusRuntimeException: UNAUTHENTICATED
  at com.google.cloud.pubsub.spi.DefaultPubSubRpc$1.apply(DefaultPubSubRpc.java:179)
  at com.google.cloud.pubsub.spi.DefaultPubSubRpc$1.apply(DefaultPubSubRpc.java:173)
  at com.google.common.util.concurrent.Futures$CatchingFuture.doFallback(Futures.java:842)
  at com.google.common.util.concurrent.Futures$CatchingFuture.doFallback(Futures.java:834)
  at com.google.common.util.concurrent.Futures$AbstractCatchingFuture.run(Futures.java:789)
  at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:456)
  at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:817)
  at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:753)
  at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:634)
  at com.google.common.util.concurrent.SettableFuture.setException(SettableFuture.java:53)
  at com.google.api.gax.grpc.BundlingFuture.setException(BundlingFuture.java:118)
  at com.google.api.gax.grpc.BundlingContext.sendResult(BundlingContext.java:99)
  at com.google.api.gax.grpc.BundleExecutor.processBundle(BundleExecutor.java:91)
  at com.google.api.gax.bundling.ThresholdBundlingForwarder$BundleForwardingRunnable.processBundle(ThresholdBundlingForwarder.java:110)
  at com.google.api.gax.bundling.ThresholdBundlingForwarder$BundleForwardingRunnable.run(ThresholdBundlingForwarder.java:93)
  at java.lang.Thread.run(Thread.java:745)
Caused by: com.google.api.gax.grpc.ApiException: io.grpc.StatusRuntimeException: UNAUTHENTICATED
  at com.google.api.gax.grpc.ExceptionTransformingCallable$ExceptionTransformingFuture.onFailure(ExceptionTransformingCallable.java:113)
  at com.google.common.util.concurrent.Futures$6.run(Futures.java:1764)
  at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:456)
  at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:817)
  at com.google.common.util.concurrent.AbstractFuture.addListener(AbstractFuture.java:595)
  at com.google.common.util.concurrent.Futures.addCallback(Futures.java:1776)
  at com.google.common.util.concurrent.Futures.addCallback(Futures.java:1713)
  at com.google.api.gax.grpc.ExceptionTransformingCallable.futureCall(ExceptionTransformingCallable.java:69)
  at com.google.api.gax.grpc.RetryingCallable$RetryingResultFuture.issueCall(RetryingCallable.java:222)
  at com.google.api.gax.grpc.RetryingCallable.futureCall(RetryingCallable.java:90)
  at com.google.api.gax.grpc.UnaryCallable.futureCall(UnaryCallable.java:242)
  at com.google.api.gax.grpc.UnaryCallable.futureCall(UnaryCallable.java:253)
  at com.google.api.gax.grpc.UnaryCallable.call(UnaryCallable.java:290)
  at com.google.api.gax.grpc.BundleExecutor.processBundle(BundleExecutor.java:84)
  at com.google.api.gax.bundling.ThresholdBundlingForwarder$BundleForwardingRunnable.processBundle(ThresholdBundlingForwarder.java:110)
  at com.google.api.gax.bundling.ThresholdBundlingForwarder$BundleForwardingRunnable.run(ThresholdBundlingForwarder.java:93)
  at java.lang.Thread.run(Thread.java:745)
Caused by: io.grpc.StatusRuntimeException: UNAUTHENTICATED
  at io.grpc.Status.asRuntimeException(Status.java:545)
  at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:442)
  at io.grpc.ClientInterceptors$CheckedForwardingClientCall.start(ClientInterceptors.java:203)
  at io.grpc.ForwardingClientCall.start(ForwardingClientCall.java:47)
  at com.google.api.gax.grpc.HeaderInterceptor$1.start(HeaderInterceptor.java:64)
  at io.grpc.stub.ClientCalls.startCall(ClientCalls.java:273)
  at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:252)
  at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:189)
  at com.google.api.gax.grpc.DirectCallable.futureCall(DirectCallable.java:58)
  at com.google.api.gax.grpc.ExceptionTransformingCallable.futureCall(ExceptionTransformingCallable.java:66)
  at com.google.api.gax.grpc.RetryingCallable$RetryingResultFuture.issueCall(RetryingCallable.java:222)
  at com.google.api.gax.grpc.RetryingCallable.futureCall(RetryingCallable.java:90)
  at com.google.api.gax.grpc.UnaryCallable.futureCall(UnaryCallable.java:242)
  at com.google.api.gax.grpc.UnaryCallable.futureCall(UnaryCallable.java:253)
  at com.google.api.gax.grpc.UnaryCallable.call(UnaryCallable.java:290)
  at com.google.api.gax.grpc.BundleExecutor.processBundle(BundleExecutor.java:84)
  at com.google.api.gax.bundling.ThresholdBundlingForwarder$BundleForwardingRunnable.processBundle(ThresholdBundlingForwarder.java:110)
  at com.google.api.gax.bundling.ThresholdBundlingForwarder$BundleForwardingRunnable.run(ThresholdBundlingForwarder.java:93)
  at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Error getting access token for service account: 
  at com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:227)
  at com.google.auth.oauth2.OAuth2Credentials.refresh(OAuth2Credentials.java:97)
  at com.google.auth.oauth2.OAuth2Credentials.getRequestMetadata(OAuth2Credentials.java:74)
  at io.grpc.auth.ClientAuthInterceptor.getRequestMetadata(ClientAuthInterceptor.java:150)
  at io.grpc.auth.ClientAuthInterceptor.access$100(ClientAuthInterceptor.java:64)
  at io.grpc.auth.ClientAuthInterceptor$1.checkedStart(ClientAuthInterceptor.java:96)
  at io.grpc.ClientInterceptors$CheckedForwardingClientCall.start(ClientInterceptors.java:195)
  at io.grpc.ForwardingClientCall.start(ForwardingClientCall.java:47)
  at com.google.api.gax.grpc.HeaderInterceptor$1.start(HeaderInterceptor.java:64)
  at io.grpc.stub.ClientCalls.startCall(ClientCalls.java:273)
  at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:252)
  at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:189)
  at com.google.api.gax.grpc.DirectCallable.futureCall(DirectCallable.java:58)
  at com.google.api.gax.grpc.ExceptionTransformingCallable.futureCall(ExceptionTransformingCallable.java:66)
  at com.google.api.gax.grpc.RetryingCallable$RetryingResultFuture.issueCall(RetryingCallable.java:222)
  at com.google.api.gax.grpc.RetryingCallable.futureCall(RetryingCallable.java:90)
  at com.google.api.gax.grpc.UnaryCallable.futureCall(UnaryCallable.java:242)
  at com.google.api.gax.grpc.UnaryCallable.futureCall(UnaryCallable.java:253)
  at com.google.api.gax.grpc.UnaryCallable.call(UnaryCallable.java:290)
  at com.google.api.gax.grpc.BundleExecutor.processBundle(BundleExecutor.java:84)
  at com.google.api.gax.bundling.ThresholdBundlingForwarder$BundleForwardingRunnable.processBundle(ThresholdBundlingForwarder.java:110)
  at com.google.api.gax.bundling.ThresholdBundlingForwarder$BundleForwardingRunnable.run(ThresholdBundlingForwarder.java:93)
  at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketTimeoutException: connect timed out
  at java.net.PlainSocketImpl.socketConnect(Native Method)
  at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
  at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
  at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
  at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
  at java.net.Socket.connect(Socket.java:589)
  at sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:668)
  at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
  at sun.net.www.http.HttpClient.openServer(HttpClient.java:432)
  at sun.net.www.http.HttpClient.openServer(HttpClient.java:527)
  at sun.net.www.protocol.https.HttpsClient.<init>(HttpsClient.java:264)
  at sun.net.www.protocol.https.HttpsClient.New(HttpsClient.java:367)
  at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.getNewHttpClient(AbstractDelegateHttpsURLConnection.java:191)
  at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1104)
  at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:998)
  at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:177)
  at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1282)
  at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1257)
  at sun.net.www.protocol.https.HttpsURLConnectionImpl.getOutputStream(HttpsURLConnectionImpl.java:250)
  at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:77)
  at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:972)
  at com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:225)
  at com.google.auth.oauth2.OAuth2Credentials.refresh(OAuth2Credentials.java:97)
  at com.google.auth.oauth2.OAuth2Credentials.getRequestMetadata(OAuth2Credentials.java:74)
  at io.grpc.auth.ClientAuthInterceptor.getRequestMetadata(ClientAuthInterceptor.java:150)
  at io.grpc.auth.ClientAuthInterceptor.access$100(ClientAuthInterceptor.java:64)
  at io.grpc.auth.ClientAuthInterceptor$1.checkedStart(ClientAuthInterceptor.java:96)
  at io.grpc.ClientInterceptors$CheckedForwardingClientCall.start(ClientInterceptors.java:195)
  at io.grpc.ForwardingClientCall.start(ForwardingClientCall.java:47)
  at com.google.api.gax.grpc.HeaderInterceptor$1.start(HeaderInterceptor.java:64)
  at io.grpc.stub.ClientCalls.startCall(ClientCalls.java:273)
  at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:252)
  at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:189)
  at com.google.api.gax.grpc.DirectCallable.futureCall(DirectCallable.java:58)
  at com.google.api.gax.grpc.ExceptionTransformingCallable.futureCall(ExceptionTransformingCallable.java:66)
  at com.google.api.gax.grpc.RetryingCallable$RetryingResultFuture.issueCall(RetryingCallable.java:222)
  at com.google.api.gax.grpc.RetryingCallable.futureCall(RetryingCallable.java:90)
  at com.google.api.gax.grpc.UnaryCallable.futureCall(UnaryCallable.java:242)
  at com.google.api.gax.grpc.UnaryCallable.futureCall(UnaryCallable.java:253)
  at com.google.api.gax.grpc.UnaryCallable.call(UnaryCallable.java:290)
  at com.google.api.gax.grpc.BundleExecutor.processBundle(BundleExecutor.java:84)
  at com.google.api.gax.bundling.ThresholdBundlingForwarder$BundleForwardingRunnable.processBundle(ThresholdBundlingForwarder.java:110)
  at com.google.api.gax.bundling.ThresholdBundlingForwarder$BundleForwardingRunnable.run(ThresholdBundlingForwarder.java:93)
  at java.lang.Thread.run(Thread.java:745)

The root of the exception originates in com.google.api.gax.bundling.ThresholdBundlingForwarder$BundleForwardingRunnable.run(ThresholdBundlingForwarder.java:93) as pointed out earlier.

The application is then attempting to publish new messages every 10-20 seconds, and this stacktrace repeats for each attempt (so it seems as if the thread launched by ThresholdBundlingForwarder is not dying, or is being restarted). This continues for 8 days or so until we see an OutOfMemoryError.

Instances of the application that don't have issues connecting to the relevant googleapi to get an access token don't experience any issues publishing and never receive an OOM.

So I think there are three issues going on:

  1. com.google.api.gax.bundling.ThresholdBundler has an unbounded queue for closedBundles that will fill endlessly if an application is publishing faster than the queue can be drained, or if draining/publishing can't occur at all for some reason.

  2. It seems like if the thread that ThresholdBundlingForwarder starts dies for any reason, it won't be restarted, and the messages accumulating in the Bundler will not be processed at all; however, I do not believe that we are seeing this in our own application as the stacktrace seems to propogate from com.google.api.gax.grpc.BundleExecutor.processBundle(BundleExecutor.java:91) - the exception that masquerades as UNAUTHENTICATED is caught here when the callable is invoked.

  3. The google-cloud-pubsub library doesn't provide any way to configure the bundling behavior within the gax/grpc layers via PubSub or PubSubOptions. If this existed, then we could simply disable bundling in our application (as we are not publishing very many messages in the normal use case) to sidestep 1 and 2 above for now.

Since 1 and 2 are really issues within the googleapis/gax-java library, should we open issues there instead?

I believe @davidxia also opened an issue in the Cloud Support Center - we'd be happy to continue the discussion there (or here), whichever seems best for resolving this.

@garrettjonesgoogle
Copy link
Member

Filing issues 1 & 2 against gax-java makes sense. (The same people work on both repositories.)

One potential way to mitigate failing hosts in your case is to send 1 synchronous "canary" call when the app starts; if it gets UNAUTHENTICATED, then you can recover & fix that then, instead of having the host do nothing useful and eventually OOM. Potentially it doesn't even have to be a publish call - calling getTopic() is probably sufficient to establish that the authentication worked.

We'll try to get all of these other issues fixed, but be aware that we are currently resource-constrained.

@garrettjonesgoogle
Copy link
Member

The remaining request here not covered by googleapis/gax-java#157 is to allow bundling to be disabled at the handwritten layer. @pongad , can you consider whether this should be exposed in the Pub/Sub rewrite?

@pongad
Copy link
Contributor

pongad commented Jan 27, 2017

As currently designed, the bundling can be "turned off" by setting elementCountThreshold to 1, saying "publish the bundle every time the bundle reaches 1 message". @mattnworb do you think it solves your use case?

EDIT: "as currently designed" refers to a new Publisher implementation which is not in master yet

@garrettjonesgoogle
Copy link
Member

@pongad actually the problem is that even with a threshold of 1, the publish is still done in the bundling thread, and the error isn't propagated to the calling thread.

@mattnworb
Copy link
Author

@pongad is the new Publisher implementation that you refer to the one on the pubsub-hp branch?

It seems like what is in this branch would solve my case, but is there still bundling done at the grpc layer?

@garrettjonesgoogle
Copy link
Member

Since @pongad is in the Sydney timezone, I will answer your questions:

  • Yes, the new Publisher implementation is on the pubsub-hp branch.
  • Yes, there is still bundling done. Technically the bundling implementation in the branch is distinct from the one in GAX, but they will get merged in short order.

@mattnworb
Copy link
Author

Thanks for the quick answer. I think the main thing that would solve the original problem here would be that bundles that can't be sent (for whatever reason) aren't kept around by the bundler or whomever indefinitely. Publishing the bundle immediately (with threshold=1) is useful too, but the main issue that I recall is the closed bundle list growing without bound.

@garrettjonesgoogle
Copy link
Member

The rewrite includes https://github.com/googleapis/gax-java/blob/bundling-hp/src/main/java/com/google/api/gax/grpc/FlowControlSettings.java , which allows for explicit limits to be set on the bundler.

@garrettjonesgoogle
Copy link
Member

The rewrite has been merged to master and released. @mattnworb could you take a look to see if your concerns are addressed?

@mattnworb
Copy link
Author

mattnworb commented Feb 23, 2017

@garrettjonesgoogle thanks for the update. Are there any docs on how to move from using PubSub and PubSub.publishAsync(message) etc to the new Publisher class? The changes involved don't seem obvious at first glance - for instance how to specify the Credentials to use with a Publisher, etc.

@garrettjonesgoogle
Copy link
Member

@pongad could you point @mattnworb to the information he needs?

@pongad
Copy link
Contributor

pongad commented Feb 26, 2017

@mattnworb Could you see if this helps? We are still in the process of updating all our code samples.

If you don't specify any Credentials, the application default credentials should be used the way it was before. If you need to specify another credentials, you can create a channel provider with the credential you want, then use the provider to build the Publisher

@garrettjonesgoogle
Copy link
Member

@mattnworb I'm going to close out this issue now - feel free to file a new issue if you're having problems adopting the new Publisher.

@mattnworb
Copy link
Author

@garrettjonesgoogle I haven't had a chance to try out the new Publisher yet, but thanks overall for following up on this and addressing the issue.

github-actions bot pushed a commit that referenced this issue Aug 9, 2022
…0.10 (#1432)

[![Mend Renovate](https://app.renovatebot.com/images/banner.svg)](https://renovatebot.com)

This PR contains the following updates:

| Package | Change | Age | Adoption | Passing | Confidence |
|---|---|---|---|---|---|
| [com.google.cloud:google-cloud-pubsub](https://togithub.com/googleapis/java-pubsub) | `1.120.9` -> `1.120.10` | [![age](https://badges.renovateapi.com/packages/maven/com.google.cloud:google-cloud-pubsub/1.120.10/age-slim)](https://docs.renovatebot.com/merge-confidence/) | [![adoption](https://badges.renovateapi.com/packages/maven/com.google.cloud:google-cloud-pubsub/1.120.10/adoption-slim)](https://docs.renovatebot.com/merge-confidence/) | [![passing](https://badges.renovateapi.com/packages/maven/com.google.cloud:google-cloud-pubsub/1.120.10/compatibility-slim/1.120.9)](https://docs.renovatebot.com/merge-confidence/) | [![confidence](https://badges.renovateapi.com/packages/maven/com.google.cloud:google-cloud-pubsub/1.120.10/confidence-slim/1.120.9)](https://docs.renovatebot.com/merge-confidence/) |

---

### Release Notes

<details>
<summary>googleapis/java-pubsub</summary>

### [`v1.120.10`](https://togithub.com/googleapis/java-pubsub/blob/HEAD/CHANGELOG.md#&#8203;112010-httpsgithubcomgoogleapisjava-pubsubcomparev11209v112010-2022-08-04)

[Compare Source](https://togithub.com/googleapis/java-pubsub/compare/v1.120.9...v1.120.10)

##### Dependencies

-   update dependency com.google.cloud:google-cloud-core to v2.8.8 ([#&#8203;1231](https://togithub.com/googleapis/java-pubsub/issues/1231)) ([9d13dd8](https://togithub.com/googleapis/java-pubsub/commit/9d13dd8bc43e24815884dde421409136958d4b0f))

</details>

---

### Configuration

📅 **Schedule**: Branch creation - At any time (no schedule defined), Automerge - At any time (no schedule defined).

🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied.

♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox.

🔕 **Ignore**: Close this PR and you won't be reminded about this update again.

---

 - [ ] <!-- rebase-check -->If you want to rebase/retry this PR, click this checkbox.

---

This PR has been generated by [Mend Renovate](https://www.mend.io/free-developer-tools/renovate/). View repository job log [here](https://app.renovatebot.com/dashboard#github/googleapis/java-bigquerydatatransfer).
<!--renovate-debug:eyJjcmVhdGVkSW5WZXIiOiIzMi4xNDMuMSIsInVwZGF0ZWRJblZlciI6IjMyLjE0My4xIn0=-->
github-actions bot pushed a commit that referenced this issue Aug 9, 2022
🤖 I have created a release *beep* *boop*
---


## [2.3.3](googleapis/java-bigquerydatatransfer@v2.3.2...v2.3.3) (2022-08-09)


### Dependencies

* update dependency com.google.cloud:google-cloud-bigquery to v2.14.1 ([#1430](googleapis/java-bigquerydatatransfer#1430)) ([befed25](googleapis/java-bigquerydatatransfer@befed25))
* update dependency com.google.cloud:google-cloud-bigquery to v2.14.2 ([#1433](googleapis/java-bigquerydatatransfer#1433)) ([30b6942](googleapis/java-bigquerydatatransfer@30b6942))
* update dependency com.google.cloud:google-cloud-bigquery to v2.14.3 ([#1434](googleapis/java-bigquerydatatransfer#1434)) ([5c054b0](googleapis/java-bigquerydatatransfer@5c054b0))
* update dependency com.google.cloud:google-cloud-pubsub to v1.120.10 ([#1432](googleapis/java-bigquerydatatransfer#1432)) ([4ab39e9](googleapis/java-bigquerydatatransfer@4ab39e9))
* update dependency com.google.cloud:google-cloud-pubsub to v1.120.8 ([#1428](googleapis/java-bigquerydatatransfer#1428)) ([ddab51f](googleapis/java-bigquerydatatransfer@ddab51f))
* update dependency com.google.cloud:google-cloud-pubsub to v1.120.9 ([#1431](googleapis/java-bigquerydatatransfer#1431)) ([f909343](googleapis/java-bigquerydatatransfer@f909343))
* update dependency com.google.cloud:google-cloud-shared-dependencies to v3 ([#1426](googleapis/java-bigquerydatatransfer#1426)) ([8a2c31e](googleapis/java-bigquerydatatransfer@8a2c31e))
* update dependency com.google.protobuf:protobuf-java-util to v3.21.3 ([#1423](googleapis/java-bigquerydatatransfer#1423)) ([50bc04e](googleapis/java-bigquerydatatransfer@50bc04e))
* update dependency com.google.protobuf:protobuf-java-util to v3.21.4 ([#1425](googleapis/java-bigquerydatatransfer#1425)) ([60d8c78](googleapis/java-bigquerydatatransfer@60d8c78))

---
This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
github-actions bot pushed a commit to renovate-bot/google-cloud-java that referenced this issue Nov 19, 2022
…s#1687) (googleapis#1432)

* chore(java): add a note in README for migrated split repos

Disable renovate bot and flaky bot for split repositories
that have moved to the Java monorepo.
The Java monorepo will pass the "monorepo=True" parameter
to java.common_templates method in its owlbot.py files so that
the migration note will not appear in the README in the monorepo.

Co-authored-by: Jeff Ching <[email protected]>
Source-Link: googleapis/synthtool@d4b2916
Post-Processor: gcr.io/cloud-devrel-public-resources/owlbot-java:latest@sha256:edae91ccdd2dded2f572ec341a768ad180305a3e8fbfd93064b28e237d35920a

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
Co-authored-by: Jeff Ching <[email protected]>
Co-authored-by: Deepankar Dixit <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests

6 participants