From 958555b7ba44f383c5ba3acfd40098732739a54e Mon Sep 17 00:00:00 2001 From: Chittaranjan Prasad Date: Sun, 24 Jan 2021 20:31:18 -0800 Subject: [PATCH 01/23] initial draft --- .../materialization/ks/KsMaterialization.java | 2 +- .../ks/KsMaterializedWindowTable.java | 9 +- .../ks/WindowStoreCacheBypass.java | 124 ++++++++++++++++++ .../ks/KsMaterializedWindowTableTest.java | 5 +- 4 files changed, 136 insertions(+), 4 deletions(-) diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterialization.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterialization.java index db3e85ce9c54..78f01abefe18 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterialization.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterialization.java @@ -84,7 +84,7 @@ public MaterializedWindowedTable windowed() { case HOPPING: case TUMBLING: return new KsMaterializedWindowTable(stateStore, wndInfo.getSize().get(), - WindowStoreCacheBypass::fetch); + WindowStoreCacheBypass::fetch, WindowStoreCacheBypass::fetchAll); default: throw new UnsupportedOperationException("Unknown window type: " + wndInfo); diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTable.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTable.java index 2233ac0112a7..068e79f92655 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTable.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTable.java @@ -25,6 +25,7 @@ import io.confluent.ksql.execution.streams.materialization.MaterializedWindowedTable; import io.confluent.ksql.execution.streams.materialization.WindowedRow; import io.confluent.ksql.execution.streams.materialization.ks.WindowStoreCacheBypass.WindowStoreCacheBypassFetcher; +import io.confluent.ksql.execution.streams.materialization.ks.WindowStoreCacheBypass.WindowStoreCacheBypassFetcherAll; import io.confluent.ksql.util.IteratorUtil; import java.time.Duration; import java.time.Instant; @@ -48,12 +49,16 @@ class KsMaterializedWindowTable implements MaterializedWindowedTable { private final KsStateStore stateStore; private final Duration windowSize; private final WindowStoreCacheBypassFetcher cacheBypassFetcher; + private final WindowStoreCacheBypassFetcherAll cacheBypassFetcherAll; KsMaterializedWindowTable(final KsStateStore store, final Duration windowSize, - final WindowStoreCacheBypassFetcher cacheBypassFetcher) { + final WindowStoreCacheBypassFetcher cacheBypassFetcher, + final WindowStoreCacheBypassFetcherAll cacheBypassFetcherAll) { this.stateStore = Objects.requireNonNull(store, "store"); this.windowSize = Objects.requireNonNull(windowSize, "windowSize"); this.cacheBypassFetcher = Objects.requireNonNull(cacheBypassFetcher, "cacheBypassFetcher"); + this.cacheBypassFetcherAll = Objects.requireNonNull( + cacheBypassFetcherAll, "cacheBypassFetcherAll"); } @Override @@ -122,7 +127,7 @@ public Iterator get( final Instant upper = calculateUpperBound(windowStartBounds, windowEndBounds); final KeyValueIterator, ValueAndTimestamp> iterator - = store.fetchAll(lower, upper); + = cacheBypassFetcherAll.fetchAll(store, lower, upper); return Streams.stream(IteratorUtil.onComplete(iterator, iterator::close)).map(next -> { final Instant windowStart = next.key.window().startTime(); if (!windowStartBounds.contains(windowStart)) { diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java index c049b373a24f..9bc34b8d4ba4 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java @@ -25,7 +25,9 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.streams.state.ReadOnlyWindowStore; import org.apache.kafka.streams.state.StateSerdes; @@ -69,6 +71,17 @@ WindowStoreIterator> fetch( Instant lower, Instant upper ); + + } + + interface WindowStoreCacheBypassFetcherAll { + + KeyValueIterator, ValueAndTimestamp> fetchAll( + ReadOnlyWindowStore> store, + Instant lower, + Instant upper + ); + } @SuppressWarnings("unchecked") @@ -152,6 +165,117 @@ private static WindowStoreIterator> fetchUncached( } } + @SuppressWarnings("unchecked") + static KeyValueIterator, ValueAndTimestamp> fetchAll( + final ReadOnlyWindowStore> store, + final Instant lower, + final Instant upper + ) { + final StateStoreProvider provider; + final String storeName; + final QueryableStoreType>> + windowStoreType; + try { + provider = (StateStoreProvider) PROVIDER_FIELD.get(store); + storeName = (String) STORE_NAME_FIELD.get(store); + windowStoreType = (QueryableStoreType>>) WINDOW_STORE_TYPE_FIELD.get(store); + } catch (final IllegalAccessException e) { + throw new RuntimeException("Stream internals changed unexpectedly!", e); + } + final List>> stores + = provider.stores(storeName, windowStoreType); + for (final ReadOnlyWindowStore> windowStore + : stores) { + try { + final KeyValueIterator, ValueAndTimestamp> result + = fetchAllUncached(windowStore, lower, upper); + // returns the first non-empty iterator + if (!result.hasNext()) { + result.close(); + } else { + return result; + } + } catch (final InvalidStateStoreException e) { + throw new InvalidStateStoreException( + "State store is not available anymore and may have been migrated to another instance; " + + "please re-discover its location from the state metadata.", e); + } + } + return null; + } + + @SuppressWarnings("unchecked") + private static KeyValueIterator, ValueAndTimestamp> fetchAllUncached( + final ReadOnlyWindowStore> windowStore, + final Instant lower, + final Instant upper + ) { + if (windowStore instanceof MeteredWindowStore) { + final StateSerdes> serdes; + try { + serdes = (StateSerdes>) SERDES_FIELD + .get(windowStore); + } catch (final IllegalAccessException e) { + throw new RuntimeException("Stream internals changed unexpectedly!", e); + } + + WindowStore wrapped + = ((MeteredWindowStore>) windowStore).wrapped(); + // Unwrap state stores until we get to the last WindowStore, which is past the caching + // layer. + while (wrapped instanceof WrappedStateStore) { + final StateStore store = ((WrappedStateStore) wrapped).wrapped(); + // A RocksDBWindowStore wraps a SegmentedBytesStore, which isn't a SessionStore, so + // we just store there. + if (!(store instanceof WindowStore)) { + break; + } + wrapped = (WindowStore) store; + } + // now we have the innermost layer of the store. + final KeyValueIterator, byte[]> fetch = wrapped.fetchAll(lower, upper); + return new DeserializingKeyValueIterator(fetch, serdes); + } else { + throw new IllegalStateException("Expecting a MeteredWindowStore"); + } + } + + private static final class DeserializingKeyValueIterator + implements KeyValueIterator, ValueAndTimestamp> { + private final KeyValueIterator, byte[]> fetch; + private final StateSerdes> serdes; + + private DeserializingKeyValueIterator(final KeyValueIterator, byte[]> fetch, + final StateSerdes> serdes) { + this.fetch = fetch; + this.serdes = serdes; + } + + + @Override + public void close() { + fetch.close(); + } + + @Override + public Windowed peekNextKey() { + final Windowed peekNext = fetch.peekNextKey(); + return new Windowed(peekNext, peekNext.window()); + } + + @Override + public boolean hasNext() { + return fetch.hasNext(); + } + + @Override + public KeyValue, ValueAndTimestamp> next() { + final KeyValue, byte[]> next = fetch.next(); + return KeyValue.pair(new Windowed(next.key, next.key.window()), + serdes.valueFrom(next.value)); + } + } private static final class DeserializingIterator implements WindowStoreIterator> { private final WindowStoreIterator fetch; diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java index 2a3061e30ead..1aa8ce467ad7 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java @@ -38,6 +38,7 @@ import io.confluent.ksql.execution.streams.materialization.MaterializationTimeOutException; import io.confluent.ksql.execution.streams.materialization.WindowedRow; import io.confluent.ksql.execution.streams.materialization.ks.WindowStoreCacheBypass.WindowStoreCacheBypassFetcher; +import io.confluent.ksql.execution.streams.materialization.ks.WindowStoreCacheBypass.WindowStoreCacheBypassFetcherAll; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; @@ -108,12 +109,14 @@ public class KsMaterializedWindowTableTest { private ArgumentCaptor> storeTypeCaptor; @Mock private WindowStoreCacheBypassFetcher cacheBypassFetcher; + @Mock + private WindowStoreCacheBypassFetcherAll cacheBypassFetcherAll; private KsMaterializedWindowTable table; @Before public void setUp() { - table = new KsMaterializedWindowTable(stateStore, WINDOW_SIZE, cacheBypassFetcher); + table = new KsMaterializedWindowTable(stateStore, WINDOW_SIZE, cacheBypassFetcher, cacheBypassFetcherAll); when(stateStore.store(any(), anyInt())).thenReturn(tableStore); when(stateStore.schema()).thenReturn(SCHEMA); From 46367b5677b923c1fdcc00503a052aeec6364c7b Mon Sep 17 00:00:00 2001 From: Chittaranjan Prasad Date: Sun, 24 Jan 2021 21:01:06 -0800 Subject: [PATCH 02/23] added window range --- .../materialization/ks/KsMaterialization.java | 4 +- .../ks/KsMaterializedWindowTable.java | 7 +- .../ks/WindowStoreCacheBypass.java | 121 +++++++++++++++++- .../ks/KsMaterializedWindowTableTest.java | 6 +- 4 files changed, 128 insertions(+), 10 deletions(-) diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterialization.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterialization.java index 78f01abefe18..68c25fa9b0d5 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterialization.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterialization.java @@ -84,7 +84,9 @@ public MaterializedWindowedTable windowed() { case HOPPING: case TUMBLING: return new KsMaterializedWindowTable(stateStore, wndInfo.getSize().get(), - WindowStoreCacheBypass::fetch, WindowStoreCacheBypass::fetchAll); + WindowStoreCacheBypass::fetch, + WindowStoreCacheBypass::fetchAll, + WindowStoreCacheBypass::fetchRange); default: throw new UnsupportedOperationException("Unknown window type: " + wndInfo); diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTable.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTable.java index 068e79f92655..c5476bd67d26 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTable.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTable.java @@ -26,6 +26,7 @@ import io.confluent.ksql.execution.streams.materialization.WindowedRow; import io.confluent.ksql.execution.streams.materialization.ks.WindowStoreCacheBypass.WindowStoreCacheBypassFetcher; import io.confluent.ksql.execution.streams.materialization.ks.WindowStoreCacheBypass.WindowStoreCacheBypassFetcherAll; +import io.confluent.ksql.execution.streams.materialization.ks.WindowStoreCacheBypass.WindowStoreCacheBypassFetcherRange; import io.confluent.ksql.util.IteratorUtil; import java.time.Duration; import java.time.Instant; @@ -50,15 +51,19 @@ class KsMaterializedWindowTable implements MaterializedWindowedTable { private final Duration windowSize; private final WindowStoreCacheBypassFetcher cacheBypassFetcher; private final WindowStoreCacheBypassFetcherAll cacheBypassFetcherAll; + private final WindowStoreCacheBypassFetcherRange cacheBypassFetcherRange; KsMaterializedWindowTable(final KsStateStore store, final Duration windowSize, final WindowStoreCacheBypassFetcher cacheBypassFetcher, - final WindowStoreCacheBypassFetcherAll cacheBypassFetcherAll) { + final WindowStoreCacheBypassFetcherAll cacheBypassFetcherAll, + final WindowStoreCacheBypassFetcherRange cacheBypassFetcherRange) { this.stateStore = Objects.requireNonNull(store, "store"); this.windowSize = Objects.requireNonNull(windowSize, "windowSize"); this.cacheBypassFetcher = Objects.requireNonNull(cacheBypassFetcher, "cacheBypassFetcher"); this.cacheBypassFetcherAll = Objects.requireNonNull( cacheBypassFetcherAll, "cacheBypassFetcherAll"); + this.cacheBypassFetcherRange = Objects.requireNonNull( + cacheBypassFetcherRange, "cacheBypassFetcherRange"); } @Override diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java index 9bc34b8d4ba4..542ddc63bf8e 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java @@ -84,6 +84,18 @@ KeyValueIterator, ValueAndTimestamp> fetchAll( } + interface WindowStoreCacheBypassFetcherRange { + + KeyValueIterator, ValueAndTimestamp> fetchRange( + ReadOnlyWindowStore> store, + GenericKey keyFrom, + GenericKey keyTo, + Instant lower, + Instant upper + ); + + } + @SuppressWarnings("unchecked") public static WindowStoreIterator> fetch( final ReadOnlyWindowStore> store, @@ -166,11 +178,16 @@ private static WindowStoreIterator> fetchUncached( } @SuppressWarnings("unchecked") - static KeyValueIterator, ValueAndTimestamp> fetchAll( + public static KeyValueIterator, ValueAndTimestamp> fetchRange( final ReadOnlyWindowStore> store, + final GenericKey keyFrom, + final GenericKey keyTo, final Instant lower, final Instant upper ) { + Objects.requireNonNull(keyFrom, "lower key can't be null"); + Objects.requireNonNull(keyTo, "upper key can't be null"); + final StateStoreProvider provider; final String storeName; final QueryableStoreType>> @@ -189,7 +206,7 @@ static KeyValueIterator, ValueAndTimestamp> fet : stores) { try { final KeyValueIterator, ValueAndTimestamp> result - = fetchAllUncached(windowStore, lower, upper); + = fetchRangeUncached(windowStore, keyFrom, keyTo, lower, upper); // returns the first non-empty iterator if (!result.hasNext()) { result.close(); @@ -198,7 +215,8 @@ static KeyValueIterator, ValueAndTimestamp> fet } } catch (final InvalidStateStoreException e) { throw new InvalidStateStoreException( - "State store is not available anymore and may have been migrated to another instance; " + "State store is not available anymore" + + " and may have been migrated to another instance; " + "please re-discover its location from the state metadata.", e); } } @@ -206,8 +224,11 @@ static KeyValueIterator, ValueAndTimestamp> fet } @SuppressWarnings("unchecked") - private static KeyValueIterator, ValueAndTimestamp> fetchAllUncached( + private static KeyValueIterator, ValueAndTimestamp> + fetchRangeUncached( final ReadOnlyWindowStore> windowStore, + final GenericKey keyFrom, + final GenericKey keyTo, final Instant lower, final Instant upper ) { @@ -220,8 +241,93 @@ private static KeyValueIterator, ValueAndTimestamp wrapped - = ((MeteredWindowStore>) windowStore).wrapped(); + = ((MeteredWindowStore>) windowStore) + .wrapped(); + // Unwrap state stores until we get to the last WindowStore, which is past the caching + // layer. + while (wrapped instanceof WrappedStateStore) { + final StateStore store = ((WrappedStateStore) wrapped).wrapped(); + // A RocksDBWindowStore wraps a SegmentedBytesStore, which isn't a SessionStore, so + // we just store there. + if (!(store instanceof WindowStore)) { + break; + } + wrapped = (WindowStore) store; + } + // now we have the innermost layer of the store. + final KeyValueIterator, byte[]> fetch = wrapped + .fetch(rawKeyFrom, rawKeyTo, lower, upper); + return new DeserializingKeyValueIterator(fetch, serdes); + } else { + throw new IllegalStateException("Expecting a MeteredWindowStore"); + } + } + + @SuppressWarnings("unchecked") + static KeyValueIterator, ValueAndTimestamp> + fetchAll( + final ReadOnlyWindowStore> store, + final Instant lower, + final Instant upper + ) { + final StateStoreProvider provider; + final String storeName; + final QueryableStoreType>> + windowStoreType; + try { + provider = (StateStoreProvider) PROVIDER_FIELD.get(store); + storeName = (String) STORE_NAME_FIELD.get(store); + windowStoreType = (QueryableStoreType>>) WINDOW_STORE_TYPE_FIELD.get(store); + } catch (final IllegalAccessException e) { + throw new RuntimeException("Stream internals changed unexpectedly!", e); + } + final List>> stores + = provider.stores(storeName, windowStoreType); + for (final ReadOnlyWindowStore> windowStore + : stores) { + try { + final KeyValueIterator, ValueAndTimestamp> result + = fetchAllUncached(windowStore, lower, upper); + // returns the first non-empty iterator + if (!result.hasNext()) { + result.close(); + } else { + return result; + } + } catch (final InvalidStateStoreException e) { + throw new InvalidStateStoreException( + "State store is not available anymore " + + "and may have been migrated to another instance; " + + "please re-discover its location from the state metadata.", e); + } + } + return null; + } + + @SuppressWarnings("unchecked") + private static KeyValueIterator, ValueAndTimestamp> + fetchAllUncached( + final ReadOnlyWindowStore> windowStore, + final Instant lower, + final Instant upper + ) { + if (windowStore instanceof MeteredWindowStore) { + final StateSerdes> serdes; + try { + serdes = (StateSerdes>) SERDES_FIELD + .get(windowStore); + } catch (final IllegalAccessException e) { + throw new RuntimeException("Stream internals changed unexpectedly!", e); + } + + WindowStore wrapped + = ((MeteredWindowStore>) windowStore) + .wrapped(); // Unwrap state stores until we get to the last WindowStore, which is past the caching // layer. while (wrapped instanceof WrappedStateStore) { @@ -246,8 +352,8 @@ private static final class DeserializingKeyValueIterator private final KeyValueIterator, byte[]> fetch; private final StateSerdes> serdes; - private DeserializingKeyValueIterator(final KeyValueIterator, byte[]> fetch, - final StateSerdes> serdes) { + private DeserializingKeyValueIterator(final KeyValueIterator, + byte[]> fetch, final StateSerdes> serdes) { this.fetch = fetch; this.serdes = serdes; } @@ -276,6 +382,7 @@ public KeyValue, ValueAndTimestamp> next() { serdes.valueFrom(next.value)); } } + private static final class DeserializingIterator implements WindowStoreIterator> { private final WindowStoreIterator fetch; diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java index 1aa8ce467ad7..8698af417d89 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java @@ -39,6 +39,7 @@ import io.confluent.ksql.execution.streams.materialization.WindowedRow; import io.confluent.ksql.execution.streams.materialization.ks.WindowStoreCacheBypass.WindowStoreCacheBypassFetcher; import io.confluent.ksql.execution.streams.materialization.ks.WindowStoreCacheBypass.WindowStoreCacheBypassFetcherAll; +import io.confluent.ksql.execution.streams.materialization.ks.WindowStoreCacheBypass.WindowStoreCacheBypassFetcherRange; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; @@ -111,12 +112,15 @@ public class KsMaterializedWindowTableTest { private WindowStoreCacheBypassFetcher cacheBypassFetcher; @Mock private WindowStoreCacheBypassFetcherAll cacheBypassFetcherAll; + @Mock + private WindowStoreCacheBypassFetcherRange cacheBypassFetcherRange; private KsMaterializedWindowTable table; @Before public void setUp() { - table = new KsMaterializedWindowTable(stateStore, WINDOW_SIZE, cacheBypassFetcher, cacheBypassFetcherAll); + table = new KsMaterializedWindowTable(stateStore, WINDOW_SIZE, + cacheBypassFetcher, cacheBypassFetcherAll, cacheBypassFetcherRange); when(stateStore.store(any(), anyInt())).thenReturn(tableStore); when(stateStore.schema()).thenReturn(SCHEMA); From e40e96458e4ddc54f51b5f599fa2cc18798f10c3 Mon Sep 17 00:00:00 2001 From: Chittaranjan Prasad Date: Sun, 24 Jan 2021 21:03:46 -0800 Subject: [PATCH 03/23] added window range checkstyle --- .../streams/materialization/ks/WindowStoreCacheBypass.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java index 542ddc63bf8e..afe3d7f2df92 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java @@ -353,7 +353,7 @@ private static final class DeserializingKeyValueIterator private final StateSerdes> serdes; private DeserializingKeyValueIterator(final KeyValueIterator, - byte[]> fetch, final StateSerdes> serdes) { + byte[]> fetch, final StateSerdes> serdes) { this.fetch = fetch; this.serdes = serdes; } From dcfecad97c098bf6a3f28f9c6dfd3a18520ae131 Mon Sep 17 00:00:00 2001 From: Chittaranjan Prasad Date: Sun, 24 Jan 2021 21:09:33 -0800 Subject: [PATCH 04/23] added emptyiterator --- .../ks/WindowStoreCacheBypass.java | 31 ++++++++++++++++--- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java index afe3d7f2df92..2e569d2e419a 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java @@ -136,7 +136,7 @@ public static WindowStoreIterator> fetch( + "please re-discover its location from the state metadata.", e); } } - return new EmptyKeyValueIterator(); + return new EmptyWindowStoreIterator(); } @SuppressWarnings("unchecked") @@ -220,7 +220,7 @@ public static KeyValueIterator, ValueAndTimestamp, ValueAndTimestamp> next() { } } - private static class EmptyKeyValueIterator + private static class EmptyWindowStoreIterator implements WindowStoreIterator> { @Override @@ -438,4 +438,27 @@ public KeyValue> next() { throw new NoSuchElementException(); } } + + private static class EmptyKeyValueIterator + implements KeyValueIterator, ValueAndTimestamp> { + + @Override + public void close() { + } + + @Override + public Windowed peekNextKey() { + throw new NoSuchElementException(); + } + + @Override + public boolean hasNext() { + return false; + } + + @Override + public KeyValue, ValueAndTimestamp> next() { + throw new NoSuchElementException(); + } + } } \ No newline at end of file From 559d63e3e224f997ab4c51ab450d49fe19bedfe9 Mon Sep 17 00:00:00 2001 From: Chittaranjan Prasad Date: Sun, 24 Jan 2021 21:29:32 -0800 Subject: [PATCH 05/23] added session store cache bypass for range --- .../materialization/ks/KsMaterialization.java | 3 +- .../ks/KsMaterializedSessionTable.java | 8 +- .../ks/SessionStoreCacheBypass.java | 88 +++++++++++++++++++ .../ks/KsMaterializedSessionTableTest.java | 5 +- 4 files changed, 101 insertions(+), 3 deletions(-) diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterialization.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterialization.java index 68c25fa9b0d5..cfd351472baa 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterialization.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterialization.java @@ -79,7 +79,8 @@ public MaterializedWindowedTable windowed() { final WindowType wndType = wndInfo.getType(); switch (wndType) { case SESSION: - return new KsMaterializedSessionTable(stateStore, SessionStoreCacheBypass::fetch); + return new KsMaterializedSessionTable(stateStore, + SessionStoreCacheBypass::fetch, SessionStoreCacheBypass::fetchRange); case HOPPING: case TUMBLING: diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java index b9c190d82c32..b4c8ca8b9066 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java @@ -24,6 +24,7 @@ import io.confluent.ksql.execution.streams.materialization.MaterializedWindowedTable; import io.confluent.ksql.execution.streams.materialization.WindowedRow; import io.confluent.ksql.execution.streams.materialization.ks.SessionStoreCacheBypass.SessionStoreCacheBypassFetcher; +import io.confluent.ksql.execution.streams.materialization.ks.SessionStoreCacheBypass.SessionStoreCacheBypassFetcherRange; import java.time.Instant; import java.util.Iterator; import java.util.List; @@ -42,11 +43,16 @@ class KsMaterializedSessionTable implements MaterializedWindowedTable { private final KsStateStore stateStore; private final SessionStoreCacheBypassFetcher cacheBypassFetcher; + private final SessionStoreCacheBypassFetcherRange cacheBypassFetcherRange; KsMaterializedSessionTable(final KsStateStore store, - final SessionStoreCacheBypassFetcher cacheBypassFetcher) { + final SessionStoreCacheBypassFetcher cacheBypassFetcher, + final SessionStoreCacheBypassFetcherRange cacheBypassFetcherRange) { this.stateStore = Objects.requireNonNull(store, "store"); this.cacheBypassFetcher = Objects.requireNonNull(cacheBypassFetcher, "cacheBypassFetcher"); + this.cacheBypassFetcherRange = Objects.requireNonNull( + cacheBypassFetcherRange, + "cacheBypassFetcherRange"); } @Override diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass.java index 3812e97fd236..e4172eecb037 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass.java @@ -69,6 +69,15 @@ KeyValueIterator, GenericRow> fetch( ); } + interface SessionStoreCacheBypassFetcherRange { + + KeyValueIterator, GenericRow> fetchRange( + ReadOnlySessionStore store, + GenericKey keyFrom, + GenericKey keyTo + ); + } + @SuppressWarnings("unchecked") public static KeyValueIterator, GenericRow> fetch( final ReadOnlySessionStore store, @@ -143,6 +152,85 @@ private static KeyValueIterator, GenericRow> fetchUncached( } } + @SuppressWarnings("unchecked") + public static KeyValueIterator, GenericRow> fetchRange( + final ReadOnlySessionStore store, + final GenericKey keyFrom, + final GenericKey keyTo + ) { + Objects.requireNonNull(keyFrom, "lower key can't be null"); + Objects.requireNonNull(keyTo, "upper key can't be null"); + + final StateStoreProvider provider; + final String storeName; + final QueryableStoreType> storeType; + try { + provider = (StateStoreProvider) PROVIDER_FIELD.get(store); + storeName = (String) STORE_NAME_FIELD.get(store); + storeType = (QueryableStoreType>) + STORE_TYPE_FIELD.get(store); + } catch (final IllegalAccessException e) { + throw new RuntimeException("Stream internals changed unexpectedly!", e); + } + final List> stores + = provider.stores(storeName, storeType); + for (final ReadOnlySessionStore sessionStore : stores) { + try { + final KeyValueIterator, GenericRow> result + = fetchRangeUncached(sessionStore, keyFrom, keyTo); + // returns the first non-empty iterator + if (!result.hasNext()) { + result.close(); + } else { + return result; + } + } catch (final InvalidStateStoreException e) { + throw new InvalidStateStoreException( + "State store is not available anymore " + + "and may have been migrated to another instance; " + + "please re-discover its location from the state metadata.", e); + } + } + return new EmptyKeyValueIterator(); + } + + @SuppressWarnings("unchecked") + private static KeyValueIterator, GenericRow> fetchRangeUncached( + final ReadOnlySessionStore sessionStore, + final GenericKey keyFrom, + final GenericKey keyTo + ) { + if (sessionStore instanceof MeteredSessionStore) { + final StateSerdes serdes; + try { + serdes = (StateSerdes) SERDES_FIELD.get(sessionStore); + } catch (final IllegalAccessException e) { + throw new RuntimeException("Stream internals changed unexpectedly!", e); + } + + final Bytes rawKeyFrom = Bytes.wrap(serdes.rawKey(keyFrom)); + final Bytes rawKeyTo = Bytes.wrap(serdes.rawKey(keyTo)); + SessionStore wrapped + = ((MeteredSessionStore) sessionStore).wrapped(); + // Unwrap state stores until we get to the last SessionStore, which is past the caching + // layer. + while (wrapped instanceof WrappedStateStore) { + final StateStore store = ((WrappedStateStore) wrapped).wrapped(); + // A RocksDBSessionStore wraps a SegmentedBytesStore, which isn't a SessionStore, so + // we just store there. + if (!(store instanceof SessionStore)) { + break; + } + wrapped = (SessionStore) store; + } + // now we have the innermost layer of the store. + final KeyValueIterator, byte[]> fetch = wrapped.fetch(rawKeyFrom, rawKeyTo); + return new DeserializingIterator(fetch, serdes); + } else { + throw new IllegalStateException("Expecting a MeteredSessionStore"); + } + } + private static final class DeserializingIterator implements KeyValueIterator, GenericRow> { private final KeyValueIterator, byte[]> fetch; diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTableTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTableTest.java index 1efb1e93314d..4429913ec6d3 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTableTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTableTest.java @@ -37,6 +37,7 @@ import io.confluent.ksql.execution.streams.materialization.MaterializationTimeOutException; import io.confluent.ksql.execution.streams.materialization.WindowedRow; import io.confluent.ksql.execution.streams.materialization.ks.SessionStoreCacheBypass.SessionStoreCacheBypassFetcher; +import io.confluent.ksql.execution.streams.materialization.ks.SessionStoreCacheBypass.SessionStoreCacheBypassFetcherRange; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; @@ -88,13 +89,15 @@ public class KsMaterializedSessionTableTest { private KeyValueIterator, GenericRow> fetchIterator; @Mock private SessionStoreCacheBypassFetcher cacheBypassFetcher; + @Mock + private SessionStoreCacheBypassFetcherRange cacheBypassFetcherRange; private KsMaterializedSessionTable table; private final List, GenericRow>> sessions = new ArrayList<>(); private int sessionIdx; @Before public void setUp() { - table = new KsMaterializedSessionTable(stateStore, cacheBypassFetcher); + table = new KsMaterializedSessionTable(stateStore, cacheBypassFetcher, cacheBypassFetcherRange); when(stateStore.store(any(), anyInt())).thenReturn(sessionStore); when(stateStore.schema()).thenReturn(SCHEMA); From 0a0353753f205722dd50909e96c0c3b740a93c0e Mon Sep 17 00:00:00 2001 From: Chittaranjan Prasad Date: Mon, 25 Jan 2021 00:35:32 -0800 Subject: [PATCH 06/23] rewrite iterator --- .../ks/WindowStoreCacheBypass.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java index 2e569d2e419a..54a8e6689bb0 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java @@ -268,8 +268,7 @@ public static KeyValueIterator, ValueAndTimestamp, ValueAndTimestamp> - fetchAll( + public static KeyValueIterator, ValueAndTimestamp> fetchAll( final ReadOnlyWindowStore> store, final Instant lower, final Instant upper @@ -301,8 +300,8 @@ public static KeyValueIterator, ValueAndTimestamp, byte[]> fetch; private final StateSerdes> serdes; - private DeserializingKeyValueIterator(final KeyValueIterator, - byte[]> fetch, final StateSerdes> serdes) { + private DeserializingKeyValueIterator( + final KeyValueIterator, byte[]> fetch, + final StateSerdes> serdes) { this.fetch = fetch; this.serdes = serdes; } - @Override public void close() { fetch.close(); @@ -366,8 +365,8 @@ public void close() { @Override public Windowed peekNextKey() { - final Windowed peekNext = fetch.peekNextKey(); - return new Windowed(peekNext, peekNext.window()); + final Windowed nextKey = fetch.peekNextKey(); + return new Windowed<>(serdes.keyFrom(nextKey.key().get()), nextKey.window()); } @Override @@ -378,8 +377,9 @@ public boolean hasNext() { @Override public KeyValue, ValueAndTimestamp> next() { final KeyValue, byte[]> next = fetch.next(); - return KeyValue.pair(new Windowed(next.key, next.key.window()), - serdes.valueFrom(next.value)); + final Windowed windowedKey = new Windowed<>( + serdes.keyFrom(next.key.key().get()), next.key.window()); + return KeyValue.pair(windowedKey, serdes.valueFrom(next.value)); } } From 3c824b31ce17cba02296f7375b8929110f9ff804 Mon Sep 17 00:00:00 2001 From: Chittaranjan Prasad Date: Mon, 25 Jan 2021 00:59:14 -0800 Subject: [PATCH 07/23] rewrite iterator 2 --- .../streams/materialization/ks/WindowStoreCacheBypass.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java index 54a8e6689bb0..be3ab5c6e69e 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java @@ -366,7 +366,8 @@ public void close() { @Override public Windowed peekNextKey() { final Windowed nextKey = fetch.peekNextKey(); - return new Windowed<>(serdes.keyFrom(nextKey.key().get()), nextKey.window()); + return nextKey == null ? null : new Windowed<>( + serdes.keyFrom(nextKey.key().get()), nextKey.window()); } @Override From 87a21e09acfddb7fb8643b6c760d9a46eb9179ee Mon Sep 17 00:00:00 2001 From: Chittaranjan Prasad Date: Mon, 25 Jan 2021 16:03:15 -0800 Subject: [PATCH 08/23] fix tests --- .../ks/KsMaterializedWindowTableTest.java | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java index 8698af417d89..7028bcebc42e 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java @@ -125,7 +125,7 @@ public void setUp() { when(stateStore.store(any(), anyInt())).thenReturn(tableStore); when(stateStore.schema()).thenReturn(SCHEMA); when(cacheBypassFetcher.fetch(any(), any(), any(), any())).thenReturn(fetchIterator); - when(tableStore.fetchAll(any(), any())).thenReturn(keyValueIterator); + when(cacheBypassFetcherAll.fetchAll(any(), any(), any())).thenReturn(keyValueIterator); } @SuppressWarnings("UnstableApiUsage") @@ -191,7 +191,7 @@ public void shouldThrowIfStoreFetchFails() { @Test public void shouldThrowIfStoreFetchFails_fetchAll() { // Given: - when(tableStore.fetchAll(any(), any())) + when(cacheBypassFetcherAll.fetchAll(any(), any(), any())) .thenThrow(new MaterializationTimeOutException("Boom")); // When: @@ -258,7 +258,8 @@ public void shouldFetchWithNoBounds_fetchAll() { table.get(PARTITION, Range.all(), Range.all()); // Then: - verify(tableStore).fetchAll( + verify(cacheBypassFetcherAll).fetchAll( + eq(tableStore), eq(Instant.ofEpochMilli(0)), eq(Instant.ofEpochMilli(Long.MAX_VALUE)) ); @@ -284,7 +285,8 @@ public void shouldFetchWithOnlyStartBounds_fetchAll() { table.get(PARTITION, WINDOW_START_BOUNDS, Range.all()); // Then: - verify(tableStore).fetchAll( + verify(cacheBypassFetcherAll).fetchAll( + eq(tableStore), eq(WINDOW_START_BOUNDS.lowerEndpoint()), eq(WINDOW_START_BOUNDS.upperEndpoint()) ); @@ -310,7 +312,8 @@ public void shouldFetchWithOnlyEndBounds_fetchAll() { table.get(PARTITION, Range.all(), WINDOW_END_BOUNDS); // Then: - verify(tableStore).fetchAll( + verify(cacheBypassFetcherAll).fetchAll( + eq(tableStore), eq(WINDOW_END_BOUNDS.lowerEndpoint().minus(WINDOW_SIZE)), eq(WINDOW_END_BOUNDS.upperEndpoint().minus(WINDOW_SIZE)) ); @@ -487,7 +490,6 @@ public void shouldReturnValuesForClosedStartBounds_fetchAll() { start.upperEndpoint().toEpochMilli() + WINDOW_SIZE.toMillis())), VALUE_2)) .thenThrow(new AssertionError()); - when(tableStore.fetchAll(any(), any())).thenReturn(keyValueIterator); // When: final Iterator result = table.get(PARTITION, start, Range.all()); @@ -580,7 +582,6 @@ public void shouldReturnValuesForClosedEndBounds_fetchAll() { startEqiv.upperEndpoint().toEpochMilli() + WINDOW_SIZE.toMillis())), VALUE_2)) .thenThrow(new AssertionError()); - when(tableStore.fetchAll(any(), any())).thenReturn(keyValueIterator); // When: final Iterator result = table.get(PARTITION, Range.all(), end); @@ -662,8 +663,6 @@ public void shouldReturnValuesForOpenStartBounds_fetchAll() { start.upperEndpoint().toEpochMilli() + WINDOW_SIZE.toMillis())), VALUE_3)) .thenThrow(new AssertionError()); - when(tableStore.fetchAll(any(), any())).thenReturn(keyValueIterator); - // When: final Iterator result = table.get(PARTITION, start, Range.all()); @@ -749,7 +748,6 @@ public void shouldReturnValuesForOpenEndBounds_fetchAll() { startEquiv.upperEndpoint().toEpochMilli() + WINDOW_SIZE.toMillis())), VALUE_3)) .thenThrow(new AssertionError()); - when(tableStore.fetchAll(any(), any())).thenReturn(keyValueIterator); // When: final Iterator result = table.get(PARTITION, Range.all(), end); @@ -830,7 +828,8 @@ public void shouldSupportRangeAll_fetchAll() { table.get(PARTITION, Range.all(), Range.all()); // Then: - verify(tableStore).fetchAll( + verify(cacheBypassFetcherAll).fetchAll( + tableStore, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE) ); From 6d0f5122d61df8738b5778fd2e3137c59e26ad45 Mon Sep 17 00:00:00 2001 From: Chittaranjan Prasad Date: Mon, 25 Jan 2021 18:27:08 -0800 Subject: [PATCH 09/23] clean up iterator code --- .../streams/materialization/ks/WindowStoreCacheBypass.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java index be3ab5c6e69e..54a8e6689bb0 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java @@ -366,8 +366,7 @@ public void close() { @Override public Windowed peekNextKey() { final Windowed nextKey = fetch.peekNextKey(); - return nextKey == null ? null : new Windowed<>( - serdes.keyFrom(nextKey.key().get()), nextKey.window()); + return new Windowed<>(serdes.keyFrom(nextKey.key().get()), nextKey.window()); } @Override From 88f8a8b8d0a283067e713292687dbb6bbd9bd559 Mon Sep 17 00:00:00 2001 From: Chittaranjan Prasad Date: Mon, 25 Jan 2021 19:01:47 -0800 Subject: [PATCH 10/23] refactor fetchUncached et al --- .../ks/WindowStoreCacheBypass.java | 141 +++++++----------- 1 file changed, 52 insertions(+), 89 deletions(-) diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java index 54a8e6689bb0..1ffa287fdeb0 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java @@ -139,42 +139,20 @@ public static WindowStoreIterator> fetch( return new EmptyWindowStoreIterator(); } - @SuppressWarnings("unchecked") private static WindowStoreIterator> fetchUncached( final ReadOnlyWindowStore> windowStore, final GenericKey key, final Instant lower, final Instant upper ) { - if (windowStore instanceof MeteredWindowStore) { - final StateSerdes> serdes; - try { - serdes = (StateSerdes>) SERDES_FIELD - .get(windowStore); - } catch (final IllegalAccessException e) { - throw new RuntimeException("Stream internals changed unexpectedly!", e); - } - - final Bytes rawKey = Bytes.wrap(serdes.rawKey(key)); - WindowStore wrapped - = ((MeteredWindowStore>) windowStore).wrapped(); - // Unwrap state stores until we get to the last WindowStore, which is past the caching - // layer. - while (wrapped instanceof WrappedStateStore) { - final StateStore store = ((WrappedStateStore) wrapped).wrapped(); - // A RocksDBWindowStore wraps a SegmentedBytesStore, which isn't a SessionStore, so - // we just store there. - if (!(store instanceof WindowStore)) { - break; - } - wrapped = (WindowStore) store; - } - // now we have the innermost layer of the store. - final WindowStoreIterator fetch = wrapped.fetch(rawKey, lower, upper); - return new DeserializingIterator(fetch, serdes); - } else { + if (!(windowStore instanceof MeteredWindowStore)) { throw new IllegalStateException("Expecting a MeteredWindowStore"); } + final StateSerdes> serdes = getSerdes(windowStore); + WindowStore wrapped = getInnermostStore(windowStore); + final Bytes rawKey = Bytes.wrap(serdes.rawKey(key)); + final WindowStoreIterator fetch = wrapped.fetch(rawKey, lower, upper); + return new DeserializingIterator(fetch, serdes); } @SuppressWarnings("unchecked") @@ -223,7 +201,6 @@ public static KeyValueIterator, ValueAndTimestamp, ValueAndTimestamp> fetchRangeUncached( final ReadOnlyWindowStore> windowStore, @@ -232,39 +209,16 @@ public static KeyValueIterator, ValueAndTimestamp> serdes; - try { - serdes = (StateSerdes>) SERDES_FIELD - .get(windowStore); - } catch (final IllegalAccessException e) { - throw new RuntimeException("Stream internals changed unexpectedly!", e); - } - - final Bytes rawKeyFrom = Bytes.wrap(serdes.rawKey(keyFrom)); - final Bytes rawKeyTo = Bytes.wrap(serdes.rawKey(keyTo)); - - WindowStore wrapped - = ((MeteredWindowStore>) windowStore) - .wrapped(); - // Unwrap state stores until we get to the last WindowStore, which is past the caching - // layer. - while (wrapped instanceof WrappedStateStore) { - final StateStore store = ((WrappedStateStore) wrapped).wrapped(); - // A RocksDBWindowStore wraps a SegmentedBytesStore, which isn't a SessionStore, so - // we just store there. - if (!(store instanceof WindowStore)) { - break; - } - wrapped = (WindowStore) store; - } - // now we have the innermost layer of the store. - final KeyValueIterator, byte[]> fetch = wrapped - .fetch(rawKeyFrom, rawKeyTo, lower, upper); - return new DeserializingKeyValueIterator(fetch, serdes); - } else { + if (!(windowStore instanceof MeteredWindowStore)) { throw new IllegalStateException("Expecting a MeteredWindowStore"); } + final StateSerdes> serdes = getSerdes(windowStore); + WindowStore wrapped = getInnermostStore(windowStore); + final Bytes rawKeyFrom = Bytes.wrap(serdes.rawKey(keyFrom)); + final Bytes rawKeyTo = Bytes.wrap(serdes.rawKey(keyTo)); + final KeyValueIterator, byte[]> fetch = wrapped + .fetch(rawKeyFrom, rawKeyTo, lower, upper); + return new DeserializingKeyValueIterator(fetch, serdes); } @SuppressWarnings("unchecked") @@ -308,42 +262,19 @@ public static KeyValueIterator, ValueAndTimestamp, ValueAndTimestamp> fetchAllUncached( final ReadOnlyWindowStore> windowStore, final Instant lower, final Instant upper ) { - if (windowStore instanceof MeteredWindowStore) { - final StateSerdes> serdes; - try { - serdes = (StateSerdes>) SERDES_FIELD - .get(windowStore); - } catch (final IllegalAccessException e) { - throw new RuntimeException("Stream internals changed unexpectedly!", e); - } - - WindowStore wrapped - = ((MeteredWindowStore>) windowStore) - .wrapped(); - // Unwrap state stores until we get to the last WindowStore, which is past the caching - // layer. - while (wrapped instanceof WrappedStateStore) { - final StateStore store = ((WrappedStateStore) wrapped).wrapped(); - // A RocksDBWindowStore wraps a SegmentedBytesStore, which isn't a SessionStore, so - // we just store there. - if (!(store instanceof WindowStore)) { - break; - } - wrapped = (WindowStore) store; - } - // now we have the innermost layer of the store. - final KeyValueIterator, byte[]> fetch = wrapped.fetchAll(lower, upper); - return new DeserializingKeyValueIterator(fetch, serdes); - } else { + if (!(windowStore instanceof MeteredWindowStore)) { throw new IllegalStateException("Expecting a MeteredWindowStore"); } + final StateSerdes> serdes = getSerdes(windowStore); + WindowStore wrapped = getInnermostStore(windowStore); + final KeyValueIterator, byte[]> fetch = wrapped.fetchAll(lower, upper); + return new DeserializingKeyValueIterator(fetch, serdes); } private static final class DeserializingKeyValueIterator @@ -461,4 +392,36 @@ public KeyValue, ValueAndTimestamp> next() { throw new NoSuchElementException(); } } -} \ No newline at end of file + + private static StateSerdes> getSerdes( + ReadOnlyWindowStore> windowStore + ) throws RuntimeException { + try { + return (StateSerdes>) SERDES_FIELD + .get(windowStore); + } catch (final IllegalAccessException e) { + throw new RuntimeException("Stream internals changed unexpectedly!", e); + } + } + + private static WindowStore getInnermostStore( + ReadOnlyWindowStore> windowStore) { + WindowStore wrapped + = ((MeteredWindowStore>) windowStore) + .wrapped(); + // Unwrap state stores until we get to the last WindowStore, which is past the caching + // layer. + while (wrapped instanceof WrappedStateStore) { + final StateStore store = ((WrappedStateStore) wrapped).wrapped(); + // A RocksDBWindowStore wraps a SegmentedBytesStore, which isn't a SessionStore, so + // we just store there. + if (!(store instanceof WindowStore)) { + break; + } + wrapped = (WindowStore) store; + } + // now we have the innermost layer of the store. + return wrapped; + } +} + From f64d605253a61272d1cfbbc6e7ce2a137909f77f Mon Sep 17 00:00:00 2001 From: Chittaranjan Prasad Date: Mon, 25 Jan 2021 19:35:23 -0800 Subject: [PATCH 11/23] refactor fetch et al --- .../ks/WindowStoreCacheBypass.java | 88 ++++++------------- 1 file changed, 29 insertions(+), 59 deletions(-) diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java index 1ffa287fdeb0..6740dfa68682 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java @@ -44,6 +44,9 @@ public final class WindowStoreCacheBypass { private static final Field STORE_NAME_FIELD; private static final Field WINDOW_STORE_TYPE_FIELD; static final Field SERDES_FIELD; + private static final String STORE_UNAVAILABLE_MESSAGE = "State store is not available anymore " + + "and may have been migrated to another instance; " + + "please re-discover its location from the state metadata."; static { try { @@ -96,7 +99,6 @@ KeyValueIterator, ValueAndTimestamp> fetchRange } - @SuppressWarnings("unchecked") public static WindowStoreIterator> fetch( final ReadOnlyWindowStore> store, final GenericKey key, @@ -104,21 +106,8 @@ public static WindowStoreIterator> fetch( final Instant upper ) { Objects.requireNonNull(key, "key can't be null"); - - final StateStoreProvider provider; - final String storeName; - final QueryableStoreType>> - windowStoreType; - try { - provider = (StateStoreProvider) PROVIDER_FIELD.get(store); - storeName = (String) STORE_NAME_FIELD.get(store); - windowStoreType = (QueryableStoreType>>) WINDOW_STORE_TYPE_FIELD.get(store); - } catch (final IllegalAccessException e) { - throw new RuntimeException("Stream internals changed unexpectedly!", e); - } final List>> stores - = provider.stores(storeName, windowStoreType); + = getStores(store); for (final ReadOnlyWindowStore> windowStore : stores) { try { @@ -131,9 +120,7 @@ public static WindowStoreIterator> fetch( return result; } } catch (final InvalidStateStoreException e) { - throw new InvalidStateStoreException( - "State store is not available anymore and may have been migrated to another instance; " - + "please re-discover its location from the state metadata.", e); + throw new InvalidStateStoreException(STORE_UNAVAILABLE_MESSAGE, e); } } return new EmptyWindowStoreIterator(); @@ -149,13 +136,12 @@ private static WindowStoreIterator> fetchUncached( throw new IllegalStateException("Expecting a MeteredWindowStore"); } final StateSerdes> serdes = getSerdes(windowStore); - WindowStore wrapped = getInnermostStore(windowStore); + final WindowStore wrapped = getInnermostStore(windowStore); final Bytes rawKey = Bytes.wrap(serdes.rawKey(key)); final WindowStoreIterator fetch = wrapped.fetch(rawKey, lower, upper); return new DeserializingIterator(fetch, serdes); } - @SuppressWarnings("unchecked") public static KeyValueIterator, ValueAndTimestamp> fetchRange( final ReadOnlyWindowStore> store, final GenericKey keyFrom, @@ -165,21 +151,8 @@ public static KeyValueIterator, ValueAndTimestamp>> - windowStoreType; - try { - provider = (StateStoreProvider) PROVIDER_FIELD.get(store); - storeName = (String) STORE_NAME_FIELD.get(store); - windowStoreType = (QueryableStoreType>>) WINDOW_STORE_TYPE_FIELD.get(store); - } catch (final IllegalAccessException e) { - throw new RuntimeException("Stream internals changed unexpectedly!", e); - } final List>> stores - = provider.stores(storeName, windowStoreType); + = getStores(store); for (final ReadOnlyWindowStore> windowStore : stores) { try { @@ -192,10 +165,7 @@ public static KeyValueIterator, ValueAndTimestamp, ValueAndTimestamp> serdes = getSerdes(windowStore); - WindowStore wrapped = getInnermostStore(windowStore); + final WindowStore wrapped = getInnermostStore(windowStore); final Bytes rawKeyFrom = Bytes.wrap(serdes.rawKey(keyFrom)); final Bytes rawKeyTo = Bytes.wrap(serdes.rawKey(keyTo)); final KeyValueIterator, byte[]> fetch = wrapped @@ -221,26 +191,13 @@ public static KeyValueIterator, ValueAndTimestamp, ValueAndTimestamp> fetchAll( final ReadOnlyWindowStore> store, final Instant lower, final Instant upper ) { - final StateStoreProvider provider; - final String storeName; - final QueryableStoreType>> - windowStoreType; - try { - provider = (StateStoreProvider) PROVIDER_FIELD.get(store); - storeName = (String) STORE_NAME_FIELD.get(store); - windowStoreType = (QueryableStoreType>>) WINDOW_STORE_TYPE_FIELD.get(store); - } catch (final IllegalAccessException e) { - throw new RuntimeException("Stream internals changed unexpectedly!", e); - } final List>> stores - = provider.stores(storeName, windowStoreType); + = getStores(store); for (final ReadOnlyWindowStore> windowStore : stores) { try { @@ -253,10 +210,7 @@ public static KeyValueIterator, ValueAndTimestamp, ValueAndTimestamp> serdes = getSerdes(windowStore); - WindowStore wrapped = getInnermostStore(windowStore); + final WindowStore wrapped = getInnermostStore(windowStore); final KeyValueIterator, byte[]> fetch = wrapped.fetchAll(lower, upper); return new DeserializingKeyValueIterator(fetch, serdes); } @@ -405,7 +359,8 @@ private static StateSerdes> getSerdes( } private static WindowStore getInnermostStore( - ReadOnlyWindowStore> windowStore) { + ReadOnlyWindowStore> windowStore + ) { WindowStore wrapped = ((MeteredWindowStore>) windowStore) .wrapped(); @@ -423,5 +378,20 @@ private static WindowStore getInnermostStore( // now we have the innermost layer of the store. return wrapped; } + + private static List>> getStores( + ReadOnlyWindowStore> store + ) { + try { + final StateStoreProvider provider = (StateStoreProvider) PROVIDER_FIELD.get(store); + final String storeName = (String) STORE_NAME_FIELD.get(store); + final QueryableStoreType>> + windowStoreType = (QueryableStoreType>>) WINDOW_STORE_TYPE_FIELD.get(store); + return provider.stores(storeName, windowStoreType); + } catch (final IllegalAccessException e) { + throw new RuntimeException("Stream internals changed unexpectedly!", e); + } + } } From 09bd6126cdcafdbca8f9e925912abf53d3fc581a Mon Sep 17 00:00:00 2001 From: Chittaranjan Prasad Date: Mon, 25 Jan 2021 20:20:56 -0800 Subject: [PATCH 12/23] change structure of windowstorecachebypass class --- .../ks/WindowStoreCacheBypass.java | 105 +++++++++--------- 1 file changed, 52 insertions(+), 53 deletions(-) diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java index 6740dfa68682..35d7907d599e 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java @@ -231,6 +231,52 @@ public static KeyValueIterator, ValueAndTimestamp> getSerdes( + ReadOnlyWindowStore> windowStore + ) throws RuntimeException { + try { + return (StateSerdes>) SERDES_FIELD.get(windowStore); + } catch (final IllegalAccessException e) { + throw new RuntimeException("Stream internals changed unexpectedly!", e); + } + } + + private static WindowStore getInnermostStore( + ReadOnlyWindowStore> windowStore + ) { + WindowStore wrapped + = ((MeteredWindowStore>) windowStore) + .wrapped(); + // Unwrap state stores until we get to the last WindowStore, which is past the caching + // layer. + while (wrapped instanceof WrappedStateStore) { + final StateStore store = ((WrappedStateStore) wrapped).wrapped(); + // A RocksDBWindowStore wraps a SegmentedBytesStore, which isn't a SessionStore, so + // we just store there. + if (!(store instanceof WindowStore)) { + break; + } + wrapped = (WindowStore) store; + } + // now we have the innermost layer of the store. + return wrapped; + } + + private static List>> getStores( + ReadOnlyWindowStore> store + ) { + try { + final StateStoreProvider provider = (StateStoreProvider) PROVIDER_FIELD.get(store); + final String storeName = (String) STORE_NAME_FIELD.get(store); + final QueryableStoreType>> + windowStoreType = (QueryableStoreType>>) WINDOW_STORE_TYPE_FIELD.get(store); + return provider.stores(storeName, windowStoreType); + } catch (final IllegalAccessException e) { + throw new RuntimeException("Stream internals changed unexpectedly!", e); + } + } + private static final class DeserializingKeyValueIterator implements KeyValueIterator, ValueAndTimestamp> { private final KeyValueIterator, byte[]> fetch; @@ -269,12 +315,13 @@ public KeyValue, ValueAndTimestamp> next() { } private static final class DeserializingIterator - implements WindowStoreIterator> { + implements WindowStoreIterator> { private final WindowStoreIterator fetch; private final StateSerdes> serdes; - private DeserializingIterator(final WindowStoreIterator fetch, - final StateSerdes> serdes) { + private DeserializingIterator( + final WindowStoreIterator fetch, + final StateSerdes> serdes) { this.fetch = fetch; this.serdes = serdes; } @@ -302,7 +349,7 @@ public KeyValue> next() { } private static class EmptyWindowStoreIterator - implements WindowStoreIterator> { + implements WindowStoreIterator> { @Override public void close() { @@ -346,52 +393,4 @@ public KeyValue, ValueAndTimestamp> next() { throw new NoSuchElementException(); } } - - private static StateSerdes> getSerdes( - ReadOnlyWindowStore> windowStore - ) throws RuntimeException { - try { - return (StateSerdes>) SERDES_FIELD - .get(windowStore); - } catch (final IllegalAccessException e) { - throw new RuntimeException("Stream internals changed unexpectedly!", e); - } - } - - private static WindowStore getInnermostStore( - ReadOnlyWindowStore> windowStore - ) { - WindowStore wrapped - = ((MeteredWindowStore>) windowStore) - .wrapped(); - // Unwrap state stores until we get to the last WindowStore, which is past the caching - // layer. - while (wrapped instanceof WrappedStateStore) { - final StateStore store = ((WrappedStateStore) wrapped).wrapped(); - // A RocksDBWindowStore wraps a SegmentedBytesStore, which isn't a SessionStore, so - // we just store there. - if (!(store instanceof WindowStore)) { - break; - } - wrapped = (WindowStore) store; - } - // now we have the innermost layer of the store. - return wrapped; - } - - private static List>> getStores( - ReadOnlyWindowStore> store - ) { - try { - final StateStoreProvider provider = (StateStoreProvider) PROVIDER_FIELD.get(store); - final String storeName = (String) STORE_NAME_FIELD.get(store); - final QueryableStoreType>> - windowStoreType = (QueryableStoreType>>) WINDOW_STORE_TYPE_FIELD.get(store); - return provider.stores(storeName, windowStoreType); - } catch (final IllegalAccessException e) { - throw new RuntimeException("Stream internals changed unexpectedly!", e); - } - } -} - +} \ No newline at end of file From 4bce1672b2c4e378c7ec84ea4ac4227c641ac4dd Mon Sep 17 00:00:00 2001 From: Chittaranjan Prasad Date: Mon, 25 Jan 2021 20:55:15 -0800 Subject: [PATCH 13/23] clean up --- .../streams/materialization/ks/WindowStoreCacheBypass.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java index 35d7907d599e..b9dc01b4f5ec 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java @@ -231,6 +231,7 @@ public static KeyValueIterator, ValueAndTimestamp> getSerdes( ReadOnlyWindowStore> windowStore ) throws RuntimeException { @@ -241,6 +242,7 @@ private static StateSerdes> getSerdes( } } + @SuppressWarnings("unchecked") private static WindowStore getInnermostStore( ReadOnlyWindowStore> windowStore ) { @@ -262,6 +264,7 @@ private static WindowStore getInnermostStore( return wrapped; } + @SuppressWarnings("unchecked") private static List>> getStores( ReadOnlyWindowStore> store ) { From c33e946a5f113427d81aa90a43e90d8aaf7ecb1e Mon Sep 17 00:00:00 2001 From: Chittaranjan Prasad Date: Mon, 25 Jan 2021 21:04:54 -0800 Subject: [PATCH 14/23] pass checkstyles --- .../ks/WindowStoreCacheBypass.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java index b9dc01b4f5ec..ba5c8a1b54e3 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java @@ -108,8 +108,7 @@ public static WindowStoreIterator> fetch( Objects.requireNonNull(key, "key can't be null"); final List>> stores = getStores(store); - for (final ReadOnlyWindowStore> windowStore - : stores) { + for (final ReadOnlyWindowStore> windowStore :stores) { try { final WindowStoreIterator> result = fetchUncached(windowStore, key, lower, upper); @@ -153,8 +152,7 @@ public static KeyValueIterator, ValueAndTimestamp>> stores = getStores(store); - for (final ReadOnlyWindowStore> windowStore - : stores) { + for (final ReadOnlyWindowStore> windowStore :stores) { try { final KeyValueIterator, ValueAndTimestamp> result = fetchRangeUncached(windowStore, keyFrom, keyTo, lower, upper); @@ -198,8 +196,7 @@ public static KeyValueIterator, ValueAndTimestamp>> stores = getStores(store); - for (final ReadOnlyWindowStore> windowStore - : stores) { + for (final ReadOnlyWindowStore> windowStore :stores) { try { final KeyValueIterator, ValueAndTimestamp> result = fetchAllUncached(windowStore, lower, upper); @@ -233,7 +230,7 @@ public static KeyValueIterator, ValueAndTimestamp> getSerdes( - ReadOnlyWindowStore> windowStore + final ReadOnlyWindowStore> windowStore ) throws RuntimeException { try { return (StateSerdes>) SERDES_FIELD.get(windowStore); @@ -244,7 +241,7 @@ private static StateSerdes> getSerdes( @SuppressWarnings("unchecked") private static WindowStore getInnermostStore( - ReadOnlyWindowStore> windowStore + final ReadOnlyWindowStore> windowStore ) { WindowStore wrapped = ((MeteredWindowStore>) windowStore) @@ -266,7 +263,7 @@ private static WindowStore getInnermostStore( @SuppressWarnings("unchecked") private static List>> getStores( - ReadOnlyWindowStore> store + final ReadOnlyWindowStore> store ) { try { final StateStoreProvider provider = (StateStoreProvider) PROVIDER_FIELD.get(store); From ee5bfd578bf2d9067b648094064a12257626cbd1 Mon Sep 17 00:00:00 2001 From: Chittaranjan Prasad Date: Tue, 2 Feb 2021 18:46:00 -0800 Subject: [PATCH 15/23] refactor fetches --- .../ks/SessionStoreCacheBypass.java | 150 ++++++++---------- .../ks/WindowStoreCacheBypass.java | 82 +++++----- 2 files changed, 102 insertions(+), 130 deletions(-) diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass.java index e4172eecb037..7581e3bc557a 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass.java @@ -42,6 +42,9 @@ public final class SessionStoreCacheBypass { private static final Field STORE_NAME_FIELD; private static final Field STORE_TYPE_FIELD; static final Field SERDES_FIELD; + private static final String STORE_UNAVAILABLE_MESSAGE = "State store is not available anymore " + + "and may have been migrated to another instance; " + + "please re-discover its location from the state metadata."; static { try { @@ -78,26 +81,12 @@ KeyValueIterator, GenericRow> fetchRange( ); } - @SuppressWarnings("unchecked") public static KeyValueIterator, GenericRow> fetch( final ReadOnlySessionStore store, final GenericKey key ) { Objects.requireNonNull(key, "key can't be null"); - - final StateStoreProvider provider; - final String storeName; - final QueryableStoreType> storeType; - try { - provider = (StateStoreProvider) PROVIDER_FIELD.get(store); - storeName = (String) STORE_NAME_FIELD.get(store); - storeType = (QueryableStoreType>) - STORE_TYPE_FIELD.get(store); - } catch (final IllegalAccessException e) { - throw new RuntimeException("Stream internals changed unexpectedly!", e); - } - final List> stores - = provider.stores(storeName, storeType); + final List> stores = getStores(store); for (final ReadOnlySessionStore sessionStore : stores) { try { final KeyValueIterator, GenericRow> result @@ -109,50 +98,27 @@ public static KeyValueIterator, GenericRow> fetch( return result; } } catch (final InvalidStateStoreException e) { - throw new InvalidStateStoreException( - "State store is not available anymore and may have been migrated to another instance; " - + "please re-discover its location from the state metadata.", e); + throw new InvalidStateStoreException(STORE_UNAVAILABLE_MESSAGE, e); } } return new EmptyKeyValueIterator(); } - @SuppressWarnings("unchecked") private static KeyValueIterator, GenericRow> fetchUncached( final ReadOnlySessionStore sessionStore, final GenericKey key ) { - if (sessionStore instanceof MeteredSessionStore) { - final StateSerdes serdes; - try { - serdes = (StateSerdes) SERDES_FIELD.get(sessionStore); - } catch (final IllegalAccessException e) { - throw new RuntimeException("Stream internals changed unexpectedly!", e); - } - + if (!(sessionStore instanceof MeteredSessionStore)) { + throw new IllegalStateException("Expecting a MeteredSessionStore"); + } else { + final StateSerdes serdes = getSerdes(sessionStore); final Bytes rawKey = Bytes.wrap(serdes.rawKey(key)); - SessionStore wrapped - = ((MeteredSessionStore) sessionStore).wrapped(); - // Unwrap state stores until we get to the last SessionStore, which is past the caching - // layer. - while (wrapped instanceof WrappedStateStore) { - final StateStore store = ((WrappedStateStore) wrapped).wrapped(); - // A RocksDBSessionStore wraps a SegmentedBytesStore, which isn't a SessionStore, so - // we just store there. - if (!(store instanceof SessionStore)) { - break; - } - wrapped = (SessionStore) store; - } - // now we have the innermost layer of the store. + final SessionStore wrapped = getInnermostStore(sessionStore); final KeyValueIterator, byte[]> fetch = wrapped.fetch(rawKey); return new DeserializingIterator(fetch, serdes); - } else { - throw new IllegalStateException("Expecting a MeteredSessionStore"); } } - @SuppressWarnings("unchecked") public static KeyValueIterator, GenericRow> fetchRange( final ReadOnlySessionStore store, final GenericKey keyFrom, @@ -161,19 +127,7 @@ public static KeyValueIterator, GenericRow> fetchRange( Objects.requireNonNull(keyFrom, "lower key can't be null"); Objects.requireNonNull(keyTo, "upper key can't be null"); - final StateStoreProvider provider; - final String storeName; - final QueryableStoreType> storeType; - try { - provider = (StateStoreProvider) PROVIDER_FIELD.get(store); - storeName = (String) STORE_NAME_FIELD.get(store); - storeType = (QueryableStoreType>) - STORE_TYPE_FIELD.get(store); - } catch (final IllegalAccessException e) { - throw new RuntimeException("Stream internals changed unexpectedly!", e); - } - final List> stores - = provider.stores(storeName, storeType); + final List> stores = getStores(store); for (final ReadOnlySessionStore sessionStore : stores) { try { final KeyValueIterator, GenericRow> result @@ -185,49 +139,75 @@ public static KeyValueIterator, GenericRow> fetchRange( return result; } } catch (final InvalidStateStoreException e) { - throw new InvalidStateStoreException( - "State store is not available anymore " - + "and may have been migrated to another instance; " - + "please re-discover its location from the state metadata.", e); + throw new InvalidStateStoreException(STORE_UNAVAILABLE_MESSAGE, e); } } return new EmptyKeyValueIterator(); } - @SuppressWarnings("unchecked") private static KeyValueIterator, GenericRow> fetchRangeUncached( final ReadOnlySessionStore sessionStore, final GenericKey keyFrom, final GenericKey keyTo ) { - if (sessionStore instanceof MeteredSessionStore) { - final StateSerdes serdes; - try { - serdes = (StateSerdes) SERDES_FIELD.get(sessionStore); - } catch (final IllegalAccessException e) { - throw new RuntimeException("Stream internals changed unexpectedly!", e); - } + if (!(sessionStore instanceof MeteredSessionStore)) { + throw new IllegalStateException("Expecting a MeteredSessionStore"); + } else { + final StateSerdes serdes = getSerdes(sessionStore); final Bytes rawKeyFrom = Bytes.wrap(serdes.rawKey(keyFrom)); final Bytes rawKeyTo = Bytes.wrap(serdes.rawKey(keyTo)); - SessionStore wrapped - = ((MeteredSessionStore) sessionStore).wrapped(); - // Unwrap state stores until we get to the last SessionStore, which is past the caching - // layer. - while (wrapped instanceof WrappedStateStore) { - final StateStore store = ((WrappedStateStore) wrapped).wrapped(); - // A RocksDBSessionStore wraps a SegmentedBytesStore, which isn't a SessionStore, so - // we just store there. - if (!(store instanceof SessionStore)) { - break; - } - wrapped = (SessionStore) store; - } - // now we have the innermost layer of the store. + final SessionStore wrapped = getInnermostStore(sessionStore); final KeyValueIterator, byte[]> fetch = wrapped.fetch(rawKeyFrom, rawKeyTo); return new DeserializingIterator(fetch, serdes); - } else { - throw new IllegalStateException("Expecting a MeteredSessionStore"); + } + } + + @SuppressWarnings("unchecked") + private static StateSerdes getSerdes( + final ReadOnlySessionStore sessionStore + ) throws RuntimeException { + try { + return (StateSerdes) SERDES_FIELD.get(sessionStore); + } catch (final IllegalAccessException e) { + throw new RuntimeException("Stream internals changed unexpectedly!", e); + } + } + + @SuppressWarnings("unchecked") + private static SessionStore getInnermostStore( + final ReadOnlySessionStore sessionStore + ) { + SessionStore wrapped + = ((MeteredSessionStore) sessionStore).wrapped(); + // Unwrap state stores until we get to the last SessionStore, which is past the caching + // layer. + while (wrapped instanceof WrappedStateStore) { + final StateStore store = ((WrappedStateStore) wrapped).wrapped(); + // A RocksDBSessionStore wraps a SegmentedBytesStore, which isn't a SessionStore, so + // we just store there. + if (!(store instanceof SessionStore)) { + break; + } + wrapped = (SessionStore) store; + } + // now we have the innermost layer of the store. + return wrapped; + } + + @SuppressWarnings("unchecked") + private static List> getStores( + final ReadOnlySessionStore store + ) { + final QueryableStoreType> storeType; + try { + final StateStoreProvider provider = (StateStoreProvider) PROVIDER_FIELD.get(store); + final String storeName = (String) STORE_NAME_FIELD.get(store); + storeType = (QueryableStoreType>) + STORE_TYPE_FIELD.get(store); + return provider.stores(storeName, storeType); + } catch (final IllegalAccessException e) { + throw new RuntimeException("Stream internals changed unexpectedly!", e); } } diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java index ba5c8a1b54e3..1a4fdf28ee84 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java @@ -17,11 +17,13 @@ import io.confluent.ksql.GenericKey; import io.confluent.ksql.GenericRow; +import java.io.Closeable; import java.lang.reflect.Field; import java.time.Instant; import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.function.Function; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; @@ -108,21 +110,10 @@ public static WindowStoreIterator> fetch( Objects.requireNonNull(key, "key can't be null"); final List>> stores = getStores(store); - for (final ReadOnlyWindowStore> windowStore :stores) { - try { - final WindowStoreIterator> result - = fetchUncached(windowStore, key, lower, upper); - // returns the first non-empty iterator - if (!result.hasNext()) { - result.close(); - } else { - return result; - } - } catch (final InvalidStateStoreException e) { - throw new InvalidStateStoreException(STORE_UNAVAILABLE_MESSAGE, e); - } - } - return new EmptyWindowStoreIterator(); + final Function>, + WindowStoreIterator>> fetchFunc = windowStore -> + fetchUncached(windowStore, key, lower, upper); + return findFirstNonEmptyIterator(stores, fetchFunc); } private static WindowStoreIterator> fetchUncached( @@ -152,21 +143,10 @@ public static KeyValueIterator, ValueAndTimestamp>> stores = getStores(store); - for (final ReadOnlyWindowStore> windowStore :stores) { - try { - final KeyValueIterator, ValueAndTimestamp> result - = fetchRangeUncached(windowStore, keyFrom, keyTo, lower, upper); - // returns the first non-empty iterator - if (!result.hasNext()) { - result.close(); - } else { - return result; - } - } catch (final InvalidStateStoreException e) { - throw new InvalidStateStoreException(STORE_UNAVAILABLE_MESSAGE, e); - } - } - return new EmptyKeyValueIterator(); + final Function>, + KeyValueIterator, ValueAndTimestamp>> fetchFunc + = windowStore -> fetchRangeUncached(windowStore, keyFrom, keyTo, lower, upper); + return findFirstNonEmptyIterator(stores, fetchFunc); } private static KeyValueIterator, ValueAndTimestamp> @@ -196,21 +176,10 @@ public static KeyValueIterator, ValueAndTimestamp>> stores = getStores(store); - for (final ReadOnlyWindowStore> windowStore :stores) { - try { - final KeyValueIterator, ValueAndTimestamp> result - = fetchAllUncached(windowStore, lower, upper); - // returns the first non-empty iterator - if (!result.hasNext()) { - result.close(); - } else { - return result; - } - } catch (final InvalidStateStoreException e) { - throw new InvalidStateStoreException(STORE_UNAVAILABLE_MESSAGE, e); - } - } - return new EmptyKeyValueIterator(); + final Function>, + KeyValueIterator, ValueAndTimestamp>> fetchFunc = + windowStore -> fetchAllUncached(windowStore, lower, upper); + return findFirstNonEmptyIterator(stores, fetchFunc); } private static KeyValueIterator, ValueAndTimestamp> @@ -228,6 +197,29 @@ public static KeyValueIterator, ValueAndTimestamp T findFirstNonEmptyIterator( + final List>> stores, + final Function>, T> func + ) { + T result = null; + for (final ReadOnlyWindowStore> store : stores) { + try { + result = func.apply(store); + // returns the first non-empty iterator + if (!result.hasNext()) { + result.close(); + } else { + return result; + } + } catch (final InvalidStateStoreException e) { + throw new InvalidStateStoreException(STORE_UNAVAILABLE_MESSAGE, e); + } + } + return (T) (result instanceof WindowStoreIterator + ? new EmptyWindowStoreIterator() : new EmptyKeyValueIterator()); + } + @SuppressWarnings("unchecked") private static StateSerdes> getSerdes( final ReadOnlyWindowStore> windowStore From 4beccae5a1aded371ab900ca1762c454bf345d9d Mon Sep 17 00:00:00 2001 From: Chittaranjan Prasad Date: Tue, 2 Feb 2021 19:09:11 -0800 Subject: [PATCH 16/23] refactor fetches sessionStore --- .../ks/SessionStoreCacheBypass.java | 61 ++++++++++--------- 1 file changed, 31 insertions(+), 30 deletions(-) diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass.java index 7581e3bc557a..de050cba0c9b 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.function.Function; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; @@ -87,21 +88,10 @@ public static KeyValueIterator, GenericRow> fetch( ) { Objects.requireNonNull(key, "key can't be null"); final List> stores = getStores(store); - for (final ReadOnlySessionStore sessionStore : stores) { - try { - final KeyValueIterator, GenericRow> result - = fetchUncached(sessionStore, key); - // returns the first non-empty iterator - if (!result.hasNext()) { - result.close(); - } else { - return result; - } - } catch (final InvalidStateStoreException e) { - throw new InvalidStateStoreException(STORE_UNAVAILABLE_MESSAGE, e); - } - } - return new EmptyKeyValueIterator(); + final Function, + KeyValueIterator, GenericRow>> fetchFunc + = sessionStore -> fetchUncached(sessionStore, key); + return findFirstNonEmptyIterator(stores, fetchFunc); } private static KeyValueIterator, GenericRow> fetchUncached( @@ -128,21 +118,10 @@ public static KeyValueIterator, GenericRow> fetchRange( Objects.requireNonNull(keyTo, "upper key can't be null"); final List> stores = getStores(store); - for (final ReadOnlySessionStore sessionStore : stores) { - try { - final KeyValueIterator, GenericRow> result - = fetchRangeUncached(sessionStore, keyFrom, keyTo); - // returns the first non-empty iterator - if (!result.hasNext()) { - result.close(); - } else { - return result; - } - } catch (final InvalidStateStoreException e) { - throw new InvalidStateStoreException(STORE_UNAVAILABLE_MESSAGE, e); - } - } - return new EmptyKeyValueIterator(); + final Function, + KeyValueIterator, GenericRow>> fetchFunc + = sessionStore -> fetchRangeUncached(sessionStore, keyFrom, keyTo); + return findFirstNonEmptyIterator(stores, fetchFunc); } private static KeyValueIterator, GenericRow> fetchRangeUncached( @@ -163,6 +142,28 @@ private static KeyValueIterator, GenericRow> fetchRangeUnca } } + private static KeyValueIterator, GenericRow> findFirstNonEmptyIterator( + final List> stores, + final Function, + KeyValueIterator, GenericRow>> fetchFunc + ) { + for (final ReadOnlySessionStore sessionStore : stores) { + try { + final KeyValueIterator, GenericRow> result + = fetchFunc.apply(sessionStore); + // returns the first non-empty iterator + if (!result.hasNext()) { + result.close(); + } else { + return result; + } + } catch (final InvalidStateStoreException e) { + throw new InvalidStateStoreException(STORE_UNAVAILABLE_MESSAGE, e); + } + } + return new EmptyKeyValueIterator(); + } + @SuppressWarnings("unchecked") private static StateSerdes getSerdes( final ReadOnlySessionStore sessionStore From 846c9b456388054daaf8059eb4b0212c38438365 Mon Sep 17 00:00:00 2001 From: Chittaranjan Prasad Date: Tue, 2 Feb 2021 21:25:46 -0800 Subject: [PATCH 17/23] add some tests --- .../ks/KsMaterializedSessionTable.java | 4 +-- .../ks/SessionStoreCacheBypassTest.java | 19 +++++++++- .../ks/WindowStoreCacheBypassTest.java | 35 ++++++++++++++++++- 3 files changed, 53 insertions(+), 5 deletions(-) diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java index b4c8ca8b9066..c2759768ed5e 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java @@ -50,9 +50,7 @@ class KsMaterializedSessionTable implements MaterializedWindowedTable { final SessionStoreCacheBypassFetcherRange cacheBypassFetcherRange) { this.stateStore = Objects.requireNonNull(store, "store"); this.cacheBypassFetcher = Objects.requireNonNull(cacheBypassFetcher, "cacheBypassFetcher"); - this.cacheBypassFetcherRange = Objects.requireNonNull( - cacheBypassFetcherRange, - "cacheBypassFetcherRange"); + this.cacheBypassFetcherRange = Objects.requireNonNull(cacheBypassFetcherRange, "cacheBypassFetcherRange"); } @Override diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypassTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypassTest.java index b2f2acec340a..be19195685e2 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypassTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypassTest.java @@ -50,7 +50,10 @@ public class SessionStoreCacheBypassTest { private static final GenericKey SOME_KEY = GenericKey.genericKey(1); + private static final GenericKey SOME_OTHER_KEY = GenericKey.genericKey(2); + private static final byte[] BYTES = new byte[] {'a', 'b'}; + private static final byte[] OTHER_BYTES = new byte[] {'c', 'd'}; @Mock private QueryableStoreType> @@ -78,7 +81,7 @@ public void setUp() { } @Test - public void shouldCallUnderlyingStore() throws IllegalAccessException { + public void shouldCallUnderlyingStoreSingleKey() throws IllegalAccessException { when(provider.stores(any(), any())).thenReturn(ImmutableList.of(meteredSessionStore)); SERDES_FIELD.set(meteredSessionStore, serdes); when(serdes.rawKey(any())).thenReturn(BYTES); @@ -91,6 +94,20 @@ public void shouldCallUnderlyingStore() throws IllegalAccessException { verify(sessionStore).fetch(new Bytes(BYTES)); } + @Test + public void shouldCallUnderlyingStoreRangeQuery() throws IllegalAccessException { + when(provider.stores(any(), any())).thenReturn(ImmutableList.of(meteredSessionStore)); + SERDES_FIELD.set(meteredSessionStore, serdes); + when(serdes.rawKey(any())).thenReturn(BYTES, OTHER_BYTES); + when(meteredSessionStore.wrapped()).thenReturn(wrappedSessionStore); + when(wrappedSessionStore.wrapped()).thenReturn(sessionStore); + when(sessionStore.fetch(any(), any())).thenReturn(storeIterator); + when(storeIterator.hasNext()).thenReturn(false); + + SessionStoreCacheBypass.fetchRange(store, SOME_KEY, SOME_OTHER_KEY); + verify(sessionStore).fetch(new Bytes(BYTES), new Bytes(OTHER_BYTES)); + } + @Test public void shouldAvoidNonSessionStore() throws IllegalAccessException { when(provider.stores(any(), any())).thenReturn(ImmutableList.of(meteredSessionStore)); diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypassTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypassTest.java index 2d19bb34421b..b8624f77c540 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypassTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypassTest.java @@ -16,6 +16,8 @@ package io.confluent.ksql.execution.streams.materialization.ks; import static io.confluent.ksql.execution.streams.materialization.ks.WindowStoreCacheBypass.SERDES_FIELD; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.state.KeyValueIterator; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertThrows; @@ -50,7 +52,9 @@ public class WindowStoreCacheBypassTest { private static final GenericKey SOME_KEY = GenericKey.genericKey(1); + private static final GenericKey SOME_OTHER_KEY = GenericKey.genericKey(2); private static final byte[] BYTES = new byte[] {'a', 'b'}; + private static final byte[] OTHER_BYTES = new byte[] {'c', 'd'}; @Mock private QueryableStoreType>> @@ -68,6 +72,8 @@ public class WindowStoreCacheBypassTest { @Mock private WindowStoreIterator windowStoreIterator; @Mock + private KeyValueIterator, byte[]> keyValueIterator; + @Mock private StateSerdes> serdes; private CompositeReadOnlyWindowStore> store; @@ -78,7 +84,7 @@ public void setUp() { } @Test - public void shouldCallUnderlyingStore() throws IllegalAccessException { + public void shouldCallUnderlyingStoreSingleKey() throws IllegalAccessException { when(provider.stores(any(), any())).thenReturn(ImmutableList.of(meteredWindowStore)); SERDES_FIELD.set(meteredWindowStore, serdes); when(serdes.rawKey(any())).thenReturn(BYTES); @@ -91,6 +97,33 @@ public void shouldCallUnderlyingStore() throws IllegalAccessException { verify(windowStore).fetch(new Bytes(BYTES), Instant.ofEpochMilli(100L), Instant.ofEpochMilli(200L)); } + @Test + public void shouldCallUnderlyingStoreForRangeQuery() throws IllegalAccessException { + when(provider.stores(any(), any())).thenReturn(ImmutableList.of(meteredWindowStore)); + SERDES_FIELD.set(meteredWindowStore, serdes); + when(serdes.rawKey(any())).thenReturn(BYTES, OTHER_BYTES); + when(meteredWindowStore.wrapped()).thenReturn(wrappedWindowStore); + when(wrappedWindowStore.wrapped()).thenReturn(windowStore); + when(windowStore.fetch(any(), any(), any(), any())).thenReturn(keyValueIterator); + when(keyValueIterator.hasNext()).thenReturn(false); + + WindowStoreCacheBypass.fetchRange(store, SOME_KEY, SOME_OTHER_KEY, Instant.ofEpochMilli(100), Instant.ofEpochMilli(200)); + verify(windowStore).fetch(new Bytes(BYTES), new Bytes(OTHER_BYTES), Instant.ofEpochMilli(100L), Instant.ofEpochMilli(200L)); + } + + @Test + public void shouldCallUnderlyingStoreForTableScans() throws IllegalAccessException { + when(provider.stores(any(), any())).thenReturn(ImmutableList.of(meteredWindowStore)); + SERDES_FIELD.set(meteredWindowStore, serdes); + when(meteredWindowStore.wrapped()).thenReturn(wrappedWindowStore); + when(wrappedWindowStore.wrapped()).thenReturn(windowStore); + when(windowStore.fetchAll(any(), any())).thenReturn(keyValueIterator); + when(keyValueIterator.hasNext()).thenReturn(false); + + WindowStoreCacheBypass.fetchAll(store, Instant.ofEpochMilli(100), Instant.ofEpochMilli(200)); + verify(windowStore).fetchAll(Instant.ofEpochMilli(100L), Instant.ofEpochMilli(200L)); + } + @Test public void shouldAvoidNonWindowStore() throws IllegalAccessException { when(provider.stores(any(), any())).thenReturn(ImmutableList.of(meteredWindowStore)); From 8c2237c68d7fe5406176c5a00f290d660ae71d59 Mon Sep 17 00:00:00 2001 From: Chittaranjan Prasad Date: Tue, 2 Feb 2021 21:55:49 -0800 Subject: [PATCH 18/23] add some comments --- .../ks/SessionStoreCacheBypass.java | 12 +++++++++ .../ks/WindowStoreCacheBypass.java | 25 +++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass.java index de050cba0c9b..b0c575b6bddb 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass.java @@ -82,6 +82,9 @@ KeyValueIterator, GenericRow> fetchRange( ); } + /* + This method is used for single key lookups. It calls the fetchUncached method. + */ public static KeyValueIterator, GenericRow> fetch( final ReadOnlySessionStore store, final GenericKey key @@ -94,6 +97,9 @@ public static KeyValueIterator, GenericRow> fetch( return findFirstNonEmptyIterator(stores, fetchFunc); } + /* + This method is used for single key lookups. It is invoked by the fetch method + */ private static KeyValueIterator, GenericRow> fetchUncached( final ReadOnlySessionStore sessionStore, final GenericKey key @@ -109,6 +115,9 @@ private static KeyValueIterator, GenericRow> fetchUncached( } } + /* + This method is used for range queries. It calls the fetchRangeUncached method. + */ public static KeyValueIterator, GenericRow> fetchRange( final ReadOnlySessionStore store, final GenericKey keyFrom, @@ -124,6 +133,9 @@ public static KeyValueIterator, GenericRow> fetchRange( return findFirstNonEmptyIterator(stores, fetchFunc); } + /* + This method is used for range queries. It is invoked by the fetchRange method + */ private static KeyValueIterator, GenericRow> fetchRangeUncached( final ReadOnlySessionStore sessionStore, final GenericKey keyFrom, diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java index 1a4fdf28ee84..a34848bc4764 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java @@ -101,6 +101,9 @@ KeyValueIterator, ValueAndTimestamp> fetchRange } + /* + This method is used for single key lookups. It calls the fetchUncached method. + */ public static WindowStoreIterator> fetch( final ReadOnlyWindowStore> store, final GenericKey key, @@ -116,6 +119,9 @@ public static WindowStoreIterator> fetch( return findFirstNonEmptyIterator(stores, fetchFunc); } + /* + This method is used for single key lookups. It is invoked by the fetch method + */ private static WindowStoreIterator> fetchUncached( final ReadOnlyWindowStore> windowStore, final GenericKey key, @@ -132,6 +138,9 @@ private static WindowStoreIterator> fetchUncached( return new DeserializingIterator(fetch, serdes); } + /* + This method is used for range queries. It calls the fetchRangeUncached method. + */ public static KeyValueIterator, ValueAndTimestamp> fetchRange( final ReadOnlyWindowStore> store, final GenericKey keyFrom, @@ -149,6 +158,9 @@ public static KeyValueIterator, ValueAndTimestamp, ValueAndTimestamp> fetchRangeUncached( final ReadOnlyWindowStore> windowStore, @@ -169,6 +181,9 @@ public static KeyValueIterator, ValueAndTimestamp, ValueAndTimestamp> fetchAll( final ReadOnlyWindowStore> store, final Instant lower, @@ -182,6 +197,9 @@ public static KeyValueIterator, ValueAndTimestamp, ValueAndTimestamp> fetchAllUncached( final ReadOnlyWindowStore> windowStore, @@ -269,6 +287,10 @@ private static List, ValueAndTimestamp> { private final KeyValueIterator, byte[]> fetch; @@ -306,6 +328,9 @@ public KeyValue, ValueAndTimestamp> next() { } } + /* + This iterator is used for key lookups in the fetchUncached method + */ private static final class DeserializingIterator implements WindowStoreIterator> { private final WindowStoreIterator fetch; From 7df1b6ce9796044c9264fc7aac460e16970c9ab9 Mon Sep 17 00:00:00 2001 From: Chittaranjan Prasad Date: Tue, 2 Feb 2021 22:01:59 -0800 Subject: [PATCH 19/23] FIX CHECKSTYLE --- .../streams/materialization/ks/KsMaterializedSessionTable.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java index c2759768ed5e..de90d34b3e8e 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java @@ -50,7 +50,8 @@ class KsMaterializedSessionTable implements MaterializedWindowedTable { final SessionStoreCacheBypassFetcherRange cacheBypassFetcherRange) { this.stateStore = Objects.requireNonNull(store, "store"); this.cacheBypassFetcher = Objects.requireNonNull(cacheBypassFetcher, "cacheBypassFetcher"); - this.cacheBypassFetcherRange = Objects.requireNonNull(cacheBypassFetcherRange, "cacheBypassFetcherRange"); + this.cacheBypassFetcherRange = Objects.requireNonNull(cacheBypassFetcherRange, + "cacheBypassFetcherRange"); } @Override From 782a50f9091270ca144b067f5fd0c97c647a77f9 Mon Sep 17 00:00:00 2001 From: Chittaranjan Prasad Date: Tue, 2 Feb 2021 23:39:28 -0800 Subject: [PATCH 20/23] experimental message change --- .../src/test/resources/query-validation-tests/group-by.json | 2 +- .../src/test/resources/query-validation-tests/partition-by.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/group-by.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/group-by.json index 7fc7417072f9..2c2653713f43 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/group-by.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/group-by.json @@ -805,7 +805,7 @@ ], "expectedException": { "type": "io.confluent.ksql.parser.exception.ParseFailedException", - "message": "mismatched input 'AND' expecting ';'" + "message": "mismatched input 'AND' expecting {';', ',', '[', 'EMIT', 'HAVING', 'LIMIT', 'AT', '+', '-', '*', '/', '%', '||', '->'}" } }, { diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json index e59e1c121265..102ea0b93b59 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json @@ -397,7 +397,7 @@ ], "expectedException": { "type": "io.confluent.ksql.parser.exception.ParseFailedException", - "message": "mismatched input 'AND' expecting ';'" + "message": "mismatched input 'AND' expecting {';', ',', '[', 'EMIT', 'HAVING', 'LIMIT', 'AT', '+', '-', '*', '/', '%', '||', '->'}" } }, { From 1b445cdf4b260d230b71537c44166c77f349765b Mon Sep 17 00:00:00 2001 From: Chittaranjan Prasad Date: Wed, 3 Feb 2021 00:52:34 -0800 Subject: [PATCH 21/23] experimental message change 2 --- .../src/test/resources/query-validation-tests/group-by.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/group-by.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/group-by.json index 2c2653713f43..fb5613789c8c 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/group-by.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/group-by.json @@ -805,7 +805,7 @@ ], "expectedException": { "type": "io.confluent.ksql.parser.exception.ParseFailedException", - "message": "mismatched input 'AND' expecting {';', ',', '[', 'EMIT', 'HAVING', 'LIMIT', 'AT', '+', '-', '*', '/', '%', '||', '->'}" + "message": "mismatched input 'AND' expecting {';', ',', '[', 'EMIT', 'HAVING', 'LIMIT', 'AT', 'PARTITION', '+', '-', '*', '/', '%', '||', '->'}" } }, { From 1d488d0d3b7e9853db096404c86f49a8e20e3320 Mon Sep 17 00:00:00 2001 From: Chittaranjan Prasad Date: Wed, 10 Feb 2021 15:15:50 -0800 Subject: [PATCH 22/23] fix merge conflict --- .../src/test/resources/query-validation-tests/group-by.json | 2 +- .../src/test/resources/query-validation-tests/partition-by.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/group-by.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/group-by.json index fb5613789c8c..911ea63398e5 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/group-by.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/group-by.json @@ -805,7 +805,7 @@ ], "expectedException": { "type": "io.confluent.ksql.parser.exception.ParseFailedException", - "message": "mismatched input 'AND' expecting {';', ',', '[', 'EMIT', 'HAVING', 'LIMIT', 'AT', 'PARTITION', '+', '-', '*', '/', '%', '||', '->'}" + "message": "mismatched input 'AND' expecting {';'" } }, { diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json index 102ea0b93b59..904513bf8f02 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json @@ -397,7 +397,7 @@ ], "expectedException": { "type": "io.confluent.ksql.parser.exception.ParseFailedException", - "message": "mismatched input 'AND' expecting {';', ',', '[', 'EMIT', 'HAVING', 'LIMIT', 'AT', '+', '-', '*', '/', '%', '||', '->'}" + "message": "mismatched input 'AND' expecting {';'" } }, { From 0cf19ddb721b8ba26c0aaa25597aef2004f26291 Mon Sep 17 00:00:00 2001 From: Chittaranjan Prasad Date: Wed, 10 Feb 2021 16:27:58 -0800 Subject: [PATCH 23/23] kick off another build --- .../streams/materialization/ks/WindowStoreCacheBypass.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java index a34848bc4764..3cf8f2033a80 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java @@ -198,7 +198,7 @@ public static KeyValueIterator, ValueAndTimestamp, ValueAndTimestamp> fetchAllUncached(