Skip to content

Commit

Permalink
Add support for version method in Feast SDK and Core (#759)
Browse files Browse the repository at this point in the history
* Add support for version method

* Fix typo in e2e tests for version method
  • Loading branch information
woop authored May 30, 2020
1 parent d22fb88 commit a5c6dce
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 10 deletions.
17 changes: 15 additions & 2 deletions core/src/main/java/feast/core/grpc/CoreServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.protobuf.InvalidProtocolBufferException;
import feast.core.config.FeastProperties;
import feast.core.exception.RetrievalException;
import feast.core.grpc.interceptors.MonitoringInterceptor;
import feast.core.model.Project;
Expand Down Expand Up @@ -64,6 +65,7 @@
@GRpcService(interceptors = {MonitoringInterceptor.class})
public class CoreServiceImpl extends CoreServiceImplBase {

private final FeastProperties feastProperties;
private SpecService specService;
private AccessManagementService accessManagementService;
private JobService jobService;
Expand All @@ -72,17 +74,28 @@ public class CoreServiceImpl extends CoreServiceImplBase {
public CoreServiceImpl(
SpecService specService,
AccessManagementService accessManagementService,
JobService jobService) {
JobService jobService,
FeastProperties feastProperties) {
this.specService = specService;
this.accessManagementService = accessManagementService;
this.jobService = jobService;
this.feastProperties = feastProperties;
}

@Override
public void getFeastCoreVersion(
GetFeastCoreVersionRequest request,
StreamObserver<GetFeastCoreVersionResponse> responseObserver) {
super.getFeastCoreVersion(request, responseObserver);
try {
GetFeastCoreVersionResponse response =
GetFeastCoreVersionResponse.newBuilder().setVersion(feastProperties.getVersion()).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (RetrievalException | StatusRuntimeException e) {
log.error("Could not determine Feast Core version: ", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

@Override
Expand Down
8 changes: 7 additions & 1 deletion sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,13 @@ def version(self):
"""
Returns version information from Feast Core and Feast Serving
"""
result = {}
import pkg_resources

result = {
"sdk": {"version": pkg_resources.get_distribution("feast").version},
"serving": "not configured",
"core": "not configured",
}

if self.serving_url:
self._connect_serving()
Expand Down
28 changes: 21 additions & 7 deletions tests/e2e/basic-ingest-redis-serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
FLOAT_TOLERANCE = 0.00001
PROJECT_NAME = 'basic_' + uuid.uuid4().hex.upper()[0:6]


@pytest.fixture(scope='module')
def core_url(pytestconfig):
return pytestconfig.getoption("core_url")
Expand Down Expand Up @@ -62,6 +63,7 @@ def client(core_url, serving_url, allow_dirty):

return client


def basic_dataframe(entities, features, ingest_time, n_size):
offset = random.randint(1000, 100000) # ensure a unique key space is used
df_dict = {
Expand All @@ -79,20 +81,29 @@ def basic_dataframe(entities, features, ingest_time, n_size):
def ingest_time():
return datetime.utcnow()


@pytest.fixture(scope="module")
def cust_trans_df(ingest_time):
return basic_dataframe(entities=["customer_id"],
features=["daily_transactions", "total_transactions"],
ingest_time=ingest_time,
n_size=5)


@pytest.fixture(scope="module")
def driver_df(ingest_time):
return basic_dataframe(entities=["driver_id"],
features=["rating", "cost"],
ingest_time=ingest_time,
n_size=5)


def test_version_returns_results(client):
version_info = client.version()
assert not version_info['core'] is 'not configured'
assert not version_info['serving'] is 'not configured'


@pytest.mark.timeout(45)
@pytest.mark.run(order=10)
def test_basic_register_feature_set_success(client):
Expand All @@ -117,6 +128,7 @@ def test_basic_register_feature_set_success(client):
# reset client's project for other tests
client.set_project()


@pytest.mark.timeout(300)
@pytest.mark.run(order=11)
def test_basic_ingest_success(client, cust_trans_df, driver_df):
Expand Down Expand Up @@ -157,16 +169,16 @@ def test_basic_retrieve_online_success(client, cust_trans_df):

returned_daily_transactions = float(
response.field_values[0]
.fields["daily_transactions"]
.float_val
.fields["daily_transactions"]
.float_val
)
sent_daily_transactions = float(
cust_trans_df.iloc[0]["daily_transactions"])

if math.isclose(
sent_daily_transactions,
returned_daily_transactions,
abs_tol=FLOAT_TOLERANCE,
sent_daily_transactions,
returned_daily_transactions,
abs_tol=FLOAT_TOLERANCE,
):
break

Expand Down Expand Up @@ -206,8 +218,8 @@ def test_basic_retrieve_online_multiple_featureset(client, cust_trans_df, driver
def check_response(ingest_df, response, feature_ref):
returned_value = float(
response.field_values[0]
.fields[feature_ref]
.float_val
.fields[feature_ref]
.float_val
)
feature_ref_splits = feature_ref.split(":")
if len(feature_ref_splits) == 1:
Expand All @@ -223,9 +235,11 @@ def check_response(ingest_df, response, feature_ref):
returned_value,
abs_tol=FLOAT_TOLERANCE,
)

if all([check_response(df, response, ref) for ref, df in feature_ref_df_mapping]):
break


@pytest.mark.timeout(300)
@pytest.mark.run(order=19)
def test_basic_ingest_jobs(client):
Expand Down

0 comments on commit a5c6dce

Please sign in to comment.