diff --git a/azure-batch/azure/batch/batch_service_client.py b/azure-batch/azure/batch/batch_service_client.py index b581445ac430..bf0d624c9c47 100644 --- a/azure-batch/azure/batch/batch_service_client.py +++ b/azure-batch/azure/batch/batch_service_client.py @@ -113,5 +113,6 @@ def __init__( self._client, self.config, self._serialize, self._deserialize) self.compute_node = ComputeNodeOperations( self._client, self.config, self._serialize, self._deserialize) - - patch_client(self) + + +patch_client() diff --git a/azure-batch/azure/batch/custom/patch.py b/azure-batch/azure/batch/custom/patch.py index dc1bf8fb2525..7fc04b15a898 100644 --- a/azure-batch/azure/batch/custom/patch.py +++ b/azure-batch/azure/batch/custom/patch.py @@ -33,6 +33,7 @@ class _TaskWorkflowManager(object): def __init__( self, client, + original_add_collection, job_id, tasks_to_add, task_add_collection_options=None, @@ -55,8 +56,8 @@ def __init__( self._pending_queue_lock = threading.Lock() # Variables to be used for task add_collection requests - self._client = TaskOperations( - client._client, client.config, client._serialize, client._deserialize) + self._client = client + self._original_add_collection = original_add_collection self._job_id = job_id self._task_add_collection_options = task_add_collection_options self._custom_headers = custom_headers @@ -76,7 +77,8 @@ def _bulk_add_tasks(self, results_queue, chunk_tasks_to_add): """ try: - add_collection_response = self._client.add_collection( + add_collection_response = self._original_add_collection( + self._client, self._job_id, chunk_tasks_to_add, self._task_add_collection_options, @@ -193,104 +195,107 @@ def _handle_output(results_queue): results.append(queue_item) return results -def patch_client(client): + +def build_new_add_collection(original_add_collection): + def bulk_add_collection( + self, + job_id, + value, + task_add_collection_options=None, + custom_headers=None, + raw=False, + threads=0, + **operation_config): + """Adds a collection of tasks to the specified job. + + Note that each task must have a unique ID. The Batch service may not + return the results for each task in the same order the tasks were + submitted in this request. If the server times out or the connection is + closed during the request, the request may have been partially or fully + processed, or not at all. In such cases, the user should re-issue the + request. Note that it is up to the user to correctly handle failures + when re-issuing a request. For example, you should use the same task + IDs during a retry so that if the prior operation succeeded, the retry + will not create extra tasks unexpectedly. If the response contains any + tasks which failed to add, a client can retry the request. In a retry, + it is most efficient to resubmit only tasks that failed to add, and to + omit tasks that were successfully added on the first attempt. + + :param job_id: The ID of the job to which the task collection is to be + added. + :type job_id: str + :param value: The collection of tasks to add. The total serialized + size of this collection must be less than 4MB. If it is greater than + 4MB (for example if each task has 100's of resource files or + environment variables), the request will fail with code + 'RequestBodyTooLarge' and should be retried again with fewer tasks. + :type value: list of :class:`TaskAddParameter + ` + :param task_add_collection_options: Additional parameters for the + operation + :type task_add_collection_options: :class:`TaskAddCollectionOptions + ` + :param dict custom_headers: headers that will be added to the request + :param bool raw: returns the direct response alongside the + deserialized response + :param int threads: number of threads to use in parallel when adding tasks. If specified + and greater than 0, will start additional threads to submit requests and wait for them to finish. + Otherwise will submit add_collection requests sequentially on main thread + :return: :class:`TaskAddCollectionResult + ` or + :class:`ClientRawResponse` if + raw=true + :rtype: :class:`TaskAddCollectionResult + ` or + :class:`ClientRawResponse` + :raises: + :class:`CreateTasksErrorException` + """ + + results_queue = collections.deque() # deque operations(append/pop) are thread-safe + task_workflow_manager = _TaskWorkflowManager( + self, + original_add_collection, + job_id, + value, + task_add_collection_options, + custom_headers, + raw, + **operation_config) + + # multi-threaded behavior + if threads: + if threads < 0: + raise ValueError("Threads must be positive or 0") + + active_threads = [] + for i in range(threads): + active_threads.append(threading.Thread( + target=task_workflow_manager.task_collection_thread_handler, + args=(results_queue,))) + active_threads[-1].start() + for thread in active_threads: + thread.join() + # single-threaded behavior + else: + task_workflow_manager.task_collection_thread_handler(results_queue) + + if task_workflow_manager.error: + raise task_workflow_manager.error # pylint: disable=raising-bad-type + else: + submitted_tasks = _handle_output(results_queue) + return TaskAddCollectionResult(value=submitted_tasks) + bulk_add_collection.metadata = {'url': '/jobs/{jobId}/addtaskcollection'} + return bulk_add_collection + + +def patch_client(): try: models = sys.modules['azure.batch.models'] except KeyError: models = importlib.import_module('azure.batch.models') setattr(models, 'CreateTasksErrorException', CreateTasksErrorException) sys.modules['azure.batch.models'] = models - client.task.add_collection = types.MethodType(bulk_add_collection, client.task) - -def bulk_add_collection( - client, - job_id, - value, - task_add_collection_options=None, - custom_headers=None, - raw=False, - threads=0, - **operation_config): - """Adds a collection of tasks to the specified job. - - Note that each task must have a unique ID. The Batch service may not - return the results for each task in the same order the tasks were - submitted in this request. If the server times out or the connection is - closed during the request, the request may have been partially or fully - processed, or not at all. In such cases, the user should re-issue the - request. Note that it is up to the user to correctly handle failures - when re-issuing a request. For example, you should use the same task - IDs during a retry so that if the prior operation succeeded, the retry - will not create extra tasks unexpectedly. If the response contains any - tasks which failed to add, a client can retry the request. In a retry, - it is most efficient to resubmit only tasks that failed to add, and to - omit tasks that were successfully added on the first attempt. The - maximum lifetime of a task from addition to completion is 7 days. If a - task has not completed within 7 days of being added it will be - terminated by the Batch service and left in whatever state it was in at - that time. - - :param job_id: The ID of the job to which the task collection is to be - added. - :type job_id: str - :param value: The collection of tasks to add. The total serialized - size of this collection must be less than 4MB. If it is greater than - 4MB (for example if each task has 100's of resource files or - environment variables), the request will fail with code - 'RequestBodyTooLarge' and should be retried again with fewer tasks. - :type value: list of :class:`TaskAddParameter - ` - :param task_add_collection_options: Additional parameters for the - operation - :type task_add_collection_options: :class:`TaskAddCollectionOptions - ` - :param dict custom_headers: headers that will be added to the request - :param bool raw: returns the direct response alongside the - deserialized response - :param int threads: number of threads to use in parallel when adding tasks. If specified - and greater than 0, will start additional threads to submit requests and wait for them to finish. - Otherwise will submit add_collection requests sequentially on main thread - :return: :class:`TaskAddCollectionResult - ` or - :class:`ClientRawResponse` if - raw=true - :rtype: :class:`TaskAddCollectionResult - ` or - :class:`ClientRawResponse` - :raises: - :class:`BatchErrorException` - """ - results_queue = collections.deque() # deque operations(append/pop) are thread-safe - task_workflow_manager = _TaskWorkflowManager( - client, - job_id, - value, - task_add_collection_options, - custom_headers, - raw, - **operation_config) - - # multi-threaded behavior - if threads: - if threads < 0: - raise ValueError("Threads must be positive or 0") - - active_threads = [] - for i in range(threads): - active_threads.append(threading.Thread( - target=task_workflow_manager.task_collection_thread_handler, - args=(results_queue,))) - active_threads[-1].start() - for thread in active_threads: - thread.join() - # single-threaded behavior - else: - task_workflow_manager.task_collection_thread_handler(results_queue) - - if task_workflow_manager.error: - raise task_workflow_manager.error # pylint: disable=raising-bad-type - else: - submitted_tasks = _handle_output(results_queue) - return TaskAddCollectionResult(value=submitted_tasks) - bulk_add_collection.metadata = {'url': '/jobs/{jobId}/addtaskcollection'} + operations_modules = importlib.import_module('azure.batch.operations') + operations_modules.TaskOperations.add_collection = build_new_add_collection(operations_modules.TaskOperations.add_collection)