diff --git a/python/seldon_core/batch_processor.py b/python/seldon_core/batch_processor.py index ff3c267085..6396103038 100644 --- a/python/seldon_core/batch_processor.py +++ b/python/seldon_core/batch_processor.py @@ -12,8 +12,8 @@ CHOICES_GATEWAY_TYPE = ["ambassador", "istio", "seldon"] CHOICES_TRANSPORT = ["rest", "grpc"] CHOICES_PAYLOAD_TYPE = ["ndarray", "tensor", "tftensor"] -CHOICES_DATA_TYPE = ["data", "json", "str"] -CHOICES_METHOD = ["predict"] +CHOICES_DATA_TYPE = ["data", "json", "str", "raw"] +CHOICES_METHOD = ["predict", "feedback"] CHOICES_LOG_LEVEL = { "debug": logging.DEBUG, "info": logging.INFO, @@ -66,6 +66,11 @@ def start_multithreaded_batch_worker( q_in = Queue(workers * 2) q_out = Queue(workers * 2) + if method == "feedback" and data_type != "raw": + raise RuntimeError("Feedback method is supported only with raw data type.") + elif data_type == "raw" and method != "feedback": + raise RuntimeError("Raw input is currently only support for feedback method.") + sc = SeldonClient( gateway=gateway_type, transport=transport, @@ -84,7 +89,7 @@ def start_multithreaded_batch_worker( for _ in range(workers): Thread( target=_start_request_worker, - args=(q_in, q_out, data_type, sc, retries, batch_id), + args=(q_in, q_out, data_type, sc, method, retries, batch_id), daemon=True, ).start() @@ -108,7 +113,7 @@ def start_multithreaded_batch_worker( t_out.join() if benchmark: - logging.info(f"Elapsed time: {time.time() - start_time}") + logger.info(f"Elapsed time: {time.time() - start_time}") def _start_input_file_worker(q_in: Queue, input_data_path: str) -> None: @@ -159,8 +164,8 @@ def _start_output_file_worker( counter += 1 if counter % 100 == 0: - logging.info(f"Processed instances: {counter}") - logging.info(f"Total processed instances: {counter}") + logger.info(f"Processed instances: {counter}") + logger.info(f"Total processed instances: {counter}") def _start_request_worker( @@ -168,6 +173,7 @@ def _start_request_worker( q_out: Queue, data_type: str, sc: SeldonClient, + method: str, retries: int, batch_id: str, ) -> None: @@ -186,6 +192,8 @@ def _start_request_worker( The json/str/data type to send the requests as sc An initialised Seldon Client configured to send the requests to + method: + Method to call: predict or feedback retries The number of attempts to try for each request batch_id @@ -193,10 +201,27 @@ def _start_request_worker( """ while True: batch_idx, batch_instance_id, input_raw = q_in.get() - str_output = _send_batch_predict( - batch_idx, batch_instance_id, input_raw, data_type, sc, retries, batch_id - ) - # Mark task as done in the queue to add space for new tasks + if method == "predict": + str_output = _send_batch_predict( + batch_idx, + batch_instance_id, + input_raw, + data_type, + sc, + retries, + batch_id, + ) + # Mark task as done in the queue to add space for new tasks + elif method == "feedback": + str_output = _send_batch_feedback( + batch_idx, + batch_instance_id, + input_raw, + data_type, + sc, + retries, + batch_id, + ) q_out.put(str_output) q_in.task_done() @@ -217,7 +242,6 @@ def _send_batch_predict( traced back individually in the Seldon Request Logger context. Each request will be attempted for the number of retries, and will return the string serialised result. - Paramters --- batch_idx @@ -234,7 +258,6 @@ def _send_batch_predict( The number of times to retry the request batch_id The unique identifier for the batch which is passed to all requests - Returns --- A string serialised result of the response (or equivallent data with error info) @@ -252,9 +275,7 @@ def _send_batch_predict( predict_kwargs["headers"] = {"Seldon-Puid": batch_instance_id} try: data = json.loads(input_raw) - # TODO: Add functionality to send "raw" payload if data_type == "data": - # TODO: Update client to avoid requiring a numpy array data_np = np.array(data) predict_kwargs["data"] = data_np elif data_type == "str": @@ -265,15 +286,88 @@ def _send_batch_predict( str_output = None for i in range(retries): try: - # TODO: Add functionality for explainer - # as explainer currently doesn't support meta - # TODO: Optimize client to share session for requests seldon_payload = sc.predict(**predict_kwargs) assert seldon_payload.success str_output = json.dumps(seldon_payload.response) break except (requests.exceptions.RequestException, AssertionError) as e: - print("Exception:", e, "retries:", retries) + logger.error("Exception:", e, "retries:", retries) + if i == (retries - 1): + raise + + except Exception as e: + error_resp = { + "status": {"info": "FAILURE", "reason": str(e), "status": 1}, + "meta": meta, + } + str_output = json.dumps(error_resp) + + return str_output + + +def _send_batch_feedback( + batch_idx: int, + batch_instance_id: int, + input_raw: str, + data_type: str, + sc: SeldonClient, + retries: int, + batch_id: str, +) -> str: + """ + Send an request using the Seldon Client with feedback + + Paramters + --- + batch_idx + The enumerated index given to the batch datapoint in order of local dataset + batch_instance_id + The unique ID of the batch datapoint created with the python uuid function + input_raw + The raw input in string format to be loaded to the respective format + data_type + The data type to send which can be str, json and data + sc + The instance of SeldonClient to use to send the requests to the seldon model + retries + The number of times to retry the request + batch_id + The unique identifier for the batch which is passed to all requests + + Returns + --- + A string serialised result of the response (or equivallent data with error info) + """ + + feedback_kwargs = {} + meta = { + "tags": { + "batch_id": batch_id, + "batch_instance_id": batch_instance_id, + "batch_index": batch_idx, + } + } + # Feedback Protos does not support meta - defined to include in file output only. + try: + data = json.loads(input_raw) + feedback_kwargs["raw_request"] = data + + str_output = None + for i in range(retries): + try: + seldon_payload = sc.feedback(**feedback_kwargs) + assert seldon_payload.success + + # Update Tags so we can track feedback intances in output file + tags = seldon_payload.response.get("meta", {}).get("tags", {}) + tags.update(meta["tags"]) + if "meta" not in seldon_payload.response: + seldon_payload.response["meta"] = {} + seldon_payload.response["meta"]["tags"] = tags + str_output = json.dumps(seldon_payload.response) + break + except (requests.exceptions.RequestException, AssertionError) as e: + logger.error("Exception:", e, "retries:", retries) if i == (retries - 1): raise diff --git a/python/seldon_core/seldon_client.py b/python/seldon_core/seldon_client.py index 22a98c3948..eb32d8c466 100644 --- a/python/seldon_core/seldon_client.py +++ b/python/seldon_core/seldon_client.py @@ -3,6 +3,7 @@ from seldon_core.utils import ( array_to_grpc_datadef, seldon_message_to_json, + json_to_feedback, json_to_seldon_message, feedback_to_json, seldon_messages_to_json, @@ -403,6 +404,7 @@ def feedback( self, prediction_request: prediction_pb2.SeldonMessage = None, prediction_response: prediction_pb2.SeldonMessage = None, + prediction_truth: prediction_pb2.SeldonMessage = None, reward: float = 0, gateway: str = None, transport: str = None, @@ -419,6 +421,7 @@ def feedback( namespace: str = None, gateway_prefix: str = None, client_return_type: str = None, + raw_request: dict = None, ) -> SeldonClientFeedback: """ @@ -483,16 +486,25 @@ def feedback( namespace=namespace, gateway_prefix=gateway_prefix, client_return_type=client_return_type, + raw_request=raw_request, ) self._validate_args(**k) if k["gateway"] == "ambassador" or k["gateway"] == "istio": if k["transport"] == "rest": return rest_feedback_gateway( - prediction_request, prediction_response, reward, **k + prediction_request, + prediction_response, + prediction_truth, + reward, + **k, ) elif k["transport"] == "grpc": return grpc_feedback_gateway( - prediction_request, prediction_response, reward, **k + prediction_request, + prediction_response, + prediction_truth, + reward, + **k, ) else: raise SeldonClientException("Unknown transport " + k["transport"]) @@ -2148,6 +2160,7 @@ def grpc_feedback_seldon_oauth( def rest_feedback_gateway( prediction_request: prediction_pb2.SeldonMessage = None, prediction_response: prediction_pb2.SeldonMessage = None, + prediction_truth: prediction_pb2.SeldonMessage = None, reward: float = 0, deployment_name: str = "", namespace: str = None, @@ -2155,6 +2168,7 @@ def rest_feedback_gateway( headers: Dict = None, gateway_prefix: str = None, client_return_type: str = "proto", + raw_request: dict = None, **kwargs, ) -> SeldonClientFeedback: """ @@ -2187,10 +2201,17 @@ def rest_feedback_gateway( A Seldon Feedback Response """ - request = prediction_pb2.Feedback( - request=prediction_request, response=prediction_response, reward=reward - ) - payload = feedback_to_json(request) + if raw_request: + request = json_to_feedback(raw_request) + payload = raw_request + else: + request = prediction_pb2.Feedback( + request=prediction_request, + response=prediction_response, + reward=reward, + truth=prediction_truth, + ) + payload = feedback_to_json(request) if gateway_prefix is None: if namespace is None: response_raw = requests.post( @@ -2248,6 +2269,7 @@ def rest_feedback_gateway( def grpc_feedback_gateway( prediction_request: prediction_pb2.SeldonMessage = None, prediction_response: prediction_pb2.SeldonMessage = None, + prediction_truth: prediction_pb2.SeldonMessage = None, reward: float = 0, deployment_name: str = "", namespace: str = None, @@ -2256,6 +2278,7 @@ def grpc_feedback_gateway( grpc_max_send_message_length: int = 4 * 1024 * 1024, grpc_max_receive_message_length: int = 4 * 1024 * 1024, client_return_type: str = "proto", + raw_request: dict = None, **kwargs, ) -> SeldonClientFeedback: """ @@ -2288,9 +2311,17 @@ def grpc_feedback_gateway( ------- """ - request = prediction_pb2.Feedback( - request=prediction_request, response=prediction_response, reward=reward - ) + if isinstance(raw_request, prediction_pb2.Feedback): + request = raw_request + elif raw_request: + request = json_to_feedback(raw_request) + else: + request = prediction_pb2.Feedback( + request=prediction_request, + response=prediction_response, + reward=reward, + truth=prediction_truth, + ) channel = grpc.insecure_channel( gateway_endpoint, options=[