Skip to content

Commit

Permalink
Fix Feast Serving not registering its store in Feast Core (#641)
Browse files Browse the repository at this point in the history
* Fixed missing call in serving's CachedSpecService to register store in core.

* Add unit test to check that cachedSpecService will register store with Core.

Co-authored-by: Zhu Zhanyan <[email protected]>
  • Loading branch information
mrzzy and Zhu Zhanyan authored Apr 22, 2020
1 parent 0816cd4 commit ffdd1f1
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;

/** In-memory cache of specs. */
/** In-memory cache of specs hosted in Feast Core. */
public class CachedSpecService {

private static final int MAX_SPEC_COUNT = 1000;
Expand Down Expand Up @@ -76,7 +76,7 @@ public class CachedSpecService {

public CachedSpecService(CoreSpecService coreService, StoreProto.Store store) {
this.coreService = coreService;
this.store = store;
this.store = coreService.registerStore(store);

Map<String, FeatureSetSpec> featureSets = getFeatureSetMap();
featureToFeatureSetMapping =
Expand Down
23 changes: 22 additions & 1 deletion serving/src/main/java/feast/serving/specs/CoreSpecService.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
import feast.core.CoreServiceProto.ListFeatureSetsResponse;
import feast.core.CoreServiceProto.UpdateStoreRequest;
import feast.core.CoreServiceProto.UpdateStoreResponse;
import feast.core.StoreProto.Store;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.slf4j.Logger;

/** Client for spec retrieval from core. */
/** Client for interfacing with specs in Feast Core. */
public class CoreSpecService {

private static final Logger log = org.slf4j.LoggerFactory.getLogger(CoreSpecService.class);
Expand All @@ -50,4 +51,24 @@ public ListFeatureSetsResponse listFeatureSets(ListFeatureSetsRequest ListFeatur
public UpdateStoreResponse updateStore(UpdateStoreRequest updateStoreRequest) {
return blockingStub.updateStore(updateStoreRequest);
}

/**
* Register the given store entry in Feast Core. If store already exists in Feast Core, updates
* the store entry in feast core.
*
* @param store entry to register/update in Feast Core.
* @return The register/updated store entry
*/
public Store registerStore(Store store) {
UpdateStoreRequest request = UpdateStoreRequest.newBuilder().setStore(store).build();
try {
UpdateStoreResponse updateStoreResponse = this.updateStore(request);
if (!updateStoreResponse.getStore().equals(store)) {
throw new RuntimeException("Core store config not matching current store config");
}
return updateStoreResponse.getStore();
} catch (Exception e) {
throw new RuntimeException("Unable to update store configuration", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

import com.google.common.collect.Lists;
import feast.core.CoreServiceProto.ListFeatureSetsRequest;
import feast.core.CoreServiceProto.ListFeatureSetsResponse;
import feast.core.CoreServiceProto.UpdateStoreRequest;
import feast.core.CoreServiceProto.UpdateStoreResponse;
import feast.core.FeatureSetProto;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.core.FeatureSetProto.FeatureSpec;
Expand Down Expand Up @@ -82,8 +82,7 @@ public void setUp() {
.build())
.build();

when(coreService.updateStore(UpdateStoreRequest.newBuilder().setStore(store).build()))
.thenReturn(UpdateStoreResponse.newBuilder().setStore(store).build());
when(coreService.registerStore(store)).thenReturn(store);

featureSetSpecs = new LinkedHashMap<>();
featureSetSpecs.put(
Expand Down Expand Up @@ -143,6 +142,11 @@ public void setUp() {
cachedSpecService = new CachedSpecService(coreService, store);
}

@Test
public void shouldRegisterStoreWithCore() {
verify(coreService, times(1)).registerStore(cachedSpecService.getStore());
}

@Test
public void shouldPopulateAndReturnStore() {
cachedSpecService.populateCache();
Expand Down

0 comments on commit ffdd1f1

Please sign in to comment.