Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Request: Implement Reactive Java Client #27679

Closed
joshfix opened this issue Dec 5, 2017 · 9 comments
Closed

Feature Request: Implement Reactive Java Client #27679

joshfix opened this issue Dec 5, 2017 · 9 comments
Labels
:Search/Search Search-related issues that do not fall into other categories

Comments

@joshfix
Copy link

joshfix commented Dec 5, 2017

Hi all,

I found this proposal for streaming responses which was closed due to lack of interest:

#12188

Java 9 and Spring 5/Spring Boot 2 natively support reactive streams.

https://docs.oracle.com/javase/9/docs/api/java/util/stream/package-summary.html
https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html

I would be very interested in seeing reactive capabilities added to the ES client. I currently have a fully reactive workflow in a Spring Boot project, except for the Elasticsearch REST client, which I have to block on until the entire SearchResponse object has been returned. It would be very interesting to see a reactive REST client that streams results.

I noticed that under the hood, the ES REST client is using Apache NIO for async requests. Apache NIO also supports reactive HTTP sessions:

https://hc.apache.org/httpcomponents-core-ga/tutorial/html/nio.html

@cbuescher cbuescher added :Search/Search Search-related issues that do not fall into other categories discuss labels Dec 7, 2017
@colings86
Copy link
Contributor

@joshfix could you elaborate a bit more on what you mean by this feature?

Since a search aims to return the top N hits (and/or aggregated information about all the hits) I'm not sure how a streaming API would work in practice because we would need to do all of the work before we can return any of the search response.

Is your use-case only concerned with the scroll API where you are iterating through all the documents matching the query? If so, is the advantage of a streaming API mainly due to convenience or do you see other advantages over using the Scroll API as it is today?

@joshfix
Copy link
Author

joshfix commented Dec 14, 2017

Hi @colings86 and thank you for your response and follow up questions. It sounds like maybe you're thinking about my question in terms of the Elasticsearch server itself and not the client. My request is simply for a reactive client, however what you bring up is interesting and a reactive client may benefit by utilizing the scroll API under the hood.

So if you're not already familiar with reactive programming and the differences between reactive and async, there are several resources available. Here is one:

http://projectreactor.io/docs/core/release/reference/#intro-reactive

Reactive calls to a remote service do not block while waiting for a response, but they don't use callbacks either. Observables (or Mono/Fluxes in Reactor) basically act as futures and provide streams of data. You write code against how you handle data when it is published, and nothing ever happens unless your Observable subscribes to the stream.

For comparison, the Lettuce client for Redis provides a synchronous, asynchronous, and reactive client capability. Here is the documentation on their reactive client:

https://github.com/lettuce-io/lettuce-core/wiki/Reactive-API-(5.0)

