Skip to content

Commit

Permalink
Restart Ingestion Job on code version update (feast-dev#949)
Browse files Browse the repository at this point in the history
* blacklist -> whitelist

* add whitelisted property

* coordinator properties in e2e tests

* move labels to job & jobcoordinator

* it test for version update

* version label constant

* fix version label
  • Loading branch information
pyalex authored and zhangchi1 committed Aug 18, 2020
1 parent 222644c commit 149accc
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public ConsolidatedJobStrategy(JobRepository jobRepository) {
}

@Override
public Job getOrCreateJob(SourceProto.Source source, Set<StoreProto.Store> stores) {
public Job getOrCreateJob(
SourceProto.Source source, Set<StoreProto.Store> stores, Map<String, String> labels) {
return jobRepository
.findFirstBySourceAndStoreNameAndStatusNotInOrderByLastUpdatedDesc(
source, null, JobStatus.getTerminalStates())
Expand All @@ -55,6 +56,7 @@ public Job getOrCreateJob(SourceProto.Source source, Set<StoreProto.Store> store
.setStores(
stores.stream()
.collect(Collectors.toMap(StoreProto.Store::getName, s -> s)))
.setLabels(labels)
.build());
}

Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/feast/core/job/JobGroupingStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import feast.core.model.Job;
import feast.proto.core.SourceProto;
import feast.proto.core.StoreProto;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -29,7 +30,8 @@
*/
public interface JobGroupingStrategy {
/** Get the non terminated ingestion job ingesting for given source and stores. */
Job getOrCreateJob(SourceProto.Source source, Set<StoreProto.Store> stores);
Job getOrCreateJob(
SourceProto.Source source, Set<StoreProto.Store> stores, Map<String, String> labels);
/** Create unique JobId that would be used as key in communications with JobRunner */
String createJobId(Job job);
/* Distribute given sources and stores across jobs. One yielded Pair - one created Job **/
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/java/feast/core/job/JobPerStoreStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import feast.proto.core.StoreProto;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -42,7 +43,8 @@ public JobPerStoreStrategy(JobRepository jobRepository) {
}

@Override
public Job getOrCreateJob(SourceProto.Source source, Set<StoreProto.Store> stores) {
public Job getOrCreateJob(
SourceProto.Source source, Set<StoreProto.Store> stores, Map<String, String> labels) {
ArrayList<StoreProto.Store> storesList = Lists.newArrayList(stores);
if (storesList.size() != 1) {
throw new RuntimeException("Only one store is acceptable in JobPerStore Strategy");
Expand All @@ -60,6 +62,7 @@ public Job getOrCreateJob(SourceProto.Source source, Set<StoreProto.Store> store
.setStores(
stores.stream()
.collect(Collectors.toMap(StoreProto.Store::getName, s -> s)))
.setLabels(labels)
.build());
}

Expand Down
32 changes: 22 additions & 10 deletions core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,6 @@ public static DataflowJobManager of(
Map<String, String> jobSelector,
Dataflow dataflow) {

// Retrieve labels to extend them with jobSelector
Map<String, String> jobLabels = new HashMap<>(runnerConfigOptions.getLabelsMap());
// Merge Job Selector Labels into runner options
jobSelector.forEach(jobLabels::put);
runnerConfigOptions = runnerConfigOptions.toBuilder().putAllLabels(jobLabels).build();

defaultOptions = new DataflowRunnerConfig(runnerConfigOptions);
this.dataflow = dataflow;
this.metrics = metricsProperties;
Expand Down Expand Up @@ -130,7 +124,11 @@ public Job startJob(Job job) {
try {
String extId =
submitDataflowJob(
job.getId(), job.getSource(), new HashSet<>(job.getStores().values()), false);
job.getId(),
job.getSource(),
new HashSet<>(job.getStores().values()),
job.getLabels(),
false);
job.setExtId(extId);
return job;

Expand Down Expand Up @@ -315,9 +313,13 @@ public List<Job> listRunningJobs() {
}

private String submitDataflowJob(
String jobName, SourceProto.Source source, Set<StoreProto.Store> sinks, boolean update) {
String jobName,
SourceProto.Source source,
Set<StoreProto.Store> sinks,
Map<String, String> labels,
boolean update) {
try {
ImportOptions pipelineOptions = getPipelineOptions(jobName, source, sinks, update);
ImportOptions pipelineOptions = getPipelineOptions(jobName, source, sinks, labels, update);
DataflowPipelineJob pipelineResult = runPipeline(pipelineOptions);
String jobId = waitForJobToRun(pipelineResult);
return jobId;
Expand All @@ -328,7 +330,11 @@ private String submitDataflowJob(
}

private ImportOptions getPipelineOptions(
String jobName, SourceProto.Source source, Set<StoreProto.Store> sinks, boolean update)
String jobName,
SourceProto.Source source,
Set<StoreProto.Store> sinks,
Map<String, String> labels,
boolean update)
throws IOException, IllegalAccessException {
ImportOptions pipelineOptions =
PipelineOptionsFactory.fromArgs(defaultOptions.toArgs()).as(ImportOptions.class);
Expand All @@ -347,6 +353,12 @@ private ImportOptions getPipelineOptions(
pipelineOptions.setJobName(jobName);
pipelineOptions.setFilesToStage(
detectClassPathResourcesToStage(DataflowRunner.class.getClassLoader()));

// Merge common labels with job's labels
Map<String, String> mergedLabels = new HashMap<>(defaultOptions.getLabels());
labels.forEach(mergedLabels::put);
pipelineOptions.setLabels(mergedLabels);

if (metrics.isEnabled()) {
pipelineOptions.setMetricsExporterType(metrics.getType());
if (metrics.getType().equals("statsd")) {
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/java/feast/core/model/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,14 @@ public JobStatus getStatus() {
public abstract Map<FeatureSetReference, FeatureSetDeliveryStatus>
getFeatureSetDeliveryStatuses();

// Job's labels
public abstract Map<String, String> getLabels();

public static Builder builder() {
return new AutoValue_Job.Builder()
.setFeatureSetDeliveryStatuses(new HashMap<>())
.setStores(new HashMap<>());
.setStores(new HashMap<>())
.setLabels(new HashMap<>());
}

@AutoValue.Builder
Expand All @@ -70,6 +74,8 @@ public interface Builder {
Builder setFeatureSetDeliveryStatuses(
Map<FeatureSetReference, FeatureSetDeliveryStatus> statuses);

Builder setLabels(Map<String, String> labels);

Job build();
}

Expand Down Expand Up @@ -164,12 +170,13 @@ public IngestionJobProto.IngestionJob toProto() {
return ingestJob;
}

public Job cloneWithId(String newJobId) {
public Job cloneWithIdAndLabels(String newJobId, Map<String, String> labels) {
return Job.builder()
.setSource(this.getSource())
.setFeatureSetDeliveryStatuses(new HashMap<>(this.getFeatureSetDeliveryStatuses()))
.setStores(new HashMap<>(this.getStores()))
.setId(newJobId)
.setLabels(labels)
.build();
}

Expand Down
23 changes: 18 additions & 5 deletions core/src/main/java/feast/core/service/JobCoordinatorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
public class JobCoordinatorService {

private final int SPEC_PUBLISHING_TIMEOUT_SECONDS = 5;
public static final String VERSION_LABEL = "feast_version";

private final JobRepository jobRepository;
private final SpecService specService;
Expand All @@ -68,6 +69,8 @@ public class JobCoordinatorService {
private final KafkaTemplate<String, FeatureSetSpec> specPublisher;
private final List<Store.Subscription> featureSetSubscriptions;
private final List<String> whitelistedStores;
private final Map<String, String> jobLabels;
private final String currentVersion;

@Autowired
public JobCoordinatorService(
Expand All @@ -88,6 +91,9 @@ public JobCoordinatorService(
.map(JobProperties.CoordinatorProperties.FeatureSetSelector::toSubscription)
.collect(Collectors.toList());
this.whitelistedStores = feastProperties.getJobs().getCoordinator().getWhitelistedStores();
this.currentVersion = feastProperties.getVersion();
this.jobLabels = new HashMap<>(feastProperties.getJobs().getCoordinator().getJobSelector());
this.jobLabels.put(VERSION_LABEL, this.currentVersion);
}

/**
Expand Down Expand Up @@ -161,7 +167,7 @@ List<JobTask> makeJobUpdateTasks(Iterable<Pair<Source, Set<Store>>> sourceToStor
Source source = mapping.getKey();
Set<Store> stores = mapping.getValue();

Job job = groupingStrategy.getOrCreateJob(source, stores);
Job job = groupingStrategy.getOrCreateJob(source, stores, this.jobLabels);

if (job.isDeployed()) {
if (!job.isRunning()) {
Expand All @@ -177,7 +183,7 @@ List<JobTask> makeJobUpdateTasks(Iterable<Pair<Source, Set<Store>>> sourceToStor
// it would make sense to spawn clone of current job
// and terminate old version on the next Poll.
// Both jobs should be in the same consumer group and not conflict with each other
job = job.cloneWithId(groupingStrategy.createJobId(job));
job = job.cloneWithIdAndLabels(groupingStrategy.createJobId(job), this.jobLabels);
job.addAllStores(stores);

isSafeToStopJobs = false;
Expand Down Expand Up @@ -214,8 +220,9 @@ List<JobTask> makeJobUpdateTasks(Iterable<Pair<Source, Set<Store>>> sourceToStor
/**
* Decides whether we need to upgrade (restart) given job. Since we send updated FeatureSets to
* IngestionJob via Kafka, and there's only one source per job (if it change - new job would be
* created) the only things that can cause upgrade here are stores: new stores can be added, or
* existing stores will change subscriptions.
* created) main trigger that can cause upgrade here are stores: new stores can be added, or
* existing stores will change subscriptions. Another trigger is release of new version: current
* version is being compared with job's version stored in labels.
*
* @param job {@link Job} to check
* @param stores Set of {@link Store} new version of stores (vs current version job.getStores())
Expand All @@ -227,6 +234,10 @@ private boolean jobRequiresUpgrade(Job job, Set<Store> stores) {
return true;
}

if (!this.currentVersion.equals(job.getLabels().get(VERSION_LABEL))) {
return true;
}

return false;
}

Expand Down Expand Up @@ -257,7 +268,9 @@ FeatureSet allocateFeatureSetToJobs(FeatureSet featureSet) {

// Add featureSet to allocated job if not allocated before
for (Pair<Source, Set<Store>> jobArgs : groupingStrategy.collectSingleJobInput(jobArgsStream)) {
Job job = groupingStrategy.getOrCreateJob(jobArgs.getLeft(), jobArgs.getRight());
Job job =
groupingStrategy.getOrCreateJob(
jobArgs.getLeft(), jobArgs.getRight(), Collections.emptyMap());
if (!job.isRunning()) {
continue;
}
Expand Down
30 changes: 30 additions & 0 deletions core/src/test/java/feast/core/service/JobCoordinatorIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.hamcrest.beans.HasPropertyWithValue.hasProperty;
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
import static org.hamcrest.collection.IsMapContaining.hasEntry;
import static org.hamcrest.collection.IsMapWithSize.aMapWithSize;
import static org.hamcrest.core.AllOf.allOf;

Expand Down Expand Up @@ -60,6 +61,7 @@
"feast.jobs.coordinator.feature-set-selector[0].project=default",
"feast.jobs.coordinator.whitelisted-stores[0]=test-store",
"feast.jobs.coordinator.whitelisted-stores[1]=new-store",
"feast.version=1.0.0"
})
public class JobCoordinatorIT extends BaseIT {
@Autowired private FakeJobManager jobManager;
Expand Down Expand Up @@ -188,6 +190,33 @@ public void shouldNotCreateJobForUnwantedFeatureSet() {
assertThat(jobManager.getAllJobs(), hasSize(0));
}

@Test
@SneakyThrows
public void shouldRestartJobWithOldVersion() {
apiClient.simpleApplyFeatureSet(
DataGenerator.createFeatureSet(DataGenerator.getDefaultSource(), "default", "test"));

Job job =
Job.builder()
.setSource(DataGenerator.getDefaultSource())
.setStores(
ImmutableMap.of(
DataGenerator.getDefaultStore().getName(), DataGenerator.getDefaultStore()))
.setId("some-running-id")
.setLabels(ImmutableMap.of(JobCoordinatorService.VERSION_LABEL, "0.9.9"))
.build();

jobManager.startJob(job);
jobRepository.add(job);

await().until(() -> jobManager.getJobStatus(job), equalTo(JobStatus.ABORTED));

Job replacement = jobRepository.findByStatus(JobStatus.RUNNING).get(0);
assertThat(replacement.getSource(), equalTo(job.getSource()));
assertThat(replacement.getStores(), equalTo(job.getStores()));
assertThat(replacement.getLabels(), hasEntry(JobCoordinatorService.VERSION_LABEL, "1.0.0"));
}

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@Nested
class SpecNotificationFlow extends SequentialFlow {
Expand All @@ -212,6 +241,7 @@ public void shouldSendNewSpec() {
ImmutableMap.of(
DataGenerator.getDefaultStore().getName(), DataGenerator.getDefaultStore()))
.setId("some-running-id")
.setLabels(ImmutableMap.of(JobCoordinatorService.VERSION_LABEL, "1.0.0"))
.build();

jobManager.startJob(job);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,11 @@ public void setUp() {
coordinatorProperties.setFeatureSetSelector(ImmutableList.of(selector));
coordinatorProperties.setWhitelistedStores(
ImmutableList.of("test-store", "test", "test-1", "test-2", "normal-store"));
coordinatorProperties.setJobSelector(ImmutableMap.of("application", "feast"));

jobProperties.setCoordinator(coordinatorProperties);
feastProperties.setJobs(jobProperties);
feastProperties.setVersion("1.0.0");

TestUtil.setupAuditLogger();

Expand Down

0 comments on commit 149accc

Please sign in to comment.