diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/group-by.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/group-by.json index 7fc7417072f9..911ea63398e5 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/group-by.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/group-by.json @@ -805,7 +805,7 @@ ], "expectedException": { "type": "io.confluent.ksql.parser.exception.ParseFailedException", - "message": "mismatched input 'AND' expecting ';'" + "message": "mismatched input 'AND' expecting {';'" } }, { diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json index e59e1c121265..904513bf8f02 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json @@ -397,7 +397,7 @@ ], "expectedException": { "type": "io.confluent.ksql.parser.exception.ParseFailedException", - "message": "mismatched input 'AND' expecting ';'" + "message": "mismatched input 'AND' expecting {';'" } }, { diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterialization.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterialization.java index db3e85ce9c54..cfd351472baa 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterialization.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterialization.java @@ -79,12 +79,15 @@ public MaterializedWindowedTable windowed() { final WindowType wndType = wndInfo.getType(); switch (wndType) { case SESSION: - return new KsMaterializedSessionTable(stateStore, SessionStoreCacheBypass::fetch); + return new KsMaterializedSessionTable(stateStore, + SessionStoreCacheBypass::fetch, SessionStoreCacheBypass::fetchRange); case HOPPING: case TUMBLING: return new KsMaterializedWindowTable(stateStore, wndInfo.getSize().get(), - WindowStoreCacheBypass::fetch); + WindowStoreCacheBypass::fetch, + WindowStoreCacheBypass::fetchAll, + WindowStoreCacheBypass::fetchRange); default: throw new UnsupportedOperationException("Unknown window type: " + wndInfo); diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java index b9c190d82c32..de90d34b3e8e 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java @@ -24,6 +24,7 @@ import io.confluent.ksql.execution.streams.materialization.MaterializedWindowedTable; import io.confluent.ksql.execution.streams.materialization.WindowedRow; import io.confluent.ksql.execution.streams.materialization.ks.SessionStoreCacheBypass.SessionStoreCacheBypassFetcher; +import io.confluent.ksql.execution.streams.materialization.ks.SessionStoreCacheBypass.SessionStoreCacheBypassFetcherRange; import java.time.Instant; import java.util.Iterator; import java.util.List; @@ -42,11 +43,15 @@ class KsMaterializedSessionTable implements MaterializedWindowedTable { private final KsStateStore stateStore; private final SessionStoreCacheBypassFetcher cacheBypassFetcher; + private final SessionStoreCacheBypassFetcherRange cacheBypassFetcherRange; KsMaterializedSessionTable(final KsStateStore store, - final SessionStoreCacheBypassFetcher cacheBypassFetcher) { + final SessionStoreCacheBypassFetcher cacheBypassFetcher, + final SessionStoreCacheBypassFetcherRange cacheBypassFetcherRange) { this.stateStore = Objects.requireNonNull(store, "store"); this.cacheBypassFetcher = Objects.requireNonNull(cacheBypassFetcher, "cacheBypassFetcher"); + this.cacheBypassFetcherRange = Objects.requireNonNull(cacheBypassFetcherRange, + "cacheBypassFetcherRange"); } @Override diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTable.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTable.java index 2233ac0112a7..c5476bd67d26 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTable.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTable.java @@ -25,6 +25,8 @@ import io.confluent.ksql.execution.streams.materialization.MaterializedWindowedTable; import io.confluent.ksql.execution.streams.materialization.WindowedRow; import io.confluent.ksql.execution.streams.materialization.ks.WindowStoreCacheBypass.WindowStoreCacheBypassFetcher; +import io.confluent.ksql.execution.streams.materialization.ks.WindowStoreCacheBypass.WindowStoreCacheBypassFetcherAll; +import io.confluent.ksql.execution.streams.materialization.ks.WindowStoreCacheBypass.WindowStoreCacheBypassFetcherRange; import io.confluent.ksql.util.IteratorUtil; import java.time.Duration; import java.time.Instant; @@ -48,12 +50,20 @@ class KsMaterializedWindowTable implements MaterializedWindowedTable { private final KsStateStore stateStore; private final Duration windowSize; private final WindowStoreCacheBypassFetcher cacheBypassFetcher; + private final WindowStoreCacheBypassFetcherAll cacheBypassFetcherAll; + private final WindowStoreCacheBypassFetcherRange cacheBypassFetcherRange; KsMaterializedWindowTable(final KsStateStore store, final Duration windowSize, - final WindowStoreCacheBypassFetcher cacheBypassFetcher) { + final WindowStoreCacheBypassFetcher cacheBypassFetcher, + final WindowStoreCacheBypassFetcherAll cacheBypassFetcherAll, + final WindowStoreCacheBypassFetcherRange cacheBypassFetcherRange) { this.stateStore = Objects.requireNonNull(store, "store"); this.windowSize = Objects.requireNonNull(windowSize, "windowSize"); this.cacheBypassFetcher = Objects.requireNonNull(cacheBypassFetcher, "cacheBypassFetcher"); + this.cacheBypassFetcherAll = Objects.requireNonNull( + cacheBypassFetcherAll, "cacheBypassFetcherAll"); + this.cacheBypassFetcherRange = Objects.requireNonNull( + cacheBypassFetcherRange, "cacheBypassFetcherRange"); } @Override @@ -122,7 +132,7 @@ public Iterator get( final Instant upper = calculateUpperBound(windowStartBounds, windowEndBounds); final KeyValueIterator, ValueAndTimestamp> iterator - = store.fetchAll(lower, upper); + = cacheBypassFetcherAll.fetchAll(store, lower, upper); return Streams.stream(IteratorUtil.onComplete(iterator, iterator::close)).map(next -> { final Instant windowStart = next.key.window().startTime(); if (!windowStartBounds.contains(windowStart)) { diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass.java index 3812e97fd236..b0c575b6bddb 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.function.Function; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; @@ -42,6 +43,9 @@ public final class SessionStoreCacheBypass { private static final Field STORE_NAME_FIELD; private static final Field STORE_TYPE_FIELD; static final Field SERDES_FIELD; + private static final String STORE_UNAVAILABLE_MESSAGE = "State store is not available anymore " + + "and may have been migrated to another instance; " + + "please re-discover its location from the state metadata."; static { try { @@ -69,30 +73,96 @@ KeyValueIterator, GenericRow> fetch( ); } - @SuppressWarnings("unchecked") + interface SessionStoreCacheBypassFetcherRange { + + KeyValueIterator, GenericRow> fetchRange( + ReadOnlySessionStore store, + GenericKey keyFrom, + GenericKey keyTo + ); + } + + /* + This method is used for single key lookups. It calls the fetchUncached method. + */ public static KeyValueIterator, GenericRow> fetch( final ReadOnlySessionStore store, final GenericKey key ) { Objects.requireNonNull(key, "key can't be null"); + final List> stores = getStores(store); + final Function, + KeyValueIterator, GenericRow>> fetchFunc + = sessionStore -> fetchUncached(sessionStore, key); + return findFirstNonEmptyIterator(stores, fetchFunc); + } - final StateStoreProvider provider; - final String storeName; - final QueryableStoreType> storeType; - try { - provider = (StateStoreProvider) PROVIDER_FIELD.get(store); - storeName = (String) STORE_NAME_FIELD.get(store); - storeType = (QueryableStoreType>) - STORE_TYPE_FIELD.get(store); - } catch (final IllegalAccessException e) { - throw new RuntimeException("Stream internals changed unexpectedly!", e); + /* + This method is used for single key lookups. It is invoked by the fetch method + */ + private static KeyValueIterator, GenericRow> fetchUncached( + final ReadOnlySessionStore sessionStore, + final GenericKey key + ) { + if (!(sessionStore instanceof MeteredSessionStore)) { + throw new IllegalStateException("Expecting a MeteredSessionStore"); + } else { + final StateSerdes serdes = getSerdes(sessionStore); + final Bytes rawKey = Bytes.wrap(serdes.rawKey(key)); + final SessionStore wrapped = getInnermostStore(sessionStore); + final KeyValueIterator, byte[]> fetch = wrapped.fetch(rawKey); + return new DeserializingIterator(fetch, serdes); + } + } + + /* + This method is used for range queries. It calls the fetchRangeUncached method. + */ + public static KeyValueIterator, GenericRow> fetchRange( + final ReadOnlySessionStore store, + final GenericKey keyFrom, + final GenericKey keyTo + ) { + Objects.requireNonNull(keyFrom, "lower key can't be null"); + Objects.requireNonNull(keyTo, "upper key can't be null"); + + final List> stores = getStores(store); + final Function, + KeyValueIterator, GenericRow>> fetchFunc + = sessionStore -> fetchRangeUncached(sessionStore, keyFrom, keyTo); + return findFirstNonEmptyIterator(stores, fetchFunc); + } + + /* + This method is used for range queries. It is invoked by the fetchRange method + */ + private static KeyValueIterator, GenericRow> fetchRangeUncached( + final ReadOnlySessionStore sessionStore, + final GenericKey keyFrom, + final GenericKey keyTo + ) { + if (!(sessionStore instanceof MeteredSessionStore)) { + throw new IllegalStateException("Expecting a MeteredSessionStore"); + } else { + + final StateSerdes serdes = getSerdes(sessionStore); + final Bytes rawKeyFrom = Bytes.wrap(serdes.rawKey(keyFrom)); + final Bytes rawKeyTo = Bytes.wrap(serdes.rawKey(keyTo)); + final SessionStore wrapped = getInnermostStore(sessionStore); + final KeyValueIterator, byte[]> fetch = wrapped.fetch(rawKeyFrom, rawKeyTo); + return new DeserializingIterator(fetch, serdes); } - final List> stores - = provider.stores(storeName, storeType); + } + + private static KeyValueIterator, GenericRow> findFirstNonEmptyIterator( + final List> stores, + final Function, + KeyValueIterator, GenericRow>> fetchFunc + ) { for (final ReadOnlySessionStore sessionStore : stores) { try { final KeyValueIterator, GenericRow> result - = fetchUncached(sessionStore, key); + = fetchFunc.apply(sessionStore); // returns the first non-empty iterator if (!result.hasNext()) { result.close(); @@ -100,46 +170,57 @@ public static KeyValueIterator, GenericRow> fetch( return result; } } catch (final InvalidStateStoreException e) { - throw new InvalidStateStoreException( - "State store is not available anymore and may have been migrated to another instance; " - + "please re-discover its location from the state metadata.", e); + throw new InvalidStateStoreException(STORE_UNAVAILABLE_MESSAGE, e); } } return new EmptyKeyValueIterator(); } @SuppressWarnings("unchecked") - private static KeyValueIterator, GenericRow> fetchUncached( - final ReadOnlySessionStore sessionStore, - final GenericKey key + private static StateSerdes getSerdes( + final ReadOnlySessionStore sessionStore + ) throws RuntimeException { + try { + return (StateSerdes) SERDES_FIELD.get(sessionStore); + } catch (final IllegalAccessException e) { + throw new RuntimeException("Stream internals changed unexpectedly!", e); + } + } + + @SuppressWarnings("unchecked") + private static SessionStore getInnermostStore( + final ReadOnlySessionStore sessionStore ) { - if (sessionStore instanceof MeteredSessionStore) { - final StateSerdes serdes; - try { - serdes = (StateSerdes) SERDES_FIELD.get(sessionStore); - } catch (final IllegalAccessException e) { - throw new RuntimeException("Stream internals changed unexpectedly!", e); + SessionStore wrapped + = ((MeteredSessionStore) sessionStore).wrapped(); + // Unwrap state stores until we get to the last SessionStore, which is past the caching + // layer. + while (wrapped instanceof WrappedStateStore) { + final StateStore store = ((WrappedStateStore) wrapped).wrapped(); + // A RocksDBSessionStore wraps a SegmentedBytesStore, which isn't a SessionStore, so + // we just store there. + if (!(store instanceof SessionStore)) { + break; } + wrapped = (SessionStore) store; + } + // now we have the innermost layer of the store. + return wrapped; + } - final Bytes rawKey = Bytes.wrap(serdes.rawKey(key)); - SessionStore wrapped - = ((MeteredSessionStore) sessionStore).wrapped(); - // Unwrap state stores until we get to the last SessionStore, which is past the caching - // layer. - while (wrapped instanceof WrappedStateStore) { - final StateStore store = ((WrappedStateStore) wrapped).wrapped(); - // A RocksDBSessionStore wraps a SegmentedBytesStore, which isn't a SessionStore, so - // we just store there. - if (!(store instanceof SessionStore)) { - break; - } - wrapped = (SessionStore) store; - } - // now we have the innermost layer of the store. - final KeyValueIterator, byte[]> fetch = wrapped.fetch(rawKey); - return new DeserializingIterator(fetch, serdes); - } else { - throw new IllegalStateException("Expecting a MeteredSessionStore"); + @SuppressWarnings("unchecked") + private static List> getStores( + final ReadOnlySessionStore store + ) { + final QueryableStoreType> storeType; + try { + final StateStoreProvider provider = (StateStoreProvider) PROVIDER_FIELD.get(store); + final String storeName = (String) STORE_NAME_FIELD.get(store); + storeType = (QueryableStoreType>) + STORE_TYPE_FIELD.get(store); + return provider.stores(storeName, storeType); + } catch (final IllegalAccessException e) { + throw new RuntimeException("Stream internals changed unexpectedly!", e); } } diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java index c049b373a24f..3cf8f2033a80 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java @@ -17,15 +17,19 @@ import io.confluent.ksql.GenericKey; import io.confluent.ksql.GenericRow; +import java.io.Closeable; import java.lang.reflect.Field; import java.time.Instant; import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.function.Function; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.streams.state.ReadOnlyWindowStore; import org.apache.kafka.streams.state.StateSerdes; @@ -42,6 +46,9 @@ public final class WindowStoreCacheBypass { private static final Field STORE_NAME_FIELD; private static final Field WINDOW_STORE_TYPE_FIELD; static final Field SERDES_FIELD; + private static final String STORE_UNAVAILABLE_MESSAGE = "State store is not available anymore " + + "and may have been migrated to another instance; " + + "please re-discover its location from the state metadata."; static { try { @@ -69,9 +76,34 @@ WindowStoreIterator> fetch( Instant lower, Instant upper ); + } - @SuppressWarnings("unchecked") + interface WindowStoreCacheBypassFetcherAll { + + KeyValueIterator, ValueAndTimestamp> fetchAll( + ReadOnlyWindowStore> store, + Instant lower, + Instant upper + ); + + } + + interface WindowStoreCacheBypassFetcherRange { + + KeyValueIterator, ValueAndTimestamp> fetchRange( + ReadOnlyWindowStore> store, + GenericKey keyFrom, + GenericKey keyTo, + Instant lower, + Instant upper + ); + + } + + /* + This method is used for single key lookups. It calls the fetchUncached method. + */ public static WindowStoreIterator> fetch( final ReadOnlyWindowStore> store, final GenericKey key, @@ -79,26 +111,119 @@ public static WindowStoreIterator> fetch( final Instant upper ) { Objects.requireNonNull(key, "key can't be null"); + final List>> stores + = getStores(store); + final Function>, + WindowStoreIterator>> fetchFunc = windowStore -> + fetchUncached(windowStore, key, lower, upper); + return findFirstNonEmptyIterator(stores, fetchFunc); + } - final StateStoreProvider provider; - final String storeName; - final QueryableStoreType>> - windowStoreType; - try { - provider = (StateStoreProvider) PROVIDER_FIELD.get(store); - storeName = (String) STORE_NAME_FIELD.get(store); - windowStoreType = (QueryableStoreType>>) WINDOW_STORE_TYPE_FIELD.get(store); - } catch (final IllegalAccessException e) { - throw new RuntimeException("Stream internals changed unexpectedly!", e); + /* + This method is used for single key lookups. It is invoked by the fetch method + */ + private static WindowStoreIterator> fetchUncached( + final ReadOnlyWindowStore> windowStore, + final GenericKey key, + final Instant lower, + final Instant upper + ) { + if (!(windowStore instanceof MeteredWindowStore)) { + throw new IllegalStateException("Expecting a MeteredWindowStore"); + } + final StateSerdes> serdes = getSerdes(windowStore); + final WindowStore wrapped = getInnermostStore(windowStore); + final Bytes rawKey = Bytes.wrap(serdes.rawKey(key)); + final WindowStoreIterator fetch = wrapped.fetch(rawKey, lower, upper); + return new DeserializingIterator(fetch, serdes); + } + + /* + This method is used for range queries. It calls the fetchRangeUncached method. + */ + public static KeyValueIterator, ValueAndTimestamp> fetchRange( + final ReadOnlyWindowStore> store, + final GenericKey keyFrom, + final GenericKey keyTo, + final Instant lower, + final Instant upper + ) { + Objects.requireNonNull(keyFrom, "lower key can't be null"); + Objects.requireNonNull(keyTo, "upper key can't be null"); + final List>> stores + = getStores(store); + final Function>, + KeyValueIterator, ValueAndTimestamp>> fetchFunc + = windowStore -> fetchRangeUncached(windowStore, keyFrom, keyTo, lower, upper); + return findFirstNonEmptyIterator(stores, fetchFunc); + } + + /* + This method is used for range queries. It is invoked by the fetchRange method + */ + private static KeyValueIterator, ValueAndTimestamp> + fetchRangeUncached( + final ReadOnlyWindowStore> windowStore, + final GenericKey keyFrom, + final GenericKey keyTo, + final Instant lower, + final Instant upper + ) { + if (!(windowStore instanceof MeteredWindowStore)) { + throw new IllegalStateException("Expecting a MeteredWindowStore"); } + final StateSerdes> serdes = getSerdes(windowStore); + final WindowStore wrapped = getInnermostStore(windowStore); + final Bytes rawKeyFrom = Bytes.wrap(serdes.rawKey(keyFrom)); + final Bytes rawKeyTo = Bytes.wrap(serdes.rawKey(keyTo)); + final KeyValueIterator, byte[]> fetch = wrapped + .fetch(rawKeyFrom, rawKeyTo, lower, upper); + return new DeserializingKeyValueIterator(fetch, serdes); + } + + /* + This method is used for table scans. It calls the fetchAllUncached method. + */ + public static KeyValueIterator, ValueAndTimestamp> fetchAll( + final ReadOnlyWindowStore> store, + final Instant lower, + final Instant upper + ) { final List>> stores - = provider.stores(storeName, windowStoreType); - for (final ReadOnlyWindowStore> windowStore - : stores) { + = getStores(store); + final Function>, + KeyValueIterator, ValueAndTimestamp>> fetchFunc = + windowStore -> fetchAllUncached(windowStore, lower, upper); + return findFirstNonEmptyIterator(stores, fetchFunc); + } + + /* + This method is used for table scans. It is invoked by the fetchAll method. + */ + private static KeyValueIterator, ValueAndTimestamp> + fetchAllUncached( + final ReadOnlyWindowStore> windowStore, + final Instant lower, + final Instant upper + ) { + if (!(windowStore instanceof MeteredWindowStore)) { + throw new IllegalStateException("Expecting a MeteredWindowStore"); + } + final StateSerdes> serdes = getSerdes(windowStore); + final WindowStore wrapped = getInnermostStore(windowStore); + final KeyValueIterator, byte[]> fetch = wrapped.fetchAll(lower, upper); + return new DeserializingKeyValueIterator(fetch, serdes); + } + + @SuppressWarnings("unchecked") + private static T findFirstNonEmptyIterator( + final List>> stores, + final Function>, T> func + ) { + T result = null; + for (final ReadOnlyWindowStore> store : stores) { try { - final WindowStoreIterator> result - = fetchUncached(windowStore, key, lower, upper); + result = func.apply(store); // returns the first non-empty iterator if (!result.hasNext()) { result.close(); @@ -106,59 +231,114 @@ public static WindowStoreIterator> fetch( return result; } } catch (final InvalidStateStoreException e) { - throw new InvalidStateStoreException( - "State store is not available anymore and may have been migrated to another instance; " - + "please re-discover its location from the state metadata.", e); + throw new InvalidStateStoreException(STORE_UNAVAILABLE_MESSAGE, e); } } - return new EmptyKeyValueIterator(); + return (T) (result instanceof WindowStoreIterator + ? new EmptyWindowStoreIterator() : new EmptyKeyValueIterator()); } @SuppressWarnings("unchecked") - private static WindowStoreIterator> fetchUncached( - final ReadOnlyWindowStore> windowStore, - final GenericKey key, - final Instant lower, - final Instant upper + private static StateSerdes> getSerdes( + final ReadOnlyWindowStore> windowStore + ) throws RuntimeException { + try { + return (StateSerdes>) SERDES_FIELD.get(windowStore); + } catch (final IllegalAccessException e) { + throw new RuntimeException("Stream internals changed unexpectedly!", e); + } + } + + @SuppressWarnings("unchecked") + private static WindowStore getInnermostStore( + final ReadOnlyWindowStore> windowStore ) { - if (windowStore instanceof MeteredWindowStore) { - final StateSerdes> serdes; - try { - serdes = (StateSerdes>) SERDES_FIELD - .get(windowStore); - } catch (final IllegalAccessException e) { - throw new RuntimeException("Stream internals changed unexpectedly!", e); + WindowStore wrapped + = ((MeteredWindowStore>) windowStore) + .wrapped(); + // Unwrap state stores until we get to the last WindowStore, which is past the caching + // layer. + while (wrapped instanceof WrappedStateStore) { + final StateStore store = ((WrappedStateStore) wrapped).wrapped(); + // A RocksDBWindowStore wraps a SegmentedBytesStore, which isn't a SessionStore, so + // we just store there. + if (!(store instanceof WindowStore)) { + break; } + wrapped = (WindowStore) store; + } + // now we have the innermost layer of the store. + return wrapped; + } - final Bytes rawKey = Bytes.wrap(serdes.rawKey(key)); - WindowStore wrapped - = ((MeteredWindowStore>) windowStore).wrapped(); - // Unwrap state stores until we get to the last WindowStore, which is past the caching - // layer. - while (wrapped instanceof WrappedStateStore) { - final StateStore store = ((WrappedStateStore) wrapped).wrapped(); - // A RocksDBWindowStore wraps a SegmentedBytesStore, which isn't a SessionStore, so - // we just store there. - if (!(store instanceof WindowStore)) { - break; - } - wrapped = (WindowStore) store; - } - // now we have the innermost layer of the store. - final WindowStoreIterator fetch = wrapped.fetch(rawKey, lower, upper); - return new DeserializingIterator(fetch, serdes); - } else { - throw new IllegalStateException("Expecting a MeteredWindowStore"); + @SuppressWarnings("unchecked") + private static List>> getStores( + final ReadOnlyWindowStore> store + ) { + try { + final StateStoreProvider provider = (StateStoreProvider) PROVIDER_FIELD.get(store); + final String storeName = (String) STORE_NAME_FIELD.get(store); + final QueryableStoreType>> + windowStoreType = (QueryableStoreType>>) WINDOW_STORE_TYPE_FIELD.get(store); + return provider.stores(storeName, windowStoreType); + } catch (final IllegalAccessException e) { + throw new RuntimeException("Stream internals changed unexpectedly!", e); + } + } + + /* + This iterator is used for range queries/table scans in the fetchRangeUncached + and fetchAllUncached methods + */ + private static final class DeserializingKeyValueIterator + implements KeyValueIterator, ValueAndTimestamp> { + private final KeyValueIterator, byte[]> fetch; + private final StateSerdes> serdes; + + private DeserializingKeyValueIterator( + final KeyValueIterator, byte[]> fetch, + final StateSerdes> serdes) { + this.fetch = fetch; + this.serdes = serdes; + } + + @Override + public void close() { + fetch.close(); + } + + @Override + public Windowed peekNextKey() { + final Windowed nextKey = fetch.peekNextKey(); + return new Windowed<>(serdes.keyFrom(nextKey.key().get()), nextKey.window()); + } + + @Override + public boolean hasNext() { + return fetch.hasNext(); + } + + @Override + public KeyValue, ValueAndTimestamp> next() { + final KeyValue, byte[]> next = fetch.next(); + final Windowed windowedKey = new Windowed<>( + serdes.keyFrom(next.key.key().get()), next.key.window()); + return KeyValue.pair(windowedKey, serdes.valueFrom(next.value)); } } + /* + This iterator is used for key lookups in the fetchUncached method + */ private static final class DeserializingIterator - implements WindowStoreIterator> { + implements WindowStoreIterator> { private final WindowStoreIterator fetch; private final StateSerdes> serdes; - private DeserializingIterator(final WindowStoreIterator fetch, - final StateSerdes> serdes) { + private DeserializingIterator( + final WindowStoreIterator fetch, + final StateSerdes> serdes) { this.fetch = fetch; this.serdes = serdes; } @@ -185,8 +365,8 @@ public KeyValue> next() { } } - private static class EmptyKeyValueIterator - implements WindowStoreIterator> { + private static class EmptyWindowStoreIterator + implements WindowStoreIterator> { @Override public void close() { @@ -207,4 +387,27 @@ public KeyValue> next() { throw new NoSuchElementException(); } } + + private static class EmptyKeyValueIterator + implements KeyValueIterator, ValueAndTimestamp> { + + @Override + public void close() { + } + + @Override + public Windowed peekNextKey() { + throw new NoSuchElementException(); + } + + @Override + public boolean hasNext() { + return false; + } + + @Override + public KeyValue, ValueAndTimestamp> next() { + throw new NoSuchElementException(); + } + } } \ No newline at end of file diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTableTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTableTest.java index 1efb1e93314d..4429913ec6d3 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTableTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTableTest.java @@ -37,6 +37,7 @@ import io.confluent.ksql.execution.streams.materialization.MaterializationTimeOutException; import io.confluent.ksql.execution.streams.materialization.WindowedRow; import io.confluent.ksql.execution.streams.materialization.ks.SessionStoreCacheBypass.SessionStoreCacheBypassFetcher; +import io.confluent.ksql.execution.streams.materialization.ks.SessionStoreCacheBypass.SessionStoreCacheBypassFetcherRange; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; @@ -88,13 +89,15 @@ public class KsMaterializedSessionTableTest { private KeyValueIterator, GenericRow> fetchIterator; @Mock private SessionStoreCacheBypassFetcher cacheBypassFetcher; + @Mock + private SessionStoreCacheBypassFetcherRange cacheBypassFetcherRange; private KsMaterializedSessionTable table; private final List, GenericRow>> sessions = new ArrayList<>(); private int sessionIdx; @Before public void setUp() { - table = new KsMaterializedSessionTable(stateStore, cacheBypassFetcher); + table = new KsMaterializedSessionTable(stateStore, cacheBypassFetcher, cacheBypassFetcherRange); when(stateStore.store(any(), anyInt())).thenReturn(sessionStore); when(stateStore.schema()).thenReturn(SCHEMA); diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java index 2a3061e30ead..7028bcebc42e 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java @@ -38,6 +38,8 @@ import io.confluent.ksql.execution.streams.materialization.MaterializationTimeOutException; import io.confluent.ksql.execution.streams.materialization.WindowedRow; import io.confluent.ksql.execution.streams.materialization.ks.WindowStoreCacheBypass.WindowStoreCacheBypassFetcher; +import io.confluent.ksql.execution.streams.materialization.ks.WindowStoreCacheBypass.WindowStoreCacheBypassFetcherAll; +import io.confluent.ksql.execution.streams.materialization.ks.WindowStoreCacheBypass.WindowStoreCacheBypassFetcherRange; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; @@ -108,17 +110,22 @@ public class KsMaterializedWindowTableTest { private ArgumentCaptor> storeTypeCaptor; @Mock private WindowStoreCacheBypassFetcher cacheBypassFetcher; + @Mock + private WindowStoreCacheBypassFetcherAll cacheBypassFetcherAll; + @Mock + private WindowStoreCacheBypassFetcherRange cacheBypassFetcherRange; private KsMaterializedWindowTable table; @Before public void setUp() { - table = new KsMaterializedWindowTable(stateStore, WINDOW_SIZE, cacheBypassFetcher); + table = new KsMaterializedWindowTable(stateStore, WINDOW_SIZE, + cacheBypassFetcher, cacheBypassFetcherAll, cacheBypassFetcherRange); when(stateStore.store(any(), anyInt())).thenReturn(tableStore); when(stateStore.schema()).thenReturn(SCHEMA); when(cacheBypassFetcher.fetch(any(), any(), any(), any())).thenReturn(fetchIterator); - when(tableStore.fetchAll(any(), any())).thenReturn(keyValueIterator); + when(cacheBypassFetcherAll.fetchAll(any(), any(), any())).thenReturn(keyValueIterator); } @SuppressWarnings("UnstableApiUsage") @@ -184,7 +191,7 @@ public void shouldThrowIfStoreFetchFails() { @Test public void shouldThrowIfStoreFetchFails_fetchAll() { // Given: - when(tableStore.fetchAll(any(), any())) + when(cacheBypassFetcherAll.fetchAll(any(), any(), any())) .thenThrow(new MaterializationTimeOutException("Boom")); // When: @@ -251,7 +258,8 @@ public void shouldFetchWithNoBounds_fetchAll() { table.get(PARTITION, Range.all(), Range.all()); // Then: - verify(tableStore).fetchAll( + verify(cacheBypassFetcherAll).fetchAll( + eq(tableStore), eq(Instant.ofEpochMilli(0)), eq(Instant.ofEpochMilli(Long.MAX_VALUE)) ); @@ -277,7 +285,8 @@ public void shouldFetchWithOnlyStartBounds_fetchAll() { table.get(PARTITION, WINDOW_START_BOUNDS, Range.all()); // Then: - verify(tableStore).fetchAll( + verify(cacheBypassFetcherAll).fetchAll( + eq(tableStore), eq(WINDOW_START_BOUNDS.lowerEndpoint()), eq(WINDOW_START_BOUNDS.upperEndpoint()) ); @@ -303,7 +312,8 @@ public void shouldFetchWithOnlyEndBounds_fetchAll() { table.get(PARTITION, Range.all(), WINDOW_END_BOUNDS); // Then: - verify(tableStore).fetchAll( + verify(cacheBypassFetcherAll).fetchAll( + eq(tableStore), eq(WINDOW_END_BOUNDS.lowerEndpoint().minus(WINDOW_SIZE)), eq(WINDOW_END_BOUNDS.upperEndpoint().minus(WINDOW_SIZE)) ); @@ -480,7 +490,6 @@ public void shouldReturnValuesForClosedStartBounds_fetchAll() { start.upperEndpoint().toEpochMilli() + WINDOW_SIZE.toMillis())), VALUE_2)) .thenThrow(new AssertionError()); - when(tableStore.fetchAll(any(), any())).thenReturn(keyValueIterator); // When: final Iterator result = table.get(PARTITION, start, Range.all()); @@ -573,7 +582,6 @@ public void shouldReturnValuesForClosedEndBounds_fetchAll() { startEqiv.upperEndpoint().toEpochMilli() + WINDOW_SIZE.toMillis())), VALUE_2)) .thenThrow(new AssertionError()); - when(tableStore.fetchAll(any(), any())).thenReturn(keyValueIterator); // When: final Iterator result = table.get(PARTITION, Range.all(), end); @@ -655,8 +663,6 @@ public void shouldReturnValuesForOpenStartBounds_fetchAll() { start.upperEndpoint().toEpochMilli() + WINDOW_SIZE.toMillis())), VALUE_3)) .thenThrow(new AssertionError()); - when(tableStore.fetchAll(any(), any())).thenReturn(keyValueIterator); - // When: final Iterator result = table.get(PARTITION, start, Range.all()); @@ -742,7 +748,6 @@ public void shouldReturnValuesForOpenEndBounds_fetchAll() { startEquiv.upperEndpoint().toEpochMilli() + WINDOW_SIZE.toMillis())), VALUE_3)) .thenThrow(new AssertionError()); - when(tableStore.fetchAll(any(), any())).thenReturn(keyValueIterator); // When: final Iterator result = table.get(PARTITION, Range.all(), end); @@ -823,7 +828,8 @@ public void shouldSupportRangeAll_fetchAll() { table.get(PARTITION, Range.all(), Range.all()); // Then: - verify(tableStore).fetchAll( + verify(cacheBypassFetcherAll).fetchAll( + tableStore, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE) ); diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypassTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypassTest.java index b2f2acec340a..be19195685e2 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypassTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypassTest.java @@ -50,7 +50,10 @@ public class SessionStoreCacheBypassTest { private static final GenericKey SOME_KEY = GenericKey.genericKey(1); + private static final GenericKey SOME_OTHER_KEY = GenericKey.genericKey(2); + private static final byte[] BYTES = new byte[] {'a', 'b'}; + private static final byte[] OTHER_BYTES = new byte[] {'c', 'd'}; @Mock private QueryableStoreType> @@ -78,7 +81,7 @@ public void setUp() { } @Test - public void shouldCallUnderlyingStore() throws IllegalAccessException { + public void shouldCallUnderlyingStoreSingleKey() throws IllegalAccessException { when(provider.stores(any(), any())).thenReturn(ImmutableList.of(meteredSessionStore)); SERDES_FIELD.set(meteredSessionStore, serdes); when(serdes.rawKey(any())).thenReturn(BYTES); @@ -91,6 +94,20 @@ public void shouldCallUnderlyingStore() throws IllegalAccessException { verify(sessionStore).fetch(new Bytes(BYTES)); } + @Test + public void shouldCallUnderlyingStoreRangeQuery() throws IllegalAccessException { + when(provider.stores(any(), any())).thenReturn(ImmutableList.of(meteredSessionStore)); + SERDES_FIELD.set(meteredSessionStore, serdes); + when(serdes.rawKey(any())).thenReturn(BYTES, OTHER_BYTES); + when(meteredSessionStore.wrapped()).thenReturn(wrappedSessionStore); + when(wrappedSessionStore.wrapped()).thenReturn(sessionStore); + when(sessionStore.fetch(any(), any())).thenReturn(storeIterator); + when(storeIterator.hasNext()).thenReturn(false); + + SessionStoreCacheBypass.fetchRange(store, SOME_KEY, SOME_OTHER_KEY); + verify(sessionStore).fetch(new Bytes(BYTES), new Bytes(OTHER_BYTES)); + } + @Test public void shouldAvoidNonSessionStore() throws IllegalAccessException { when(provider.stores(any(), any())).thenReturn(ImmutableList.of(meteredSessionStore)); diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypassTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypassTest.java index 2d19bb34421b..b8624f77c540 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypassTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypassTest.java @@ -16,6 +16,8 @@ package io.confluent.ksql.execution.streams.materialization.ks; import static io.confluent.ksql.execution.streams.materialization.ks.WindowStoreCacheBypass.SERDES_FIELD; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.state.KeyValueIterator; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertThrows; @@ -50,7 +52,9 @@ public class WindowStoreCacheBypassTest { private static final GenericKey SOME_KEY = GenericKey.genericKey(1); + private static final GenericKey SOME_OTHER_KEY = GenericKey.genericKey(2); private static final byte[] BYTES = new byte[] {'a', 'b'}; + private static final byte[] OTHER_BYTES = new byte[] {'c', 'd'}; @Mock private QueryableStoreType>> @@ -68,6 +72,8 @@ public class WindowStoreCacheBypassTest { @Mock private WindowStoreIterator windowStoreIterator; @Mock + private KeyValueIterator, byte[]> keyValueIterator; + @Mock private StateSerdes> serdes; private CompositeReadOnlyWindowStore> store; @@ -78,7 +84,7 @@ public void setUp() { } @Test - public void shouldCallUnderlyingStore() throws IllegalAccessException { + public void shouldCallUnderlyingStoreSingleKey() throws IllegalAccessException { when(provider.stores(any(), any())).thenReturn(ImmutableList.of(meteredWindowStore)); SERDES_FIELD.set(meteredWindowStore, serdes); when(serdes.rawKey(any())).thenReturn(BYTES); @@ -91,6 +97,33 @@ public void shouldCallUnderlyingStore() throws IllegalAccessException { verify(windowStore).fetch(new Bytes(BYTES), Instant.ofEpochMilli(100L), Instant.ofEpochMilli(200L)); } + @Test + public void shouldCallUnderlyingStoreForRangeQuery() throws IllegalAccessException { + when(provider.stores(any(), any())).thenReturn(ImmutableList.of(meteredWindowStore)); + SERDES_FIELD.set(meteredWindowStore, serdes); + when(serdes.rawKey(any())).thenReturn(BYTES, OTHER_BYTES); + when(meteredWindowStore.wrapped()).thenReturn(wrappedWindowStore); + when(wrappedWindowStore.wrapped()).thenReturn(windowStore); + when(windowStore.fetch(any(), any(), any(), any())).thenReturn(keyValueIterator); + when(keyValueIterator.hasNext()).thenReturn(false); + + WindowStoreCacheBypass.fetchRange(store, SOME_KEY, SOME_OTHER_KEY, Instant.ofEpochMilli(100), Instant.ofEpochMilli(200)); + verify(windowStore).fetch(new Bytes(BYTES), new Bytes(OTHER_BYTES), Instant.ofEpochMilli(100L), Instant.ofEpochMilli(200L)); + } + + @Test + public void shouldCallUnderlyingStoreForTableScans() throws IllegalAccessException { + when(provider.stores(any(), any())).thenReturn(ImmutableList.of(meteredWindowStore)); + SERDES_FIELD.set(meteredWindowStore, serdes); + when(meteredWindowStore.wrapped()).thenReturn(wrappedWindowStore); + when(wrappedWindowStore.wrapped()).thenReturn(windowStore); + when(windowStore.fetchAll(any(), any())).thenReturn(keyValueIterator); + when(keyValueIterator.hasNext()).thenReturn(false); + + WindowStoreCacheBypass.fetchAll(store, Instant.ofEpochMilli(100), Instant.ofEpochMilli(200)); + verify(windowStore).fetchAll(Instant.ofEpochMilli(100L), Instant.ofEpochMilli(200L)); + } + @Test public void shouldAvoidNonWindowStore() throws IllegalAccessException { when(provider.stores(any(), any())).thenReturn(ImmutableList.of(meteredWindowStore));