Skip to content

Commit

Permalink
fix: Removing reverted configuration org.apache.kafka.streams.Streams… (
Browse files Browse the repository at this point in the history
#9248)

* fix: Removing reverted configuration org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG

The configuration org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG was reverted in Apache Kafka.
  • Loading branch information
jnh5y authored Jul 9, 2022
1 parent 8fdd56f commit bde8f40
Show file tree
Hide file tree
Showing 5 changed files with 4 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

package io.confluent.ksql.api.auth;

import io.confluent.ksql.api.server.Server;
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.api.server.Server;
import io.confluent.ksql.security.KsqlPrincipal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,9 @@ public static List<String> makePullQueryWsRequest(
sql,
Optional.of(mediaType),
Optional.of(contentType),
Optional.of(credentials)
Optional.of(credentials),
Optional.empty(),
Optional.empty()
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,38 +375,6 @@ public void shouldQueryWS() throws Exception {
assertThat(rows_0, hasSize(HEADER + 2));
}

@Test
public void shouldQueryWS() throws Exception {
// Given:
ClusterFormation clusterFormation = findClusterFormation(TEST_APP_0, TEST_APP_1, TEST_APP_2);
waitForClusterToBeDiscovered(clusterFormation.standBy.getApp(), 3, USER_CREDS);
waitForRemoteServerToChangeStatus(clusterFormation.router.getApp(),
clusterFormation.router.getHost(), HighAvailabilityTestUtil.lagsReported(3), USER_CREDS);

waitForRemoteServerToChangeStatus(
clusterFormation.standBy.getApp(),
clusterFormation.active.getHost(),
HighAvailabilityTestUtil::remoteServerIsUp,
USER_CREDS);

waitForRemoteServerToChangeStatus(
clusterFormation.router.getApp(),
clusterFormation.standBy.getHost(),
HighAvailabilityTestUtil::remoteServerIsUp,
USER_CREDS);


final Credentials credentials = new Credentials(USER_WITH_ACCESS, USER_WITH_ACCESS_PWD);
// When:
List<String> rows_0 =
makePullQueryWsRequest(clusterFormation.router.getApp().getWsListener(), sql, "", "", credentials, Optional.empty(), Optional.empty());

// Then:
// websocket pull query returns a header, the value row, and an error row indicating that it is done
assertThat(rows_0, hasSize(HEADER + 2));
}


public void shouldQueryActiveWhenActiveAliveStandbyDeadQueryIssuedToRouter() {
// Given:
ClusterFormation clusterFormation = findClusterFormation(TEST_APP_0, TEST_APP_1, TEST_APP_2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static io.netty.handler.codec.http.HttpHeaderNames.ACCEPT;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
import static io.vertx.core.http.HttpHeaders.AUTHORIZATION;
import static io.vertx.core.http.HttpMethod.POST;
import static io.vertx.core.http.HttpVersion.HTTP_1_1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import static org.apache.kafka.streams.StreamsConfig.POLL_MS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.PROCESSING_GUARANTEE_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.WINDOW_SIZE_MS_CONFIG;

Expand Down Expand Up @@ -87,7 +86,6 @@ public class PropertiesList extends KsqlEntity {
KSQL_QUERY_PULL_TABLE_SCAN_ENABLED,
KSQL_TIMESTAMP_THROW_ON_INVALID,
MAX_TASK_IDLE_MS_CONFIG,
STATESTORE_CACHE_MAX_BYTES_CONFIG,
TASK_TIMEOUT_MS_CONFIG
);

Expand Down

0 comments on commit bde8f40

Please sign in to comment.