Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for version method in Feast SDK and Core #759

Merged
merged 2 commits into from
May 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions core/src/main/java/feast/core/grpc/CoreServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.protobuf.InvalidProtocolBufferException;
import feast.core.config.FeastProperties;
import feast.core.exception.RetrievalException;
import feast.core.grpc.interceptors.MonitoringInterceptor;
import feast.core.model.Project;
Expand Down Expand Up @@ -64,6 +65,7 @@
@GRpcService(interceptors = {MonitoringInterceptor.class})
public class CoreServiceImpl extends CoreServiceImplBase {

private final FeastProperties feastProperties;
private SpecService specService;
private AccessManagementService accessManagementService;
private JobService jobService;
Expand All @@ -72,17 +74,28 @@ public class CoreServiceImpl extends CoreServiceImplBase {
public CoreServiceImpl(
SpecService specService,
AccessManagementService accessManagementService,
JobService jobService) {
JobService jobService,
FeastProperties feastProperties) {
this.specService = specService;
this.accessManagementService = accessManagementService;
this.jobService = jobService;
this.feastProperties = feastProperties;
}

@Override
public void getFeastCoreVersion(
GetFeastCoreVersionRequest request,
StreamObserver<GetFeastCoreVersionResponse> responseObserver) {
super.getFeastCoreVersion(request, responseObserver);
try {
GetFeastCoreVersionResponse response =
GetFeastCoreVersionResponse.newBuilder().setVersion(feastProperties.getVersion()).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (RetrievalException | StatusRuntimeException e) {
log.error("Could not determine Feast Core version: ", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

@Override
Expand Down
8 changes: 7 additions & 1 deletion sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,13 @@ def version(self):
"""
Returns version information from Feast Core and Feast Serving
"""
result = {}
import pkg_resources

result = {
"sdk": {"version": pkg_resources.get_distribution("feast").version},
"serving": "not configured",
"core": "not configured",
}

if self.serving_url:
self._connect_serving()
Expand Down
28 changes: 21 additions & 7 deletions tests/e2e/basic-ingest-redis-serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
FLOAT_TOLERANCE = 0.00001
PROJECT_NAME = 'basic_' + uuid.uuid4().hex.upper()[0:6]


@pytest.fixture(scope='module')
def core_url(pytestconfig):
return pytestconfig.getoption("core_url")
Expand Down Expand Up @@ -62,6 +63,7 @@ def client(core_url, serving_url, allow_dirty):

return client


def basic_dataframe(entities, features, ingest_time, n_size):
offset = random.randint(1000, 100000) # ensure a unique key space is used
df_dict = {
Expand All @@ -79,20 +81,29 @@ def basic_dataframe(entities, features, ingest_time, n_size):
def ingest_time():
return datetime.utcnow()


@pytest.fixture(scope="module")
def cust_trans_df(ingest_time):
return basic_dataframe(entities=["customer_id"],
features=["daily_transactions", "total_transactions"],
ingest_time=ingest_time,
n_size=5)


@pytest.fixture(scope="module")
def driver_df(ingest_time):
return basic_dataframe(entities=["driver_id"],
features=["rating", "cost"],
ingest_time=ingest_time,
n_size=5)


def test_version_returns_results(client):
version_info = client.version()
assert not version_info['core'] is 'not configured'
assert not version_info['serving'] is 'not configured'


@pytest.mark.timeout(45)
@pytest.mark.run(order=10)
def test_basic_register_feature_set_success(client):
Expand All @@ -117,6 +128,7 @@ def test_basic_register_feature_set_success(client):
# reset client's project for other tests
client.set_project()


@pytest.mark.timeout(300)
@pytest.mark.run(order=11)
def test_basic_ingest_success(client, cust_trans_df, driver_df):
Expand Down Expand Up @@ -157,16 +169,16 @@ def test_basic_retrieve_online_success(client, cust_trans_df):

returned_daily_transactions = float(
response.field_values[0]
.fields["daily_transactions"]
.float_val
.fields["daily_transactions"]
.float_val
)
sent_daily_transactions = float(
cust_trans_df.iloc[0]["daily_transactions"])

if math.isclose(
sent_daily_transactions,
returned_daily_transactions,
abs_tol=FLOAT_TOLERANCE,
sent_daily_transactions,
returned_daily_transactions,
abs_tol=FLOAT_TOLERANCE,
):
break

Expand Down Expand Up @@ -206,8 +218,8 @@ def test_basic_retrieve_online_multiple_featureset(client, cust_trans_df, driver
def check_response(ingest_df, response, feature_ref):
returned_value = float(
response.field_values[0]
.fields[feature_ref]
.float_val
.fields[feature_ref]
.float_val
)
feature_ref_splits = feature_ref.split(":")
if len(feature_ref_splits) == 1:
Expand All @@ -223,9 +235,11 @@ def check_response(ingest_df, response, feature_ref):
returned_value,
abs_tol=FLOAT_TOLERANCE,
)

if all([check_response(df, response, ref) for ref, df in feature_ref_df_mapping]):
break


@pytest.mark.timeout(300)
@pytest.mark.run(order=19)
def test_basic_ingest_jobs(client):
Expand Down