Back to the scroll api... if very large responses are returned from ES (let's go with the 10k limit) and the service that requested the large document is under heavy load, there is no way to slow things down if the service can't keep up with the incoming response. A reactive client could break the request up into pieces and only request the next piece when it's ready for it, all the while publishing each available chunk of the response to the stream. I don't know much about what the responses from the scroll api look like, but one potential issue is how ES wraps response documents with metadata about the request/response. Is it possible to only request a list of results vs a single wrapped document? Does that make sense?

To wrap things up, I'll include a full example of one of my methods to query ES in my Spring Boot 2 application that uses WebFlux and is otherwise fully reactive.

    public Mono<ItemCollection> search(
            List<String> indices, String type, QueryBuilder queryBuilder, int limit, int offset) {
        SearchSourceBuilder searchSourceBuilder =
                new SearchSourceBuilder().query(queryBuilder).size(limit).from(offset * limit);
        SearchRequest searchRequest = new SearchRequest().source(searchSourceBuilder)
                .indices(indices.toArray(new String[indices.size()])).types(type);

        log.debug("Searching ES with the following request: \n" + searchRequest.source().toString());
        String nextOffset = String.valueOf(++offset);

        return Mono
                // use fromCallable to make the blocking request to ES
                .fromCallable(() -> submitSearch(searchRequest))
                // switch to managed thread pool for blocking process
                .subscribeOn(scheduler)
                // switch back to regular thread for the rest of the processes
                .publishOn(Schedulers.single())
                // build a flux from the hits
                .flatMapIterable(response -> Arrays.asList(response.getHits().getHits()))
                // map each hit to it's source string
                .map(SearchHit::getSourceAsString)
                // deserialize each string
                .map(this::deserializeItem)
                // if deserializer threw an exception for an item and Optional is empty, exclude it
                .filter(Optional::isPresent)
                // get a flux of the action Item objects
                .map(Optional::get)
                // build the "self" link for each item
                .map(this::buildSelfLink)
                // collect all the items to a list
                .collectList()
                // take the items list create an item collection from it
                .map(itemList -> new ItemCollection().features(itemList).type(ItemCollection.TypeEnum.FEATURECOLLECTION).nextPageToken(nextOffset));
    }

A lot of care is taken in the previous example to handle which thread the call to ES is made in. I have to manage my own thread pool (which is the "scheduler" object) for the blocking requests to ES, then switch back to the default threading system.

I'm also currently considering dropping the ES REST client altogether and just using the JSON produced by the QueryBuilder along with the reactive WebClient provided by Spring 5 to avoid needing to manage the threading stuff:

http://www.baeldung.com/spring-5-webclient

@colings86
Copy link
Contributor

@joshfix Thanks for the additional explanation. We currently don't have any plans to implement this type of client since the current High Level REST client still has a lot of work to go for it to be where we would like. If you are interested in building this type of client we would welcome a contribution but its not something we would be looking to do at this time.

@DaveCTurner
Copy link
Contributor

If it helps, here is a blog post that talks about building an adapter in order to use the existing client in a reactive setting:

http://www.nurkiewicz.com/2018/01/spring-reactor-and-elasticsearch-from.html

@joshfix
Copy link
Author

joshfix commented Jan 10, 2018

Thanks for the link! I ended up using the Spring reactive WebClient to submit my requests to Elasticsearch. It's pretty straight forward...

    private Mono<SearchResponse> searchInternal(String body, String uri) {
        return client
                .post()
                .uri(uri)
                .contentType(MediaType.APPLICATION_JSON)
                .body(Mono.just(body), String.class)
                .retrieve()
                .onStatus(status -> !status.is2xxSuccessful(),
                        response -> Mono.error(new RuntimeException(SEARCH_EXCEPTION)))
                .bodyToMono(String.class)
                .map(this::deserializeSearchResponse)
                .filter(Optional::isPresent)
                .map(Optional::get);
    }

    private Optional<SearchResponse> deserializeSearchResponse(String bodyString) {
        try {
            XContentParser parser = XContentFactory
                    .xContent(XContentType.JSON)
                    .createParser(NamedXContentRegistry.EMPTY, bodyString);
            return Optional.of(SearchResponse.fromXContent(parser));
        } catch (IOException ioe) {
            log.error("Error deserializing Elasticsearch response.", ioe);
        }
        return Optional.empty();
    }

@colings86
Copy link
Contributor

Looks like there is a good workaround for this and we are not planning to implement this kind of client at this stage so I'm going to close this issue for now

@javanna
Copy link
Member

javanna commented Mar 15, 2018

Closing, we are not going to address this for the time being, we can always reopen once we are ready and want to get back to this. It is unfortunate that people end up not using our client due to this, as we have retries in case of failure, support for sniffing etc.

@gonciarz
Copy link

Major databases expose reactive API. Spring Data already supports reactive Redis, MongoDB, Cassandra, Couchbase. I know that ES is a distributed search engine, but in some sort of things it's another type of database. But anyway reactiveness does not only mean asynchronous. It's not just a callback. It offers non-blocking operations by changing concurrent model from thread pool to event loop. Whenever you have a partial result, you expose it to a client.
Please let us know if current architecture allows to introduce reactiveness and what is required in order to achieve it? As I understand folks from Pivotal don't want to support reactive ES unless it's driver becomes reactive.

@dadoonet
Copy link
Member

@gonciarz You might be interested in this: https://github.com/reactiverse/elasticsearch-client ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Search/Search Search-related issues that do not fall into other categories
Projects
None yet
Development

No branches or pull requests

7 participants