Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adds ability to bypass cache for pull queries #6891

Merged
merged 23 commits into from
Feb 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@
],
"expectedException": {
"type": "io.confluent.ksql.parser.exception.ParseFailedException",
"message": "mismatched input 'AND' expecting ';'"
"message": "mismatched input 'AND' expecting {';'"
}
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@
],
"expectedException": {
"type": "io.confluent.ksql.parser.exception.ParseFailedException",
"message": "mismatched input 'AND' expecting ';'"
"message": "mismatched input 'AND' expecting {';'"
}
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,15 @@ public MaterializedWindowedTable windowed() {
final WindowType wndType = wndInfo.getType();
switch (wndType) {
case SESSION:
return new KsMaterializedSessionTable(stateStore, SessionStoreCacheBypass::fetch);
return new KsMaterializedSessionTable(stateStore,
SessionStoreCacheBypass::fetch, SessionStoreCacheBypass::fetchRange);

case HOPPING:
case TUMBLING:
return new KsMaterializedWindowTable(stateStore, wndInfo.getSize().get(),
WindowStoreCacheBypass::fetch);
WindowStoreCacheBypass::fetch,
WindowStoreCacheBypass::fetchAll,
WindowStoreCacheBypass::fetchRange);

default:
throw new UnsupportedOperationException("Unknown window type: " + wndInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.confluent.ksql.execution.streams.materialization.MaterializedWindowedTable;
import io.confluent.ksql.execution.streams.materialization.WindowedRow;
import io.confluent.ksql.execution.streams.materialization.ks.SessionStoreCacheBypass.SessionStoreCacheBypassFetcher;
import io.confluent.ksql.execution.streams.materialization.ks.SessionStoreCacheBypass.SessionStoreCacheBypassFetcherRange;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
Expand All @@ -42,11 +43,15 @@ class KsMaterializedSessionTable implements MaterializedWindowedTable {

private final KsStateStore stateStore;
private final SessionStoreCacheBypassFetcher cacheBypassFetcher;
private final SessionStoreCacheBypassFetcherRange cacheBypassFetcherRange;

KsMaterializedSessionTable(final KsStateStore store,
final SessionStoreCacheBypassFetcher cacheBypassFetcher) {
final SessionStoreCacheBypassFetcher cacheBypassFetcher,
final SessionStoreCacheBypassFetcherRange cacheBypassFetcherRange) {
this.stateStore = Objects.requireNonNull(store, "store");
this.cacheBypassFetcher = Objects.requireNonNull(cacheBypassFetcher, "cacheBypassFetcher");
this.cacheBypassFetcherRange = Objects.requireNonNull(cacheBypassFetcherRange,
"cacheBypassFetcherRange");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.confluent.ksql.execution.streams.materialization.MaterializedWindowedTable;
import io.confluent.ksql.execution.streams.materialization.WindowedRow;
import io.confluent.ksql.execution.streams.materialization.ks.WindowStoreCacheBypass.WindowStoreCacheBypassFetcher;
import io.confluent.ksql.execution.streams.materialization.ks.WindowStoreCacheBypass.WindowStoreCacheBypassFetcherAll;
import io.confluent.ksql.execution.streams.materialization.ks.WindowStoreCacheBypass.WindowStoreCacheBypassFetcherRange;
import io.confluent.ksql.util.IteratorUtil;
import java.time.Duration;
import java.time.Instant;
Expand All @@ -48,12 +50,20 @@ class KsMaterializedWindowTable implements MaterializedWindowedTable {
private final KsStateStore stateStore;
private final Duration windowSize;
private final WindowStoreCacheBypassFetcher cacheBypassFetcher;
private final WindowStoreCacheBypassFetcherAll cacheBypassFetcherAll;
private final WindowStoreCacheBypassFetcherRange cacheBypassFetcherRange;

KsMaterializedWindowTable(final KsStateStore store, final Duration windowSize,
final WindowStoreCacheBypassFetcher cacheBypassFetcher) {
final WindowStoreCacheBypassFetcher cacheBypassFetcher,
final WindowStoreCacheBypassFetcherAll cacheBypassFetcherAll,
final WindowStoreCacheBypassFetcherRange cacheBypassFetcherRange) {
this.stateStore = Objects.requireNonNull(store, "store");
this.windowSize = Objects.requireNonNull(windowSize, "windowSize");
this.cacheBypassFetcher = Objects.requireNonNull(cacheBypassFetcher, "cacheBypassFetcher");
this.cacheBypassFetcherAll = Objects.requireNonNull(
cacheBypassFetcherAll, "cacheBypassFetcherAll");
this.cacheBypassFetcherRange = Objects.requireNonNull(
cprasad1 marked this conversation as resolved.
Show resolved Hide resolved
cacheBypassFetcherRange, "cacheBypassFetcherRange");
}

@Override
Expand Down Expand Up @@ -122,7 +132,7 @@ public Iterator<WindowedRow> get(
final Instant upper = calculateUpperBound(windowStartBounds, windowEndBounds);

final KeyValueIterator<Windowed<GenericKey>, ValueAndTimestamp<GenericRow>> iterator
= store.fetchAll(lower, upper);
= cacheBypassFetcherAll.fetchAll(store, lower, upper);
return Streams.stream(IteratorUtil.onComplete(iterator, iterator::close)).map(next -> {
final Instant windowStart = next.key.window().startTime();
if (!windowStartBounds.contains(windowStart)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.Function;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
Expand All @@ -42,6 +43,9 @@ public final class SessionStoreCacheBypass {
private static final Field STORE_NAME_FIELD;
private static final Field STORE_TYPE_FIELD;
static final Field SERDES_FIELD;
private static final String STORE_UNAVAILABLE_MESSAGE = "State store is not available anymore "
+ "and may have been migrated to another instance; "
+ "please re-discover its location from the state metadata.";

static {
try {
Expand Down Expand Up @@ -69,77 +73,154 @@ KeyValueIterator<Windowed<GenericKey>, GenericRow> fetch(
);
}

@SuppressWarnings("unchecked")
interface SessionStoreCacheBypassFetcherRange {

KeyValueIterator<Windowed<GenericKey>, GenericRow> fetchRange(
ReadOnlySessionStore<GenericKey, GenericRow> store,
GenericKey keyFrom,
GenericKey keyTo
);
}

/*
This method is used for single key lookups. It calls the fetchUncached method.
*/
public static KeyValueIterator<Windowed<GenericKey>, GenericRow> fetch(
final ReadOnlySessionStore<GenericKey, GenericRow> store,
final GenericKey key
) {
Objects.requireNonNull(key, "key can't be null");
final List<ReadOnlySessionStore<GenericKey, GenericRow>> stores = getStores(store);
final Function<ReadOnlySessionStore<GenericKey, GenericRow>,
KeyValueIterator<Windowed<GenericKey>, GenericRow>> fetchFunc
= sessionStore -> fetchUncached(sessionStore, key);
return findFirstNonEmptyIterator(stores, fetchFunc);
}

final StateStoreProvider provider;
final String storeName;
final QueryableStoreType<ReadOnlySessionStore<GenericKey, GenericRow>> storeType;
try {
provider = (StateStoreProvider) PROVIDER_FIELD.get(store);
storeName = (String) STORE_NAME_FIELD.get(store);
storeType = (QueryableStoreType<ReadOnlySessionStore<GenericKey, GenericRow>>)
STORE_TYPE_FIELD.get(store);
} catch (final IllegalAccessException e) {
throw new RuntimeException("Stream internals changed unexpectedly!", e);
/*
This method is used for single key lookups. It is invoked by the fetch method
*/
private static KeyValueIterator<Windowed<GenericKey>, GenericRow> fetchUncached(
final ReadOnlySessionStore<GenericKey, GenericRow> sessionStore,
final GenericKey key
) {
if (!(sessionStore instanceof MeteredSessionStore)) {
throw new IllegalStateException("Expecting a MeteredSessionStore");
} else {
final StateSerdes<GenericKey, GenericRow> serdes = getSerdes(sessionStore);
final Bytes rawKey = Bytes.wrap(serdes.rawKey(key));
final SessionStore<Bytes, byte[]> wrapped = getInnermostStore(sessionStore);
final KeyValueIterator<Windowed<Bytes>, byte[]> fetch = wrapped.fetch(rawKey);
return new DeserializingIterator(fetch, serdes);
}
}

/*
This method is used for range queries. It calls the fetchRangeUncached method.
*/
public static KeyValueIterator<Windowed<GenericKey>, GenericRow> fetchRange(
final ReadOnlySessionStore<GenericKey, GenericRow> store,
final GenericKey keyFrom,
final GenericKey keyTo
) {
Objects.requireNonNull(keyFrom, "lower key can't be null");
Objects.requireNonNull(keyTo, "upper key can't be null");

final List<ReadOnlySessionStore<GenericKey, GenericRow>> stores = getStores(store);
final Function<ReadOnlySessionStore<GenericKey, GenericRow>,
KeyValueIterator<Windowed<GenericKey>, GenericRow>> fetchFunc
= sessionStore -> fetchRangeUncached(sessionStore, keyFrom, keyTo);
return findFirstNonEmptyIterator(stores, fetchFunc);
}

/*
This method is used for range queries. It is invoked by the fetchRange method
*/
private static KeyValueIterator<Windowed<GenericKey>, GenericRow> fetchRangeUncached(
final ReadOnlySessionStore<GenericKey, GenericRow> sessionStore,
final GenericKey keyFrom,
final GenericKey keyTo
) {
if (!(sessionStore instanceof MeteredSessionStore)) {
throw new IllegalStateException("Expecting a MeteredSessionStore");
} else {

final StateSerdes<GenericKey, GenericRow> serdes = getSerdes(sessionStore);
final Bytes rawKeyFrom = Bytes.wrap(serdes.rawKey(keyFrom));
final Bytes rawKeyTo = Bytes.wrap(serdes.rawKey(keyTo));
final SessionStore<Bytes, byte[]> wrapped = getInnermostStore(sessionStore);
final KeyValueIterator<Windowed<Bytes>, byte[]> fetch = wrapped.fetch(rawKeyFrom, rawKeyTo);
return new DeserializingIterator(fetch, serdes);
}
final List<ReadOnlySessionStore<GenericKey, GenericRow>> stores
= provider.stores(storeName, storeType);
}

private static KeyValueIterator<Windowed<GenericKey>, GenericRow> findFirstNonEmptyIterator(
final List<ReadOnlySessionStore<GenericKey, GenericRow>> stores,
final Function<ReadOnlySessionStore<GenericKey, GenericRow>,
KeyValueIterator<Windowed<GenericKey>, GenericRow>> fetchFunc
) {
for (final ReadOnlySessionStore<GenericKey, GenericRow> sessionStore : stores) {
try {
final KeyValueIterator<Windowed<GenericKey>, GenericRow> result
= fetchUncached(sessionStore, key);
= fetchFunc.apply(sessionStore);
// returns the first non-empty iterator
if (!result.hasNext()) {
result.close();
} else {
return result;
}
} catch (final InvalidStateStoreException e) {
throw new InvalidStateStoreException(
"State store is not available anymore and may have been migrated to another instance; "
+ "please re-discover its location from the state metadata.", e);
throw new InvalidStateStoreException(STORE_UNAVAILABLE_MESSAGE, e);
}
}
return new EmptyKeyValueIterator();
}

@SuppressWarnings("unchecked")
private static KeyValueIterator<Windowed<GenericKey>, GenericRow> fetchUncached(
final ReadOnlySessionStore<GenericKey, GenericRow> sessionStore,
final GenericKey key
private static StateSerdes<GenericKey, GenericRow> getSerdes(
final ReadOnlySessionStore<GenericKey, GenericRow> sessionStore
) throws RuntimeException {
try {
return (StateSerdes<GenericKey, GenericRow>) SERDES_FIELD.get(sessionStore);
} catch (final IllegalAccessException e) {
throw new RuntimeException("Stream internals changed unexpectedly!", e);
}
}

@SuppressWarnings("unchecked")
private static SessionStore<Bytes, byte[]> getInnermostStore(
final ReadOnlySessionStore<GenericKey, GenericRow> sessionStore
) {
if (sessionStore instanceof MeteredSessionStore) {
final StateSerdes<GenericKey, GenericRow> serdes;
try {
serdes = (StateSerdes<GenericKey, GenericRow>) SERDES_FIELD.get(sessionStore);
} catch (final IllegalAccessException e) {
throw new RuntimeException("Stream internals changed unexpectedly!", e);
SessionStore<Bytes, byte[]> wrapped
= ((MeteredSessionStore<GenericKey, GenericRow>) sessionStore).wrapped();
// Unwrap state stores until we get to the last SessionStore, which is past the caching
// layer.
while (wrapped instanceof WrappedStateStore) {
final StateStore store = ((WrappedStateStore<?, ?, ?>) wrapped).wrapped();
// A RocksDBSessionStore wraps a SegmentedBytesStore, which isn't a SessionStore, so
// we just store there.
if (!(store instanceof SessionStore)) {
break;
}
wrapped = (SessionStore<Bytes, byte[]>) store;
}
// now we have the innermost layer of the store.
return wrapped;
}

final Bytes rawKey = Bytes.wrap(serdes.rawKey(key));
SessionStore<Bytes, byte[]> wrapped
= ((MeteredSessionStore<GenericKey, GenericRow>) sessionStore).wrapped();
// Unwrap state stores until we get to the last SessionStore, which is past the caching
// layer.
while (wrapped instanceof WrappedStateStore) {
final StateStore store = ((WrappedStateStore<?, ?, ?>) wrapped).wrapped();
// A RocksDBSessionStore wraps a SegmentedBytesStore, which isn't a SessionStore, so
// we just store there.
if (!(store instanceof SessionStore)) {
break;
}
wrapped = (SessionStore<Bytes, byte[]>) store;
}
// now we have the innermost layer of the store.
final KeyValueIterator<Windowed<Bytes>, byte[]> fetch = wrapped.fetch(rawKey);
return new DeserializingIterator(fetch, serdes);
} else {
throw new IllegalStateException("Expecting a MeteredSessionStore");
@SuppressWarnings("unchecked")
private static List<ReadOnlySessionStore<GenericKey, GenericRow>> getStores(
final ReadOnlySessionStore<GenericKey, GenericRow> store
) {
final QueryableStoreType<ReadOnlySessionStore<GenericKey, GenericRow>> storeType;
try {
final StateStoreProvider provider = (StateStoreProvider) PROVIDER_FIELD.get(store);
final String storeName = (String) STORE_NAME_FIELD.get(store);
storeType = (QueryableStoreType<ReadOnlySessionStore<GenericKey, GenericRow>>)
STORE_TYPE_FIELD.get(store);
return provider.stores(storeName, storeType);
} catch (final IllegalAccessException e) {
throw new RuntimeException("Stream internals changed unexpectedly!", e);
}
}

Expand Down
Loading