From ddad1c4b0992fd07541db300d6e8c9e6be48ee9f Mon Sep 17 00:00:00 2001 From: Khor Shu Heng <32997938+khorshuheng@users.noreply.github.com> Date: Fri, 14 Feb 2020 13:56:37 +0800 Subject: [PATCH] Use bzip2 compressed feature set json as pipeline option (#466) * Use bzip2 compressed feature set json as pipeline option * Make decompressor and compressor more generic and extensible * Avoid code duplication in test --- .../core/job/dataflow/DataflowJobManager.java | 40 +++++---- .../job/direct/DirectRunnerJobManager.java | 21 +++-- .../option/FeatureSetJsonByteConverter.java | 47 +++++++++++ .../job/dataflow/DataflowJobManagerTest.java | 36 +++++--- .../direct/DirectRunnerJobManagerTest.java | 25 +++++- .../FeatureSetJsonByteConverterTest.java | 83 +++++++++++++++++++ .../main/java/feast/ingestion/ImportJob.java | 14 ++-- .../ingestion/options/BZip2Compressor.java | 47 +++++++++++ .../ingestion/options/BZip2Decompressor.java | 38 +++++++++ .../ingestion/options/ImportOptions.java | 6 +- .../options/InputStreamConverter.java | 31 +++++++ .../options/OptionByteConverter.java | 30 +++++++ .../ingestion/options/OptionCompressor.java | 31 +++++++ .../ingestion/options/OptionDecompressor.java | 30 +++++++ .../options/StringListStreamConverter.java | 41 +++++++++ .../java/feast/ingestion/ImportJobTest.java | 17 ++-- .../options/BZip2CompressorTest.java | 40 +++++++++ .../options/BZip2DecompressorTest.java | 48 +++++++++++ .../StringListStreamConverterTest.java | 36 ++++++++ .../{util => utils}/DateUtilTest.java | 7 +- .../{util => utils}/JsonUtilTest.java | 3 +- .../{util => utils}/StoreUtilTest.java | 18 +--- 22 files changed, 609 insertions(+), 80 deletions(-) create mode 100644 core/src/main/java/feast/core/job/option/FeatureSetJsonByteConverter.java create mode 100644 core/src/test/java/feast/core/job/option/FeatureSetJsonByteConverterTest.java create mode 100644 ingestion/src/main/java/feast/ingestion/options/BZip2Compressor.java create mode 100644 ingestion/src/main/java/feast/ingestion/options/BZip2Decompressor.java create mode 100644 ingestion/src/main/java/feast/ingestion/options/InputStreamConverter.java create mode 100644 ingestion/src/main/java/feast/ingestion/options/OptionByteConverter.java create mode 100644 ingestion/src/main/java/feast/ingestion/options/OptionCompressor.java create mode 100644 ingestion/src/main/java/feast/ingestion/options/OptionDecompressor.java create mode 100644 ingestion/src/main/java/feast/ingestion/options/StringListStreamConverter.java create mode 100644 ingestion/src/test/java/feast/ingestion/options/BZip2CompressorTest.java create mode 100644 ingestion/src/test/java/feast/ingestion/options/BZip2DecompressorTest.java create mode 100644 ingestion/src/test/java/feast/ingestion/options/StringListStreamConverterTest.java rename ingestion/src/test/java/feast/ingestion/{util => utils}/DateUtilTest.java (92%) rename ingestion/src/test/java/feast/ingestion/{util => utils}/JsonUtilTest.java (95%) rename ingestion/src/test/java/feast/ingestion/{util => utils}/StoreUtilTest.java (91%) diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java index 2de46ae1f2..d80d654718 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java @@ -22,7 +22,6 @@ import com.google.common.base.Strings; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; -import com.google.protobuf.util.JsonFormat.Printer; import feast.core.FeatureSetProto; import feast.core.SourceProto; import feast.core.StoreProto; @@ -30,15 +29,13 @@ import feast.core.exception.JobExecutionException; import feast.core.job.JobManager; import feast.core.job.Runner; -import feast.core.model.FeatureSet; -import feast.core.model.Job; -import feast.core.model.JobStatus; -import feast.core.model.Project; -import feast.core.model.Source; -import feast.core.model.Store; +import feast.core.job.option.FeatureSetJsonByteConverter; +import feast.core.model.*; import feast.core.util.TypeConversion; import feast.ingestion.ImportJob; +import feast.ingestion.options.BZip2Compressor; import feast.ingestion.options.ImportOptions; +import feast.ingestion.options.OptionCompressor; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -88,7 +85,12 @@ public Job startJob(Job job) { job.getStore().toProto(), false); } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(String.format("Unable to start job %s", job.getId()), e); + log.error(e.getMessage()); + throw new IllegalArgumentException( + String.format( + "DataflowJobManager failed to START job with id '%s' because the job" + + "has an invalid spec. Please check the FeatureSet, Source and Store specs. Actual error message: %s", + job.getId(), e.getMessage())); } } @@ -103,12 +105,15 @@ public Job updateJob(Job job) { try { List featureSetProtos = job.getFeatureSets().stream().map(FeatureSet::toProto).collect(Collectors.toList()); - return submitDataflowJob( job.getId(), featureSetProtos, job.getSource().toProto(), job.getStore().toProto(), true); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(String.format("Unable to update job %s", job.getId()), e); + log.error(e.getMessage()); + throw new IllegalArgumentException( + String.format( + "DataflowJobManager failed to UPDATE job with id '%s' because the job" + + "has an invalid spec. Please check the FeatureSet, Source and Store specs. Actual error message: %s", + job.getId(), e.getMessage())); } } @@ -210,13 +215,12 @@ private ImportOptions getPipelineOptions( throws IOException { String[] args = TypeConversion.convertMapToArgs(defaultOptions); ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class); - Printer printer = JsonFormat.printer(); - List featureSetsJson = new ArrayList<>(); - for (FeatureSetProto.FeatureSet featureSet : featureSets) { - featureSetsJson.add(printer.print(featureSet.getSpec())); - } - pipelineOptions.setFeatureSetJson(featureSetsJson); - pipelineOptions.setStoreJson(Collections.singletonList(printer.print(sink))); + + OptionCompressor> featureSetJsonCompressor = + new BZip2Compressor<>(new FeatureSetJsonByteConverter()); + + pipelineOptions.setFeatureSetJson(featureSetJsonCompressor.compress(featureSets)); + pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink))); pipelineOptions.setProject(projectId); pipelineOptions.setUpdate(update); pipelineOptions.setRunner(DataflowRunner.class); diff --git a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java index fdf3aad9bc..35ab45e630 100644 --- a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java +++ b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java @@ -17,9 +17,7 @@ package feast.core.job.direct; import com.google.common.base.Strings; -import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; -import com.google.protobuf.util.JsonFormat.Printer; import feast.core.FeatureSetProto; import feast.core.FeatureSetProto.FeatureSetSpec; import feast.core.StoreProto; @@ -27,12 +25,15 @@ import feast.core.exception.JobExecutionException; import feast.core.job.JobManager; import feast.core.job.Runner; +import feast.core.job.option.FeatureSetJsonByteConverter; import feast.core.model.FeatureSet; import feast.core.model.Job; import feast.core.model.JobStatus; import feast.core.util.TypeConversion; import feast.ingestion.ImportJob; +import feast.ingestion.options.BZip2Compressor; import feast.ingestion.options.ImportOptions; +import feast.ingestion.options.OptionCompressor; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -92,17 +93,15 @@ public Job startJob(Job job) { } private ImportOptions getPipelineOptions( - List featureSets, StoreProto.Store sink) - throws InvalidProtocolBufferException { + List featureSets, StoreProto.Store sink) throws IOException { String[] args = TypeConversion.convertMapToArgs(defaultOptions); ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class); - Printer printer = JsonFormat.printer(); - List featureSetsJson = new ArrayList<>(); - for (FeatureSetProto.FeatureSet featureSet : featureSets) { - featureSetsJson.add(printer.print(featureSet.getSpec())); - } - pipelineOptions.setFeatureSetJson(featureSetsJson); - pipelineOptions.setStoreJson(Collections.singletonList(printer.print(sink))); + + OptionCompressor> featureSetJsonCompressor = + new BZip2Compressor<>(new FeatureSetJsonByteConverter()); + + pipelineOptions.setFeatureSetJson(featureSetJsonCompressor.compress(featureSets)); + pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink))); pipelineOptions.setRunner(DirectRunner.class); pipelineOptions.setProject(""); // set to default value to satisfy validation if (metrics.isEnabled()) { diff --git a/core/src/main/java/feast/core/job/option/FeatureSetJsonByteConverter.java b/core/src/main/java/feast/core/job/option/FeatureSetJsonByteConverter.java new file mode 100644 index 0000000000..dbd04d668f --- /dev/null +++ b/core/src/main/java/feast/core/job/option/FeatureSetJsonByteConverter.java @@ -0,0 +1,47 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.job.option; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.util.JsonFormat; +import feast.core.FeatureSetProto; +import feast.ingestion.options.OptionByteConverter; +import java.util.ArrayList; +import java.util.List; + +public class FeatureSetJsonByteConverter + implements OptionByteConverter> { + + /** + * Convert list of feature sets to json strings joined by new line, represented as byte arrays + * + * @param featureSets List of feature set protobufs + * @return Byte array representation of the json strings + * @throws InvalidProtocolBufferException + */ + @Override + public byte[] toByte(List featureSets) + throws InvalidProtocolBufferException { + JsonFormat.Printer printer = + JsonFormat.printer().omittingInsignificantWhitespace().printingEnumsAsInts(); + List featureSetsJson = new ArrayList<>(); + for (FeatureSetProto.FeatureSet featureSet : featureSets) { + featureSetsJson.add(printer.print(featureSet.getSpec())); + } + return String.join("\n", featureSetsJson).getBytes(); + } +} diff --git a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java index c263515ed0..9f26c6919e 100644 --- a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java +++ b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java @@ -19,11 +19,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import static org.mockito.MockitoAnnotations.initMocks; import com.google.api.services.dataflow.Dataflow; @@ -44,14 +40,15 @@ import feast.core.config.FeastProperties.MetricsProperties; import feast.core.exception.JobExecutionException; import feast.core.job.Runner; -import feast.core.model.FeatureSet; -import feast.core.model.Job; -import feast.core.model.JobStatus; -import feast.core.model.Source; -import feast.core.model.Store; +import feast.core.job.option.FeatureSetJsonByteConverter; +import feast.core.model.*; +import feast.ingestion.options.BZip2Compressor; import feast.ingestion.options.ImportOptions; +import feast.ingestion.options.OptionCompressor; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.beam.runners.dataflow.DataflowPipelineJob; import org.apache.beam.runners.dataflow.DataflowRunner; @@ -131,8 +128,11 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException { expectedPipelineOptions.setAppName("DataflowJobManager"); expectedPipelineOptions.setJobName(jobName); expectedPipelineOptions.setStoreJson(Lists.newArrayList(printer.print(store))); + + OptionCompressor> featureSetJsonCompressor = + new BZip2Compressor<>(new FeatureSetJsonByteConverter()); expectedPipelineOptions.setFeatureSetJson( - Lists.newArrayList(printer.print(featureSet.getSpec()))); + featureSetJsonCompressor.compress(Collections.singletonList(featureSet))); ArgumentCaptor captor = ArgumentCaptor.forClass(ImportOptions.class); @@ -170,7 +170,19 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException { // Assume the files that are staged are correct expectedPipelineOptions.setFilesToStage(actualPipelineOptions.getFilesToStage()); - assertThat(actualPipelineOptions.toString(), equalTo(expectedPipelineOptions.toString())); + assertThat( + actualPipelineOptions.getFeatureSetJson(), + equalTo(expectedPipelineOptions.getFeatureSetJson())); + assertThat( + actualPipelineOptions.getDeadLetterTableSpec(), + equalTo(expectedPipelineOptions.getDeadLetterTableSpec())); + assertThat( + actualPipelineOptions.getStatsdHost(), equalTo(expectedPipelineOptions.getStatsdHost())); + assertThat( + actualPipelineOptions.getMetricsExporterType(), + equalTo(expectedPipelineOptions.getMetricsExporterType())); + assertThat( + actualPipelineOptions.getStoreJson(), equalTo(expectedPipelineOptions.getStoreJson())); assertThat(actual.getExtId(), equalTo(expectedExtJobId)); } diff --git a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java index 2dd87cfc6e..64412f4391 100644 --- a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java +++ b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java @@ -40,14 +40,19 @@ import feast.core.StoreProto.Store.Subscription; import feast.core.config.FeastProperties.MetricsProperties; import feast.core.job.Runner; +import feast.core.job.option.FeatureSetJsonByteConverter; import feast.core.model.FeatureSet; import feast.core.model.Job; import feast.core.model.JobStatus; import feast.core.model.Source; import feast.core.model.Store; +import feast.ingestion.options.BZip2Compressor; import feast.ingestion.options.ImportOptions; +import feast.ingestion.options.OptionCompressor; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.beam.runners.direct.DirectRunner; import org.apache.beam.sdk.PipelineResult; @@ -121,8 +126,11 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException { expectedPipelineOptions.setProject(""); expectedPipelineOptions.setStoreJson(Lists.newArrayList(printer.print(store))); expectedPipelineOptions.setProject(""); + + OptionCompressor> featureSetJsonCompressor = + new BZip2Compressor<>(new FeatureSetJsonByteConverter()); expectedPipelineOptions.setFeatureSetJson( - Lists.newArrayList(printer.print(featureSet.getSpec()))); + featureSetJsonCompressor.compress(Collections.singletonList(featureSet))); String expectedJobId = "feast-job-0"; ArgumentCaptor pipelineOptionsCaptor = @@ -150,7 +158,20 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException { expectedPipelineOptions.setOptionsId( actualPipelineOptions.getOptionsId()); // avoid comparing this value - assertThat(actualPipelineOptions.toString(), equalTo(expectedPipelineOptions.toString())); + assertThat( + actualPipelineOptions.getFeatureSetJson(), + equalTo(expectedPipelineOptions.getFeatureSetJson())); + assertThat( + actualPipelineOptions.getDeadLetterTableSpec(), + equalTo(expectedPipelineOptions.getDeadLetterTableSpec())); + assertThat( + actualPipelineOptions.getStatsdHost(), equalTo(expectedPipelineOptions.getStatsdHost())); + assertThat( + actualPipelineOptions.getMetricsExporterType(), + equalTo(expectedPipelineOptions.getMetricsExporterType())); + assertThat( + actualPipelineOptions.getStoreJson(), equalTo(expectedPipelineOptions.getStoreJson())); + assertThat(jobStarted.getPipelineResult(), equalTo(mockPipelineResult)); assertThat(jobStarted.getJobId(), equalTo(expectedJobId)); assertThat(actual.getExtId(), equalTo(expectedJobId)); diff --git a/core/src/test/java/feast/core/job/option/FeatureSetJsonByteConverterTest.java b/core/src/test/java/feast/core/job/option/FeatureSetJsonByteConverterTest.java new file mode 100644 index 0000000000..2dfeef1d96 --- /dev/null +++ b/core/src/test/java/feast/core/job/option/FeatureSetJsonByteConverterTest.java @@ -0,0 +1,83 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.job.option; + +import static org.junit.Assert.*; + +import com.google.protobuf.InvalidProtocolBufferException; +import feast.core.FeatureSetProto; +import feast.core.SourceProto; +import feast.types.ValueProto; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.junit.Test; + +public class FeatureSetJsonByteConverterTest { + + private FeatureSetProto.FeatureSet newFeatureSet(Integer version, Integer numberOfFeatures) { + List features = + IntStream.range(1, numberOfFeatures + 1) + .mapToObj( + i -> + FeatureSetProto.FeatureSpec.newBuilder() + .setValueType(ValueProto.ValueType.Enum.FLOAT) + .setName("feature".concat(Integer.toString(i))) + .build()) + .collect(Collectors.toList()); + + return FeatureSetProto.FeatureSet.newBuilder() + .setSpec( + FeatureSetProto.FeatureSetSpec.newBuilder() + .setSource( + SourceProto.Source.newBuilder() + .setType(SourceProto.SourceType.KAFKA) + .setKafkaSourceConfig( + SourceProto.KafkaSourceConfig.newBuilder() + .setBootstrapServers("somebrokers:9092") + .setTopic("sometopic"))) + .addAllFeatures(features) + .setVersion(version) + .addEntities( + FeatureSetProto.EntitySpec.newBuilder() + .setName("entity") + .setValueType(ValueProto.ValueType.Enum.STRING))) + .build(); + } + + @Test + public void shouldConvertFeatureSetsAsJsonStringBytes() throws InvalidProtocolBufferException { + int nrOfFeatureSet = 1; + int nrOfFeatures = 1; + List featureSets = + IntStream.range(1, nrOfFeatureSet + 1) + .mapToObj(i -> newFeatureSet(i, nrOfFeatures)) + .collect(Collectors.toList()); + + String expectedOutputString = + "{\"version\":1," + + "\"entities\":[{\"name\":\"entity\",\"valueType\":2}]," + + "\"features\":[{\"name\":\"feature1\",\"valueType\":6}]," + + "\"source\":{" + + "\"type\":1," + + "\"kafkaSourceConfig\":{" + + "\"bootstrapServers\":\"somebrokers:9092\"," + + "\"topic\":\"sometopic\"}}}"; + FeatureSetJsonByteConverter byteConverter = new FeatureSetJsonByteConverter(); + assertEquals(expectedOutputString, new String(byteConverter.toByte(featureSets))); + } +} diff --git a/ingestion/src/main/java/feast/ingestion/ImportJob.java b/ingestion/src/main/java/feast/ingestion/ImportJob.java index 41af5f9bb4..c4973ce3ca 100644 --- a/ingestion/src/main/java/feast/ingestion/ImportJob.java +++ b/ingestion/src/main/java/feast/ingestion/ImportJob.java @@ -22,7 +22,9 @@ import feast.core.FeatureSetProto.FeatureSet; import feast.core.SourceProto.Source; import feast.core.StoreProto.Store; +import feast.ingestion.options.BZip2Decompressor; import feast.ingestion.options.ImportOptions; +import feast.ingestion.options.StringListStreamConverter; import feast.ingestion.transform.ReadFromSource; import feast.ingestion.transform.ValidateFeatureRows; import feast.ingestion.transform.WriteFailedElementToBigQuery; @@ -33,6 +35,7 @@ import feast.ingestion.utils.StoreUtil; import feast.ingestion.values.FailedElement; import feast.types.FeatureRowProto.FeatureRow; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -57,15 +60,14 @@ public class ImportJob { * @param args arguments to be passed to Beam pipeline * @throws InvalidProtocolBufferException if options passed to the pipeline are invalid */ - public static void main(String[] args) throws InvalidProtocolBufferException { + public static void main(String[] args) throws IOException { ImportOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create().as(ImportOptions.class); runPipeline(options); } @SuppressWarnings("UnusedReturnValue") - public static PipelineResult runPipeline(ImportOptions options) - throws InvalidProtocolBufferException { + public static PipelineResult runPipeline(ImportOptions options) throws IOException { /* * Steps: * 1. Read messages from Feast Source as FeatureRow @@ -80,8 +82,10 @@ public static PipelineResult runPipeline(ImportOptions options) log.info("Starting import job with settings: \n{}", options.toString()); - List featureSets = - SpecUtil.parseFeatureSetSpecJsonList(options.getFeatureSetJson()); + BZip2Decompressor> decompressor = + new BZip2Decompressor<>(new StringListStreamConverter()); + List featureSetJson = decompressor.decompress(options.getFeatureSetJson()); + List featureSets = SpecUtil.parseFeatureSetSpecJsonList(featureSetJson); List stores = SpecUtil.parseStoreJsonList(options.getStoreJson()); for (Store store : stores) { diff --git a/ingestion/src/main/java/feast/ingestion/options/BZip2Compressor.java b/ingestion/src/main/java/feast/ingestion/options/BZip2Compressor.java new file mode 100644 index 0000000000..b7e4e6ee0a --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/options/BZip2Compressor.java @@ -0,0 +1,47 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; + +public class BZip2Compressor implements OptionCompressor { + + private final OptionByteConverter byteConverter; + + public BZip2Compressor(OptionByteConverter byteConverter) { + this.byteConverter = byteConverter; + } + /** + * Compress pipeline option using BZip2 + * + * @param option Pipeline option value + * @return BZip2 compressed option value + * @throws IOException + */ + @Override + public byte[] compress(T option) throws IOException { + ByteArrayOutputStream compressedStream = new ByteArrayOutputStream(); + try (BZip2CompressorOutputStream bzip2Output = + new BZip2CompressorOutputStream(compressedStream)) { + bzip2Output.write(byteConverter.toByte(option)); + } + + return compressedStream.toByteArray(); + } +} diff --git a/ingestion/src/main/java/feast/ingestion/options/BZip2Decompressor.java b/ingestion/src/main/java/feast/ingestion/options/BZip2Decompressor.java new file mode 100644 index 0000000000..ce49c1be6e --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/options/BZip2Decompressor.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; + +public class BZip2Decompressor implements OptionDecompressor { + + private final InputStreamConverter inputStreamConverter; + + public BZip2Decompressor(InputStreamConverter inputStreamConverter) { + this.inputStreamConverter = inputStreamConverter; + } + + @Override + public T decompress(byte[] compressed) throws IOException { + try (ByteArrayInputStream inputStream = new ByteArrayInputStream(compressed); + BZip2CompressorInputStream bzip2Input = new BZip2CompressorInputStream(inputStream)) { + return inputStreamConverter.readStream(bzip2Input); + } + } +} diff --git a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java index b299bb47e5..6afdd80dd7 100644 --- a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java +++ b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java @@ -28,16 +28,16 @@ public interface ImportOptions extends PipelineOptions, DataflowPipelineOptions, DirectOptions { @Required @Description( - "JSON string representation of the FeatureSet that the import job will process." + "JSON string representation of the FeatureSet that the import job will process, in BZip2 binary format." + "FeatureSet follows the format in feast.core.FeatureSet proto." + "Mutliple FeatureSetSpec can be passed by specifying '--featureSet={...}' multiple times" + "The conversion of Proto message to JSON should follow this mapping:" + "https://developers.google.com/protocol-buffers/docs/proto3#json" + "Please minify and remove all insignificant whitespace such as newline in the JSON string" + "to prevent error when parsing the options") - List getFeatureSetJson(); + byte[] getFeatureSetJson(); - void setFeatureSetJson(List featureSetJson); + void setFeatureSetJson(byte[] featureSetJson); @Required @Description( diff --git a/ingestion/src/main/java/feast/ingestion/options/InputStreamConverter.java b/ingestion/src/main/java/feast/ingestion/options/InputStreamConverter.java new file mode 100644 index 0000000000..e2fef73236 --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/options/InputStreamConverter.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import java.io.IOException; +import java.io.InputStream; + +public interface InputStreamConverter { + + /** + * Used in conjunction with {@link OptionDecompressor} to decompress the pipeline option + * + * @param inputStream Input byte stream in compressed format + * @return Decompressed pipeline option value + */ + T readStream(InputStream inputStream) throws IOException; +} diff --git a/ingestion/src/main/java/feast/ingestion/options/OptionByteConverter.java b/ingestion/src/main/java/feast/ingestion/options/OptionByteConverter.java new file mode 100644 index 0000000000..ff5a41a627 --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/options/OptionByteConverter.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import java.io.IOException; + +public interface OptionByteConverter { + + /** + * Used in conjunction with {@link OptionCompressor} to compress the pipeline option + * + * @param option Pipeline option value + * @return byte representation of the pipeline option value, without compression. + */ + byte[] toByte(T option) throws IOException; +} diff --git a/ingestion/src/main/java/feast/ingestion/options/OptionCompressor.java b/ingestion/src/main/java/feast/ingestion/options/OptionCompressor.java new file mode 100644 index 0000000000..b2345fc3eb --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/options/OptionCompressor.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import java.io.IOException; + +public interface OptionCompressor { + + /** + * Compress pipeline option into bytes format. This is necessary as some Beam runner has + * limitation in terms of pipeline option size. + * + * @param option Pipeline option value + * @return Compressed values of the option, as byte array + */ + byte[] compress(T option) throws IOException; +} diff --git a/ingestion/src/main/java/feast/ingestion/options/OptionDecompressor.java b/ingestion/src/main/java/feast/ingestion/options/OptionDecompressor.java new file mode 100644 index 0000000000..affeafdaa0 --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/options/OptionDecompressor.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import java.io.IOException; + +public interface OptionDecompressor { + + /** + * Decompress pipeline option from byte array. + * + * @param compressed Compressed pipeline option value + * @return Decompressed pipeline option + */ + T decompress(byte[] compressed) throws IOException; +} diff --git a/ingestion/src/main/java/feast/ingestion/options/StringListStreamConverter.java b/ingestion/src/main/java/feast/ingestion/options/StringListStreamConverter.java new file mode 100644 index 0000000000..d7277f3c7d --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/options/StringListStreamConverter.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.List; +import java.util.stream.Collectors; + +public class StringListStreamConverter implements InputStreamConverter> { + + /** + * Convert Input byte stream to newline separated strings + * + * @param inputStream Input byte stream + * @return List of string + */ + @Override + public List readStream(InputStream inputStream) throws IOException { + BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); + List stringList = reader.lines().collect(Collectors.toList()); + reader.close(); + return stringList; + } +} diff --git a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java index 290b38dabe..58ecae8f04 100644 --- a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java +++ b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java @@ -30,13 +30,16 @@ import feast.core.StoreProto.Store.RedisConfig; import feast.core.StoreProto.Store.StoreType; import feast.core.StoreProto.Store.Subscription; +import feast.ingestion.options.BZip2Compressor; import feast.ingestion.options.ImportOptions; +import feast.ingestion.options.OptionByteConverter; import feast.storage.RedisProto.RedisKey; import feast.test.TestUtil; import feast.test.TestUtil.LocalKafka; import feast.test.TestUtil.LocalRedis; import feast.types.FeatureRowProto.FeatureRow; import feast.types.ValueProto.ValueType.Enum; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -48,6 +51,7 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.joda.time.Duration; import org.junit.AfterClass; @@ -162,12 +166,13 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow() .build(); ImportOptions options = PipelineOptionsFactory.create().as(ImportOptions.class); - options.setFeatureSetJson( - Collections.singletonList( - JsonFormat.printer().omittingInsignificantWhitespace().print(featureSet.getSpec()))); - options.setStoreJson( - Collections.singletonList( - JsonFormat.printer().omittingInsignificantWhitespace().print(redis))); + BZip2Compressor compressor = new BZip2Compressor<>(option -> { + JsonFormat.Printer printer = + JsonFormat.printer().omittingInsignificantWhitespace().printingEnumsAsInts(); + return printer.print(option).getBytes(); + }); + options.setFeatureSetJson(compressor.compress(spec)); + options.setStoreJson(Collections.singletonList(JsonFormat.printer().print(redis))); options.setProject(""); options.setBlockOnRun(false); diff --git a/ingestion/src/test/java/feast/ingestion/options/BZip2CompressorTest.java b/ingestion/src/test/java/feast/ingestion/options/BZip2CompressorTest.java new file mode 100644 index 0000000000..cd03b18c79 --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/options/BZip2CompressorTest.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; +import org.junit.Assert; +import org.junit.Test; + +public class BZip2CompressorTest { + + @Test + public void shouldHavBZip2CompatibleOutput() throws IOException { + BZip2Compressor compressor = new BZip2Compressor<>(String::getBytes); + String origString = "somestring"; + try (ByteArrayInputStream inputStream = + new ByteArrayInputStream(compressor.compress(origString)); + BZip2CompressorInputStream bzip2Input = new BZip2CompressorInputStream(inputStream); + BufferedReader reader = new BufferedReader(new InputStreamReader(bzip2Input))) { + Assert.assertEquals(origString, reader.readLine()); + } + } +} diff --git a/ingestion/src/test/java/feast/ingestion/options/BZip2DecompressorTest.java b/ingestion/src/test/java/feast/ingestion/options/BZip2DecompressorTest.java new file mode 100644 index 0000000000..fe7cc789d8 --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/options/BZip2DecompressorTest.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import static org.junit.Assert.*; + +import java.io.*; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; +import org.junit.Test; + +public class BZip2DecompressorTest { + + @Test + public void shouldDecompressBZip2Stream() throws IOException { + BZip2Decompressor decompressor = + new BZip2Decompressor<>( + inputStream -> { + BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); + String output = reader.readLine(); + reader.close(); + return output; + }); + + String originalString = "abc"; + ByteArrayOutputStream compressedStream = new ByteArrayOutputStream(); + try (BZip2CompressorOutputStream bzip2Output = + new BZip2CompressorOutputStream(compressedStream)) { + bzip2Output.write(originalString.getBytes()); + } + + String decompressedString = decompressor.decompress(compressedStream.toByteArray()); + assertEquals(originalString, decompressedString); + } +} diff --git a/ingestion/src/test/java/feast/ingestion/options/StringListStreamConverterTest.java b/ingestion/src/test/java/feast/ingestion/options/StringListStreamConverterTest.java new file mode 100644 index 0000000000..5ce9f054bc --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/options/StringListStreamConverterTest.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import static org.junit.Assert.*; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import org.junit.Test; + +public class StringListStreamConverterTest { + + @Test + public void shouldReadStreamAsNewlineSeparatedStrings() throws IOException { + StringListStreamConverter converter = new StringListStreamConverter(); + String originalString = "abc\ndef"; + InputStream stringStream = new ByteArrayInputStream(originalString.getBytes()); + assertEquals(Arrays.asList("abc", "def"), converter.readStream(stringStream)); + } +} diff --git a/ingestion/src/test/java/feast/ingestion/util/DateUtilTest.java b/ingestion/src/test/java/feast/ingestion/utils/DateUtilTest.java similarity index 92% rename from ingestion/src/test/java/feast/ingestion/util/DateUtilTest.java rename to ingestion/src/test/java/feast/ingestion/utils/DateUtilTest.java index 71d4e67bea..151d501a59 100644 --- a/ingestion/src/test/java/feast/ingestion/util/DateUtilTest.java +++ b/ingestion/src/test/java/feast/ingestion/utils/DateUtilTest.java @@ -14,15 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package feast.ingestion.util; +package feast.ingestion.utils; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.*; import com.google.protobuf.Timestamp; -import feast.ingestion.utils.DateUtil; import junit.framework.TestCase; import org.joda.time.DateTime; diff --git a/ingestion/src/test/java/feast/ingestion/util/JsonUtilTest.java b/ingestion/src/test/java/feast/ingestion/utils/JsonUtilTest.java similarity index 95% rename from ingestion/src/test/java/feast/ingestion/util/JsonUtilTest.java rename to ingestion/src/test/java/feast/ingestion/utils/JsonUtilTest.java index 02af4d819f..62c74dfc34 100644 --- a/ingestion/src/test/java/feast/ingestion/util/JsonUtilTest.java +++ b/ingestion/src/test/java/feast/ingestion/utils/JsonUtilTest.java @@ -14,12 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package feast.ingestion.util; +package feast.ingestion.utils; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; -import feast.ingestion.utils.JsonUtil; import java.util.Collections; import java.util.HashMap; import java.util.Map; diff --git a/ingestion/src/test/java/feast/ingestion/util/StoreUtilTest.java b/ingestion/src/test/java/feast/ingestion/utils/StoreUtilTest.java similarity index 91% rename from ingestion/src/test/java/feast/ingestion/util/StoreUtilTest.java rename to ingestion/src/test/java/feast/ingestion/utils/StoreUtilTest.java index 4e2297e405..82988121bc 100644 --- a/ingestion/src/test/java/feast/ingestion/util/StoreUtilTest.java +++ b/ingestion/src/test/java/feast/ingestion/utils/StoreUtilTest.java @@ -14,22 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package feast.ingestion.util; +package feast.ingestion.utils; -import static feast.types.ValueProto.ValueType.Enum.BOOL; -import static feast.types.ValueProto.ValueType.Enum.BOOL_LIST; -import static feast.types.ValueProto.ValueType.Enum.BYTES; -import static feast.types.ValueProto.ValueType.Enum.BYTES_LIST; -import static feast.types.ValueProto.ValueType.Enum.DOUBLE; -import static feast.types.ValueProto.ValueType.Enum.DOUBLE_LIST; -import static feast.types.ValueProto.ValueType.Enum.FLOAT; -import static feast.types.ValueProto.ValueType.Enum.FLOAT_LIST; -import static feast.types.ValueProto.ValueType.Enum.INT32; -import static feast.types.ValueProto.ValueType.Enum.INT32_LIST; -import static feast.types.ValueProto.ValueType.Enum.INT64; -import static feast.types.ValueProto.ValueType.Enum.INT64_LIST; -import static feast.types.ValueProto.ValueType.Enum.STRING; -import static feast.types.ValueProto.ValueType.Enum.STRING_LIST; +import static feast.types.ValueProto.ValueType.Enum.*; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.Field; @@ -40,7 +27,6 @@ import feast.core.FeatureSetProto.FeatureSet; import feast.core.FeatureSetProto.FeatureSetSpec; import feast.core.FeatureSetProto.FeatureSpec; -import feast.ingestion.utils.StoreUtil; import java.util.Arrays; import org.junit.Assert; import org.junit.Test;