From 9242079cf0fd6913088f59570055426a53541d93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Fri, 23 Oct 2020 14:54:39 -0500 Subject: [PATCH] refactor: remove unused code --- .../confluent/ksql/engine/KsqlEngineTest.java | 1 - .../confluent/ksql/metastore/MetaStore.java | 5 - .../ksql/metastore/MetaStoreImpl.java | 76 +----- .../ksql/metastore/MutableMetaStore.java | 8 - .../ReferentialIntegrityTableEntry.java | 68 ----- .../ksql/metastore/MetaStoreImplTest.java | 244 ------------------ .../ReferentialIntegrityTableEntryTest.java | 143 ---------- 7 files changed, 6 insertions(+), 539 deletions(-) delete mode 100644 ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/ReferentialIntegrityTableEntry.java delete mode 100644 ksqldb-metastore/src/test/java/io/confluent/ksql/metastore/ReferentialIntegrityTableEntryTest.java diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java index dfa60e68ba84..56321bbead56 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java @@ -1495,7 +1495,6 @@ public void shouldNotUpdateMetaStoreDuringTryExecute() { // Then: assertThat(metaStore.getSource(SourceName.of("TEST3")), is(notNullValue())); - assertThat(metaStore.getQueriesWithSource(SourceName.of("TEST2")), is(empty())); assertThat(metaStore.getSource(SourceName.of("BAR")), is(nullValue())); assertThat(metaStore.getSource(SourceName.of("FOO")), is(nullValue())); assertThat("live", ksqlEngine.numberOfLiveQueries(), is(numberOfLiveQueries)); diff --git a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/MetaStore.java b/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/MetaStore.java index 029e2b4c4463..3f0b7a9b65f0 100644 --- a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/MetaStore.java +++ b/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/MetaStore.java @@ -19,7 +19,6 @@ import io.confluent.ksql.metastore.model.DataSource; import io.confluent.ksql.name.SourceName; import java.util.Map; -import java.util.Set; public interface MetaStore extends FunctionRegistry, TypeRegistry { @@ -27,9 +26,5 @@ public interface MetaStore extends FunctionRegistry, TypeRegistry { Map getAllDataSources(); - Set getQueriesWithSource(SourceName sourceName); - - Set getQueriesWithSink(SourceName sourceName); - MetaStore copy(); } diff --git a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/MetaStoreImpl.java b/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/MetaStoreImpl.java index 931c79066257..c40ecbb6ed5b 100644 --- a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/MetaStoreImpl.java +++ b/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/MetaStoreImpl.java @@ -46,7 +46,7 @@ public final class MetaStoreImpl implements MutableMetaStore { private final Map dataSources = new ConcurrentHashMap<>(); - private final Object referentialIntegrityLock = new Object(); + private final Object metaStoreLock = new Object(); private final FunctionRegistry functionRegistry; private final TypeRegistry typeRegistry; @@ -100,7 +100,7 @@ public void putSource(final DataSource dataSource, final boolean allowReplace) { @Override public void deleteSource(final SourceName sourceName) { - synchronized (referentialIntegrityLock) { + synchronized (metaStoreLock) { dataSources.compute(sourceName, (ignored, sourceInfo) -> { if (sourceInfo == null) { throw new KsqlException(String.format("No data source with name %s exists.", @@ -133,7 +133,7 @@ public void deleteSource(final SourceName sourceName) { @Override public void linkSource(final SourceName sourceName, final SourceName withSourceName) { - synchronized (referentialIntegrityLock) { + synchronized (metaStoreLock) { if (sourceName.equals(withSourceName)) { throw new KsqlException(String.format("Source name '%s' is equals to linked source.", sourceName.text())); @@ -182,69 +182,9 @@ public Map getAllDataSources() { .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().source)); } - @Override - public void updateForPersistentQuery( - final String queryId, - final Set sourceNames, - final Set sinkNames - ) { - synchronized (referentialIntegrityLock) { - final String sourceAlreadyRegistered = streamSources(sourceNames) - .filter(source -> source.referentialIntegrity.getSourceForQueries().contains(queryId)) - .map(source -> source.source.getName()) - .map(Object::toString) - .collect(Collectors.joining(",")); - - final String sinkAlreadyRegistered = streamSources(sinkNames) - .filter(source -> source.referentialIntegrity.getSinkForQueries().contains(queryId)) - .map(source -> source.source.getName()) - .map(Object::toString) - .collect(Collectors.joining(",")); - - if (!sourceAlreadyRegistered.isEmpty() || !sinkAlreadyRegistered.isEmpty()) { - throw new KsqlException("query already registered." - + " queryId: " + queryId - + ", registeredAgainstSource: " + sourceAlreadyRegistered - + ", registeredAgainstSink: " + sinkAlreadyRegistered); - } - - streamSources(sourceNames) - .forEach(source -> source.referentialIntegrity.addSourceForQueries(queryId)); - streamSources(sinkNames) - .forEach(source -> source.referentialIntegrity.addSinkForQueries(queryId)); - } - } - - @Override - public void removePersistentQuery(final String queryId) { - synchronized (referentialIntegrityLock) { - for (final SourceInfo sourceInfo : dataSources.values()) { - sourceInfo.referentialIntegrity.removeQuery(queryId); - } - } - } - - @Override - public Set getQueriesWithSource(final SourceName sourceName) { - final SourceInfo sourceInfo = dataSources.get(sourceName); - if (sourceInfo == null) { - return Collections.emptySet(); - } - return sourceInfo.referentialIntegrity.getSourceForQueries(); - } - - @Override - public Set getQueriesWithSink(final SourceName sourceName) { - final SourceInfo sourceInfo = dataSources.get(sourceName); - if (sourceInfo == null) { - return Collections.emptySet(); - } - return sourceInfo.referentialIntegrity.getSinkForQueries(); - } - @Override public MutableMetaStore copy() { - synchronized (referentialIntegrityLock) { + synchronized (metaStoreLock) { return new MetaStoreImpl(dataSources, functionRegistry, typeRegistry); } } @@ -337,7 +277,6 @@ public Iterator types() { private static final class SourceInfo { private final DataSource source; - private final ReferentialIntegrityTableEntry referentialIntegrity; private final Set linkedSources = new ConcurrentHashSet<>(); private final Set referentialSources = new ConcurrentHashSet<>(); @@ -345,17 +284,14 @@ private SourceInfo( final DataSource source ) { this.source = Objects.requireNonNull(source, "source"); - this.referentialIntegrity = new ReferentialIntegrityTableEntry(); } private SourceInfo( final DataSource source, - final ReferentialIntegrityTableEntry referentialIntegrity, final Set linkedSources, final Set referentialSources ) { this.source = Objects.requireNonNull(source, "source"); - this.referentialIntegrity = referentialIntegrity.copy(); this.linkedSources.addAll( Objects.requireNonNull(linkedSources, "linkedSources")); this.referentialSources.addAll( @@ -363,11 +299,11 @@ private SourceInfo( } public SourceInfo copy() { - return new SourceInfo(source, referentialIntegrity, linkedSources, referentialSources); + return new SourceInfo(source, linkedSources, referentialSources); } public SourceInfo copyWith(final DataSource source) { - return new SourceInfo(source, referentialIntegrity, linkedSources, referentialSources); + return new SourceInfo(source, linkedSources, referentialSources); } public void linkSource(final SourceName sourceName) { diff --git a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/MutableMetaStore.java b/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/MutableMetaStore.java index b164dee1ba5b..b4147b76ef75 100644 --- a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/MutableMetaStore.java +++ b/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/MutableMetaStore.java @@ -17,7 +17,6 @@ import io.confluent.ksql.metastore.model.DataSource; import io.confluent.ksql.name.SourceName; -import java.util.Set; public interface MutableMetaStore extends MetaStore { @@ -27,12 +26,5 @@ public interface MutableMetaStore extends MetaStore { void linkSource(SourceName sourceName, SourceName withSourceName); - void updateForPersistentQuery( - String queryId, - Set sourceNames, - Set sinkNames); - - void removePersistentQuery(String queryId); - MutableMetaStore copy(); } diff --git a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/ReferentialIntegrityTableEntry.java b/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/ReferentialIntegrityTableEntry.java deleted file mode 100644 index e97d08b222b1..000000000000 --- a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/ReferentialIntegrityTableEntry.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright 2018 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.metastore; - -import java.util.Collections; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import javax.annotation.concurrent.ThreadSafe; - -@ThreadSafe -final class ReferentialIntegrityTableEntry { - - private final Set sourceForQueries = ConcurrentHashMap.newKeySet(); - private final Set sinkForQueries = ConcurrentHashMap.newKeySet(); - - ReferentialIntegrityTableEntry() { - } - - private ReferentialIntegrityTableEntry( - final Set sourceForQueries, - final Set sinkForQueries - ) { - this.sourceForQueries.addAll(sourceForQueries); - this.sinkForQueries.addAll(sinkForQueries); - } - - Set getSourceForQueries() { - return Collections.unmodifiableSet(sourceForQueries); - } - - Set getSinkForQueries() { - return Collections.unmodifiableSet(sinkForQueries); - } - - void addSourceForQueries(final String queryId) { - if (!sourceForQueries.add(queryId)) { - throw new IllegalStateException("Already source for query: " + queryId); - } - } - - void addSinkForQueries(final String queryId) { - if (!sinkForQueries.add(queryId)) { - throw new IllegalStateException("Already sink for query: " + queryId); - } - } - - void removeQuery(final String queryId) { - sourceForQueries.remove(queryId); - sinkForQueries.remove(queryId); - } - - public ReferentialIntegrityTableEntry copy() { - return new ReferentialIntegrityTableEntry(sourceForQueries, sinkForQueries); - } -} diff --git a/ksqldb-metastore/src/test/java/io/confluent/ksql/metastore/MetaStoreImplTest.java b/ksqldb-metastore/src/test/java/io/confluent/ksql/metastore/MetaStoreImplTest.java index d40ab46fe2b1..77384937e2f6 100644 --- a/ksqldb-metastore/src/test/java/io/confluent/ksql/metastore/MetaStoreImplTest.java +++ b/ksqldb-metastore/src/test/java/io/confluent/ksql/metastore/MetaStoreImplTest.java @@ -27,7 +27,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import com.google.common.collect.ImmutableSet; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.metastore.model.DataSource; @@ -38,14 +37,11 @@ import io.confluent.ksql.schema.ksql.types.SqlPrimitiveType; import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.util.KsqlException; -import io.confluent.ksql.util.KsqlReferentialIntegrityException; import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; import org.junit.After; import org.junit.Before; @@ -116,33 +112,6 @@ public void shouldDeepCopyTypesOnCopy() { assertThat("Expected no types in the original", !metaStore.types().hasNext()); } - @Test - public void shouldDeepCopySourceReferentialIntegrityDataOnCopy() { - // Given: - metaStore.putSource(dataSource, false); - - metaStore.updateForPersistentQuery( - "source query", - ImmutableSet.of(dataSource.getName()), - ImmutableSet.of()); - - metaStore.updateForPersistentQuery( - "sink query", - ImmutableSet.of(), - ImmutableSet.of(dataSource.getName())); - - // When: - final MetaStore copy = metaStore.copy(); - metaStore.removePersistentQuery("source query"); - metaStore.removePersistentQuery("sink query"); - - // Then: - assertThat(copy.getQueriesWithSource(dataSource.getName()), contains("source query")); - assertThat(metaStore.getQueriesWithSource(dataSource.getName()), is(empty())); - assertThat(copy.getQueriesWithSink(dataSource.getName()), contains("sink query")); - assertThat(metaStore.getQueriesWithSink(dataSource.getName()), is(empty())); - } - @Test public void shouldDeepCopyLinkedAndReferentialSourcesOnCopy() { // Given: @@ -308,148 +277,6 @@ public void shouldThrowOnRemoveUnknownSource() { assertThat(e.getMessage(), containsString("No data source with name bob exists")); } - @Test - public void shouldThrowOnDropSourceIfUsedAsSourceOfQueries() { - // Given: - metaStore.putSource(dataSource, false); - metaStore.updateForPersistentQuery( - "source query", - ImmutableSet.of(dataSource.getName()), - ImmutableSet.of()); - - // When: - final Exception e = assertThrows( - KsqlReferentialIntegrityException.class, - () -> metaStore.deleteSource(dataSource.getName()) - ); - - // Then: - assertThat(e.getMessage(), containsString("The following queries read from this source: [source query]")); - } - - @Test - public void shouldThrowOnUpdateForPersistentQueryOnUnknownSource() { - // When: - final Exception e = assertThrows( - KsqlException.class, - () -> metaStore.updateForPersistentQuery( - "source query", - ImmutableSet.of(UNKNOWN_SOURCE), - ImmutableSet.of()) - ); - - // Then: - assertThat(e.getMessage(), containsString("Unknown source: unknown")); - } - - @Test - public void shouldThrowOnUpdateForPersistentQueryOnUnknownSink() { - // When: - final Exception e = assertThrows( - KsqlException.class, - () -> metaStore.updateForPersistentQuery( - "sink query", - ImmutableSet.of(), - ImmutableSet.of(UNKNOWN_SOURCE)) - ); - - // Then: - assertThat(e.getMessage(), containsString("Unknown source: unknown")); - } - - @Test - public void shouldThrowOnDropSourceIfUsedAsSinkOfQueries() { - // Given: - metaStore.putSource(dataSource, false); - metaStore.updateForPersistentQuery( - "sink query", - ImmutableSet.of(), - ImmutableSet.of(dataSource.getName())); - - // When: - final Exception e = assertThrows( - KsqlReferentialIntegrityException.class, - () -> metaStore.deleteSource(dataSource.getName()) - ); - - // Then: - assertThat(e.getMessage(), containsString("The following queries write into this source: [sink query]")); - } - - @Test - public void shouldFailToUpdateForPersistentQueryAtomicallyForUnknownSource() { - // When: - try { - metaStore.updateForPersistentQuery( - "some query", - ImmutableSet.of(dataSource.getName(), UNKNOWN_SOURCE), - ImmutableSet.of(dataSource.getName())); - } catch (final KsqlException e) { - // Expected - } - - // Then: - assertThat(metaStore.getQueriesWithSource(dataSource.getName()), is(empty())); - assertThat(metaStore.getQueriesWithSink(dataSource.getName()), is(empty())); - } - - @Test - public void shouldFailToUpdateForPersistentQueryAtomicallyForUnknownSink() { - // When: - try { - metaStore.updateForPersistentQuery( - "some query", - ImmutableSet.of(dataSource.getName()), - ImmutableSet.of(dataSource.getName(), UNKNOWN_SOURCE)); - } catch (final KsqlException e) { - // Expected - } - - // Then: - assertThat(metaStore.getQueriesWithSource(dataSource.getName()), is(empty())); - assertThat(metaStore.getQueriesWithSink(dataSource.getName()), is(empty())); - } - - @Test - public void shouldDefaultToEmptySetOfQueriesForUnknownSource() { - assertThat(metaStore.getQueriesWithSource(UNKNOWN_SOURCE), is(empty())); - } - - @Test - public void shouldDefaultToEmptySetOfQueriesForUnknownSink() { - assertThat(metaStore.getQueriesWithSink(UNKNOWN_SOURCE), is(empty())); - } - - @Test - public void shouldRegisterQuerySources() { - // Given: - metaStore.putSource(dataSource, false); - - // When: - metaStore.updateForPersistentQuery( - "some query", - ImmutableSet.of(dataSource.getName()), - ImmutableSet.of()); - - // Then: - assertThat(metaStore.getQueriesWithSource(dataSource.getName()), contains("some query")); - } - - @Test - public void shouldRegisterQuerySinks() { - // Given: - metaStore.putSource(dataSource, false); - - // When: - metaStore.updateForPersistentQuery( - "some query", - ImmutableSet.of(), - ImmutableSet.of(dataSource.getName())); - - // Then: - assertThat(metaStore.getQueriesWithSink(dataSource.getName()), contains("some query")); - } - @Test public void shouldRegisterType() { // Given: @@ -482,82 +309,11 @@ public void shouldBeThreadSafe() { when(source.getName()).thenReturn(SourceName.of("source" + idx)); metaStore.putSource(source, false); metaStore.getSource(source.getName()); - metaStore.getAllDataSources(); - - final String queryId = "query" + idx; - metaStore.updateForPersistentQuery( - queryId, - ImmutableSet.of(source.getName()), - ImmutableSet.of(source.getName())); - - metaStore.getQueriesWithSource(source.getName()); - metaStore.getQueriesWithSink(source.getName()); - metaStore.copy(); - - metaStore.removePersistentQuery(queryId); metaStore.deleteSource(source.getName()); }); assertThat(metaStore.getAllDataSources().keySet(), is(empty())); } - - @Test(timeout = 10_000) - public void shouldBeThreadSafeAroundRefIntegrity() throws Exception { - // Given: - final int iterations = 1_000; - final AtomicInteger remaining = new AtomicInteger(iterations); - final ImmutableSet sources = ImmutableSet.of(dataSource1.getName(), dataSource.getName()); - - metaStore.putSource(dataSource1, false); - - final Future mainThread = executor.submit(() -> { - while (remaining.get() > 0) { - metaStore.putSource(dataSource, false); - - while (true) { - try { - metaStore.deleteSource(dataSource.getName()); - break; - } catch (final KsqlReferentialIntegrityException e) { - // Expected - } - } - } - }); - - // When: - IntStream.range(0, iterations) - .parallel() - .forEach(idx -> { - try { - final String queryId = "query" + idx; - - while (true) { - try { - metaStore.updateForPersistentQuery(queryId, sources, sources); - break; - } catch (final KsqlException e) { - // Expected - } - } - - assertThat(metaStore.getQueriesWithSource(dataSource.getName()), hasItem(queryId)); - assertThat(metaStore.getQueriesWithSink(dataSource.getName()), hasItem(queryId)); - - metaStore.removePersistentQuery(queryId); - - remaining.decrementAndGet(); - } catch (final Throwable t) { - remaining.set(0); - throw t; - } - }); - - // Then: - assertThat(metaStore.getQueriesWithSource(dataSource1.getName()), is(empty())); - assertThat(metaStore.getQueriesWithSink(dataSource1.getName()), is(empty())); - mainThread.get(1, TimeUnit.MINUTES); - } } \ No newline at end of file diff --git a/ksqldb-metastore/src/test/java/io/confluent/ksql/metastore/ReferentialIntegrityTableEntryTest.java b/ksqldb-metastore/src/test/java/io/confluent/ksql/metastore/ReferentialIntegrityTableEntryTest.java deleted file mode 100644 index b5c6c590e0fc..000000000000 --- a/ksqldb-metastore/src/test/java/io/confluent/ksql/metastore/ReferentialIntegrityTableEntryTest.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Copyright 2018 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.metastore; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThrows; - -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.stream.IntStream; -import org.junit.Before; -import org.junit.Test; - -@SuppressFBWarnings({"RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", "RV_RETURN_VALUE_IGNORED_INFERRED"}) -public class ReferentialIntegrityTableEntryTest { - - private ReferentialIntegrityTableEntry entry; - - @Before - public void setUp() { - entry = new ReferentialIntegrityTableEntry(); - } - - @Test - public void shouldAddSourceForQuery() { - // When: - entry.addSourceForQueries("someId"); - - // Then: - assertThat(entry.getSourceForQueries(), contains("someId")); - } - - @Test - public void shouldAddSinkForQuery() { - // When: - entry.addSinkForQueries("someId"); - - // Then: - assertThat(entry.getSinkForQueries(), contains("someId")); - } - - @Test - public void shouldRemoveQuery() { - // Given: - entry.addSourceForQueries("someId"); - entry.addSourceForQueries("otherId"); - entry.addSinkForQueries("someId"); - entry.addSinkForQueries("anotherId"); - - // When: - entry.removeQuery("someId"); - - // Then: - assertThat(entry.getSourceForQueries(), contains("otherId")); - assertThat(entry.getSinkForQueries(), contains("anotherId")); - } - - @Test - public void shouldDeepCopy() { - // Given: - entry.addSourceForQueries("sourceId"); - entry.addSinkForQueries("sinkId"); - final ReferentialIntegrityTableEntry copy = entry.copy(); - - // When: - entry.removeQuery("sourceId"); - entry.removeQuery("sinkId"); - - // Then: - assertThat(copy.getSourceForQueries(), contains("sourceId")); - assertThat(copy.getSinkForQueries(), contains("sinkId")); - } - - @Test - public void shouldThrowIfAlreadyRegisteredAsSource() { - // Given: - entry.addSourceForQueries("id"); - - // When: - final IllegalStateException e = assertThrows( - IllegalStateException.class, - () -> entry.addSourceForQueries("id") - ); - - // Then: - assertThat(e.getMessage(), containsString("Already source for query: id")); - } - - @Test - public void shouldThrowIfAlreadyRegisteredAsSink() { - // Given: - entry.addSinkForQueries("id"); - - // When: - final IllegalStateException e = assertThrows( - IllegalStateException.class, - () -> entry.addSinkForQueries("id") - ); - - // Then: - assertThat(e.getMessage(), containsString("Already sink for query: id")); - } - - @SuppressWarnings("ResultOfMethodCallIgnored") - @Test - public void shouldBeThreadSafe() { - IntStream.range(0, 5_000) - .parallel() - .forEach(idx -> { - final String sourceQueryId = "source" + idx; - final String sinkQueryId = "sink" + idx; - entry.addSourceForQueries(sourceQueryId); - entry.addSinkForQueries(sinkQueryId); - - entry.getSourceForQueries(); - entry.getSinkForQueries(); - - entry.copy(); - - entry.removeQuery(sourceQueryId); - entry.removeQuery(sinkQueryId); - }); - - assertThat(entry.getSourceForQueries(), is(empty())); - assertThat(entry.getSinkForQueries(), is(empty())); - } -} \ No newline at end of file