Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Kafka: polling records cause OutOfMemory #14382

Open
tunguyen-12 opened this issue Jul 4, 2022 · 10 comments
Open

Source Kafka: polling records cause OutOfMemory #14382

tunguyen-12 opened this issue Jul 4, 2022 · 10 comments
Labels
area/databases community connectors/source/kafka frozen Not being actively worked on team/db-dw-sources Backlog for Database and Data Warehouse Sources team type/bug Something isn't working

Comments

@tunguyen-12
Copy link

Environment

  • Airbyte version: example is 0.39.29-alpha
  • OS Version / Instance: example macOS, Windows 7/10, Ubuntu 18.04, GCP n2. , AWS EC2
  • Deployment: example are Docker or Kubernetes deploy env
  • Source Connector and version: (if applicable example Salesforce 0.2.3)
  • Destination Connector and version: (if applicable example Postgres 0.3.3)
  • Step where error happened: Deploy / Sync job / Setup new connection / Update connector / Upgrade Airbyte

Current Behavior

Kafka Source keep polling reocords cause Out Of Memory
See logic here:

Expected Behavior

Should limit by number of records or bytes

Logs

Steps to Reproduce

  1. batching records by number or records or bytes

Are you willing to submit a PR?

Not Now

@marcosmarxm
Copy link
Member

@tunguyen-12 agree with your suggestion. Looks the connector is using other logic to stop the sync which can cause the problem. I added this issue to connector team roadmap for future implementation

final ConsumerRecords<String, JsonNode> consumerRecords = consumer.poll(Duration.of(polling_time, ChronoUnit.MILLIS));
if (consumerRecords.count() == 0) {
pollCount++;
if (pollCount > retry) {
break;
}
}

Can you give more information about the server instance you're running Airbyte and the collection size in Kafka, so the team can reproduce easily?

@tunguyen-12
Copy link
Author

tunguyen-12 commented Jul 5, 2022

Hi @marcosmarxm, Here is the information about my deployment

  • Airbyte version: example is 0.39.29-alpha
  • Deployment: Kubernetes
  • Source Connector and version: airbyte kafka 0.1.7
  • Destination Connector and version: airbyte bigquery 1.1.11
  • Step where error happened: Sync data from Kafka
  • **Kafk source: Produce 190 msg/sec, ~ 4 MB/sec => cause OutOfMemory

@tunguyen-12
Copy link
Author

@marcosmarxm Could you give me more details about the problem you said: Looks the connector is using other logic to stop the sync which can cause the problem
My Questions:

  • Can we batch here by number of records or bytes
  • does Worker batch data received from then source pod then flush to destination pod ?
    Thanks

@marcosmarxm
Copy link
Member

does Worker batch data received from then source pod then flush to destination pod ?

From the code pasted here looks the connector tries to read all records in the collection, stop after and then flush to destination. Which is not going to work for your use case with a fast message generation. Probably this is the reason of OOM

@tunguyen-12
Copy link
Author

tunguyen-12 commented Jul 5, 2022

My idea is reading by microbach, implement LazyIterator with abstract List load() function, when calling hasNext() if there available message in global queue return true if not call load() function

My concerns here will this cause OOM in worker ?

does Worker batch data received from then source pod then flush to destination pod ?

From the code pasted here looks the connector tries to read all records in the collection, stop after and then flush to destination. Which is not going to work for your use case with a fast message generation. Probably this is the reason of OOM

@marcosmarxm
Copy link
Member

I think it's safer compared to logic implemented today.

@tunguyen-12
Copy link
Author

I think it's safer compared to logic implemented today.

Yeah, I have read the debezium flow and I applied the logic for Kafka flow and it works like a charm, I'll consider to submit a PR in near future.

@grishick grishick added the team/db-dw-sources Backlog for Database and Data Warehouse Sources team label Sep 27, 2022
@obastemur
Copy link

IMHO, record count limit has few short comings which prevent utilizing an expectedly long running task.

Feels like this could be managed with tracking total fetch size (not the number of records) if the problem is OOM ? Still, cutting the job short is an issue.

@alexnikitchuk
Copy link
Contributor

Is there any workaround for the issue?
I have 10Gb topic which Airbyte fails to read with error:

Terminating due to java.lang.OutOfMemoryError: Java heap space

Moreover, It commits offsets but does not write data into the destination.

@alexnikitchuk
Copy link
Contributor

alexnikitchuk commented Oct 29, 2022

Is there any workaround for the issue? I have 10Gb topic which Airbyte fails to read with error:

Terminating due to java.lang.OutOfMemoryError: Java heap space

Moreover, It commits offsets but does not write data into the destination.

I thought that limiting max records per run can help but looks like if we use JSON format this parameter does not work properly: record_counter increment is present in AVRO format but missing in JSON
FYI @sivankumar86

@marcosmarxm marcosmarxm changed the title Kafka Source Cause OutOfMemory Source Kafka: polling records cause OutOfMemory Nov 30, 2022
@bleonard bleonard added the frozen Not being actively worked on label Mar 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/databases community connectors/source/kafka frozen Not being actively worked on team/db-dw-sources Backlog for Database and Data Warehouse Sources team type/bug Something isn't working
Projects
None yet
Development

No branches or pull requests

7 participants