Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis sink flushes only rows that have more recent eventTimestamp #913

Merged
merged 11 commits into from
Aug 1, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion storage/connectors/redis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,12 @@
<version>4.12</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-protobuf</artifactId>
<version>${org.apache.beam.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.storage.connectors.redis.writer;

import feast.storage.common.retry.Retriable;
import io.lettuce.core.RedisException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Function;
import org.apache.beam.sdk.transforms.DoFn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Base class for redis-related DoFns. Assumes that operations will be batched. Prepares redisClient
* on DoFn.Setup stage and close it on DoFn.Teardown stage.
*
* @param <Input>
* @param <Output>
*/
public class BatchDoFnWithRedis<Input, Output> extends DoFn<Input, Output> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be only one extension of this base class (RedisCustomIO.WriteDoFn). Is this base class necessary?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanted to separate redis wrapper code from business logic

private static final Logger log = LoggerFactory.getLogger(BatchDoFnWithRedis.class);

private final RedisIngestionClient redisIngestionClient;

BatchDoFnWithRedis(RedisIngestionClient redisIngestionClient) {
this.redisIngestionClient = redisIngestionClient;
}

@Setup
public void setup() {
this.redisIngestionClient.setup();
}

@StartBundle
public void startBundle() {
try {
redisIngestionClient.connect();
} catch (RedisException e) {
log.error("Connection to redis cannot be established: %s", e);
}
}

void executeBatch(Function<RedisIngestionClient, Iterable<Future<? extends Object>>> executor)
throws Exception {
this.redisIngestionClient
.getBackOffExecutor()
.execute(
new Retriable() {
@Override
public void execute() throws ExecutionException, InterruptedException {
if (!redisIngestionClient.isConnected()) {
redisIngestionClient.connect();
}

Iterable<Future<?>> futures = executor.apply(redisIngestionClient);
redisIngestionClient.sync(futures);
}

@Override
public Boolean isExceptionRetriable(Exception e) {
return e instanceof RedisException;
}

@Override
public void cleanUpAfterFailure() {}
});
}

@Teardown
public void teardown() {
redisIngestionClient.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
import feast.proto.core.StoreProto;
import feast.storage.common.retry.BackOffExecutor;
import io.lettuce.core.LettuceFutures;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.RedisURI;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands;
import io.lettuce.core.codec.ByteArrayCodec;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.joda.time.Duration;
Expand All @@ -39,7 +40,6 @@ public class RedisClusterIngestionClient implements RedisIngestionClient {
private transient RedisClusterClient clusterClient;
private StatefulRedisClusterConnection<byte[], byte[]> connection;
private RedisAdvancedClusterAsyncCommands<byte[], byte[]> commands;
private List<RedisFuture> futures = Lists.newArrayList();

public RedisClusterIngestionClient(StoreProto.Store.RedisClusterConfig redisClusterConfig) {
this.uriList =
Expand All @@ -55,7 +55,6 @@ public RedisClusterIngestionClient(StoreProto.Store.RedisClusterConfig redisClus
redisClusterConfig.getInitialBackoffMs() > 0 ? redisClusterConfig.getInitialBackoffMs() : 1;
this.backOffExecutor =
new BackOffExecutor(redisClusterConfig.getMaxRetries(), Duration.millis(backoffMs));
this.clusterClient = RedisClusterClient.create(uriList);
}

@Override
Expand All @@ -78,6 +77,10 @@ public void connect() {
if (!isConnected()) {
this.connection = clusterClient.connect(new ByteArrayCodec());
this.commands = connection.async();

// despite we're using async API client still flushes after each command by default
// which we don't want since we produce all commands in batches
this.commands.setAutoFlushCommands(false);
}
}

Expand All @@ -87,46 +90,20 @@ public boolean isConnected() {
}

@Override
public void sync() {
try {
LettuceFutures.awaitAll(60, TimeUnit.SECONDS, futures.toArray(new RedisFuture[0]));
} finally {
futures.clear();
}
}

@Override
public void pexpire(byte[] key, Long expiryMillis) {
futures.add(commands.pexpire(key, expiryMillis));
}

@Override
public void append(byte[] key, byte[] value) {
futures.add(commands.append(key, value));
}

@Override
public void set(byte[] key, byte[] value) {
futures.add(commands.set(key, value));
}
public void sync(Iterable<Future<?>> futures) {
this.connection.flushCommands();

@Override
public void lpush(byte[] key, byte[] value) {
futures.add(commands.lpush(key, value));
}

@Override
public void rpush(byte[] key, byte[] value) {
futures.add(commands.rpush(key, value));
LettuceFutures.awaitAll(
60, TimeUnit.SECONDS, Lists.newArrayList(futures).toArray(new Future[0]));
}

@Override
public void sadd(byte[] key, byte[] value) {
futures.add(commands.sadd(key, value));
public CompletableFuture<String> set(byte[] key, byte[] value) {
return commands.set(key, value).toCompletableFuture();
}

@Override
public void zadd(byte[] key, Long score, byte[] value) {
futures.add(commands.zadd(key, score, value));
public CompletableFuture<byte[]> get(byte[] key) {
return commands.get(key).toCompletableFuture();
}
}
Loading