Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deallocate featureSet from job when source changed #844

Merged
merged 2 commits into from
Jun 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.dao;

import feast.core.model.FeatureSetJobStatus;
import org.springframework.data.jpa.repository.JpaRepository;

public interface FeatureSetJobStatusRepository
extends JpaRepository<FeatureSetJobStatus, FeatureSetJobStatus.FeatureSetJobStatusKey> {
long count();
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,6 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hashCode(job, featureSet, deliveryStatus, version);
return Objects.hashCode(job, featureSet);
}
}
41 changes: 30 additions & 11 deletions core/src/main/java/feast/core/model/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,10 @@ public boolean isDeployed() {
return getExtId() != null && !getExtId().isEmpty();
}

public List<FeatureSet> getFeatureSets() {
public Set<FeatureSet> getFeatureSets() {
return this.featureSetJobStatuses.stream()
.map(FeatureSetJobStatus::getFeatureSet)
.collect(Collectors.toList());
.collect(Collectors.toSet());
}

public Source getSource() {
Expand All @@ -128,6 +128,22 @@ public Source getSource() {
return source;
}

public void addAllFeatureSets(Set<FeatureSet> featureSets) {
for (FeatureSet fs : featureSets) {
FeatureSetJobStatus status = new FeatureSetJobStatus();
status.setFeatureSet(fs);
status.setJob(this);
if (fs.getStatus() == FeatureSetProto.FeatureSetStatus.STATUS_READY) {
// Feature Set was already delivered to previous generation of the job
// (another words, it exists in kafka)
// so we expect Job will ack latest version based on history from kafka topic
status.setVersion(fs.getVersion());
}
status.setDeliveryStatus(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS);
this.getFeatureSetJobStatuses().add(status);
}
}

/**
* Convert a job model to ingestion job proto
*
Expand Down Expand Up @@ -159,15 +175,18 @@ public IngestionJobProto.IngestionJob toProto() throws InvalidProtocolBufferExce
}

public Job clone() {
return Job.builder()
.setStores(getStores())
.setStoreName(getStoreName())
.setSourceConfig(getSourceConfig())
.setSourceType(getSourceType())
.setFeatureSetJobStatuses(new HashSet<>())
.setRunner(getRunner())
.setStatus(JobStatus.UNKNOWN)
.build();
Job job =
Job.builder()
.setStores(getStores())
.setStoreName(getStoreName())
.setSourceConfig(getSourceConfig())
.setSourceType(getSourceType())
.setFeatureSetJobStatuses(new HashSet<>())
.setRunner(getRunner())
.setStatus(JobStatus.UNKNOWN)
.build();
job.addAllFeatureSets(getFeatureSets());
return job;
}

@Override
Expand Down
85 changes: 51 additions & 34 deletions core/src/main/java/feast/core/service/JobCoordinatorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
*/
package feast.core.service;

import static feast.common.models.Store.isSubscribedToFeatureSet;
import static feast.core.model.FeatureSet.parseReference;

import com.google.common.collect.Sets;
import feast.core.config.FeastProperties;
import feast.core.config.FeastProperties.JobProperties;
import feast.core.dao.FeatureSetJobStatusRepository;
import feast.core.dao.FeatureSetRepository;
import feast.core.dao.JobRepository;
import feast.core.job.*;
Expand Down Expand Up @@ -58,6 +60,7 @@ public class JobCoordinatorService {

private final JobRepository jobRepository;
private final FeatureSetRepository featureSetRepository;
private final FeatureSetJobStatusRepository jobStatusRepository;
private final SpecService specService;
private final JobManager jobManager;
private final JobProperties jobProperties;
Expand All @@ -68,13 +71,15 @@ public class JobCoordinatorService {
public JobCoordinatorService(
JobRepository jobRepository,
FeatureSetRepository featureSetRepository,
FeatureSetJobStatusRepository jobStatusRepository,
SpecService specService,
JobManager jobManager,
FeastProperties feastProperties,
JobGroupingStrategy groupingStrategy,
KafkaTemplate<String, FeatureSetProto.FeatureSetSpec> specPublisher) {
this.jobRepository = jobRepository;
this.featureSetRepository = featureSetRepository;
this.jobStatusRepository = jobStatusRepository;
this.specService = specService;
this.jobManager = jobManager;
this.jobProperties = feastProperties.getJobs();
Expand Down Expand Up @@ -152,10 +157,6 @@ List<JobTask> makeJobUpdateTasks(Iterable<Pair<Source, Set<Store>>> sourceToStor
for (Pair<Source, Set<Store>> mapping : sourceToStores) {
Source source = mapping.getKey();
Set<Store> stores = mapping.getValue();
Set<FeatureSet> featureSets =
stores.stream()
.flatMap(s -> getFeatureSetsForStore(s).stream())
.collect(Collectors.toSet());

Job job = groupingStrategy.getOrCreateJob(source, stores);

Expand Down Expand Up @@ -185,12 +186,15 @@ List<JobTask> makeJobUpdateTasks(Iterable<Pair<Source, Set<Store>>> sourceToStor
}
} else {
job.setId(groupingStrategy.createJobId(job));
job.addAllFeatureSets(
stores.stream()
.flatMap(s -> getFeatureSetsForStore(s).stream())
.filter(fs -> fs.getSource().equals(source))
.collect(Collectors.toSet()));

jobTasks.add(CreateJobTask.builder().setJob(job).setJobManager(jobManager).build());
}

allocateFeatureSets(job, featureSets);

// Record the job as required to safeguard it from getting stopped
activeJobs.add(job);
}
Expand Down Expand Up @@ -227,38 +231,50 @@ private boolean jobRequiresUpgrade(Job job, Set<Store> stores) {
}

/**
* Connects given {@link Job} with FeatureSets by creating {@link FeatureSetJobStatus}. This
* connection represents responsibility of the job to handle allocated FeatureSets. We use this
* Connects given {@link FeatureSet} with Jobs by creating {@link FeatureSetJobStatus}. This
* connection represents responsibility of the job to handle allocated FeatureSet. We use this
* connection {@link FeatureSetJobStatus} to monitor Ingestion of specific FeatureSet and Specs
* delivery status.
*
* <p>Only after this connection is created FeatureSetSpec could be sent to IngestionJob.
*
* @param job {@link Job} responsible job
* @param featureSets Set of {@link FeatureSet} featureSets to allocate to this job
* @param featureSet featureSet {@link FeatureSet} to find jobs and allocate
*/
void allocateFeatureSets(Job job, Set<FeatureSet> featureSets) {
Map<FeatureSet, FeatureSetJobStatus> alreadyConnected =
job.getFeatureSetJobStatuses().stream()
.collect(Collectors.toMap(FeatureSetJobStatus::getFeatureSet, s -> s));

for (FeatureSet fs : featureSets) {
if (alreadyConnected.containsKey(fs)) {
FeatureSet allocateFeatureSetToJobs(FeatureSet featureSet) {
Set<FeatureSetJobStatus> toAdd = new HashSet<>();
Set<FeatureSetJobStatus> existing = featureSet.getJobStatuses();

Stream<Pair<Source, Store>> jobArgsStream =
getAllStores().stream()
.filter(
s ->
isSubscribedToFeatureSet(
s.getSubscriptions(),
featureSet.getProject().getName(),
featureSet.getName()))
.map(s -> Pair.of(featureSet.getSource(), s));

for (Pair<Source, Set<Store>> jobArgs : groupingStrategy.collectSingleJobInput(jobArgsStream)) {
Job job = groupingStrategy.getOrCreateJob(jobArgs.getLeft(), jobArgs.getRight());
if (!job.isRunning()) {
continue;
}

FeatureSetJobStatus status = new FeatureSetJobStatus();
status.setFeatureSet(fs);
status.setFeatureSet(featureSet);
status.setJob(job);
if (fs.getStatus() == FeatureSetProto.FeatureSetStatus.STATUS_READY) {
// Feature Set was already delivered to previous generation of the job
// (another words, it exists in kafka)
// so we expect Job will ack latest version based on history from kafka topic
status.setVersion(fs.getVersion());
}
status.setDeliveryStatus(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS);
job.getFeatureSetJobStatuses().add(status);

toAdd.add(status);
}

Set<FeatureSetJobStatus> toDelete = Sets.difference(existing, toAdd);
toAdd = Sets.difference(toAdd, existing);

jobStatusRepository.deleteAll(toDelete);
jobStatusRepository.saveAll(toAdd);
jobStatusRepository.flush();
return featureSet;
}

/** Get running extra ingestion jobs that have ids not in keepJobs */
Expand All @@ -271,24 +287,24 @@ private Collection<Job> getExtraJobs(List<Job> keepJobs) {
return extraJobMap.values();
}

private List<Store> getAllStores() {
ListStoresResponse listStoresResponse = specService.listStores(Filter.newBuilder().build());
return listStoresResponse.getStoreList().stream()
.map(Store::fromProto)
.collect(Collectors.toList());
}

/**
* Generate a source to stores mapping. The resulting iterable yields pairs of Source and
* Set-of-stores to create one ingestion job per each pair.
*
* @return a Map from source to stores.
*/
private Iterable<Pair<Source, Set<Store>>> getSourceToStoreMappings() {
ListStoresResponse listStoresResponse = specService.listStores(Filter.newBuilder().build());
List<Store> stores =
listStoresResponse.getStoreList().stream()
.map(Store::fromProto)
.collect(Collectors.toList());

// build mapping from source to store.
// compile a set of sources via subscribed FeatureSets of stores.

Stream<Pair<Source, Store>> distinctPairs =
stores.stream()
getAllStores().stream()
.flatMap(
store ->
getFeatureSetsForStore(store).stream()
Expand Down Expand Up @@ -325,6 +341,7 @@ public void notifyJobsWhenFeatureSetUpdated() {
featureSetRepository.findAllByStatus(FeatureSetProto.FeatureSetStatus.STATUS_PENDING);

pendingFeatureSets.stream()
.map(this::allocateFeatureSetToJobs)
.filter(
fs -> {
List<FeatureSetJobStatus> runningJobs =
Expand All @@ -334,7 +351,7 @@ public void notifyJobsWhenFeatureSetUpdated() {

return runningJobs.size() > 0
&& runningJobs.stream()
.allMatch(jobStatus -> jobStatus.getVersion() < fs.getVersion());
.anyMatch(jobStatus -> jobStatus.getVersion() < fs.getVersion());
})
.forEach(
fs -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE jobs_feature_sets ADD CONSTRAINT jobs_feature_sets_pkey PRIMARY KEY (job_id, feature_sets_id);

ALTER TABLE jobs_stores ADD CONSTRAINT jobs_stores_pkey PRIMARY KEY (job_id, store_name);
Loading