Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: add feedback to batch processor #2653

Merged
merged 4 commits into from
Nov 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 112 additions & 18 deletions python/seldon_core/batch_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
CHOICES_GATEWAY_TYPE = ["ambassador", "istio", "seldon"]
CHOICES_TRANSPORT = ["rest", "grpc"]
CHOICES_PAYLOAD_TYPE = ["ndarray", "tensor", "tftensor"]
CHOICES_DATA_TYPE = ["data", "json", "str"]
CHOICES_METHOD = ["predict"]
CHOICES_DATA_TYPE = ["data", "json", "str", "raw"]
CHOICES_METHOD = ["predict", "feedback"]
CHOICES_LOG_LEVEL = {
"debug": logging.DEBUG,
"info": logging.INFO,
Expand Down Expand Up @@ -66,6 +66,11 @@ def start_multithreaded_batch_worker(
q_in = Queue(workers * 2)
q_out = Queue(workers * 2)

if method == "feedback" and data_type != "raw":
raise RuntimeError("Feedback method is supported only with raw data type.")
elif data_type == "raw" and method != "feedback":
raise RuntimeError("Raw input is currently only support for feedback method.")

sc = SeldonClient(
gateway=gateway_type,
transport=transport,
Expand All @@ -84,7 +89,7 @@ def start_multithreaded_batch_worker(
for _ in range(workers):
Thread(
target=_start_request_worker,
args=(q_in, q_out, data_type, sc, retries, batch_id),
args=(q_in, q_out, data_type, sc, method, retries, batch_id),
daemon=True,
).start()

Expand All @@ -108,7 +113,7 @@ def start_multithreaded_batch_worker(
t_out.join()

if benchmark:
logging.info(f"Elapsed time: {time.time() - start_time}")
logger.info(f"Elapsed time: {time.time() - start_time}")


def _start_input_file_worker(q_in: Queue, input_data_path: str) -> None:
Expand Down Expand Up @@ -159,15 +164,16 @@ def _start_output_file_worker(

counter += 1
if counter % 100 == 0:
logging.info(f"Processed instances: {counter}")
logging.info(f"Total processed instances: {counter}")
logger.info(f"Processed instances: {counter}")
logger.info(f"Total processed instances: {counter}")


def _start_request_worker(
q_in: Queue,
q_out: Queue,
data_type: str,
sc: SeldonClient,
method: str,
retries: int,
batch_id: str,
) -> None:
Expand All @@ -186,17 +192,36 @@ def _start_request_worker(
The json/str/data type to send the requests as
sc
An initialised Seldon Client configured to send the requests to
method:
Method to call: predict or feedback
retries
The number of attempts to try for each request
batch_id
The unique identifier for the batch which is passed to all requests
"""
while True:
batch_idx, batch_instance_id, input_raw = q_in.get()
str_output = _send_batch_predict(
batch_idx, batch_instance_id, input_raw, data_type, sc, retries, batch_id
)
# Mark task as done in the queue to add space for new tasks
if method == "predict":
str_output = _send_batch_predict(
batch_idx,
batch_instance_id,
input_raw,
data_type,
sc,
retries,
batch_id,
)
# Mark task as done in the queue to add space for new tasks
elif method == "feedback":
str_output = _send_batch_feedback(
batch_idx,
batch_instance_id,
input_raw,
data_type,
sc,
retries,
batch_id,
)
q_out.put(str_output)
q_in.task_done()

Expand All @@ -217,7 +242,6 @@ def _send_batch_predict(
traced back individually in the Seldon Request Logger context. Each request
will be attempted for the number of retries, and will return the string
serialised result.

Paramters
---
batch_idx
Expand All @@ -234,7 +258,6 @@ def _send_batch_predict(
The number of times to retry the request
batch_id
The unique identifier for the batch which is passed to all requests

Returns
---
A string serialised result of the response (or equivallent data with error info)
Expand All @@ -252,9 +275,7 @@ def _send_batch_predict(
predict_kwargs["headers"] = {"Seldon-Puid": batch_instance_id}
try:
data = json.loads(input_raw)
# TODO: Add functionality to send "raw" payload
if data_type == "data":
# TODO: Update client to avoid requiring a numpy array
data_np = np.array(data)
predict_kwargs["data"] = data_np
elif data_type == "str":
Expand All @@ -265,15 +286,88 @@ def _send_batch_predict(
str_output = None
for i in range(retries):
try:
# TODO: Add functionality for explainer
# as explainer currently doesn't support meta
# TODO: Optimize client to share session for requests
seldon_payload = sc.predict(**predict_kwargs)
assert seldon_payload.success
str_output = json.dumps(seldon_payload.response)
break
except (requests.exceptions.RequestException, AssertionError) as e:
print("Exception:", e, "retries:", retries)
logger.error("Exception:", e, "retries:", retries)
if i == (retries - 1):
raise

except Exception as e:
error_resp = {
"status": {"info": "FAILURE", "reason": str(e), "status": 1},
"meta": meta,
}
str_output = json.dumps(error_resp)

return str_output


def _send_batch_feedback(
batch_idx: int,
batch_instance_id: int,
input_raw: str,
data_type: str,
sc: SeldonClient,
retries: int,
batch_id: str,
) -> str:
"""
Send an request using the Seldon Client with feedback

Paramters
---
batch_idx
The enumerated index given to the batch datapoint in order of local dataset
batch_instance_id
The unique ID of the batch datapoint created with the python uuid function
input_raw
The raw input in string format to be loaded to the respective format
data_type
The data type to send which can be str, json and data
sc
The instance of SeldonClient to use to send the requests to the seldon model
retries
The number of times to retry the request
batch_id
The unique identifier for the batch which is passed to all requests

Returns
---
A string serialised result of the response (or equivallent data with error info)
"""

feedback_kwargs = {}
meta = {
"tags": {
"batch_id": batch_id,
"batch_instance_id": batch_instance_id,
"batch_index": batch_idx,
}
}
# Feedback Protos does not support meta - defined to include in file output only.
try:
data = json.loads(input_raw)
feedback_kwargs["raw_request"] = data

str_output = None
for i in range(retries):
try:
seldon_payload = sc.feedback(**feedback_kwargs)
assert seldon_payload.success

# Update Tags so we can track feedback intances in output file
tags = seldon_payload.response.get("meta", {}).get("tags", {})
tags.update(meta["tags"])
if "meta" not in seldon_payload.response:
seldon_payload.response["meta"] = {}
seldon_payload.response["meta"]["tags"] = tags
str_output = json.dumps(seldon_payload.response)
break
except (requests.exceptions.RequestException, AssertionError) as e:
logger.error("Exception:", e, "retries:", retries)
if i == (retries - 1):
raise

Expand Down
49 changes: 40 additions & 9 deletions python/seldon_core/seldon_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from seldon_core.utils import (
array_to_grpc_datadef,
seldon_message_to_json,
json_to_feedback,
json_to_seldon_message,
feedback_to_json,
seldon_messages_to_json,
Expand Down Expand Up @@ -403,6 +404,7 @@ def feedback(
self,
prediction_request: prediction_pb2.SeldonMessage = None,
prediction_response: prediction_pb2.SeldonMessage = None,
prediction_truth: prediction_pb2.SeldonMessage = None,
reward: float = 0,
gateway: str = None,
transport: str = None,
Expand All @@ -419,6 +421,7 @@ def feedback(
namespace: str = None,
gateway_prefix: str = None,
client_return_type: str = None,
raw_request: dict = None,
) -> SeldonClientFeedback:
"""

Expand Down Expand Up @@ -483,16 +486,25 @@ def feedback(
namespace=namespace,
gateway_prefix=gateway_prefix,
client_return_type=client_return_type,
raw_request=raw_request,
)
self._validate_args(**k)
if k["gateway"] == "ambassador" or k["gateway"] == "istio":
if k["transport"] == "rest":
return rest_feedback_gateway(
prediction_request, prediction_response, reward, **k
prediction_request,
prediction_response,
prediction_truth,
reward,
**k,
)
elif k["transport"] == "grpc":
return grpc_feedback_gateway(
prediction_request, prediction_response, reward, **k
prediction_request,
prediction_response,
prediction_truth,
reward,
**k,
)
else:
raise SeldonClientException("Unknown transport " + k["transport"])
Expand Down Expand Up @@ -2148,13 +2160,15 @@ def grpc_feedback_seldon_oauth(
def rest_feedback_gateway(
prediction_request: prediction_pb2.SeldonMessage = None,
prediction_response: prediction_pb2.SeldonMessage = None,
prediction_truth: prediction_pb2.SeldonMessage = None,
reward: float = 0,
deployment_name: str = "",
namespace: str = None,
gateway_endpoint: str = "localhost:8003",
headers: Dict = None,
gateway_prefix: str = None,
client_return_type: str = "proto",
raw_request: dict = None,
**kwargs,
) -> SeldonClientFeedback:
"""
Expand Down Expand Up @@ -2187,10 +2201,17 @@ def rest_feedback_gateway(
A Seldon Feedback Response

"""
request = prediction_pb2.Feedback(
request=prediction_request, response=prediction_response, reward=reward
)
payload = feedback_to_json(request)
if raw_request:
request = json_to_feedback(raw_request)
payload = raw_request
else:
request = prediction_pb2.Feedback(
request=prediction_request,
response=prediction_response,
reward=reward,
truth=prediction_truth,
)
payload = feedback_to_json(request)
if gateway_prefix is None:
if namespace is None:
response_raw = requests.post(
Expand Down Expand Up @@ -2248,6 +2269,7 @@ def rest_feedback_gateway(
def grpc_feedback_gateway(
prediction_request: prediction_pb2.SeldonMessage = None,
prediction_response: prediction_pb2.SeldonMessage = None,
prediction_truth: prediction_pb2.SeldonMessage = None,
reward: float = 0,
deployment_name: str = "",
namespace: str = None,
Expand All @@ -2256,6 +2278,7 @@ def grpc_feedback_gateway(
grpc_max_send_message_length: int = 4 * 1024 * 1024,
grpc_max_receive_message_length: int = 4 * 1024 * 1024,
client_return_type: str = "proto",
raw_request: dict = None,
**kwargs,
) -> SeldonClientFeedback:
"""
Expand Down Expand Up @@ -2288,9 +2311,17 @@ def grpc_feedback_gateway(
-------

"""
request = prediction_pb2.Feedback(
request=prediction_request, response=prediction_response, reward=reward
)
if isinstance(raw_request, prediction_pb2.Feedback):
request = raw_request
elif raw_request:
request = json_to_feedback(raw_request)
else:
request = prediction_pb2.Feedback(
request=prediction_request,
response=prediction_response,
reward=reward,
truth=prediction_truth,
)
channel = grpc.insecure_channel(
gateway_endpoint,
options=[
Expand Down