Skip to content

Commit

Permalink
JobCoordinator use public API to communicate with Core (#943)
Browse files Browse the repository at this point in the history
* test job with no labels

* filtering by status & status update in Core API

* specs IT

* SpecServiceIT

* resolve conflicts

* comments
  • Loading branch information
Oleksii Moskalenko authored Aug 11, 2020
1 parent 51f4fb6 commit 65626d6
Show file tree
Hide file tree
Showing 14 changed files with 1,035 additions and 1,190 deletions.
16 changes: 16 additions & 0 deletions core/src/main/java/feast/core/grpc/CoreServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,22 @@ public void getFeatureStatistics(
}
}

@Override
public void updateFeatureSetStatus(
UpdateFeatureSetStatusRequest request,
StreamObserver<UpdateFeatureSetStatusResponse> responseObserver) {
try {
UpdateFeatureSetStatusResponse response = specService.updateFeatureSetStatus(request);

responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Exception e) {
log.error("Exception has occurred in UpdateFeatureSetStatus method: ", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

@Override
public void listStores(
ListStoresRequest request, StreamObserver<ListStoresResponse> responseObserver) {
Expand Down
114 changes: 73 additions & 41 deletions core/src/main/java/feast/core/service/JobCoordinatorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,24 @@

import static feast.common.models.Store.isSubscribedToFeatureSet;
import static feast.core.model.FeatureSet.parseReference;
import static feast.core.util.StreamUtil.wrapException;

import com.google.common.collect.Sets;
import com.google.protobuf.InvalidProtocolBufferException;
import feast.common.models.FeatureSetReference;
import feast.core.config.FeastProperties;
import feast.core.config.FeastProperties.JobProperties;
import feast.core.dao.FeatureSetRepository;
import feast.core.job.*;
import feast.core.job.task.*;
import feast.core.model.*;
import feast.core.model.FeatureSet;
import feast.core.model.FeatureSetDeliveryStatus;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.proto.core.CoreServiceProto;
import feast.proto.core.CoreServiceProto.ListStoresRequest.Filter;
import feast.proto.core.CoreServiceProto.ListStoresResponse;
import feast.proto.core.FeatureSetProto;
import feast.proto.core.FeatureSetProto.FeatureSet;
import feast.proto.core.FeatureSetProto.FeatureSetSpec;
import feast.proto.core.FeatureSetReferenceProto;
import feast.proto.core.IngestionJobProto;
import feast.proto.core.SourceProto.Source;
import feast.proto.core.StoreProto.Store;
Expand All @@ -59,24 +61,21 @@ public class JobCoordinatorService {
private final int SPEC_PUBLISHING_TIMEOUT_SECONDS = 5;

private final JobRepository jobRepository;
private final FeatureSetRepository featureSetRepository;
private final SpecService specService;
private final JobManager jobManager;
private final JobProperties jobProperties;
private final JobGroupingStrategy groupingStrategy;
private final KafkaTemplate<String, FeatureSetProto.FeatureSetSpec> specPublisher;
private final KafkaTemplate<String, FeatureSetSpec> specPublisher;

@Autowired
public JobCoordinatorService(
JobRepository jobRepository,
FeatureSetRepository featureSetRepository,
SpecService specService,
JobManager jobManager,
FeastProperties feastProperties,
JobGroupingStrategy groupingStrategy,
KafkaTemplate<String, FeatureSetProto.FeatureSetSpec> specPublisher) {
KafkaTemplate<String, FeatureSetSpec> specPublisher) {
this.jobRepository = jobRepository;
this.featureSetRepository = featureSetRepository;
this.specService = specService;
this.jobManager = jobManager;
this.jobProperties = feastProperties.getJobs();
Expand Down Expand Up @@ -236,7 +235,7 @@ private boolean jobRequiresUpgrade(Job job, Set<Store> stores) {
*/
FeatureSet allocateFeatureSetToJobs(FeatureSet featureSet) {
FeatureSetReference ref =
FeatureSetReference.of(featureSet.getProject().getName(), featureSet.getName());
FeatureSetReference.of(featureSet.getSpec().getProject(), featureSet.getSpec().getName());
Set<String> confirmedJobIds = new HashSet<>();

Stream<Pair<Source, Store>> jobArgsStream =
Expand All @@ -245,9 +244,9 @@ FeatureSet allocateFeatureSetToJobs(FeatureSet featureSet) {
s ->
isSubscribedToFeatureSet(
s.getSubscriptionsList(),
featureSet.getProject().getName(),
featureSet.getName()))
.map(s -> Pair.of(featureSet.getSource().toProto(), s));
featureSet.getSpec().getProject(),
featureSet.getSpec().getName()))
.map(s -> Pair.of(featureSet.getSpec().getSource(), s));

// Add featureSet to allocated job if not allocated before
for (Pair<Source, Set<Store>> jobArgs : groupingStrategy.collectSingleJobInput(jobArgsStream)) {
Expand Down Expand Up @@ -320,31 +319,47 @@ Iterable<Pair<Source, Set<Store>>> getSourceToStoreMappings() {
* @param store to get subscribed FeatureSets for
* @return list of FeatureSets that the store subscribes to.
*/
private List<FeatureSetProto.FeatureSet> getFeatureSetsForStore(Store store) {
private List<FeatureSet> getFeatureSetsForStore(Store store) {
return store.getSubscriptionsList().stream()
.flatMap(
subscription ->
featureSetRepository
.findAllByNameLikeAndProject_NameLikeOrderByNameAsc(
subscription.getName().replace('*', '%'),
subscription.getProject().replace('*', '%'))
.stream())
subscription -> {
try {
return specService
.listFeatureSets(
CoreServiceProto.ListFeatureSetsRequest.Filter.newBuilder()
.setProject(subscription.getProject())
.setFeatureSetName(subscription.getName())
.build())
.getFeatureSetsList().stream();
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(
String.format(
"Couldn't fetch featureSets for subscription %s. Reason: %s",
subscription, e.getMessage()));
}
})
.distinct()
.map(wrapException(FeatureSet::toProto))
.collect(Collectors.toList());
}

@Scheduled(fixedDelayString = "${feast.stream.specsOptions.notifyIntervalMilliseconds}")
public void notifyJobsWhenFeatureSetUpdated() {
public void notifyJobsWhenFeatureSetUpdated() throws InvalidProtocolBufferException {
List<FeatureSet> pendingFeatureSets =
featureSetRepository.findAllByStatus(FeatureSetProto.FeatureSetStatus.STATUS_PENDING);
specService
.listFeatureSets(
CoreServiceProto.ListFeatureSetsRequest.Filter.newBuilder()
.setProject("*")
.setFeatureSetName("*")
.setStatus(FeatureSetProto.FeatureSetStatus.STATUS_PENDING)
.build())
.getFeatureSetsList();

pendingFeatureSets.stream()
.map(this::allocateFeatureSetToJobs)
.map(
fs -> {
FeatureSetReference ref =
FeatureSetReference.of(fs.getProject().getName(), fs.getName());
FeatureSetReference.of(fs.getSpec().getProject(), fs.getSpec().getName());
List<FeatureSetDeliveryStatus> deliveryStatuses =
jobRepository.findByFeatureSetReference(ref).stream()
.filter(Job::isRunning)
Expand All @@ -360,13 +375,17 @@ public void notifyJobsWhenFeatureSetUpdated() {
&& pair.getRight().stream()
.anyMatch(
jobStatus ->
jobStatus.getDeliveredVersion() < pair.getLeft().getVersion()))
jobStatus.getDeliveredVersion()
< pair.getLeft().getSpec().getVersion()))
.forEach(
pair -> {
FeatureSet fs = pair.getLeft();
List<FeatureSetDeliveryStatus> deliveryStatuses = pair.getRight();

log.info("Sending new FeatureSet {} to Ingestion", fs.getReference());
FeatureSetReference ref =
FeatureSetReference.of(fs.getSpec().getProject(), fs.getSpec().getName());

log.info("Sending new FeatureSet {} to Ingestion", ref);

// Sending latest version of FeatureSet to all currently running IngestionJobs
// (there's one topic for all sets).
Expand All @@ -375,7 +394,7 @@ public void notifyJobsWhenFeatureSetUpdated() {
// again later.
try {
specPublisher
.sendDefault(fs.getReference(), fs.toProto().getSpec())
.sendDefault(ref.getReference(), fs.getSpec())
.get(SPEC_PUBLISHING_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (Exception e) {
log.error(
Expand All @@ -393,7 +412,7 @@ public void notifyJobsWhenFeatureSetUpdated() {
jobStatus -> {
jobStatus.setDeliveryStatus(
FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS);
jobStatus.setDeliveredVersion(fs.getVersion());
jobStatus.setDeliveredVersion(fs.getSpec().getVersion());
});
});
}
Expand All @@ -412,13 +431,19 @@ public void notifyJobsWhenFeatureSetUpdated() {
@KafkaListener(
topics = {"${feast.stream.specsOptions.specsAckTopic}"},
containerFactory = "kafkaAckListenerContainerFactory")
public void listenAckFromJobs(
ConsumerRecord<String, IngestionJobProto.FeatureSetSpecAck> record) {
public void listenAckFromJobs(ConsumerRecord<String, IngestionJobProto.FeatureSetSpecAck> record)
throws InvalidProtocolBufferException {
String setReference = record.key();
Pair<String, String> projectAndSetName = parseReference(setReference);
FeatureSet featureSet =
featureSetRepository.findFeatureSetByNameAndProject_Name(
projectAndSetName.getRight(), projectAndSetName.getLeft());
specService
.getFeatureSet(
CoreServiceProto.GetFeatureSetRequest.newBuilder()
.setProject(projectAndSetName.getLeft())
.setName(projectAndSetName.getRight())
.build())
.getFeatureSet();

if (featureSet == null) {
log.warn(
String.format("ACKListener received message for unknown FeatureSet %s", setReference));
Expand All @@ -427,18 +452,18 @@ public void listenAckFromJobs(

int ackVersion = record.value().getFeatureSetVersion();

if (featureSet.getVersion() != ackVersion) {
if (featureSet.getSpec().getVersion() != ackVersion) {
log.warn(
String.format(
"ACKListener received outdated ack for %s. Current %d, Received %d",
setReference, featureSet.getVersion(), ackVersion));
setReference, featureSet.getSpec().getVersion(), ackVersion));
return;
}

log.info("Updating featureSet {} delivery statuses.", featureSet.getReference());

FeatureSetReference ref =
FeatureSetReference.of(featureSet.getProject().getName(), featureSet.getName());
FeatureSetReference.of(featureSet.getSpec().getProject(), featureSet.getSpec().getName());

log.info("Updating featureSet {} delivery statuses.", ref);

jobRepository
.findById(record.value().getJobName())
Expand All @@ -459,10 +484,17 @@ public void listenAckFromJobs(
.equals(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED));

if (allDelivered) {
log.info("FeatureSet {} update is completely delivered", featureSet.getReference());

featureSet.setStatus(FeatureSetProto.FeatureSetStatus.STATUS_READY);
featureSetRepository.saveAndFlush(featureSet);
log.info("FeatureSet {} update is completely delivered", ref);

specService.updateFeatureSetStatus(
CoreServiceProto.UpdateFeatureSetStatusRequest.newBuilder()
.setReference(
FeatureSetReferenceProto.FeatureSetReference.newBuilder()
.setName(ref.getFeatureSetName())
.setProject(ref.getProjectName())
.build())
.setStatus(FeatureSetProto.FeatureSetStatus.STATUS_READY)
.build());
}
}
}
39 changes: 31 additions & 8 deletions core/src/main/java/feast/core/service/SpecService.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import feast.proto.core.CoreServiceProto.ListStoresRequest;
import feast.proto.core.CoreServiceProto.ListStoresResponse;
import feast.proto.core.CoreServiceProto.ListStoresResponse.Builder;
import feast.proto.core.CoreServiceProto.UpdateFeatureSetStatusRequest;
import feast.proto.core.CoreServiceProto.UpdateFeatureSetStatusResponse;
import feast.proto.core.CoreServiceProto.UpdateStoreRequest;
import feast.proto.core.CoreServiceProto.UpdateStoreResponse;
import feast.proto.core.FeatureSetProto;
Expand Down Expand Up @@ -89,29 +91,33 @@ public SpecService(
*/
public GetFeatureSetResponse getFeatureSet(GetFeatureSetRequest request)
throws InvalidProtocolBufferException {
FeatureSet featureSet = getFeatureSet(request.getProject(), request.getName());

return GetFeatureSetResponse.newBuilder().setFeatureSet(featureSet.toProto()).build();
}

private FeatureSet getFeatureSet(String projectName, String featureSetName) {
// Validate input arguments
checkValidCharacters(request.getName(), "featureSetName");
checkValidCharacters(featureSetName, "featureSetName");

if (request.getName().isEmpty()) {
if (featureSetName.isEmpty()) {
throw new IllegalArgumentException("No feature set name provided");
}
// Autofill default project if project is not specified
if (request.getProject().isEmpty()) {
request = request.toBuilder().setProject(Project.DEFAULT_NAME).build();
if (projectName.isEmpty()) {
projectName = Project.DEFAULT_NAME;
}

FeatureSet featureSet;

featureSet =
featureSetRepository.findFeatureSetByNameAndProject_Name(
request.getName(), request.getProject());
featureSetRepository.findFeatureSetByNameAndProject_Name(featureSetName, projectName);

if (featureSet == null) {
throw new RetrievalException(
String.format("Feature set with name \"%s\" could not be found.", request.getName()));
String.format("Feature set with name \"%s\" could not be found.", featureSetName));
}
return GetFeatureSetResponse.newBuilder().setFeatureSet(featureSet.toProto()).build();
return featureSet;
}

/**
Expand All @@ -138,6 +144,7 @@ public ListFeatureSetsResponse listFeatureSets(ListFeatureSetsRequest.Filter fil
String name = filter.getFeatureSetName();
String project = filter.getProject();
Map<String, String> labelsFilter = filter.getLabelsMap();
FeatureSetStatus statusFilter = filter.getStatus();

if (name.isEmpty()) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -195,6 +202,10 @@ public ListFeatureSetsResponse listFeatureSets(ListFeatureSetsRequest.Filter fil
if (featureSets.size() > 0) {
featureSets =
featureSets.stream()
.filter(
featureSet ->
statusFilter.equals(FeatureSetStatus.STATUS_INVALID)
|| featureSet.getStatus().equals(statusFilter))
.filter(featureSet -> featureSet.hasAllLabels(labelsFilter))
.collect(Collectors.toList());
for (FeatureSet featureSet : featureSets) {
Expand Down Expand Up @@ -264,6 +275,18 @@ public ListFeaturesResponse listFeatures(ListFeaturesRequest.Filter filter) {
}
}

/** Update FeatureSet's status by given FeatureSetReference and new status */
public UpdateFeatureSetStatusResponse updateFeatureSetStatus(
UpdateFeatureSetStatusRequest request) {
FeatureSet featureSet =
getFeatureSet(request.getReference().getProject(), request.getReference().getName());

featureSet.setStatus(request.getStatus());
featureSetRepository.saveAndFlush(featureSet);

return UpdateFeatureSetStatusResponse.newBuilder().build();
}

/**
* Get stores matching the store name provided in the filter. If the store name is not provided,
* the method will return all stores currently registered to Feast.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,7 @@ void canApplyFeatureSetIfAuthenticated() {
SimpleAPIClient secureApiClient =
getSecureApiClient("[email protected]");
FeatureSetProto.FeatureSet expectedFeatureSet =
DataGenerator.createFeatureSet(
DataGenerator.getDefaultSource(),
"project_1",
"test_1",
Collections.emptyList(),
Collections.emptyList());
DataGenerator.createFeatureSet(DataGenerator.getDefaultSource(), "project_1", "test_1");

secureApiClient.simpleApplyFeatureSet(expectedFeatureSet);

Expand Down
Loading

0 comments on commit 65626d6

Please sign in to comment.