Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub: Subscriber Client stops acking messages after several seconds #4274

Closed
anorth2 opened this issue Oct 27, 2017 · 25 comments
Closed

PubSub: Subscriber Client stops acking messages after several seconds #4274

anorth2 opened this issue Oct 27, 2017 · 25 comments
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release.

Comments

@anorth2
Copy link

anorth2 commented Oct 27, 2017

  1. Specify the API at the beginning of the title (for example, "BigQuery: ...")
    General, Core, and Other are also allowed as types

  2. OS type and version
    python:3-alpine docker image running in GKE

  3. Python version and virtual environment information python --version

  4. google-cloud-python version pip show google-cloud, pip show google-<service> or pip freeze
    google-cloud-pubsub==0.28.4
    grpcio==1.6.3

  5. Stacktrace if available

  6. Steps to reproduce
    Run example code, watch stackdriver metrics for acks vs modify_ack_deadline

  7. Code example

import os
import sys
import logging
import traceback
import json
import time
import argparse
import grequests
import grpc

from item_store import ItemStore

from google.cloud.pubsub_v1.subscriber.policy.thread import Policy
from google.cloud import pubsub_v1
requests = ItemStore()

class OurPolicy(Policy):
    """
    We occasionally see errors that google code doesn't
    recover from, set a flag that let's the outer thread respond
    by restarting the client.
    grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated
    with (StatusCode.UNAVAILABLE, OS Error)>
    """
    _exception_caught = None
    def __init__(self, *args, **kws):
        return super(OurPolicy, self).__init__(*args, **kws)
    def on_exception(self, exc):
        # If this is DEADLINE_EXCEEDED, then we want to retry by returning
        # None instead of raise-ing
        deadline_exceeded = grpc.StatusCode.DEADLINE_EXCEEDED
        code_value = getattr(exc, 'code', lambda: None)()
        if code_value == deadline_exceeded:
            return
        OurPolicy._exception_caught = exc
        # will just raise exc
        return super(OurPolicy, self).on_exception(exc)

class InvalidSchemaException(Exception):
    pass

def log_unhandled_exception(type, value, traceback):
    logger.error(type, value, traceback)

sys.excepthook = log_unhandled_exception

def send_to_data_insertion(message):
    requests.add(grequests.post('http://%s:8080/url_here=%s' % (
        address, token),
        data=message.data.decode('utf-8')))


###############  Subscriber logic here ####################
def receive_messages(project, subscription_name):
    subscriber = pubsub_v1.SubscriberClient(policy_class=OurPolicy)
    subscription_path = subscriber.subscription_path(
        project, subscription_name)

    def callback(message):
        message.ack()
        try:
            send_to_data_insertion(message)

        except InvalidSchemaException as e:
            return
        except Exception as e:
            return

    while live_forever:
        if subscriber is None:
            logger.warning('Starting pubsub subscriber client')
            subscriber = pubsub_v1.SubscriberClient(policy_class=OurPolicy)

        subscriber.subscribe(subscription_path).open(callback=callback)

        try:
            while True:
                grequests.map(requests.getAll())
                time.sleep(sleep_interval)

                if OurPolicy._exception_caught:
                    exc = OurPolicy._exception_caught
                    OurPolicy._exception_caught = None
                    raise exc
        except KeyboardInterrupt:
            break
        except Exception as e:
            subscriber = None

    # otherwise, sleep for one interval and exit
    time.sleep(sleep_interval)

#####################################################


if __name__ == "__main__":
    parser.add_argument('project', help='Google cloud project ID')
    parser.add_argument('subscription', help="Google cloud subscription name")
    args = parser.parse_args()
    receive_messages(args.project, args.subscription)

screen shot 2017-10-27 at 1 06 46 pm

Using GitHub flavored markdown can help make your request clearer.
See: https://guides.github.com/features/mastering-markdown/

@markcwhitfield
Copy link

markcwhitfield commented Oct 27, 2017

Don't have any solutions for you, but I'm also trying out the new API and seeing similar issues.

I'm also seeing StatusCode.UNAVAILABLE killing the consumer thread. My approach was just to let the policy itself retry by returning None in on_exception, same as DEADLINE_EXCEEDED. I don't understand why this or something similar wouldn't be the case in the default policy, as occasional UNAVAILABLE responses seem expected and inevitable, and the default policy's approach is to let the thread die without any way to detect or recover it in the main thread.

In any case, that modification of on_exception definitely helps. I am still seeing a very large number of repeat messages, though I haven't done thorough enough testing to even pin it down to the library or my own code.

EDIT: I see there are several similar issues already opened. Between this, #4186, and #3886 upgrading Pub/Sub has been a real pain, but the old API holds back every other client :/

@lukesneeringer
Copy link
Contributor

I think this issue is a duplicate of #4238, but leaving open just in case (and because this is a really useful bug report).

I think I have a handle on the root cause of this and am working on it this week. I actually got a fix in for the exception handling issue last week, but we have not released it to PyPI yet.

@lukesneeringer lukesneeringer self-assigned this Oct 30, 2017
@lukesneeringer lukesneeringer added api: pubsub Issues related to the Pub/Sub API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. labels Oct 30, 2017
@tseaver tseaver changed the title PubSub: Pubsub Python Client stops acking messages after several seconds Subscriber Client stops acking messages after several seconds Oct 30, 2017
@anorth2
Copy link
Author

anorth2 commented Oct 30, 2017

More info on this:

What it appears to do is drop the ack rate to between 0/s and 10/s while StreamingPullOperations climb into the thousands. It also has thousands of ModifyAckDeadline events. So it seems to keep repulling (?) the same events and extending their deadline so they don't get resent to other subscribers.

@eoltean
Copy link

eoltean commented Nov 1, 2017

Any updates on this?

@chemelnucfin chemelnucfin changed the title Subscriber Client stops acking messages after several seconds Pubsub: Subscriber Client stops acking messages after several seconds Nov 3, 2017
@chemelnucfin chemelnucfin changed the title Pubsub: Subscriber Client stops acking messages after several seconds PubSub: Subscriber Client stops acking messages after several seconds Nov 3, 2017
@lukesneeringer
Copy link
Contributor

@eoltean Still working on it. :-/

@lukesneeringer
Copy link
Contributor

What it appears to do is drop the ack rate to between 0/s and 10/s while StreamingPullOperations climb into the thousands. It also has thousands of ModifyAckDeadline events. So it seems to keep repulling (?) the same events and extending their deadline so they don't get resent to other subscribers.

Let me make sure I understand this correctly. You are saying that your acks cease going through (except in small amounts) while the modacks do?

@anorth2
Copy link
Author

anorth2 commented Nov 3, 2017

Unless I misunderstand the graph, then yes that's what is going on. These are all the same time period. The slow burndown in unacked messages is the python client. When the unacked messages dropped suddenly, that is when we switched to the Go client for testing.
For most of the period of these graphs the ack rate is from 0 to 10/s with a few random spikes where it hit several hundred per second before dropping.

screen shot 2017-11-03 at 10 58 50 am
screen shot 2017-11-03 at 10 58 56 am
Ignore from-data-insertion-svc, that is another Topic/Sub using the same client code
asdfasdf

Zoomed (two time periods, one with 0 acks, one with acks)
ModifyAckDeadline decreasing when 67 Acks/second
ModifyAckDeadline With 0 Acks/second

@adamlofts
Copy link

adamlofts commented Nov 6, 2017

Could someone recommend a version of the pubsub client which doesn't have this bug? I've just started using pubsub and this bug makes it unusable. Thanks.

Note for those debugging this issue: I only see this problem on GKE and not when running locally.

@eoltean
Copy link

eoltean commented Nov 6, 2017

This is happening both locally and on GKE for us

@anorth2
Copy link
Author

anorth2 commented Nov 7, 2017

@adamlofts The last version that appears to work correctly was before the API rewrite. So the 0.27.x releases.

@adamlofts
Copy link

@anorth2 I've downgraded and the subscriber is working nicely. Thanks.

