From 8816c0159326f219c13e587cd19b87b2f4683978 Mon Sep 17 00:00:00 2001 From: pyalex Date: Thu, 28 May 2020 13:42:25 +0300 Subject: [PATCH] redis-cluster is being merged into redis styling handle null case styling --- docs/coverage/java/pom.xml | 6 - ingestion/pom.xml | 6 - .../java/feast/ingestion/utils/StoreUtil.java | 3 +- serving/pom.xml | 6 - .../serving/config/ServingServiceConfig.java | 2 +- storage/connectors/pom.xml | 1 - storage/connectors/redis/pom.xml | 7 + .../RedisClusterOnlineRetriever.java | 2 +- .../writer/RedisClusterIngestionClient.java | 2 +- .../redis/writer/RedisCustomIO.java | 28 +- .../redis/writer/RedisFeatureSink.java | 49 ++- .../RedisClusterOnlineRetrieverTest.java | 2 +- .../writer/RedisClusterFeatureSinkTest.java | 10 +- storage/connectors/rediscluster/pom.xml | 81 ----- .../retriever/FeatureRowDecoder.java | 82 ----- .../writer/RedisClusterCustomIO.java | 296 ------------------ .../writer/RedisClusterFeatureSink.java | 75 ----- .../writer/RedisIngestionClient.java | 49 --- 18 files changed, 70 insertions(+), 637 deletions(-) rename storage/connectors/{rediscluster/src/main/java/feast/storage/connectors/rediscluster => redis/src/main/java/feast/storage/connectors/redis}/retriever/RedisClusterOnlineRetriever.java (99%) rename storage/connectors/{rediscluster/src/main/java/feast/storage/connectors/rediscluster => redis/src/main/java/feast/storage/connectors/redis}/writer/RedisClusterIngestionClient.java (98%) rename storage/connectors/{rediscluster/src/test/java/feast/storage/connectors/rediscluster => redis/src/test/java/feast/storage/connectors/redis}/retriever/RedisClusterOnlineRetrieverTest.java (99%) rename storage/connectors/{rediscluster/src/test/java/feast/storage/connectors/rediscluster => redis/src/test/java/feast/storage/connectors/redis}/writer/RedisClusterFeatureSinkTest.java (98%) delete mode 100644 storage/connectors/rediscluster/pom.xml delete mode 100644 storage/connectors/rediscluster/src/main/java/feast/storage/connectors/rediscluster/retriever/FeatureRowDecoder.java delete mode 100644 storage/connectors/rediscluster/src/main/java/feast/storage/connectors/rediscluster/writer/RedisClusterCustomIO.java delete mode 100644 storage/connectors/rediscluster/src/main/java/feast/storage/connectors/rediscluster/writer/RedisClusterFeatureSink.java delete mode 100644 storage/connectors/rediscluster/src/main/java/feast/storage/connectors/rediscluster/writer/RedisIngestionClient.java diff --git a/docs/coverage/java/pom.xml b/docs/coverage/java/pom.xml index 05ffaf374e..9666b7092c 100644 --- a/docs/coverage/java/pom.xml +++ b/docs/coverage/java/pom.xml @@ -59,12 +59,6 @@ ${project.version} - - dev.feast - feast-storage-connector-redis-cluster - ${project.version} - - dev.feast feast-ingestion diff --git a/ingestion/pom.xml b/ingestion/pom.xml index a004c6d9df..36d5cd6269 100644 --- a/ingestion/pom.xml +++ b/ingestion/pom.xml @@ -126,12 +126,6 @@ ${project.version} - - dev.feast - feast-storage-connector-redis-cluster - ${project.version} - - dev.feast feast-storage-connector-bigquery diff --git a/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java b/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java index 54323869cf..4efec994f7 100644 --- a/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java +++ b/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java @@ -26,7 +26,6 @@ import feast.storage.api.writer.FeatureSink; import feast.storage.connectors.bigquery.writer.BigQueryFeatureSink; import feast.storage.connectors.redis.writer.RedisFeatureSink; -import feast.storage.connectors.rediscluster.writer.RedisClusterFeatureSink; import java.util.HashMap; import java.util.Map; import org.slf4j.Logger; @@ -84,7 +83,7 @@ public static FeatureSink getFeatureSink( StoreType storeType = store.getType(); switch (storeType) { case REDIS_CLUSTER: - return RedisClusterFeatureSink.fromConfig(store.getRedisClusterConfig(), featureSetSpecs); + return RedisFeatureSink.fromConfig(store.getRedisClusterConfig(), featureSetSpecs); case REDIS: return RedisFeatureSink.fromConfig(store.getRedisConfig(), featureSetSpecs); case BIGQUERY: diff --git a/serving/pom.xml b/serving/pom.xml index c7a30166f6..8304899b51 100644 --- a/serving/pom.xml +++ b/serving/pom.xml @@ -110,12 +110,6 @@ ${project.version} - - dev.feast - feast-storage-connector-redis-cluster - ${project.version} - - dev.feast feast-storage-connector-bigquery diff --git a/serving/src/main/java/feast/serving/config/ServingServiceConfig.java b/serving/src/main/java/feast/serving/config/ServingServiceConfig.java index 46ffdb2fcc..41a92e4bc6 100644 --- a/serving/src/main/java/feast/serving/config/ServingServiceConfig.java +++ b/serving/src/main/java/feast/serving/config/ServingServiceConfig.java @@ -28,8 +28,8 @@ import feast.storage.api.retriever.HistoricalRetriever; import feast.storage.api.retriever.OnlineRetriever; import feast.storage.connectors.bigquery.retriever.BigQueryHistoricalRetriever; +import feast.storage.connectors.redis.retriever.RedisClusterOnlineRetriever; import feast.storage.connectors.redis.retriever.RedisOnlineRetriever; -import feast.storage.connectors.rediscluster.retriever.RedisClusterOnlineRetriever; import io.opentracing.Tracer; import java.util.Map; import org.slf4j.Logger; diff --git a/storage/connectors/pom.xml b/storage/connectors/pom.xml index 280b0d40bf..6cd949acfd 100644 --- a/storage/connectors/pom.xml +++ b/storage/connectors/pom.xml @@ -16,7 +16,6 @@ redis - rediscluster bigquery diff --git a/storage/connectors/redis/pom.xml b/storage/connectors/redis/pom.xml index 6c50895bd2..2a19c0e828 100644 --- a/storage/connectors/redis/pom.xml +++ b/storage/connectors/redis/pom.xml @@ -70,6 +70,13 @@ test + + net.ishiis.redis + redis-unit + 1.0.3 + test + + junit diff --git a/storage/connectors/rediscluster/src/main/java/feast/storage/connectors/rediscluster/retriever/RedisClusterOnlineRetriever.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterOnlineRetriever.java similarity index 99% rename from storage/connectors/rediscluster/src/main/java/feast/storage/connectors/rediscluster/retriever/RedisClusterOnlineRetriever.java rename to storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterOnlineRetriever.java index f80de0fc8a..18619252c2 100644 --- a/storage/connectors/rediscluster/src/main/java/feast/storage/connectors/rediscluster/retriever/RedisClusterOnlineRetriever.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterOnlineRetriever.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package feast.storage.connectors.rediscluster.retriever; +package feast.storage.connectors.redis.retriever; import com.google.protobuf.AbstractMessageLite; import com.google.protobuf.InvalidProtocolBufferException; diff --git a/storage/connectors/rediscluster/src/main/java/feast/storage/connectors/rediscluster/writer/RedisClusterIngestionClient.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisClusterIngestionClient.java similarity index 98% rename from storage/connectors/rediscluster/src/main/java/feast/storage/connectors/rediscluster/writer/RedisClusterIngestionClient.java rename to storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisClusterIngestionClient.java index 5bd70d08b4..389db4be3a 100644 --- a/storage/connectors/rediscluster/src/main/java/feast/storage/connectors/rediscluster/writer/RedisClusterIngestionClient.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisClusterIngestionClient.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package feast.storage.connectors.rediscluster.writer; +package feast.storage.connectors.redis.writer; import com.google.common.collect.Lists; import feast.proto.core.StoreProto; diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java index eeac5b3d96..b7fd68755c 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java @@ -19,7 +19,7 @@ import feast.proto.core.FeatureSetProto.EntitySpec; import feast.proto.core.FeatureSetProto.FeatureSetSpec; import feast.proto.core.FeatureSetProto.FeatureSpec; -import feast.proto.core.StoreProto.Store.RedisConfig; +import feast.proto.core.StoreProto.Store.*; import feast.proto.storage.RedisProto.RedisKey; import feast.proto.storage.RedisProto.RedisKey.Builder; import feast.proto.types.FeatureRowProto.FeatureRow; @@ -28,7 +28,7 @@ import feast.storage.api.writer.FailedElement; import feast.storage.api.writer.WriteResult; import feast.storage.common.retry.Retriable; -import io.lettuce.core.RedisConnectionException; +import io.lettuce.core.RedisException; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -63,21 +63,22 @@ public class RedisCustomIO { private RedisCustomIO() {} - public static Write write(RedisConfig redisConfig, Map featureSetSpecs) { - return new Write(redisConfig, featureSetSpecs); + public static Write write( + RedisIngestionClient redisIngestionClient, Map featureSetSpecs) { + return new Write(redisIngestionClient, featureSetSpecs); } /** ServingStoreWrite data to a Redis server. */ public static class Write extends PTransform, WriteResult> { private Map featureSetSpecs; - private RedisConfig redisConfig; + private RedisIngestionClient redisIngestionClient; private int batchSize; private int timeout; - public Write(RedisConfig redisConfig, Map featureSetSpecs) { - - this.redisConfig = redisConfig; + public Write( + RedisIngestionClient redisIngestionClient, Map featureSetSpecs) { + this.redisIngestionClient = redisIngestionClient; this.featureSetSpecs = featureSetSpecs; } @@ -95,7 +96,7 @@ public Write withTimeout(int timeout) { public WriteResult expand(PCollection input) { PCollectionTuple redisWrite = input.apply( - ParDo.of(new WriteDoFn(redisConfig, featureSetSpecs)) + ParDo.of(new WriteDoFn(redisIngestionClient, featureSetSpecs)) .withOutputTags(successfulInsertsTag, TupleTagList.of(failedInsertsTupleTag))); return WriteResult.in( input.getPipeline(), @@ -111,9 +112,10 @@ public static class WriteDoFn extends DoFn { private int timeout = DEFAULT_TIMEOUT; private RedisIngestionClient redisIngestionClient; - WriteDoFn(RedisConfig config, Map featureSetSpecs) { + WriteDoFn( + RedisIngestionClient redisIngestionClient, Map featureSetSpecs) { - this.redisIngestionClient = new RedisStandaloneIngestionClient(config); + this.redisIngestionClient = redisIngestionClient; this.featureSetSpecs = featureSetSpecs; } @@ -140,7 +142,7 @@ public void setup() { public void startBundle() { try { redisIngestionClient.connect(); - } catch (RedisConnectionException e) { + } catch (RedisException e) { log.error("Connection to redis cannot be established ", e); } featureRows.clear(); @@ -165,7 +167,7 @@ public void execute() throws ExecutionException, InterruptedException { @Override public Boolean isExceptionRetriable(Exception e) { - return e instanceof RedisConnectionException; + return e instanceof RedisException; } @Override diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisFeatureSink.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisFeatureSink.java index 171f17daac..3ddaef7779 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisFeatureSink.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisFeatureSink.java @@ -20,6 +20,7 @@ import feast.proto.core.FeatureSetProto.FeatureSet; import feast.proto.core.FeatureSetProto.FeatureSetSpec; import feast.proto.core.StoreProto; +import feast.proto.core.StoreProto.Store.RedisClusterConfig; import feast.proto.core.StoreProto.Store.RedisConfig; import feast.proto.types.FeatureRowProto.FeatureRow; import feast.storage.api.writer.FeatureSink; @@ -28,6 +29,7 @@ import io.lettuce.core.RedisConnectionException; import io.lettuce.core.RedisURI; import java.util.Map; +import javax.annotation.Nullable; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; @@ -46,8 +48,17 @@ public static FeatureSink fromConfig( return builder().setFeatureSetSpecs(featureSetSpecs).setRedisConfig(redisConfig).build(); } + public static FeatureSink fromConfig( + RedisClusterConfig redisConfig, Map featureSetSpecs) { + return builder().setFeatureSetSpecs(featureSetSpecs).setRedisClusterConfig(redisConfig).build(); + } + + @Nullable public abstract RedisConfig getRedisConfig(); + @Nullable + public abstract RedisClusterConfig getRedisClusterConfig(); + public abstract Map getFeatureSetSpecs(); public abstract Builder toBuilder(); @@ -60,6 +71,8 @@ public static Builder builder() { public abstract static class Builder { public abstract Builder setRedisConfig(RedisConfig redisConfig); + public abstract Builder setRedisClusterConfig(RedisClusterConfig redisConfig); + public abstract Builder setFeatureSetSpecs(Map featureSetSpecs); public abstract RedisFeatureSink build(); @@ -67,22 +80,36 @@ public abstract static class Builder { @Override public void prepareWrite(FeatureSet featureSet) { - - RedisClient redisClient = - RedisClient.create(RedisURI.create(getRedisConfig().getHost(), getRedisConfig().getPort())); - try { - redisClient.connect(); - } catch (RedisConnectionException e) { + if (getRedisConfig() != null) { + RedisClient redisClient = + RedisClient.create( + RedisURI.create(getRedisConfig().getHost(), getRedisConfig().getPort())); + try { + redisClient.connect(); + } catch (RedisConnectionException e) { + throw new RuntimeException( + String.format( + "Failed to connect to Redis at host: '%s' port: '%d'. Please check that your Redis is running and accessible from Feast.", + getRedisConfig().getHost(), getRedisConfig().getPort())); + } + redisClient.shutdown(); + } else if (getRedisClusterConfig() == null) { throw new RuntimeException( - String.format( - "Failed to connect to Redis at host: '%s' port: '%d'. Please check that your Redis is running and accessible from Feast.", - getRedisConfig().getHost(), getRedisConfig().getPort())); + "At least one RedisConfig or RedisClusterConfig must be provided to Redis Sink"); } - redisClient.shutdown(); } @Override public PTransform, WriteResult> writer() { - return new RedisCustomIO.Write(getRedisConfig(), getFeatureSetSpecs()); + if (getRedisClusterConfig() != null) { + return new RedisCustomIO.Write( + new RedisClusterIngestionClient(getRedisClusterConfig()), getFeatureSetSpecs()); + } else if (getRedisConfig() != null) { + return new RedisCustomIO.Write( + new RedisStandaloneIngestionClient(getRedisConfig()), getFeatureSetSpecs()); + } else { + throw new RuntimeException( + "At least one RedisConfig or RedisClusterConfig must be provided to Redis Sink"); + } } } diff --git a/storage/connectors/rediscluster/src/test/java/feast/storage/connectors/rediscluster/retriever/RedisClusterOnlineRetrieverTest.java b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/retriever/RedisClusterOnlineRetrieverTest.java similarity index 99% rename from storage/connectors/rediscluster/src/test/java/feast/storage/connectors/rediscluster/retriever/RedisClusterOnlineRetrieverTest.java rename to storage/connectors/redis/src/test/java/feast/storage/connectors/redis/retriever/RedisClusterOnlineRetrieverTest.java index deed53e1ae..45842cef6e 100644 --- a/storage/connectors/rediscluster/src/test/java/feast/storage/connectors/rediscluster/retriever/RedisClusterOnlineRetrieverTest.java +++ b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/retriever/RedisClusterOnlineRetrieverTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package feast.storage.connectors.rediscluster.retriever; +package feast.storage.connectors.redis.retriever; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; diff --git a/storage/connectors/rediscluster/src/test/java/feast/storage/connectors/rediscluster/writer/RedisClusterFeatureSinkTest.java b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisClusterFeatureSinkTest.java similarity index 98% rename from storage/connectors/rediscluster/src/test/java/feast/storage/connectors/rediscluster/writer/RedisClusterFeatureSinkTest.java rename to storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisClusterFeatureSinkTest.java index 8c5afd3d67..7bdc9bba2b 100644 --- a/storage/connectors/rediscluster/src/test/java/feast/storage/connectors/rediscluster/writer/RedisClusterFeatureSinkTest.java +++ b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisClusterFeatureSinkTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package feast.storage.connectors.rediscluster.writer; +package feast.storage.connectors.redis.writer; import static feast.storage.common.testing.TestUtil.field; import static org.hamcrest.CoreMatchers.equalTo; @@ -67,7 +67,7 @@ public class RedisClusterFeatureSinkTest { private RedisClusterClient redisClusterClient; private RedisClusterCommands redisClusterCommands; - private RedisClusterFeatureSink redisClusterFeatureSink; + private RedisFeatureSink redisClusterFeatureSink; @Before public void setUp() throws IOException { @@ -123,7 +123,7 @@ public void setUp() throws IOException { .build(); redisClusterFeatureSink = - RedisClusterFeatureSink.builder() + RedisFeatureSink.builder() .setFeatureSetSpecs(specMap) .setRedisClusterConfig(redisClusterConfig) .build(); @@ -141,8 +141,8 @@ static boolean deleteDirectory(File directoryToBeDeleted) { @After public void teardown() { - redisClusterClient.shutdown(); redisCluster.stop(); + redisClusterClient.shutdown(); deleteDirectory(new File(String.valueOf(Paths.get(System.getProperty("user.dir"), ".redis")))); } @@ -259,7 +259,7 @@ public void shouldProduceFailedElementIfRetryExceeded() { .build(); Map specMap = ImmutableMap.of("myproject/fs", spec1); redisClusterFeatureSink = - RedisClusterFeatureSink.builder() + RedisFeatureSink.builder() .setFeatureSetSpecs(specMap) .setRedisClusterConfig(redisClusterConfig) .build(); diff --git a/storage/connectors/rediscluster/pom.xml b/storage/connectors/rediscluster/pom.xml deleted file mode 100644 index 5c3cb6e42d..0000000000 --- a/storage/connectors/rediscluster/pom.xml +++ /dev/null @@ -1,81 +0,0 @@ - - - - dev.feast - feast-storage-connectors - ${revision} - - - 4.0.0 - feast-storage-connector-redis-cluster - - Feast Storage Connector for Redis Cluster - - - - io.lettuce - lettuce-core - - - - org.apache.commons - commons-lang3 - 3.9 - - - - com.google.auto.value - auto-value-annotations - 1.6.6 - - - - com.google.auto.value - auto-value - 1.6.6 - provided - - - - org.mockito - mockito-core - 2.23.0 - test - - - - org.apache.beam - beam-runners-direct-java - ${org.apache.beam.version} - test - - - - org.hamcrest - hamcrest-core - test - - - - org.hamcrest - hamcrest-library - test - - - - net.ishiis.redis - redis-unit - 1.0.3 - test - - - - junit - junit - 4.12 - test - - - - diff --git a/storage/connectors/rediscluster/src/main/java/feast/storage/connectors/rediscluster/retriever/FeatureRowDecoder.java b/storage/connectors/rediscluster/src/main/java/feast/storage/connectors/rediscluster/retriever/FeatureRowDecoder.java deleted file mode 100644 index d398752aa4..0000000000 --- a/storage/connectors/rediscluster/src/main/java/feast/storage/connectors/rediscluster/retriever/FeatureRowDecoder.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2020 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feast.storage.connectors.rediscluster.retriever; - -import feast.proto.core.FeatureSetProto.FeatureSetSpec; -import feast.proto.core.FeatureSetProto.FeatureSpec; -import feast.proto.types.FeatureRowProto.FeatureRow; -import feast.proto.types.FieldProto.Field; -import java.util.Comparator; -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -public class FeatureRowDecoder { - - private final String featureSetRef; - private final FeatureSetSpec spec; - - public FeatureRowDecoder(String featureSetRef, FeatureSetSpec spec) { - this.featureSetRef = featureSetRef; - this.spec = spec; - } - - /** - * Validates if an encoded feature row can be decoded without exception. - * - * @param featureRow Feature row - * @return boolean - */ - public Boolean isEncodingValid(FeatureRow featureRow) { - return featureRow.getFieldsList().size() == spec.getFeaturesList().size(); - } - - /** - * Decoding feature row by repopulating the field names based on the corresponding feature set - * spec. - * - * @param encodedFeatureRow Feature row - * @return boolean - */ - public FeatureRow decode(FeatureRow encodedFeatureRow) { - final List fieldsWithoutName = encodedFeatureRow.getFieldsList(); - - List featureNames = - spec.getFeaturesList().stream() - .sorted(Comparator.comparing(FeatureSpec::getName)) - .map(FeatureSpec::getName) - .collect(Collectors.toList()); - List fields = - IntStream.range(0, featureNames.size()) - .mapToObj( - featureNameIndex -> { - String featureName = featureNames.get(featureNameIndex); - return fieldsWithoutName - .get(featureNameIndex) - .toBuilder() - .setName(featureName) - .build(); - }) - .collect(Collectors.toList()); - return encodedFeatureRow - .toBuilder() - .clearFields() - .setFeatureSet(featureSetRef) - .addAllFields(fields) - .build(); - } -} diff --git a/storage/connectors/rediscluster/src/main/java/feast/storage/connectors/rediscluster/writer/RedisClusterCustomIO.java b/storage/connectors/rediscluster/src/main/java/feast/storage/connectors/rediscluster/writer/RedisClusterCustomIO.java deleted file mode 100644 index 712012a2fb..0000000000 --- a/storage/connectors/rediscluster/src/main/java/feast/storage/connectors/rediscluster/writer/RedisClusterCustomIO.java +++ /dev/null @@ -1,296 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2019 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feast.storage.connectors.rediscluster.writer; - -import feast.proto.core.FeatureSetProto.EntitySpec; -import feast.proto.core.FeatureSetProto.FeatureSetSpec; -import feast.proto.core.FeatureSetProto.FeatureSpec; -import feast.proto.core.StoreProto.Store.RedisClusterConfig; -import feast.proto.storage.RedisProto.RedisKey; -import feast.proto.storage.RedisProto.RedisKey.Builder; -import feast.proto.types.FeatureRowProto.FeatureRow; -import feast.proto.types.FieldProto.Field; -import feast.proto.types.ValueProto; -import feast.storage.api.writer.FailedElement; -import feast.storage.api.writer.WriteResult; -import feast.storage.common.retry.Retriable; -import io.lettuce.core.RedisException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TupleTagList; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class RedisClusterCustomIO { - - private static final int DEFAULT_BATCH_SIZE = 1000; - private static final int DEFAULT_TIMEOUT = 2000; - - private static TupleTag successfulInsertsTag = - new TupleTag("successfulInserts") {}; - private static TupleTag failedInsertsTupleTag = - new TupleTag("failedInserts") {}; - - private static final Logger log = LoggerFactory.getLogger(RedisClusterCustomIO.class); - - private RedisClusterCustomIO() {} - - public static Write write( - RedisClusterConfig redisClusterConfig, Map featureSetSpecs) { - return new Write(redisClusterConfig, featureSetSpecs); - } - - /** ServingStoreWrite data to a Redis server. */ - public static class Write extends PTransform, WriteResult> { - - private Map featureSetSpecs; - private RedisClusterConfig redisClusterConfig; - private int batchSize; - private int timeout; - - public Write( - RedisClusterConfig redisClusterConfig, Map featureSetSpecs) { - - this.redisClusterConfig = redisClusterConfig; - this.featureSetSpecs = featureSetSpecs; - } - - public Write withBatchSize(int batchSize) { - this.batchSize = batchSize; - return this; - } - - public Write withTimeout(int timeout) { - this.timeout = timeout; - return this; - } - - @Override - public WriteResult expand(PCollection input) { - PCollectionTuple redisWrite = - input.apply( - ParDo.of(new WriteDoFn(redisClusterConfig, featureSetSpecs)) - .withOutputTags(successfulInsertsTag, TupleTagList.of(failedInsertsTupleTag))); - return WriteResult.in( - input.getPipeline(), - redisWrite.get(successfulInsertsTag), - redisWrite.get(failedInsertsTupleTag)); - } - - public static class WriteDoFn extends DoFn { - - private final List featureRows = new ArrayList<>(); - private Map featureSetSpecs; - private int batchSize = DEFAULT_BATCH_SIZE; - private int timeout = DEFAULT_TIMEOUT; - private RedisIngestionClient redisIngestionClient; - - WriteDoFn(RedisClusterConfig config, Map featureSetSpecs) { - - this.redisIngestionClient = new RedisClusterIngestionClient(config); - this.featureSetSpecs = featureSetSpecs; - } - - public WriteDoFn withBatchSize(int batchSize) { - if (batchSize > 0) { - this.batchSize = batchSize; - } - return this; - } - - public WriteDoFn withTimeout(int timeout) { - if (timeout > 0) { - this.timeout = timeout; - } - return this; - } - - @Setup - public void setup() { - this.redisIngestionClient.setup(); - } - - @StartBundle - public void startBundle() { - try { - redisIngestionClient.connect(); - } catch (RedisException e) { - log.error("Connection to redis cannot be established ", e); - } - featureRows.clear(); - } - - private void executeBatch() throws Exception { - this.redisIngestionClient - .getBackOffExecutor() - .execute( - new Retriable() { - @Override - public void execute() throws ExecutionException, InterruptedException { - if (!redisIngestionClient.isConnected()) { - redisIngestionClient.connect(); - } - featureRows.forEach( - row -> { - redisIngestionClient.set(getKey(row), getValue(row)); - }); - redisIngestionClient.sync(); - } - - @Override - public Boolean isExceptionRetriable(Exception e) { - return e instanceof RedisException; - } - - @Override - public void cleanUpAfterFailure() {} - }); - } - - private FailedElement toFailedElement( - FeatureRow featureRow, Exception exception, String jobName) { - return FailedElement.newBuilder() - .setJobName(jobName) - .setTransformName("RedisClusterCustomIO") - .setPayload(featureRow.toString()) - .setErrorMessage(exception.getMessage()) - .setStackTrace(ExceptionUtils.getStackTrace(exception)) - .build(); - } - - private byte[] getKey(FeatureRow featureRow) { - FeatureSetSpec featureSetSpec = featureSetSpecs.get(featureRow.getFeatureSet()); - List entityNames = - featureSetSpec.getEntitiesList().stream() - .map(EntitySpec::getName) - .sorted() - .collect(Collectors.toList()); - - Map entityFields = new HashMap<>(); - Builder redisKeyBuilder = RedisKey.newBuilder().setFeatureSet(featureRow.getFeatureSet()); - for (Field field : featureRow.getFieldsList()) { - if (entityNames.contains(field.getName())) { - entityFields.putIfAbsent( - field.getName(), - Field.newBuilder().setName(field.getName()).setValue(field.getValue()).build()); - } - } - for (String entityName : entityNames) { - redisKeyBuilder.addEntities(entityFields.get(entityName)); - } - return redisKeyBuilder.build().toByteArray(); - } - - private byte[] getValue(FeatureRow featureRow) { - FeatureSetSpec spec = featureSetSpecs.get(featureRow.getFeatureSet()); - - List featureNames = - spec.getFeaturesList().stream().map(FeatureSpec::getName).collect(Collectors.toList()); - Map fieldValueOnlyMap = - featureRow.getFieldsList().stream() - .filter(field -> featureNames.contains(field.getName())) - .distinct() - .collect( - Collectors.toMap( - Field::getName, - field -> Field.newBuilder().setValue(field.getValue()).build())); - - List values = - featureNames.stream() - .sorted() - .map( - featureName -> - fieldValueOnlyMap.getOrDefault( - featureName, - Field.newBuilder() - .setValue(ValueProto.Value.getDefaultInstance()) - .build())) - .collect(Collectors.toList()); - - return FeatureRow.newBuilder() - .setEventTimestamp(featureRow.getEventTimestamp()) - .addAllFields(values) - .build() - .toByteArray(); - } - - @ProcessElement - public void processElement(ProcessContext context) { - FeatureRow featureRow = context.element(); - featureRows.add(featureRow); - if (featureRows.size() >= batchSize) { - try { - executeBatch(); - featureRows.forEach(row -> context.output(successfulInsertsTag, row)); - featureRows.clear(); - } catch (Exception e) { - featureRows.forEach( - failedMutation -> { - FailedElement failedElement = - toFailedElement(failedMutation, e, context.getPipelineOptions().getJobName()); - context.output(failedInsertsTupleTag, failedElement); - }); - featureRows.clear(); - } - } - } - - @FinishBundle - public void finishBundle(FinishBundleContext context) - throws IOException, InterruptedException { - if (featureRows.size() > 0) { - try { - executeBatch(); - featureRows.forEach( - row -> - context.output( - successfulInsertsTag, row, Instant.now(), GlobalWindow.INSTANCE)); - featureRows.clear(); - } catch (Exception e) { - featureRows.forEach( - failedMutation -> { - FailedElement failedElement = - toFailedElement(failedMutation, e, context.getPipelineOptions().getJobName()); - context.output( - failedInsertsTupleTag, failedElement, Instant.now(), GlobalWindow.INSTANCE); - }); - featureRows.clear(); - } - } - } - - @Teardown - public void teardown() { - redisIngestionClient.shutdown(); - } - } - } -} diff --git a/storage/connectors/rediscluster/src/main/java/feast/storage/connectors/rediscluster/writer/RedisClusterFeatureSink.java b/storage/connectors/rediscluster/src/main/java/feast/storage/connectors/rediscluster/writer/RedisClusterFeatureSink.java deleted file mode 100644 index 160df6553c..0000000000 --- a/storage/connectors/rediscluster/src/main/java/feast/storage/connectors/rediscluster/writer/RedisClusterFeatureSink.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2020 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feast.storage.connectors.rediscluster.writer; - -import com.google.auto.value.AutoValue; -import feast.proto.core.FeatureSetProto; -import feast.proto.core.StoreProto.Store.RedisClusterConfig; -import feast.proto.types.FeatureRowProto; -import feast.storage.api.writer.FeatureSink; -import feast.storage.api.writer.WriteResult; -import java.util.Map; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; - -@AutoValue -public abstract class RedisClusterFeatureSink implements FeatureSink { - - /** - * Initialize a {@link RedisClusterFeatureSink.Builder} from a {@link RedisClusterConfig}. - * - * @param redisClusterConfig {@link RedisClusterConfig} - * @param featureSetSpecs - * @return {@link RedisClusterFeatureSink.Builder} - */ - public static FeatureSink fromConfig( - RedisClusterConfig redisClusterConfig, - Map featureSetSpecs) { - return builder() - .setFeatureSetSpecs(featureSetSpecs) - .setRedisClusterConfig(redisClusterConfig) - .build(); - } - - public abstract RedisClusterConfig getRedisClusterConfig(); - - public abstract Map getFeatureSetSpecs(); - - public abstract Builder toBuilder(); - - public static Builder builder() { - return new AutoValue_RedisClusterFeatureSink.Builder(); - } - - @AutoValue.Builder - public abstract static class Builder { - public abstract Builder setRedisClusterConfig(RedisClusterConfig redisClusterConfig); - - public abstract Builder setFeatureSetSpecs( - Map featureSetSpecs); - - public abstract RedisClusterFeatureSink build(); - } - - @Override - public void prepareWrite(FeatureSetProto.FeatureSet featureSet) {} - - @Override - public PTransform, WriteResult> writer() { - return new RedisClusterCustomIO.Write(getRedisClusterConfig(), getFeatureSetSpecs()); - } -} diff --git a/storage/connectors/rediscluster/src/main/java/feast/storage/connectors/rediscluster/writer/RedisIngestionClient.java b/storage/connectors/rediscluster/src/main/java/feast/storage/connectors/rediscluster/writer/RedisIngestionClient.java deleted file mode 100644 index 5a0b54e697..0000000000 --- a/storage/connectors/rediscluster/src/main/java/feast/storage/connectors/rediscluster/writer/RedisIngestionClient.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2020 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feast.storage.connectors.rediscluster.writer; - -import feast.storage.common.retry.BackOffExecutor; -import java.io.Serializable; - -public interface RedisIngestionClient extends Serializable { - - void setup(); - - BackOffExecutor getBackOffExecutor(); - - void shutdown(); - - void connect(); - - boolean isConnected(); - - void sync(); - - void pexpire(byte[] key, Long expiryMillis); - - void append(byte[] key, byte[] value); - - void set(byte[] key, byte[] value); - - void lpush(byte[] key, byte[] value); - - void rpush(byte[] key, byte[] value); - - void sadd(byte[] key, byte[] value); - - void zadd(byte[] key, Long score, byte[] value); -}