Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer drops messages under heavy workload with batch_send=True #170

Closed
yongkun opened this issue May 13, 2014 · 6 comments
Closed

Producer drops messages under heavy workload with batch_send=True #170

yongkun opened this issue May 13, 2014 · 6 comments
Labels

Comments

@yongkun
Copy link

yongkun commented May 13, 2014

The SimpleProducer dropped about 2/3 messages in my test when using producer sent to broker on another machine with Kafka 0.8.1.
I set batch size to 100, each message in my test is about 800 bytes.
buffer.size is default (100KB).

Test code:

from kafka.client import KafkaClient
from kafka.producer import SimpleProducer

kafka = KafkaClient("host:9092")

producer = SimpleProducer(kafka,
            async=False,
            #req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
            #ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, # 1000ms
            batch_send=True,
            batch_send_every_n=100,
            batch_send_every_t=1000)

data_file = "mylog.log"

count = 0
with open(data_file) as f:
    for line in f:
        producer.send_messages("my-topic", str(count) + line)
        count += 1

There is no problem when the batch_send is disabled.
I also tested the Java client, no messages dropped.

@dpkp
Copy link
Owner

dpkp commented May 13, 2014

i haven't tested, but hunch is that the main body of your test script is ending before the async thread is done. You might try at the very end something like calling producer.stop(timeout=None) or some timeout that seems reasonable given the data.

@wizzat
Copy link
Collaborator

wizzat commented May 13, 2014

I remember seeing behavior like this in some of the tests if you don't stop the async producer. It might be worth putting an atexit hook in to stop it for us.

-Mark

On May 12, 2014, at 22:35, Dana Powers [email protected] wrote:

i haven't tested, but hunch is that the main body of your test script is ending before the async thread is done. You might try at the very end something like calling producer.stop(timeout=None) or some timeout that seems reasonable given the data.


Reply to this email directly or view it on GitHub.

@yongkun
Copy link
Author

yongkun commented May 14, 2014

It works well after adding producer.stop(timeout=None), thanks @dpkp . But I feel that it's confusing since I have already specified async=False. A hook may be better, as proposed by @wizzat .

@dpkp
Copy link
Owner

dpkp commented May 14, 2014

sure, though you are using async if you specify batch_send=True. There is no way to do batch synchronously :)

https://github.com/mumrah/kafka-python/blob/master/kafka/producer.py#L109-L110

@wizzat
Copy link
Collaborator

wizzat commented May 14, 2014

Depending on how you think about batching, it's possible to manually batch messages because producer.send_messages takes many messages and creates a single message set and dispatches it. I've found you have to be careful about max message size limits though.

@dpkp dpkp added the producer label Aug 22, 2014
@dpkp
Copy link
Owner

dpkp commented Apr 6, 2015

producers now register atexit handlers - #360

@dpkp dpkp closed this as completed Apr 6, 2015
wbarnha added a commit to mkromer-tc/kafka-python that referenced this issue Mar 18, 2024
* Support custom SASL mechanisms

There is some interest in supporting various SASL mechanisms not
currently included in the library:

* dpkp#2110 (DMS)
* dpkp#2204 (SSPI)
* dpkp#2232 (AWS_MSK_IAM)

Adding these mechanisms in the core library may be undesirable due to:

* Increased maintenance burden.
* Unavailable testing environments.
* Vendor specificity.

This commit provides a quick prototype for a pluggable SASL system.

---

**Example**

To define a custom SASL mechanism a module must implement two methods:

```py

def validate_config(conn):
    # Check configuration values, available libraries, etc.
    assert conn.config['vendor_specific_setting'] is not None, (
        'vendor_specific_setting required when sasl_mechanism=MY_SASL'
    )

def try_authenticate(conn, future):
    # Do authentication routine and return resolved Future with failed
    # or succeeded state.
```

And then the custom mechanism should be registered before initializing
a KafkaAdminClient, KafkaConsumer, or KafkaProducer:

```py

import kafka.sasl
from kafka import KafkaProducer

import my_sasl

kafka.sasl.register_mechanism('MY_SASL', my_sasl)

producer = KafkaProducer(sasl_mechanism='MY_SASL')
```

---

**Notes**

**ABCs**

This prototype does not implement an ABC for custom SASL mechanisms.
Using an ABC would reduce a few of the explicit assertions involved with
registering a mechanism and is a viable option. Due to differing feature
sets between py2/py3 this option was not explored, but shouldn't be
difficult.

**Private Methods**