@lukesneeringer
Copy link
Contributor

(Status update: Work still in progress trying to fix this.)

@anorth2
Copy link
Author

anorth2 commented Nov 14, 2017

Any updates on this? Our team is trying to decide if the production version of our project can rely on the Python client or if we need to switch to Go/Java.

@eoltean
Copy link

eoltean commented Nov 30, 2017

@lukesneeringer I see a couple of the other issues are being resolved do these effect this issue at all? Is there any other updates on this its been over 3 weeks since any update.

@dhermes
Copy link
Contributor

dhermes commented Nov 30, 2017

@eoltean I'll try to get to this soon. Hopefully it'll be reproducible (since @anorth2 provided so much code, it seems there are plenty of details).

@AndreCimander
Copy link

AndreCimander commented Dec 5, 2017

We might be running into a related problem, we are currently rewriting the psq package to suit our needs and I have trouble with ack'ing messages after an exception occurred. Is the message somehow auto-nack'ed after an exception is thrown?

If I start a multi-process worker I can watch the task getting redelivered to the other processes just to also fail.

google-cloud==0.30.0
google-cloud-pubsub==0.29.2

    @classmethod
    def restore(cls, message):
        """Restore task from dumped data.

        Args:
            message (google.cloud.pubsub_v1.subscriber.message.Message):

        Returns:
            psq.task.Task: task instance for worker
        """
        try:
            # todo: psq: implement TaskRegistry and a sane json serialization to avoid import problems
            task = unpickle(message.data)  # type: Task
            task.message = message
            return task
        except UnpickleError:
            #
            # import broken, it's dead, Jim.
            #
            message.ack()  # <-- this should discard the poor message, shouldn't it?
            
            logger.exception('Failed to unpickle task {}.'.format(message.message_id))
ERROR    2017-12-05 10:07:15,998 psq.task Failed to unpickle task 101585298217325.
Traceback (most recent call last):
  File "/home/andre/Projects/psq/src/psq/utils.py", line 47, in unpickle
    obj = loads(pickled_string)
AttributeError: Can't get attribute 'mark_done' on <module 'psq.tests.worker_test' from '/home/andre/Projects/psq/src/psq/tests/worker_test.py'>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/andre/Projects/psq/src/psq/task.py", line 99, in restore
    task = unpickle(message.data)
  File "/home/andre/Projects/psq/src/psq/utils.py", line 49, in unpickle
    raise UnpickleError('Could not unpickle', pickled_string, e)
psq.utils.UnpickleError: ('Could not unpickle', b'\x80\x04\x95\x96\x00\x00\x00\x00\x00\x00\x00\x8c\x08psq.task\x94\x8c\x04Task\x94\x93\x94)\x81\x94}\x94(\x8c\x06kwargs\x94}\x94\x8c\x01f\x94\x8c\x15psq.tests.worker_test\x94\x8c\tmark_done\x94\x93\x94\x8c\x06result\x94N\x8c\x04args\x94)\x8c\x02id\x94\x8c\x011\x94\x8c\x07retries\x94K\x00\x8c\x07message\x94N\x8c\x06status\x94\x8c\x06queued\x94ub.', AttributeError("Can't get attribute 'mark_done' on <module 'psq.tests.worker_test' from '/home/andre/Projects/psq/src/psq/tests/worker_test.py'>",))

@theacodes
Copy link
Contributor

theacodes commented Dec 5, 2017 via email

@skycoop
Copy link

skycoop commented Dec 6, 2017

Any updates on this?

@dmontag
Copy link

dmontag commented Dec 11, 2017

I might be seeing this issue as well. I have a single subscriber listening to infrequent changes in a GCS bucket using a PubSub topic. My subscriber callback looks like this:

def add_to_queue(message):
  message_queue.put(message)
  message.ack()

Old messages keep getting delivered over and over, and eventually I think the un-ack()'d backlog blocks new messages from being processed.
image
I'm using pubsub 0.29.3, cloud 0.31.0, core 0.28.0, grpcio 1.7.3 and Python 3.6.3.

