Skip to content

Commit

Permalink
refactor: remove unused code
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Oct 23, 2020
1 parent f6779fa commit 9242079
Show file tree
Hide file tree
Showing 7 changed files with 6 additions and 539 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1495,7 +1495,6 @@ public void shouldNotUpdateMetaStoreDuringTryExecute() {

// Then:
assertThat(metaStore.getSource(SourceName.of("TEST3")), is(notNullValue()));
assertThat(metaStore.getQueriesWithSource(SourceName.of("TEST2")), is(empty()));
assertThat(metaStore.getSource(SourceName.of("BAR")), is(nullValue()));
assertThat(metaStore.getSource(SourceName.of("FOO")), is(nullValue()));
assertThat("live", ksqlEngine.numberOfLiveQueries(), is(numberOfLiveQueries));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,12 @@
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.name.SourceName;
import java.util.Map;
import java.util.Set;

public interface MetaStore extends FunctionRegistry, TypeRegistry {

DataSource getSource(SourceName sourceName);

Map<SourceName, DataSource> getAllDataSources();

Set<String> getQueriesWithSource(SourceName sourceName);

Set<String> getQueriesWithSink(SourceName sourceName);

MetaStore copy();
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
public final class MetaStoreImpl implements MutableMetaStore {

private final Map<SourceName, SourceInfo> dataSources = new ConcurrentHashMap<>();
private final Object referentialIntegrityLock = new Object();
private final Object metaStoreLock = new Object();
private final FunctionRegistry functionRegistry;
private final TypeRegistry typeRegistry;

Expand Down Expand Up @@ -100,7 +100,7 @@ public void putSource(final DataSource dataSource, final boolean allowReplace) {

@Override
public void deleteSource(final SourceName sourceName) {
synchronized (referentialIntegrityLock) {
synchronized (metaStoreLock) {
dataSources.compute(sourceName, (ignored, sourceInfo) -> {
if (sourceInfo == null) {
throw new KsqlException(String.format("No data source with name %s exists.",
Expand Down Expand Up @@ -133,7 +133,7 @@ public void deleteSource(final SourceName sourceName) {

@Override
public void linkSource(final SourceName sourceName, final SourceName withSourceName) {
synchronized (referentialIntegrityLock) {
synchronized (metaStoreLock) {
if (sourceName.equals(withSourceName)) {
throw new KsqlException(String.format("Source name '%s' is equals to linked source.",
sourceName.text()));
Expand Down Expand Up @@ -182,69 +182,9 @@ public Map<SourceName, DataSource> getAllDataSources() {
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().source));
}

@Override
public void updateForPersistentQuery(
final String queryId,
final Set<SourceName> sourceNames,
final Set<SourceName> sinkNames
) {
synchronized (referentialIntegrityLock) {
final String sourceAlreadyRegistered = streamSources(sourceNames)
.filter(source -> source.referentialIntegrity.getSourceForQueries().contains(queryId))
.map(source -> source.source.getName())
.map(Object::toString)
.collect(Collectors.joining(","));

final String sinkAlreadyRegistered = streamSources(sinkNames)
.filter(source -> source.referentialIntegrity.getSinkForQueries().contains(queryId))
.map(source -> source.source.getName())
.map(Object::toString)
.collect(Collectors.joining(","));

if (!sourceAlreadyRegistered.isEmpty() || !sinkAlreadyRegistered.isEmpty()) {
throw new KsqlException("query already registered."
+ " queryId: " + queryId
+ ", registeredAgainstSource: " + sourceAlreadyRegistered
+ ", registeredAgainstSink: " + sinkAlreadyRegistered);
}

streamSources(sourceNames)
.forEach(source -> source.referentialIntegrity.addSourceForQueries(queryId));
streamSources(sinkNames)
.forEach(source -> source.referentialIntegrity.addSinkForQueries(queryId));
}
}

@Override
public void removePersistentQuery(final String queryId) {
synchronized (referentialIntegrityLock) {
for (final SourceInfo sourceInfo : dataSources.values()) {
sourceInfo.referentialIntegrity.removeQuery(queryId);
}
}
}

@Override
public Set<String> getQueriesWithSource(final SourceName sourceName) {
final SourceInfo sourceInfo = dataSources.get(sourceName);
if (sourceInfo == null) {
return Collections.emptySet();
}
return sourceInfo.referentialIntegrity.getSourceForQueries();
}

@Override
public Set<String> getQueriesWithSink(final SourceName sourceName) {
final SourceInfo sourceInfo = dataSources.get(sourceName);
if (sourceInfo == null) {
return Collections.emptySet();
}
return sourceInfo.referentialIntegrity.getSinkForQueries();
}

@Override
public MutableMetaStore copy() {
synchronized (referentialIntegrityLock) {
synchronized (metaStoreLock) {
return new MetaStoreImpl(dataSources, functionRegistry, typeRegistry);
}
}
Expand Down Expand Up @@ -337,37 +277,33 @@ public Iterator<CustomType> types() {
private static final class SourceInfo {

private final DataSource source;
private final ReferentialIntegrityTableEntry referentialIntegrity;
private final Set<SourceName> linkedSources = new ConcurrentHashSet<>();
private final Set<SourceName> referentialSources = new ConcurrentHashSet<>();

private SourceInfo(
final DataSource source
) {
this.source = Objects.requireNonNull(source, "source");
this.referentialIntegrity = new ReferentialIntegrityTableEntry();
}

private SourceInfo(
final DataSource source,
final ReferentialIntegrityTableEntry referentialIntegrity,
final Set<SourceName> linkedSources,
final Set<SourceName> referentialSources
) {
this.source = Objects.requireNonNull(source, "source");
this.referentialIntegrity = referentialIntegrity.copy();
this.linkedSources.addAll(
Objects.requireNonNull(linkedSources, "linkedSources"));
this.referentialSources.addAll(
Objects.requireNonNull(referentialSources, "referentialSources"));
}

public SourceInfo copy() {
return new SourceInfo(source, referentialIntegrity, linkedSources, referentialSources);
return new SourceInfo(source, linkedSources, referentialSources);
}

public SourceInfo copyWith(final DataSource source) {
return new SourceInfo(source, referentialIntegrity, linkedSources, referentialSources);
return new SourceInfo(source, linkedSources, referentialSources);
}

public void linkSource(final SourceName sourceName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.name.SourceName;
import java.util.Set;

public interface MutableMetaStore extends MetaStore {

Expand All @@ -27,12 +26,5 @@ public interface MutableMetaStore extends MetaStore {

void linkSource(SourceName sourceName, SourceName withSourceName);

void updateForPersistentQuery(
String queryId,
Set<SourceName> sourceNames,
Set<SourceName> sinkNames);

void removePersistentQuery(String queryId);

MutableMetaStore copy();
}

This file was deleted.

Loading

0 comments on commit 9242079

Please sign in to comment.