This prototype relies on some methods that are currently marked as
**private** in `BrokerConnection`.

* `._can_send_recv`
* `._lock`
* `._recv_bytes_blocking`
* `._send_bytes_blocking`

A pluggable system would require stable interfaces for these actions.

**Alternative Approach**

If the module-scoped dict modification in `register_mechanism` feels too
clunky maybe the addtional mechanisms can be specified via an argument
when initializing one of the `Kafka*` classes?

* Add test_msk.py by @mattoberle

* add msk to __init__ and check for extension in conn.py

* rename try_authenticate in msk.py

* fix imports

* fix imports

* add botocore to requirements-dev.txt

* add boto3 to requirements-dev.txt

* add awscli to requirements-dev.txt

* add awscli to workflow since it takes too long to install normally

* just install botocore i guess

* just install boto3 i guess

* force reinstall awscli

* try something weird

* ok now the dang tests should work and if they don't i'll cry

* skip the msk test for now...

* Revert "skip the msk test for now..."

This reverts commit 1c29667.

* skip the msk test for now...

* nvm just needed to update tox lol

* Update kafka/sasl/gssapi.py

Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>

* Update kafka/sasl/oauthbearer.py

Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>

* Update kafka/sasl/plain.py

Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>

* Update kafka/sasl/scram.py

Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>

* Update kafka/sasl/msk.py

Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>

---------

Co-authored-by: Matt Oberle <[email protected]>
Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>
bradenneal1 pushed a commit to bradenneal1/kafka-python that referenced this issue May 16, 2024
* Support custom SASL mechanisms

There is some interest in supporting various SASL mechanisms not
currently included in the library:

* dpkp#2110 (DMS)
* dpkp#2204 (SSPI)
* dpkp#2232 (AWS_MSK_IAM)

Adding these mechanisms in the core library may be undesirable due to:

* Increased maintenance burden.
* Unavailable testing environments.
* Vendor specificity.

This commit provides a quick prototype for a pluggable SASL system.

---

**Example**

To define a custom SASL mechanism a module must implement two methods:

```py

def validate_config(conn):
    # Check configuration values, available libraries, etc.
    assert conn.config['vendor_specific_setting'] is not None, (
        'vendor_specific_setting required when sasl_mechanism=MY_SASL'
    )

def try_authenticate(conn, future):
    # Do authentication routine and return resolved Future with failed
    # or succeeded state.
```

And then the custom mechanism should be registered before initializing
a KafkaAdminClient, KafkaConsumer, or KafkaProducer:

```py

import kafka.sasl
from kafka import KafkaProducer

import my_sasl

kafka.sasl.register_mechanism('MY_SASL', my_sasl)

producer = KafkaProducer(sasl_mechanism='MY_SASL')
```

---

**Notes**

**ABCs**

This prototype does not implement an ABC for custom SASL mechanisms.
Using an ABC would reduce a few of the explicit assertions involved with
registering a mechanism and is a viable option. Due to differing feature
sets between py2/py3 this option was not explored, but shouldn't be
difficult.

**Private Methods**

This prototype relies on some methods that are currently marked as
**private** in `BrokerConnection`.

* `._can_send_recv`
* `._lock`
* `._recv_bytes_blocking`
* `._send_bytes_blocking`

A pluggable system would require stable interfaces for these actions.

**Alternative Approach**

If the module-scoped dict modification in `register_mechanism` feels too
clunky maybe the addtional mechanisms can be specified via an argument
when initializing one of the `Kafka*` classes?

* Add test_msk.py by @mattoberle

* add msk to __init__ and check for extension in conn.py

* rename try_authenticate in msk.py

* fix imports

* fix imports

* add botocore to requirements-dev.txt

* add boto3 to requirements-dev.txt

* add awscli to requirements-dev.txt

* add awscli to workflow since it takes too long to install normally

* just install botocore i guess

* just install boto3 i guess

* force reinstall awscli

* try something weird

* ok now the dang tests should work and if they don't i'll cry

* skip the msk test for now...

* Revert "skip the msk test for now..."

This reverts commit 1c29667.

* skip the msk test for now...

* nvm just needed to update tox lol

* Update kafka/sasl/gssapi.py

Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>

* Update kafka/sasl/oauthbearer.py

Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>

* Update kafka/sasl/plain.py

Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>

* Update kafka/sasl/scram.py

Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>

* Update kafka/sasl/msk.py

Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>

---------

Co-authored-by: Matt Oberle <[email protected]>
Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants