Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream wrapper functionality #27

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from

Conversation

kisaga
Copy link

@kisaga kisaga commented Jun 19, 2022

Problem

Recently I've started getting myself familiar with the open source software and how to contribute. I found the apache beam project and while I was trying to get familiar with the project I found an example that was using twitter4j as a client. I tried running this example and then I realized that v1 endpoint of twitter stream API has been deprecated recently. I tried to find a way to contribute on the twitter4j project and I saw that it has not updated for the last four years. This project seemed the correct choice for the any required enhancement for integrating with this apache beam example . I needed a mechanism/API that will wrap the stream methods (implemented the sampleStream, but the same can be done for the searchStream) and will provide an interface that to retrieve the results of the stream.

Solution

Some components of the solution were already in place, but in the examples folder. I've moved the executor and the listener to a new package named stream and I've added one more thread to deserialize the lines.

Permormance improvements

I've done several comparisons for the performance of the stream processing.
You can find the results here: https://docs.google.com/spreadsheets/d/1yt6uofcbBNOj_o34A_LhhRhv7Lg7-6o0B9ts4u1m87o/edit?usp=sharing
The results contain the durations in seconds of parsing 250.000 tweets served from wiremock running locally. (I can share the file with the wiremock config if you want).

The initial solution (with 2 threads - without deserialization layer) corresponds to the No Deserial Layer (Gson) line/column.
By changing the deserialization provider from gson to jackson 'No Deserial Layer (ObjectMapper) the performance was improved enough.
After that. I added the third (deserialization) layer to decouple 'getting the next line' from 'transforming from String to the Model object'. This has also an impact in the performance.
Adding more that one threads in the third layer did not improve the performance but it made things worse.
Finally I changed the type of the response from InputStream to okio.BufferedSource and it made the application perform faster.

Structural changes

I've introduced some enums for the parameters of the request for sampleStream and searchStream. They can be used in many other places as I've seen. I've also introduced a parameter object along with the builder. The StreamQueryParameter object was introduced to decouple the signatures of all the low level calls inside TweetsApi from all these arguments. The builder object was created as a way of avoiding setting all the values if not required and having some default values. Also I've overridden the method for setting the field with var args to be able to set params with comma separation as in the API.

While rebasing I saw that something similar had been introduced but it was not easy to create a wrapper of the sampleStream with parametrized input without adding every parameter in the signature of the wrapper method.

public void startStream(Integer backfillMinutes, Set<String> tweetFields, Set<String> expansions, Set<String> mediaFields, Set<String> pollFields, Set<String> userFields, Set<String> placeFields) {
    apiInstancesampleStream().backfillMinutes(backfillMinutes).tweetFields(tweetFields).expansions(expansions).mediaFields(mediaFields).pollFields(pollFields).userFields(userFields).placeFields(placeFields).execute();
}

Towards avoiding the above, I've modified the builder of the request object and the request object itself to contain the StreamQueryParameter as a field and delegated the creation of the http call parameters to the QueryParameters object leaving the sampleSearch with only the high level composition of actions.

public void startStream(StreamQueryParameters) {
    apiInstance.sampleStream().parameters(parameters).execute();
}

Other changes

I've added some logging using slf4 in some places and in the pom of the examples module I've added the slf4j-simple implementation to be able to see the logs. It can be removed if you think that it should not be there.

Also, a timeout mechanism has been implemented as mentioned here (after 20 seconds of empty stream results). Re-connection is mentioned in the page but it can be added in a future contribution.

Result

An API for handling the stream internally and providing the results to the users of this library through the listeners has been implemented.
And a new contributor to this repo 🙂 . I have also some other improvements/recommendations for the main structure of this repo towards increased maintainability.

@CLAassistant
Copy link

CLAassistant commented Jun 19, 2022

CLA assistant check
All committers have signed the CLA.

@kisaga
Copy link
Author

kisaga commented Jun 20, 2022

I've also mentioned this PR in this apache beam issue and I am waiting for their feedback on the approach. I'll keep you posted

@kisaga kisaga marked this pull request as draft June 20, 2022 21:59
@kisaga
Copy link
Author

kisaga commented Jun 20, 2022

Changed to draft since jackson was not set properly for parsing every field. This will reduce the performance mentioned in the PR description. I'll check the parsing with all the required jackson configuration and the performance compared to gson and I'll post the results.

@kisaga
Copy link
Author

kisaga commented Jun 21, 2022

Switched back to gson for the deserialization thread because lots of config is required for jackson to work as expected for all the fields, nevertheless the performance improvement from the addition of the mid/deserialization layer is significant.
The permormance would be even better if jackson was added.
Let's proceed with the current solution and discuss any concerns, disagreements or improvements.

@kisaga kisaga marked this pull request as ready for review June 21, 2022 20:46
@zacmos
Copy link
Collaborator

zacmos commented Jun 22, 2022

Hi @kisaga , thank you for your PR.
I really appreciate your effort and willingness to improve the Twitter APIs Java SDK.
After checking your PR these are my comments:

  1. The SDK code is auto generated from the Twitter OpenAPI spec using the openapi-generator lib. It uses template files which at the moment are maintained by Twitter. Therefore PRs can not be merged directly to the main branch, they will have to be implemented in the code generation process.

  2. Gson to Jackson - I see that you rolled back to Gson.

  3. InputStream to okio.BufferedSource

    a. InputStream is a JDK class, it’s more popular and used by developers and was the obvious option for the SDK. There has to be a good reason for changing it.

    b. While running a few tests of the streaming example using the okio.BufferedSource, after 45 - 60 minutes I got a SocketTimeoutException, maybe I missed some configurations, I didn’t check it yet. Did you have this issue?

    c. It would be nice to know more about the performance differences between InputStream and okio.BufferedSource

  4. Streaming example changes + structural changes - the example is just an example and there is no intention at the moment to add the TweetsStreamExecutor.java to the SDK. If you want, you are welcome to open a repo with Twitter APIs examples and publish the streaming flow there.

Again, thank you for the effort and sharing your ideas.

@zacmos zacmos marked this pull request as draft June 30, 2022 02:33
@rzo1
Copy link

rzo1 commented Jul 15, 2022

If you want, you are welcome to open a repo with Twitter APIs examples and publish the streaming flow there.

@kisaga I would appreciate it as I was also working on a multi-threaded version of the queuer / consumer scenario including automatic reconnect, etc.

I also encountered the "empty stream" thing after a certain amount of time, which (atm?) requires a reconnect to solve.

I think, that other devs would benefit from a more exhaustive example repo in dealing with the SDK.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants