-
Notifications
You must be signed in to change notification settings - Fork 998
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add general storage API and refactor existing store implementations #567
Conversation
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: zhilingc The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
I didn't look too closely at the code dependencies yet, but is it feasible to move the metrics namespacing commit out of this PR to later, or could it even apply to master directly? It seems useful orthogonally to the storage refactor and would help us keep review focus on the central subject. |
It's included in this PR because BQ doesn't support success metrics, so i needed to write metrics elsewhere as well, but i can reverse that merge first until you're done looking through this one. |
bd1b39e
to
5328a1b
Compare
I still don't understand how this is specific to the scope of storage modularization effort then—if it doesn't work for BigQuery, then how does it work for current master? If there are no success metrics for BQ on current master, then this is a new feature, not refactoring. |
No, it's because in this storage refactor PR i moved the original metrics from before the write to after, because I had to change the structure of the pipeline. (These post-write metrics are admittedly a new feature). The metrics namespacing PR adds the pre-write metrics back into the pipeline to ensure that users writing to bigquery get at least some visibility into what they're pushing into the sink. |
public abstract String getError(); | ||
|
||
@Nullable | ||
public abstract List<String> getFileUris(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a full review, but before forgetting I just wanted to carry over a point I had on #569 that's more appropriate to discuss in the wider context here:
I feel a little reserved though about committing to the pattern of out-of-band file-based result data as API, i.e.
getFileUris()
. Not sure if large data sets will ever be streamed through RPC responses, but either of these options compromise the possibility for user jobs doing retrieval and processing the results to leverage data locality in systems where that could be achieved—see discussion starting at #482 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ches As you have rightly mentioned in #482, maintaining data locality requires a discussion on its own. I fear that our specific requirements at Gojek have steered us towards an approach that may be less than ideal in an HDFS + Spark world.
You've mentioned that you have been working on a retrieval layer internally that bypasses our historical retrieval and presumably continues to maintain locality.
I'd be happy to start a new issue on the requirements we (Gojek) currently have on the historical retrieval side, and hopefully we can extend it with yours as well. Between the two we would cover a large swathe of use cases I believe, and hopefully we can refine the historical retrieval layer to a point where both options are possible (or even better, we decide one is clearly superior).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, that's very reasonable, it's a complex subject but one that I think we've framed sufficiently to start that. I'm far from having a design proposal without requirements all laid out, but I can contribute to starting an RFC with problem statements.
For the sake of letting this PR move forward, maybe we can document getFileUris
as a provisional API with a @see
pointing to the new issue. In the worst case an implementation for which it doesn't make sense can throw an UnsupportedOperationException
or the like and we can learn from use as this proceeds.
In our current approach with Hive, most likely we would not implement HistoricalRetrievalResult
at all—we use Feast Ingestion but not Serving for historical, there is no GetBatchFeatures
RPC call. It might be interesting to consider whether a similar call could be made mostly for Feast authorization purposes, metrics, etc., without expecting Serving to orchestrate the data staging. But, that's a matter for our new thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be interesting to consider whether a similar call could be made mostly for Feast authorization purposes, metrics, etc., without expecting Serving to orchestrate the data staging. But, that's a matter for our new thread.
I'd really like to see if this is possible.
By the way, how are you currently doing point in time correct joins? What triggers that execution?
@zhilingc: The following test failed, say
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here. |
22fcf8f
to
19c8bf3
Compare
serving/src/main/java/feast/serving/service/HistoricalServingService.java
Outdated
Show resolved
Hide resolved
Your check list (in the PR comment) is a bit ambiguous. It would be clearer if they were phrased as todo actions |
.setType(FeastServingType.FEAST_SERVING_TYPE_BATCH) | ||
.setJobStagingLocation(bigQueryHistoricalRetriever.jobStagingLocation()) | ||
.build(); | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This exception seems too broad. If there is a problem with a retriever then we should be logging it out.
this.jobService = jobService; | ||
} | ||
|
||
/** {@inheritDoc} */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be useful if we had comments or JavaDocs describing these methods in order for us to know the scope/responsibility. The interface is probably a good place to start with the documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW a comment only to do @inheritdoc
is unnecessary, it is automatic if you don't add any new Javadoc comment. @inheritdoc
allows you to extend with additions.
|
||
/** {@inheritDoc} */ | ||
@Override | ||
public GetBatchFeaturesResponse getBatchFeatures(GetBatchFeaturesRequest getFeaturesRequest) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm finding it hard to limit comments here. This is probably out of scope for the storage refactor, but having the serving service do job management here (creating a job Id and kicking off the job) seems to be an incorrect abstraction. I think this should be a part of what the job service does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Completely agree on both counts, the abstraction/layer boundaries and the scope of storage refactor.
I'm not sure where/when to address it, if we're going to have all this serving refactoring in the PR too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well I just realized that my configuration clean up #525 is the yak shave of the century. I am trying to limit the scope of that as well, but I am touching all of the same places that @zhilingc is because configuration is heavily used by stores. I've even now rebased on top of this very PR.
I think we should try and limit the already large scope of this PR to only the most critical aspects of the storage API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, for historical stores at least, the serving service only manages jobs, because historical serving is done async, and historical retrieval is analogous to starting a retrieval job. I'd argue that until there is a need for the two to be defined separately (e.g. if serving does some other job management), it's fine as is.
/** {@inheritDoc} */ | ||
@Override | ||
public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequest request) { | ||
try (Scope scope = tracer.buildSpan("Redis-getOnlineFeatures").startActive(true)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this trace be made more generic or interpolated?
List<List<FeatureRow>> featureRows = | ||
retriever.getOnlineFeatures(entityRows, featureSetRequests); | ||
|
||
for (var fsIdx = 0; fsIdx < featureRows.size(); fsIdx++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this block of code would benefit from comments that describe what is happening.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhilingc You added docs that can resolve this comment, but could we now take it a step further and factor some parts out to helpers, or domain methods on classes involved?
Forgive the laundry list, I'm going to make suggestions here before I start a line-by-line review that will take me awhile to finish:
-
Pull the metrics out to helpers, they eat up a lot of lines especially in deeply-nested code and metrics always add noise distracting readers from the business logic. Style nit while we're on this: I think the static imports are less than helpful, on first pass I was looking for a local for
staleKeyCount
, if the usage site wasMetrics.staleKeyCount
I would not have needed to. -
Scan for variable extractions, there are a few things repeated, e.g.
featureSetRequest.getSpec().getProject()
,featureSetRequest.getSpec().getMaxAge()
—nicely named locals will reduce the reading fatigue. -
Minor, but the part assigning
Map<String, FeatureReference> featureNames
could perhaps reasonably be an instance method onFeatureSetRequest
, likerefsByName
or something (also a suggestion for the local identifier name here,featureNames
sounds like a sequence to me rather than a map). -
isStale
(which could currently be markedstatic
btw) could also maybe be a method onFeatureSetRequest
, somehow… I don't like that as-is, especially because we've moved it to storage-api in this PR andEntityRow
is a nested message ofGetOnlineFeaturesRequest
—that does not belong at that layer. Maybe we're missing an abstraction here. (OrEntityRow
is misplaced, or both 😅)isStale
is behavior that should be unit tested. It could be, as a static method ofOnlineServingService
, but it feels wrong: there's nothing binding the arguments as belonging together in an aggregate (in DDD sense of the word). In other words, it's easy for a caller to make a mistake in using it. How could we improve that? I think this belongs in a perhaps-missing domain service layer (again in DDD sense, thisOnlineServingService
is pretty much an MVC controller currently, and that's not where I want to test or rely on core business logic behavior of Feast).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that what isStale
really needs is the feature set spec in question, the FeatureSetRequest
is incidental. That's a hint at the boundaries of a domain service layer, I'd say.
Somewhat random aside, FeatureSetReference
is currently defined in feast.serving.ServingAPIProto
—I think it probably belongs in feast.types
. It was added for v0.4, unfortunate that moving it is probably a breaking change. At least Maven module-wise it's all one dependency on datatypes-java
, it'll look out of place as an import in layers far removed from serving but otherwise little consequence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isStale
is definitely a bit of a conundrum here, I don't think it belongs to the FeatureSetRequest
because I dont think its the responsibility of the request info object (which is for the most part a value object) to contain that sort of logic.
Regarding FeatureSetReference
, The recent PR #548 adds this as a datatype (see here). Once that PR has been merged in I will refactor this code to use it.
I think the two key questions are
Other things to comment on
The idea of having the storage API consist out of a bunch of loosely defined interfaces is growing on me. If you have a look at the Presto connector interfaces you can see a pretty nice implementation. https://github.com/prestodb/presto/blob/master/presto-spi/src/main/java/com/facebook/presto/spi/connector/Connector.java. One of the reasons why I like their approach is because it will read a lot better from the consumer side. Instead of instantiating a retriever in serving you would instantiate a store. You would then ask the store for the retriever (or for the statistics retriever?). It can then let you know whether that functionality is implemented or even if you have a valid configuration to initialize it. Another thing that we can consider adopting is more explicit interface names with a flat directory structure. Right now we are grouping interfaces into |
One more comment. I think we should rename the title of this PR to be more descriptive. It will become a line in the changelog after all. @zhilingc I will leave that up to you. |
e74e2c6
to
e75050e
Compare
* Add storage interfaces, basic file structure * Apply spotless, add comments * Move parseResponse and isEmpty to response object * Make changes to write interface to be more beam-like * Pass feature specs to the retriever * Pass feature specs to online retriever * Add FeatureSetRequest * Add mistakenly removed TestUtil * Add mistakenly removed TestUtil
* Add Redis storage * Remove staleness check; can be checked at the service level * Remove staleness related tests * Add dependencies to top level pom * Clean up code
* Change serving and ingestion to use storage API * Remove extra exclusion clause
* API and docstring tweaks * Fix javadoc linting errors * Apply spotless * Fix javadoc formatting * Drop result from HistoricalRetrievalResult constructors
* Add better code documentation, make GetFeastServingInfo independent of retriever * Make getStagingLocation method of historical retriever * Apply spotless
* Clean up OnlineServingService code to be more readable * Revert Metrics
295a772
to
fa87a42
Compare
Moving ahead with merging this into master after the name changes. |
/lgtm |
What this PR does / why we need it:
This PR adds an API for implementing various backends for feast, adds the implementations for redis and bigquery, and refactors the ingestion and serving code to use this API. It also introduces a fix for issue #489.
Outstanding issues with this PR:
Which issue(s) this PR fixes:
Fixes #489
Fixes #110
Does this PR introduce a user-facing change?: