Skip to content

Commit

Permalink
redis-cluster is being merged into redis
Browse files Browse the repository at this point in the history
styling

handle null case

styling
  • Loading branch information
pyalex committed May 28, 2020
1 parent 75400fe commit 8816c01
Show file tree
Hide file tree
Showing 18 changed files with 70 additions and 637 deletions.
6 changes: 0 additions & 6 deletions docs/coverage/java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,6 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>dev.feast</groupId>
<artifactId>feast-storage-connector-redis-cluster</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>dev.feast</groupId>
<artifactId>feast-ingestion</artifactId>
Expand Down
6 changes: 0 additions & 6 deletions ingestion/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,6 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>dev.feast</groupId>
<artifactId>feast-storage-connector-redis-cluster</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>dev.feast</groupId>
<artifactId>feast-storage-connector-bigquery</artifactId>
Expand Down
3 changes: 1 addition & 2 deletions ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import feast.storage.api.writer.FeatureSink;
import feast.storage.connectors.bigquery.writer.BigQueryFeatureSink;
import feast.storage.connectors.redis.writer.RedisFeatureSink;
import feast.storage.connectors.rediscluster.writer.RedisClusterFeatureSink;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
Expand Down Expand Up @@ -84,7 +83,7 @@ public static FeatureSink getFeatureSink(
StoreType storeType = store.getType();
switch (storeType) {
case REDIS_CLUSTER:
return RedisClusterFeatureSink.fromConfig(store.getRedisClusterConfig(), featureSetSpecs);
return RedisFeatureSink.fromConfig(store.getRedisClusterConfig(), featureSetSpecs);
case REDIS:
return RedisFeatureSink.fromConfig(store.getRedisConfig(), featureSetSpecs);
case BIGQUERY:
Expand Down
6 changes: 0 additions & 6 deletions serving/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,6 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>dev.feast</groupId>
<artifactId>feast-storage-connector-redis-cluster</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>dev.feast</groupId>
<artifactId>feast-storage-connector-bigquery</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import feast.storage.api.retriever.HistoricalRetriever;
import feast.storage.api.retriever.OnlineRetriever;
import feast.storage.connectors.bigquery.retriever.BigQueryHistoricalRetriever;
import feast.storage.connectors.redis.retriever.RedisClusterOnlineRetriever;
import feast.storage.connectors.redis.retriever.RedisOnlineRetriever;
import feast.storage.connectors.rediscluster.retriever.RedisClusterOnlineRetriever;
import io.opentracing.Tracer;
import java.util.Map;
import org.slf4j.Logger;
Expand Down
1 change: 0 additions & 1 deletion storage/connectors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

<modules>
<module>redis</module>
<module>rediscluster</module>
<module>bigquery</module>
</modules>

Expand Down
7 changes: 7 additions & 0 deletions storage/connectors/redis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>net.ishiis.redis</groupId>
<artifactId>redis-unit</artifactId>
<version>1.0.3</version>
<scope>test</scope>
</dependency>


<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.storage.connectors.rediscluster.retriever;
package feast.storage.connectors.redis.retriever;

import com.google.protobuf.AbstractMessageLite;
import com.google.protobuf.InvalidProtocolBufferException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.storage.connectors.rediscluster.writer;
package feast.storage.connectors.redis.writer;

import com.google.common.collect.Lists;
import feast.proto.core.StoreProto;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import feast.proto.core.FeatureSetProto.EntitySpec;
import feast.proto.core.FeatureSetProto.FeatureSetSpec;
import feast.proto.core.FeatureSetProto.FeatureSpec;
import feast.proto.core.StoreProto.Store.RedisConfig;
import feast.proto.core.StoreProto.Store.*;
import feast.proto.storage.RedisProto.RedisKey;
import feast.proto.storage.RedisProto.RedisKey.Builder;
import feast.proto.types.FeatureRowProto.FeatureRow;
Expand All @@ -28,7 +28,7 @@
import feast.storage.api.writer.FailedElement;
import feast.storage.api.writer.WriteResult;
import feast.storage.common.retry.Retriable;
import io.lettuce.core.RedisConnectionException;
import io.lettuce.core.RedisException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -63,21 +63,22 @@ public class RedisCustomIO {

private RedisCustomIO() {}

public static Write write(RedisConfig redisConfig, Map<String, FeatureSetSpec> featureSetSpecs) {
return new Write(redisConfig, featureSetSpecs);
public static Write write(
RedisIngestionClient redisIngestionClient, Map<String, FeatureSetSpec> featureSetSpecs) {
return new Write(redisIngestionClient, featureSetSpecs);
}

/** ServingStoreWrite data to a Redis server. */
public static class Write extends PTransform<PCollection<FeatureRow>, WriteResult> {

private Map<String, FeatureSetSpec> featureSetSpecs;
private RedisConfig redisConfig;
private RedisIngestionClient redisIngestionClient;
private int batchSize;
private int timeout;

public Write(RedisConfig redisConfig, Map<String, FeatureSetSpec> featureSetSpecs) {

this.redisConfig = redisConfig;
public Write(
RedisIngestionClient redisIngestionClient, Map<String, FeatureSetSpec> featureSetSpecs) {
this.redisIngestionClient = redisIngestionClient;
this.featureSetSpecs = featureSetSpecs;
}

Expand All @@ -95,7 +96,7 @@ public Write withTimeout(int timeout) {
public WriteResult expand(PCollection<FeatureRow> input) {
PCollectionTuple redisWrite =
input.apply(
ParDo.of(new WriteDoFn(redisConfig, featureSetSpecs))
ParDo.of(new WriteDoFn(redisIngestionClient, featureSetSpecs))
.withOutputTags(successfulInsertsTag, TupleTagList.of(failedInsertsTupleTag)));
return WriteResult.in(
input.getPipeline(),
Expand All @@ -111,9 +112,10 @@ public static class WriteDoFn extends DoFn<FeatureRow, FeatureRow> {
private int timeout = DEFAULT_TIMEOUT;
private RedisIngestionClient redisIngestionClient;

WriteDoFn(RedisConfig config, Map<String, FeatureSetSpec> featureSetSpecs) {
WriteDoFn(
RedisIngestionClient redisIngestionClient, Map<String, FeatureSetSpec> featureSetSpecs) {

this.redisIngestionClient = new RedisStandaloneIngestionClient(config);
this.redisIngestionClient = redisIngestionClient;
this.featureSetSpecs = featureSetSpecs;
}

Expand All @@ -140,7 +142,7 @@ public void setup() {
public void startBundle() {
try {
redisIngestionClient.connect();
} catch (RedisConnectionException e) {
} catch (RedisException e) {
log.error("Connection to redis cannot be established ", e);
}
featureRows.clear();
Expand All @@ -165,7 +167,7 @@ public void execute() throws ExecutionException, InterruptedException {

@Override
public Boolean isExceptionRetriable(Exception e) {
return e instanceof RedisConnectionException;
return e instanceof RedisException;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import feast.proto.core.FeatureSetProto.FeatureSet;
import feast.proto.core.FeatureSetProto.FeatureSetSpec;
import feast.proto.core.StoreProto;
import feast.proto.core.StoreProto.Store.RedisClusterConfig;
import feast.proto.core.StoreProto.Store.RedisConfig;
import feast.proto.types.FeatureRowProto.FeatureRow;
import feast.storage.api.writer.FeatureSink;
Expand All @@ -28,6 +29,7 @@
import io.lettuce.core.RedisConnectionException;
import io.lettuce.core.RedisURI;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;

Expand All @@ -46,8 +48,17 @@ public static FeatureSink fromConfig(
return builder().setFeatureSetSpecs(featureSetSpecs).setRedisConfig(redisConfig).build();
}

public static FeatureSink fromConfig(
RedisClusterConfig redisConfig, Map<String, FeatureSetSpec> featureSetSpecs) {
return builder().setFeatureSetSpecs(featureSetSpecs).setRedisClusterConfig(redisConfig).build();
}

@Nullable
public abstract RedisConfig getRedisConfig();

@Nullable
public abstract RedisClusterConfig getRedisClusterConfig();

public abstract Map<String, FeatureSetSpec> getFeatureSetSpecs();

public abstract Builder toBuilder();
Expand All @@ -60,29 +71,45 @@ public static Builder builder() {
public abstract static class Builder {
public abstract Builder setRedisConfig(RedisConfig redisConfig);

public abstract Builder setRedisClusterConfig(RedisClusterConfig redisConfig);

public abstract Builder setFeatureSetSpecs(Map<String, FeatureSetSpec> featureSetSpecs);

public abstract RedisFeatureSink build();
}

@Override
public void prepareWrite(FeatureSet featureSet) {

RedisClient redisClient =
RedisClient.create(RedisURI.create(getRedisConfig().getHost(), getRedisConfig().getPort()));
try {
redisClient.connect();
} catch (RedisConnectionException e) {
if (getRedisConfig() != null) {
RedisClient redisClient =
RedisClient.create(
RedisURI.create(getRedisConfig().getHost(), getRedisConfig().getPort()));
try {
redisClient.connect();
} catch (RedisConnectionException e) {
throw new RuntimeException(
String.format(
"Failed to connect to Redis at host: '%s' port: '%d'. Please check that your Redis is running and accessible from Feast.",
getRedisConfig().getHost(), getRedisConfig().getPort()));
}
redisClient.shutdown();
} else if (getRedisClusterConfig() == null) {
throw new RuntimeException(
String.format(
"Failed to connect to Redis at host: '%s' port: '%d'. Please check that your Redis is running and accessible from Feast.",
getRedisConfig().getHost(), getRedisConfig().getPort()));
"At least one RedisConfig or RedisClusterConfig must be provided to Redis Sink");
}
redisClient.shutdown();
}

@Override
public PTransform<PCollection<FeatureRow>, WriteResult> writer() {
return new RedisCustomIO.Write(getRedisConfig(), getFeatureSetSpecs());
if (getRedisClusterConfig() != null) {
return new RedisCustomIO.Write(
new RedisClusterIngestionClient(getRedisClusterConfig()), getFeatureSetSpecs());
} else if (getRedisConfig() != null) {
return new RedisCustomIO.Write(
new RedisStandaloneIngestionClient(getRedisConfig()), getFeatureSetSpecs());
} else {
throw new RuntimeException(
"At least one RedisConfig or RedisClusterConfig must be provided to Redis Sink");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.storage.connectors.rediscluster.retriever;
package feast.storage.connectors.redis.retriever;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.storage.connectors.rediscluster.writer;
package feast.storage.connectors.redis.writer;

import static feast.storage.common.testing.TestUtil.field;
import static org.hamcrest.CoreMatchers.equalTo;
Expand Down Expand Up @@ -67,7 +67,7 @@ public class RedisClusterFeatureSinkTest {
private RedisClusterClient redisClusterClient;
private RedisClusterCommands<byte[], byte[]> redisClusterCommands;

private RedisClusterFeatureSink redisClusterFeatureSink;
private RedisFeatureSink redisClusterFeatureSink;

@Before
public void setUp() throws IOException {
Expand Down Expand Up @@ -123,7 +123,7 @@ public void setUp() throws IOException {
.build();

redisClusterFeatureSink =
RedisClusterFeatureSink.builder()
RedisFeatureSink.builder()
.setFeatureSetSpecs(specMap)
.setRedisClusterConfig(redisClusterConfig)
.build();
Expand All @@ -141,8 +141,8 @@ static boolean deleteDirectory(File directoryToBeDeleted) {

@After
public void teardown() {
redisClusterClient.shutdown();
redisCluster.stop();
redisClusterClient.shutdown();
deleteDirectory(new File(String.valueOf(Paths.get(System.getProperty("user.dir"), ".redis"))));
}

Expand Down Expand Up @@ -259,7 +259,7 @@ public void shouldProduceFailedElementIfRetryExceeded() {
.build();
Map<String, FeatureSetSpec> specMap = ImmutableMap.of("myproject/fs", spec1);
redisClusterFeatureSink =
RedisClusterFeatureSink.builder()
RedisFeatureSink.builder()
.setFeatureSetSpecs(specMap)
.setRedisClusterConfig(redisClusterConfig)
.build();
Expand Down
Loading

0 comments on commit 8816c01

Please sign in to comment.