-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Serve] Support Multiple DAG Entrypoints in DAGDriver #26573
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really cool. What about supporting a dictionary of route: DAG
instead of the "prefix_route" list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall looks like a good approach! it's not quite prefix matching though; you are just matching url path here; should update variable name according.
python/ray/serve/drivers.py
Outdated
async def handle_request( | ||
request: starlette.requests.Request, inp=Depends(http_adapter) | ||
): | ||
resp = await self.predict(inp, deployment_graph_route_path=request.url.path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah this is going to break serve/air_integration.py PredictorDeployment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work on this change! This should work fine with the REST API/CLI, since the user can define the dag_routes
in the graph code.
However, they won't be able to update these routes through the YAML file. They would need to update the code and redeploy. Dynamic route updates would likely require more intrusive code changes, so this should be fine for now.
python/ray/serve/drivers.py
Outdated
http_adapter: Optional[Union[str, Callable]] = None, | ||
dags_routes=None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dags_routes=None, | |
dag_routes=None, |
Can you do that via |
b57941a
to
a04aa76
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sihanwang41 I'm a bit concerned about the API here, it seems very error-prone and it isn't obvious how the *args map to the dags_routes
. I would much prefer to have the routes and dags mapped directly. My proposal would be something like as follows:
def __init__(self, dags: Union[DagHandle, Dict[str, DagHandle]]):
If a single DAG is passed, it works as currently. If a dict is passed, it's of the form route: DAG
. This makes it much more obvious how the routes map to the sub-dags.
Also, as a grammatical nit, it should be dag_handles
not dag_handles
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, a follow-up thought, this should support DeploymentNode
s as well as DAGHandle
s. That way you can pass individual classes in addition to call graphs.
My main is that the reconfigure should support adding and removing new routes instead of just updating the routes for existing list. The existing handles rarely need to be updated. However, it looks like I suggest the following:
For the last point, you can do the following:
|
python/ray/serve/drivers.py
Outdated
for route in dags: | ||
|
||
@self.app.get(f"{route}") | ||
@self.app.post(f"{route}") | ||
async def handle_request( | ||
request: starlette.requests.Request, inp=Depends(http_adapter) | ||
): | ||
return await self.multi_dag_predict(request.url.path, inp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for route in dags: | |
@self.app.get(f"{route}") | |
@self.app.post(f"{route}") | |
async def handle_request( | |
request: starlette.requests.Request, inp=Depends(http_adapter) | |
): | |
return await self.multi_dag_predict(request.url.path, inp) | |
for route, handle in dags.items(): | |
@self.app.get(f"{route}") | |
@self.app.post(f"{route}") | |
async def handle_request( | |
request: starlette.requests.Request, inp=Depends(http_adapter) | |
): | |
return await self.handle.remote(inp) |
Now we don't need the special mutli_dag predict
We can also generalize the single DAG case into a wildcard route "/{path}"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @simon-mo ,
i give a try, I think the handle variable life cycle it outlived after for loop. All the endpoint will use the last handle
For single DAG case into a wildcard route "/{path}", the change you suggest will receive all requests. Which means, we will not return 404 from FlaskAPI side (we have to do it by ourselves) when user send http://127.0.0.1/not_exist for single DAG.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the endpoint will use the last handle
You can use functools.partial to properly perform the bind in arguments and closure: https://docs.python-guide.org/writing/gotchas/#late-binding-closures
we will not return 404 from FlaskAPI side (we have to do it by ourselves) when user send http://127.0.0.1/not_exist for single DAG.
But for a single DAG we never return 404 because it's route_prefix enforced to "/{path}" already. User don't specify the route_prefix for single DAG case; if they want, they should do {"prefix": DAG} and pass the dict instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, works like a charm for functools.partial. Updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
direction is great, minor code issue please clean up.
python/ray/serve/drivers.py
Outdated
|
||
async def predict(self, *args, **kwargs): | ||
"""Perform inference directly without HTTP.""" | ||
return await self.dag_handle.remote(*args, **kwargs) | ||
|
||
async def multi_dag_predict(self, route_path, *args, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no longer used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is used for the handle ingress prediction, we still need a way to select correct handle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah i see. I do feel this API is very unnatural to use. In that case, we should encourage user to just get a handle for their DAG entrypoint...
How about we support a special kwargs route_prefix for now? so user we using the same method call just specifying it via handle.remote(..., route="...")
Alternatively, a better name might help, how about predict_with_route
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will go for the predict_with_route. it is much clear for user to know I will select a handle by sending a route.
For user behavior, remote(xxx) are all including the inputs (args and kwargs) for their task, add route
for internal usage is not very clean interface.
Signed-off-by: Sihan Wang <[email protected]>
Signed-off-by: Sihan Wang <[email protected]>
Signed-off-by: Sihan Wang <[email protected]>
Signed-off-by: Sihan Wang <[email protected]>
Signed-off-by: Sihan Wang <[email protected]>
Signed-off-by: Sihan Wang <[email protected]>
Signed-off-by: Sihan Wang <[email protected]>
Signed-off-by: Sihan Wang <[email protected]>
Signed-off-by: Sihan Wang <[email protected]>
Signed-off-by: Sihan Wang <[email protected]>
Signed-off-by: Sihan Wang <[email protected]>
Signed-off-by: Sihan Wang <[email protected]>
Signed-off-by: Sihan Wang <[email protected]>
Signed-off-by: Sihan Wang <[email protected]>
Signed-off-by: Sihan Wang <[email protected]>
Signed-off-by: Sihan Wang <[email protected]>
Signed-off-by: Sihan Wang <[email protected]>
Signed-off-by: Sihan Wang <[email protected]>
Signed-off-by: Sihan Wang <[email protected]>
Signed-off-by: Sihan Wang <[email protected]>
Let's make sure to follow up with documentations and examples about this in user guide section by mid this week. |
Co-authored-by: Sihan Wang <[email protected]> This is an important feature to prevent regression of feature set when user migrating from 1.0 to 2.0.
) Signed-off-by: Stefan van der Kleij <[email protected]>
Why are these changes needed?
Each DAG will have a different endpoint, and DAGDriver will consolidate the routes for all the endpoints from all DAGs .
Note: not support to execute any middle node of one DAG.
Related issue number
Checks
scripts/format.sh
to lint the changes in this PR.