From d6c2a65211cd7cb8b1772ab0774d147d3021c71c Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Tue, 30 Jun 2020 10:42:01 +0300 Subject: [PATCH] Deallocate featureSet from job when source changed (#844) * allocate featureSet to Jobs * lint --- .../dao/FeatureSetJobStatusRepository.java | 25 ++++++ .../feast/core/model/FeatureSetJobStatus.java | 2 +- core/src/main/java/feast/core/model/Job.java | 41 ++++++--- .../core/service/JobCoordinatorService.java | 85 +++++++++++-------- .../db/migration/V2.3__Fix_Primary_Keys.sql | 3 + .../service/JobCoordinatorServiceTest.java | 60 +++++++------ 6 files changed, 145 insertions(+), 71 deletions(-) create mode 100644 core/src/main/java/feast/core/dao/FeatureSetJobStatusRepository.java create mode 100644 core/src/main/resources/db/migration/V2.3__Fix_Primary_Keys.sql diff --git a/core/src/main/java/feast/core/dao/FeatureSetJobStatusRepository.java b/core/src/main/java/feast/core/dao/FeatureSetJobStatusRepository.java new file mode 100644 index 0000000000..33e9e5905c --- /dev/null +++ b/core/src/main/java/feast/core/dao/FeatureSetJobStatusRepository.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.dao; + +import feast.core.model.FeatureSetJobStatus; +import org.springframework.data.jpa.repository.JpaRepository; + +public interface FeatureSetJobStatusRepository + extends JpaRepository { + long count(); +} diff --git a/core/src/main/java/feast/core/model/FeatureSetJobStatus.java b/core/src/main/java/feast/core/model/FeatureSetJobStatus.java index f6320defdd..79a3b51da4 100644 --- a/core/src/main/java/feast/core/model/FeatureSetJobStatus.java +++ b/core/src/main/java/feast/core/model/FeatureSetJobStatus.java @@ -79,6 +79,6 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hashCode(job, featureSet, deliveryStatus, version); + return Objects.hashCode(job, featureSet); } } diff --git a/core/src/main/java/feast/core/model/Job.java b/core/src/main/java/feast/core/model/Job.java index 1b6b28fe4b..90794292d1 100644 --- a/core/src/main/java/feast/core/model/Job.java +++ b/core/src/main/java/feast/core/model/Job.java @@ -115,10 +115,10 @@ public boolean isDeployed() { return getExtId() != null && !getExtId().isEmpty(); } - public List getFeatureSets() { + public Set getFeatureSets() { return this.featureSetJobStatuses.stream() .map(FeatureSetJobStatus::getFeatureSet) - .collect(Collectors.toList()); + .collect(Collectors.toSet()); } public Source getSource() { @@ -128,6 +128,22 @@ public Source getSource() { return source; } + public void addAllFeatureSets(Set featureSets) { + for (FeatureSet fs : featureSets) { + FeatureSetJobStatus status = new FeatureSetJobStatus(); + status.setFeatureSet(fs); + status.setJob(this); + if (fs.getStatus() == FeatureSetProto.FeatureSetStatus.STATUS_READY) { + // Feature Set was already delivered to previous generation of the job + // (another words, it exists in kafka) + // so we expect Job will ack latest version based on history from kafka topic + status.setVersion(fs.getVersion()); + } + status.setDeliveryStatus(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); + this.getFeatureSetJobStatuses().add(status); + } + } + /** * Convert a job model to ingestion job proto * @@ -159,15 +175,18 @@ public IngestionJobProto.IngestionJob toProto() throws InvalidProtocolBufferExce } public Job clone() { - return Job.builder() - .setStores(getStores()) - .setStoreName(getStoreName()) - .setSourceConfig(getSourceConfig()) - .setSourceType(getSourceType()) - .setFeatureSetJobStatuses(new HashSet<>()) - .setRunner(getRunner()) - .setStatus(JobStatus.UNKNOWN) - .build(); + Job job = + Job.builder() + .setStores(getStores()) + .setStoreName(getStoreName()) + .setSourceConfig(getSourceConfig()) + .setSourceType(getSourceType()) + .setFeatureSetJobStatuses(new HashSet<>()) + .setRunner(getRunner()) + .setStatus(JobStatus.UNKNOWN) + .build(); + job.addAllFeatureSets(getFeatureSets()); + return job; } @Override diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index 08b8b90411..c7862a386e 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -16,11 +16,13 @@ */ package feast.core.service; +import static feast.common.models.Store.isSubscribedToFeatureSet; import static feast.core.model.FeatureSet.parseReference; import com.google.common.collect.Sets; import feast.core.config.FeastProperties; import feast.core.config.FeastProperties.JobProperties; +import feast.core.dao.FeatureSetJobStatusRepository; import feast.core.dao.FeatureSetRepository; import feast.core.dao.JobRepository; import feast.core.job.*; @@ -58,6 +60,7 @@ public class JobCoordinatorService { private final JobRepository jobRepository; private final FeatureSetRepository featureSetRepository; + private final FeatureSetJobStatusRepository jobStatusRepository; private final SpecService specService; private final JobManager jobManager; private final JobProperties jobProperties; @@ -68,6 +71,7 @@ public class JobCoordinatorService { public JobCoordinatorService( JobRepository jobRepository, FeatureSetRepository featureSetRepository, + FeatureSetJobStatusRepository jobStatusRepository, SpecService specService, JobManager jobManager, FeastProperties feastProperties, @@ -75,6 +79,7 @@ public JobCoordinatorService( KafkaTemplate specPublisher) { this.jobRepository = jobRepository; this.featureSetRepository = featureSetRepository; + this.jobStatusRepository = jobStatusRepository; this.specService = specService; this.jobManager = jobManager; this.jobProperties = feastProperties.getJobs(); @@ -152,10 +157,6 @@ List makeJobUpdateTasks(Iterable>> sourceToStor for (Pair> mapping : sourceToStores) { Source source = mapping.getKey(); Set stores = mapping.getValue(); - Set featureSets = - stores.stream() - .flatMap(s -> getFeatureSetsForStore(s).stream()) - .collect(Collectors.toSet()); Job job = groupingStrategy.getOrCreateJob(source, stores); @@ -185,12 +186,15 @@ List makeJobUpdateTasks(Iterable>> sourceToStor } } else { job.setId(groupingStrategy.createJobId(job)); + job.addAllFeatureSets( + stores.stream() + .flatMap(s -> getFeatureSetsForStore(s).stream()) + .filter(fs -> fs.getSource().equals(source)) + .collect(Collectors.toSet())); jobTasks.add(CreateJobTask.builder().setJob(job).setJobManager(jobManager).build()); } - allocateFeatureSets(job, featureSets); - // Record the job as required to safeguard it from getting stopped activeJobs.add(job); } @@ -227,38 +231,50 @@ private boolean jobRequiresUpgrade(Job job, Set stores) { } /** - * Connects given {@link Job} with FeatureSets by creating {@link FeatureSetJobStatus}. This - * connection represents responsibility of the job to handle allocated FeatureSets. We use this + * Connects given {@link FeatureSet} with Jobs by creating {@link FeatureSetJobStatus}. This + * connection represents responsibility of the job to handle allocated FeatureSet. We use this * connection {@link FeatureSetJobStatus} to monitor Ingestion of specific FeatureSet and Specs * delivery status. * *

Only after this connection is created FeatureSetSpec could be sent to IngestionJob. * - * @param job {@link Job} responsible job - * @param featureSets Set of {@link FeatureSet} featureSets to allocate to this job + * @param featureSet featureSet {@link FeatureSet} to find jobs and allocate */ - void allocateFeatureSets(Job job, Set featureSets) { - Map alreadyConnected = - job.getFeatureSetJobStatuses().stream() - .collect(Collectors.toMap(FeatureSetJobStatus::getFeatureSet, s -> s)); - - for (FeatureSet fs : featureSets) { - if (alreadyConnected.containsKey(fs)) { + FeatureSet allocateFeatureSetToJobs(FeatureSet featureSet) { + Set toAdd = new HashSet<>(); + Set existing = featureSet.getJobStatuses(); + + Stream> jobArgsStream = + getAllStores().stream() + .filter( + s -> + isSubscribedToFeatureSet( + s.getSubscriptions(), + featureSet.getProject().getName(), + featureSet.getName())) + .map(s -> Pair.of(featureSet.getSource(), s)); + + for (Pair> jobArgs : groupingStrategy.collectSingleJobInput(jobArgsStream)) { + Job job = groupingStrategy.getOrCreateJob(jobArgs.getLeft(), jobArgs.getRight()); + if (!job.isRunning()) { continue; } FeatureSetJobStatus status = new FeatureSetJobStatus(); - status.setFeatureSet(fs); + status.setFeatureSet(featureSet); status.setJob(job); - if (fs.getStatus() == FeatureSetProto.FeatureSetStatus.STATUS_READY) { - // Feature Set was already delivered to previous generation of the job - // (another words, it exists in kafka) - // so we expect Job will ack latest version based on history from kafka topic - status.setVersion(fs.getVersion()); - } status.setDeliveryStatus(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); - job.getFeatureSetJobStatuses().add(status); + + toAdd.add(status); } + + Set toDelete = Sets.difference(existing, toAdd); + toAdd = Sets.difference(toAdd, existing); + + jobStatusRepository.deleteAll(toDelete); + jobStatusRepository.saveAll(toAdd); + jobStatusRepository.flush(); + return featureSet; } /** Get running extra ingestion jobs that have ids not in keepJobs */ @@ -271,6 +287,13 @@ private Collection getExtraJobs(List keepJobs) { return extraJobMap.values(); } + private List getAllStores() { + ListStoresResponse listStoresResponse = specService.listStores(Filter.newBuilder().build()); + return listStoresResponse.getStoreList().stream() + .map(Store::fromProto) + .collect(Collectors.toList()); + } + /** * Generate a source to stores mapping. The resulting iterable yields pairs of Source and * Set-of-stores to create one ingestion job per each pair. @@ -278,17 +301,10 @@ private Collection getExtraJobs(List keepJobs) { * @return a Map from source to stores. */ private Iterable>> getSourceToStoreMappings() { - ListStoresResponse listStoresResponse = specService.listStores(Filter.newBuilder().build()); - List stores = - listStoresResponse.getStoreList().stream() - .map(Store::fromProto) - .collect(Collectors.toList()); - // build mapping from source to store. // compile a set of sources via subscribed FeatureSets of stores. - Stream> distinctPairs = - stores.stream() + getAllStores().stream() .flatMap( store -> getFeatureSetsForStore(store).stream() @@ -325,6 +341,7 @@ public void notifyJobsWhenFeatureSetUpdated() { featureSetRepository.findAllByStatus(FeatureSetProto.FeatureSetStatus.STATUS_PENDING); pendingFeatureSets.stream() + .map(this::allocateFeatureSetToJobs) .filter( fs -> { List runningJobs = @@ -334,7 +351,7 @@ public void notifyJobsWhenFeatureSetUpdated() { return runningJobs.size() > 0 && runningJobs.stream() - .allMatch(jobStatus -> jobStatus.getVersion() < fs.getVersion()); + .anyMatch(jobStatus -> jobStatus.getVersion() < fs.getVersion()); }) .forEach( fs -> { diff --git a/core/src/main/resources/db/migration/V2.3__Fix_Primary_Keys.sql b/core/src/main/resources/db/migration/V2.3__Fix_Primary_Keys.sql new file mode 100644 index 0000000000..98cb8c2aaa --- /dev/null +++ b/core/src/main/resources/db/migration/V2.3__Fix_Primary_Keys.sql @@ -0,0 +1,3 @@ +ALTER TABLE jobs_feature_sets ADD CONSTRAINT jobs_feature_sets_pkey PRIMARY KEY (job_id, feature_sets_id); + +ALTER TABLE jobs_stores ADD CONSTRAINT jobs_stores_pkey PRIMARY KEY (job_id, store_name); \ No newline at end of file diff --git a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java index 2a4fc24693..86e18fcd0a 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java @@ -16,6 +16,7 @@ */ package feast.core.service; +import static feast.common.models.Store.convertStringToSubscription; import static feast.proto.core.FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED; import static feast.proto.core.FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS; import static org.hamcrest.CoreMatchers.*; @@ -35,6 +36,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import feast.core.config.FeastProperties; import feast.core.config.FeastProperties.JobProperties; +import feast.core.dao.FeatureSetJobStatusRepository; import feast.core.dao.FeatureSetRepository; import feast.core.dao.JobRepository; import feast.core.dao.SourceRepository; @@ -74,6 +76,7 @@ public class JobCoordinatorServiceTest { @Mock JobManager jobManager; @Mock SpecService specService; @Mock FeatureSetRepository featureSetRepository; + @Mock FeatureSetJobStatusRepository jobStatusRepository; @Mock private KafkaTemplate kafkaTemplate; @Mock SourceRepository sourceRepository; @@ -93,6 +96,7 @@ public void setUp() { new JobCoordinatorService( jobRepository, featureSetRepository, + jobStatusRepository, specService, jobManager, feastProperties, @@ -103,6 +107,7 @@ public void setUp() { new JobCoordinatorService( jobRepository, featureSetRepository, + jobStatusRepository, specService, jobManager, feastProperties, @@ -512,7 +517,7 @@ public void shouldUseStoreSubscriptionToMapStore() throws InvalidProtocolBufferE } @Test - public void shouldSendPendingFeatureSetToJobs() { + public void shouldSendPendingFeatureSetToJobs() throws InvalidProtocolBufferException { FeatureSet fs1 = TestUtil.CreateFeatureSet( "fs_1", "project", Collections.emptyList(), Collections.emptyList()); @@ -553,6 +558,8 @@ public void shouldSendPendingFeatureSetToJobs() { when(featureSetRepository.findAllByStatus(FeatureSetProto.FeatureSetStatus.STATUS_PENDING)) .thenReturn(ImmutableList.of(fs1, fs2, fs3)); + when(specService.listStores(any())).thenReturn(ListStoresResponse.newBuilder().build()); + jcsWithConsolidation.notifyJobsWhenFeatureSetUpdated(); verify(kafkaTemplate).sendDefault(eq(fs1.getReference()), any(FeatureSetSpec.class)); @@ -585,6 +592,7 @@ public void shouldNotUpdateJobStatusVersionWhenKafkaUnavailable() { when(kafkaTemplate.sendDefault(eq(fsInTest.getReference()), any()).get()).thenThrow(exc); when(featureSetRepository.findAllByStatus(FeatureSetProto.FeatureSetStatus.STATUS_PENDING)) .thenReturn(ImmutableList.of(fsInTest)); + when(specService.listStores(any())).thenReturn(ListStoresResponse.newBuilder().build()); jcsWithConsolidation.notifyJobsWhenFeatureSetUpdated(); assertThat(featureSetJobStatus.getVersion(), is(1)); @@ -651,43 +659,44 @@ public void specAckListenerShouldUpdateFeatureSetStatus() { } @Test - public void featureSetsShouldBeAllocated() { + public void featureSetShouldBeAllocated() throws InvalidProtocolBufferException { FeatureSetProto.FeatureSet.Builder fsBuilder = FeatureSetProto.FeatureSet.newBuilder().setMeta(FeatureSetMeta.newBuilder()); FeatureSetSpec.Builder specBuilder = FeatureSetSpec.newBuilder(); + Source source = TestUtil.createKafkaSource("kafka", "topic", false); + Store store = + TestUtil.createStore("store", ImmutableList.of(convertStringToSubscription("*:*"))); + + when(specService.listStores(any())) + .thenReturn(ListStoresResponse.newBuilder().addStore(store.toProto()).build()); + FeatureSet featureSet1 = FeatureSet.fromProto(fsBuilder.setSpec(specBuilder.setName("featureSet1")).build()); - FeatureSet featureSet2 = - FeatureSet.fromProto(fsBuilder.setSpec(specBuilder.setName("featureSet2")).build()); - - featureSet2.setStatus(FeatureSetProto.FeatureSetStatus.STATUS_READY); - featureSet2.setVersion(5); + featureSet1.setSource(source); - Job job = new Job(); - jcsWithConsolidation.allocateFeatureSets(job, ImmutableSet.of(featureSet1)); + FeatureSetJobStatus status = + TestUtil.CreateFeatureSetJobStatusWithJob(JobStatus.RUNNING, STATUS_IN_PROGRESS, 1); - FeatureSetJobStatus expectedStatus1 = new FeatureSetJobStatus(); - expectedStatus1.setJob(job); - expectedStatus1.setFeatureSet(featureSet1); - expectedStatus1.setVersion(0); - expectedStatus1.setDeliveryStatus(STATUS_IN_PROGRESS); + featureSet1.getJobStatuses().add(status); - assertThat(job.getFeatureSetJobStatuses(), containsInAnyOrder(expectedStatus1)); + Job job = new Job(); + job.setStatus(JobStatus.RUNNING); - expectedStatus1.setDeliveryStatus(STATUS_DELIVERED); - job.getFeatureSetJobStatuses().forEach(j -> j.setDeliveryStatus(STATUS_DELIVERED)); + when(jobRepository + .findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( + source.getType(), source.getConfig(), null, JobStatus.getTerminalStates())) + .thenReturn(Optional.of(job)); - jcsWithConsolidation.allocateFeatureSets(job, ImmutableSet.of(featureSet1, featureSet2)); + jcsWithConsolidation.allocateFeatureSetToJobs(featureSet1); - FeatureSetJobStatus expectedStatus2 = new FeatureSetJobStatus(); - expectedStatus2.setJob(job); - expectedStatus2.setFeatureSet(featureSet2); - expectedStatus2.setVersion(featureSet2.getVersion()); - expectedStatus2.setDeliveryStatus(STATUS_IN_PROGRESS); + FeatureSetJobStatus expectedStatus = new FeatureSetJobStatus(); + expectedStatus.setJob(job); + expectedStatus.setFeatureSet(featureSet1); + expectedStatus.setDeliveryStatus(STATUS_IN_PROGRESS); - assertThat( - job.getFeatureSetJobStatuses(), containsInAnyOrder(expectedStatus1, expectedStatus2)); + verify(jobStatusRepository).saveAll(ImmutableSet.of(expectedStatus)); + verify(jobStatusRepository).deleteAll(ImmutableSet.of(status)); } @Test @@ -855,6 +864,7 @@ public void shouldCloneRunningJobOnUpgrade() throws InvalidProtocolBufferExcepti .setExtId("extId") .setId("some-id") .setStatus(JobStatus.RUNNING) + .setFeatureSetJobStatuses(new HashSet<>()) .build(); when(jobRepository