Edit: I rewrote my script using google-cloud-pubsub==0.27.0 as mentioned by @anorth2, and it works correctly. I'll also try the newly released 0.29.4 to see how it works.

Edit 2: 0.29.4 seemed to handle message pulling a bit better but not perfectly (once it hung on receiving messages, but they seemed to be acknowledged when I restarted the app). However, it exhibits the separate behavior of eventually consuming 100% CPU that I reported in #4563, so that forced me to use 0.27.0 anyway.

@anorth2
Copy link
Author

anorth2 commented Dec 14, 2017

To provide an update, we're currently on 0.29.4 and seeing the same (if not worse) behavior as before. Hitting 14 GB of memory usage after about 10 minutes, only acking at around 5-10/s, 2000-7000+ streamingpulloperations/second:

screen shot 2017-12-14 at 10 00 01 am

@dhermes
Copy link
Contributor

dhermes commented Dec 14, 2017

@anorth2, there is a spinlock bug in gRPC that I think may be the culprit. A PR (grpc/grpc#13665) is out as a potential fix and I am going to install grpcio from source based on this PR to see if it resolves the issue.

Can you run a strace on the impacted pthread? For example, when 25626 was my parent process:

$ ps auxw -L
USER       PID   LWP %CPU NLWP %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
...
${USER}  25626 25626  0.2    9  0.3 816352 53276 pts/2    Sl+  20:30   0:13 .../bin/python no-messages-too/script.py
${USER}  25626 25631  0.1    9  0.3 816352 53276 pts/2    Sl+  20:30   0:06 .../bin/python no-messages-too/script.py
${USER}  25626 25632  0.0    9  0.3 816352 53276 pts/2    Sl+  20:30   0:00 .../bin/python no-messages-too/script.py
${USER}  25626 25639  0.1    9  0.3 816352 53276 pts/2    Sl+  20:30   0:05 .../bin/python no-messages-too/script.py
${USER}  25626 25657  0.0    9  0.3 816352 53276 pts/2    Sl+  20:30   0:00 .../bin/python no-messages-too/script.py
${USER}  25626 25658  0.0    9  0.3 816352 53276 pts/2    Sl+  20:30   0:00 .../bin/python no-messages-too/script.py
${USER}  25626 25659  0.0    9  0.3 816352 53276 pts/2    Sl+  20:30   0:00 .../bin/python no-messages-too/script.py
${USER}  25626  5033 99.1    9  0.3 816352 53276 pts/2    Rl+  21:46   0:35 .../bin/python no-messages-too/script.py
${USER}  25626  5034  0.0    9  0.3 816352 53276 pts/2    Sl+  21:46   0:00 .../bin/python no-messages-too/script.py
...

I identified 5033 as the runaway pthread and attaching to it shows the POLLIN/POLLHUP issue:

$ sudo strace -p 5033
...
poll([{fd=6, events=POLLIN}, {fd=7, events=0}, {fd=9, events=POLLIN}], 3, 190) = 1 ([{fd=7, revents=POLLHUP}])
clock_gettime(CLOCK_REALTIME, {1513230431, 254451018}) = 0
write(2, "D1213 21:47:11.254451018    5033"..., 78) = 78
clock_gettime(CLOCK_REALTIME, {1513230431, 254488163}) = 0
write(2, "D1213 21:47:11.254488163    5033"..., 96) = 96
clock_gettime(CLOCK_MONOTONIC, {596307, 373944159}) = 0
poll([{fd=6, events=POLLIN}, {fd=7, events=0}, {fd=9, events=POLLIN}], 3, 190) = 1 ([{fd=7, revents=POLLHUP}])
clock_gettime(CLOCK_REALTIME, {1513230431, 254563047}) = 0
write(2, "D1213 21:47:11.254563047    5033"..., 78) = 78
clock_gettime(CLOCK_REALTIME, {1513230431, 254597548}) = 0
write(2, "D1213 21:47:11.254597548    5033"..., 96) = 96
clock_gettime(CLOCK_MONOTONIC, {596307, 374077123}) = 0
poll([{fd=6, events=POLLIN}, {fd=7, events=0}, {fd=9, events=POLLIN}], 3, 190) = 1 ([{fd=7, revents=POLLHUP}])
clock_gettime(CLOCK_REALTIME, {1513230431, 254708226}) = 0
write(2, "D1213 21:47:11.254708226    5033"..., 78) = 78
clock_gettime(CLOCK_REALTIME, {1513230431, 254742499}) = 0
write(2, "D1213 21:47:11.254742499    5033"..., 96) = 96
clock_gettime(CLOCK_MONOTONIC, {596307, 374197185}) = 0
poll([{fd=6, events=POLLIN}, {fd=7, events=0}, {fd=9, events=POLLIN}], 3, 190) = 1 ([{fd=7, revents=POLLHUP}])
clock_gettime(CLOCK_REALTIME, {1513230431, 254828449}) = 0
write(2, "D1213 21:47:11.254828449    5033"..., 78) = 78
clock_gettime(CLOCK_REALTIME, {1513230431, 254868180}) = 0
write(2, "D1213 21:47:11.254868180    5033"..., 96) = 96
clock_gettime(CLOCK_MONOTONIC, {596307, 374335746}) = 0
...

The current example I have to reproduce the CPU spike usually takes an hour to appear, I'd love it if I could get a spike within 10 minutes so I could run a reproducible case more often. Ping me on Hangouts and maybe we can figure something out?

@dhermes
Copy link
Contributor

dhermes commented Dec 14, 2017

I have backported that fix to the v1.7.3 tag and will be installing from source and running it now to see if it solves the spinlock issue.

@dhermes
Copy link
Contributor

dhermes commented Dec 14, 2017

So I just ran my "do-nothing" reproducible case (thanks to @dmontag) that reliably thrashes the CPU after 65 minutes. I can confirm that after installing a custom grpcio with grpc/grpc#13665, the thrashing goes away!

I have created a manylinux 64-bit wheel that includes the backport: https://github.com/dhermes/google-cloud-pubsub-performance/blob/master/grpcio-1.7.4.dev1-cp36-cp36m-manylinux1_x86_64.whl


Feel free to install but "buyer beware" this isn't a wheel endorsed by the grpcio folks. The wheel was created via:

$ docker run \
>   --rm \
>   --tty \
>   --interactive \
>   --volume $(pwd):/var/wheels/ \
>   quay.io/pypa/manylinux1_x86_64:latest \
>   /bin/bash
% cd tmp/
% git clone https://github.com/dhermes/grpc
% cd grpc
% git checkout 1.7.3-with-13665
% git submodule update --init
% /opt/python/cp36-cp36m/bin/python3.6 -m pip install --upgrade pip wheel
% /opt/python/cp36-cp36m/bin/python3.6 -m pip install --requirement requirements.txt
% export REPO_ROOT=$(pwd)
% export GRPC_PYTHON_BUILD_WITH_CYTHON=1
% /opt/python/cp36-cp36m/bin/python3.6 -m pip wheel . --wheel-dir $(pwd)
% auditwheel repair grpcio-1.7.4.dev1-cp36-cp36m-linux_x86_64.whl --wheel-dir /var/wheels/

All of these commands will work on your target machine except for auditwheel, but if you build a custom wheel on your target machine (i.e. a wheel not intended for "general distribution") then you don't need to use auditwheel. Also note you'll need to change the relevant Python path from /opt/python/cp36-cp36m/bin/python3.6.

@dhermes
Copy link
Contributor

dhermes commented Dec 15, 2017

@anorth2 Would you mind trying with the patched grpcio==1.7.4.dev1?

@dhermes
Copy link
Contributor

dhermes commented Dec 15, 2017

I'm closing this issue in favor of #4600. The original report "PubSub: Subscriber Client stops acking messages after several seconds" has been resolved by the addition of Consumer.pause() and Consumer.resume() in #4558.

@anorth2 Can we move future discussion there? I'll move my last pending question there for "completeness".

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release.
Projects
None yet
Development

No branches or pull requests

10 participants