Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-4776 Migrate to new Elasticsearch client #4777

Draft
wants to merge 2 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 2 additions & 24 deletions compliance/elasticsearch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch.test</groupId>
<artifactId>framework</artifactId>
<version>${elasticsearch.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
Expand Down Expand Up @@ -100,16 +84,10 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>${elasticsearch.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.vividsolutions</groupId>
<artifactId>jts</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
Expand Down
27 changes: 2 additions & 25 deletions core/sail/elasticsearch-store/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,38 +11,15 @@
<description>Store for utilizing Elasticsearch as a triplestore.</description>
<dependencies>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>${elasticsearch.version}</version>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>${httpcore.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>${elasticsearch.version}</version>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- org.elasticsearch.client:transport pulls in JCL -->
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
*******************************************************************************/
package org.eclipse.rdf4j.sail.elasticsearchstore;

import org.elasticsearch.client.Client;
import co.elastic.clients.elasticsearch.ElasticsearchClient;

/**
* @author Håvard Mikkelsen Ottestad
*/
interface ClientProvider extends AutoCloseable {

Client getClient();
ElasticsearchClient getClient();

boolean isClosed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.stream.Stream;

import org.apache.commons.io.IOUtils;
import org.apache.http.HttpStatus;
import org.apache.http.util.EntityUtils;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.LookAheadIteration;
import org.eclipse.rdf4j.model.BNode;
Expand All @@ -37,27 +39,19 @@
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.extensiblestore.DataStructureInterface;
import org.eclipse.rdf4j.sail.extensiblestore.valuefactory.ExtensibleStatement;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequestBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.client.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import co.elastic.clients.elasticsearch._types.Conflicts;
import co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch._types.query_dsl.TermQuery;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.DeleteByQueryRequest;
import co.elastic.clients.elasticsearch.core.IndexRequest;

/**
* @author Håvard Mikkelsen Ottestad
*/
Expand Down Expand Up @@ -146,14 +140,18 @@ public void addStatement(Collection<ExtensibleStatement> statements) {
@Override
synchronized public void clear(boolean inferred, Resource[] contexts) {

BulkByScrollResponse response = new DeleteByQueryRequestBuilder(clientProvider.getClient(),
DeleteByQueryAction.INSTANCE)
.filter(getQueryBuilder(null, null, null, inferred, contexts))
.abortOnVersionConflict(false)
.source(index)
.get();
DeleteByQueryRequest.Builder builder = new DeleteByQueryRequest.Builder();
DeleteByQueryRequest build = builder.index(index)
.query(getQuery(null, null, null, inferred, contexts))
.conflicts(Conflicts.Proceed)
.build();

try {
Long deleted = clientProvider.getClient().deleteByQuery(build).deleted();
} catch (IOException e) {
throw new SailException(e);
}

long deleted = response.getDeleted();
}

@Override
Expand All @@ -166,7 +164,7 @@ public CloseableIteration<? extends ExtensibleStatement> getStatements(Resource
IRI predicate,
Value object, boolean inferred, Resource... context) {

QueryBuilder queryBuilder = getQueryBuilder(subject, predicate, object, inferred, context);
Query queryBuilder = getQuery(subject, predicate, object, inferred, context);

return new LookAheadIteration<>() {

Expand Down Expand Up @@ -221,73 +219,64 @@ protected void handleClose() throws SailException {

}

private QueryBuilder getQueryBuilder(Resource subject, IRI predicate, Value object, boolean inferred,
Resource[] contexts) {
private Query getQuery(Resource subject, IRI predicate, Value object, boolean inferred, Resource[] contexts) {

BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
BoolQuery.Builder mainQuery = new BoolQuery.Builder();

if (subject != null) {
boolQueryBuilder.must(QueryBuilders.termQuery("subject", subject.stringValue()));
mainQuery.must(b -> b.term(t -> t.field("subject").value(subject.stringValue())));

if (subject instanceof IRI) {
boolQueryBuilder.must(QueryBuilders.termQuery("subject_IRI", true));
mainQuery.must(b -> b.term(t -> t.field("subject_IRI").value(true)));
} else {
boolQueryBuilder.must(QueryBuilders.termQuery("subject_BNode", true));
mainQuery.must(b -> b.term(t -> t.field("subject_BNode").value(true)));
}
}

if (predicate != null) {
boolQueryBuilder.must(QueryBuilders.termQuery("predicate", predicate.stringValue()));
mainQuery.must(b -> b.term(t -> t.field("predicate").value(predicate.stringValue())));
}

if (object != null) {
boolQueryBuilder.must(QueryBuilders.termQuery("object_Hash", object.stringValue().hashCode()));
mainQuery.must(b -> b.term(t -> t.field("object_Hash").value(object.stringValue().hashCode())));

if (object instanceof IRI) {
boolQueryBuilder.must(QueryBuilders.termQuery("object_IRI", true));
mainQuery.must(b -> b.term(t -> t.field("object_IRI").value(true)));
} else if (object instanceof BNode) {
boolQueryBuilder.must(QueryBuilders.termQuery("object_BNode", true));
mainQuery.must(b -> b.term(t -> t.field("object_BNode").value(true)));
} else {
boolQueryBuilder.must(
QueryBuilders.termQuery("object_Datatype", ((Literal) object).getDatatype().stringValue()));
mainQuery.must(b -> b
.term(t -> t.field("object_Datatype").value(((Literal) object).getDatatype().stringValue())));

if (((Literal) object).getLanguage().isPresent()) {
boolQueryBuilder
.must(QueryBuilders.termQuery("object_Lang", ((Literal) object).getLanguage().get()));
mainQuery.must(
b -> b.term(t -> t.field("object_Lang").value(((Literal) object).getLanguage().get())));
}
}
}

if (contexts != null && contexts.length > 0) {

BoolQueryBuilder contextQueryBuilder = new BoolQueryBuilder();

for (Resource context : contexts) {

if (context == null) {

contextQueryBuilder.should(new BoolQueryBuilder().mustNot(QueryBuilders.existsQuery("context")));

mainQuery.should(b -> b.bool(bb -> bb.mustNot(mb -> mb.exists(a -> a.field("context")))));
} else if (context instanceof IRI) {

contextQueryBuilder.should(
new BoolQueryBuilder()
.must(QueryBuilders.termQuery("context", context.stringValue()))
.must(QueryBuilders.termQuery("context_IRI", true)));

mainQuery.should(b -> b.bool(bb -> {
bb.must(mb -> mb.term(t -> t.field("context").value(context.stringValue())));
bb.must(mb -> mb.term(t -> t.field("context_IRI").value(true)));
}));
} else { // BNode
contextQueryBuilder.should(
new BoolQueryBuilder()
.must(QueryBuilders.termQuery("context", context.stringValue()))
.must(QueryBuilders.termQuery("context_BNode", true)));
mainQuery.should(b -> b.bool(bb -> {
bb.must(mb -> mb.term(t -> t.field("context").value(context.stringValue())));
bb.must(mb -> mb.term(t -> t.field("context_BNode").value(true)));
}));
}

}

boolQueryBuilder.must(contextQueryBuilder);

}

boolQueryBuilder.must(QueryBuilders.termQuery("inferred", inferred));
mainQuery.must(b -> b.term(t -> t.field("inferred").value(inferred)));

return QueryBuilders.constantScoreQuery(boolQueryBuilder);
return mainQuery.build()._toQuery();
}

@Override
Expand Down Expand Up @@ -604,7 +593,7 @@ public synchronized boolean removeStatementsByQuery(Resource subj, IRI pred, Val

BulkByScrollResponse response = new DeleteByQueryRequestBuilder(clientProvider.getClient(),
DeleteByQueryAction.INSTANCE)
.filter(getQueryBuilder(subj, pred, obj, inferred, contexts))
.filter(getQuery(subj, pred, obj, inferred, contexts))
.source(index)
.abortOnVersionConflict(false)
.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.extensiblestore.ExtensibleStore;
import org.eclipse.rdf4j.sail.extensiblestore.valuefactory.ExtensibleStatementHelper;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.HealthStatus;

/**
* <p>
* An RDF4J SailStore persisted to Elasticsearch.
Expand All @@ -49,9 +48,8 @@
* There is no write-ahead logging, so a failure during a transaction may result in partially persisted changes.
* </p>
*
* @see <a href="https://www.elastic.co/licensing/elastic-license/faq">Elastic License FAQ</a>
*
* @author Håvard Mikkelsen Ottestad
* @see <a href="https://www.elastic.co/licensing/elastic-license/faq">Elastic License FAQ</a>
*/
@Experimental
public class ElasticsearchStore extends ExtensibleStore<ElasticsearchDataStructure, ElasticsearchNamespaceStore> {
Expand Down Expand Up @@ -101,11 +99,11 @@ public ElasticsearchStore(ClientProvider clientPool, String index, Cache cache)

}

public ElasticsearchStore(Client client, String index) {
public ElasticsearchStore(ElasticsearchClient client, String index) {
this(client, index, Cache.EAGER);
}

public ElasticsearchStore(Client client, String index, Cache cache) {
public ElasticsearchStore(ElasticsearchClient client, String index, Cache cache) {
this(new UnclosableClientProvider(new UserProvidedClientProvider(client)), index, cache);
}

Expand Down Expand Up @@ -152,16 +150,10 @@ public void waitForElasticsearch(int time, TemporalUnit timeUnit) {

}
try {
Client client = clientProvider.getClient();

ClusterHealthResponse clusterHealthResponse = client.admin()
.cluster()
.health(new ClusterHealthRequest())
.actionGet();
ClusterHealthStatus status = clusterHealthResponse.getStatus();
logger.info("Cluster status: {}", status.name());
ElasticsearchClient client = clientProvider.getClient();

if (status.equals(ClusterHealthStatus.GREEN) || status.equals(ClusterHealthStatus.YELLOW)) {
HealthStatus status = client.cluster().health().status();
if (status.equals(HealthStatus.Green) || status.equals(HealthStatus.Yellow)) {
logger.info("Elasticsearch started!");

return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*******************************************************************************/
package org.eclipse.rdf4j.sail.elasticsearchstore;

import org.elasticsearch.client.Client;
import co.elastic.clients.elasticsearch.ElasticsearchClient;

/**
* Used by the user to provide an Elasticsearch Client to the ElasticsearchStore instead of providing host, port,
Expand All @@ -20,16 +20,16 @@
*/
public class UserProvidedClientProvider implements ClientProvider {

final private Client client;
final private ElasticsearchClient client;

transient boolean closed;

public UserProvidedClientProvider(Client client) {
public UserProvidedClientProvider(ElasticsearchClient client) {
this.client = client;
}

@Override
public Client getClient() {
public ElasticsearchClient getClient() {
return client;
}

Expand Down
Loading
Loading