From c4158041c04f42c2f6c177fc6b589b49818b1d86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Bregu=C5=82a?= Date: Sat, 1 Jan 2000 00:00:00 +0000 Subject: [PATCH] [AIRFLOW-3980] Unify logger --- airflow/bin/cli.py | 2 +- airflow/configuration.py | 4 +- airflow/contrib/auth/backends/ldap_auth.py | 2 +- airflow/contrib/hooks/bigquery_hook.py | 40 +++++++------ airflow/contrib/hooks/gcp_container_hook.py | 27 ++++----- airflow/contrib/kubernetes/pod_launcher.py | 2 +- .../contrib/operators/aws_athena_operator.py | 10 ++-- .../contrib/operators/gcp_compute_operator.py | 10 ++-- .../operators/gcp_container_operator.py | 2 +- .../contrib/operators/mlengine_operator.py | 11 ++-- .../operators/oracle_to_oracle_transfer.py | 5 +- .../operators/s3_delete_objects_operator.py | 2 +- airflow/contrib/operators/s3_list_operator.py | 5 +- .../contrib/operators/s3_to_gcs_operator.py | 5 +- .../operators/sagemaker_endpoint_operator.py | 2 +- airflow/contrib/operators/winrm_operator.py | 2 +- .../aws_glue_catalog_partition_sensor.py | 4 +- airflow/contrib/sensors/bash_sensor.py | 8 +-- airflow/contrib/sensors/hdfs_sensor.py | 10 +--- airflow/contrib/sensors/python_sensor.py | 2 +- .../sensors/sagemaker_training_sensor.py | 2 +- airflow/contrib/sensors/wasb_sensor.py | 5 +- airflow/contrib/utils/gcp_field_sanitizer.py | 24 ++++---- airflow/contrib/utils/gcp_field_validator.py | 56 +++++++++---------- airflow/executors/base_executor.py | 4 +- airflow/executors/celery_executor.py | 24 ++++---- airflow/hooks/dbapi_hook.py | 5 +- airflow/hooks/druid_hook.py | 7 +-- airflow/hooks/hive_hooks.py | 2 +- airflow/hooks/http_hook.py | 3 +- airflow/hooks/oracle_hook.py | 4 +- airflow/hooks/pig_hook.py | 2 +- airflow/hooks/webhdfs_hook.py | 4 +- airflow/jobs.py | 55 +++++++----------- airflow/models/__init__.py | 33 +++++------ airflow/operators/bash_operator.py | 2 +- airflow/operators/check_operator.py | 12 ++-- airflow/operators/druid_check_operator.py | 4 +- airflow/operators/python_operator.py | 8 +-- airflow/operators/s3_to_hive_operator.py | 20 +++---- airflow/security/kerberos.py | 34 +++++------ airflow/sensors/external_task_sensor.py | 7 +-- airflow/sensors/hdfs_sensor.py | 2 +- airflow/sensors/hive_partition_sensor.py | 4 +- .../sensors/named_hive_partition_sensor.py | 10 ++-- airflow/sensors/s3_key_sensor.py | 3 +- airflow/sensors/s3_prefix_sensor.py | 3 +- airflow/sensors/web_hdfs_sensor.py | 2 +- airflow/utils/cli_action_loggers.py | 12 ++-- airflow/utils/dag_processing.py | 44 ++++++--------- airflow/utils/log/es_task_handler.py | 4 +- airflow/utils/log/gcs_task_handler.py | 4 +- airflow/utils/timeout.py | 2 +- airflow/www/api/experimental/endpoints.py | 2 +- airflow/www/security.py | 21 ++++--- .../contrib/hooks/test_gcp_container_hook.py | 4 +- .../operators/test_gcp_container_operator.py | 4 +- tests/executors/test_celery_executor.py | 5 +- tests/utils/log/test_es_task_handler.py | 3 +- 59 files changed, 275 insertions(+), 326 deletions(-) diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index fae3cb7a274894..5b4e05ca9970b4 100644 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -502,7 +502,7 @@ def run(args, dag=None): dag = get_dag(args) elif not dag: with db.create_session() as session: - log.info('Loading pickle id {args.pickle}'.format(args=args)) + log.info('Loading pickle id %s', args.pickle) dag_pickle = session.query(DagPickle).filter(DagPickle.id == args.pickle).first() if not dag_pickle: raise AirflowException("Who hid the pickle!? [missing pickle]") diff --git a/airflow/configuration.py b/airflow/configuration.py index 0f4c434ec383cb..d9b5dadf15f5b4 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -253,12 +253,12 @@ def get(self, section, key, **kwargs): else: log.warning( - "section/key [{section}/{key}] not found in config".format(**locals()) + "section/key [%s/%s] not found in config", section, key ) raise AirflowConfigException( "section/key [{section}/{key}] not found " - "in config".format(**locals())) + "in config".format(section=section, key=key)) def getboolean(self, section, key, **kwargs): val = str(self.get(section, key, **kwargs)).lower().strip() diff --git a/airflow/contrib/auth/backends/ldap_auth.py b/airflow/contrib/auth/backends/ldap_auth.py index 587fc2be16d05b..654d57baa153b3 100644 --- a/airflow/contrib/auth/backends/ldap_auth.py +++ b/airflow/contrib/auth/backends/ldap_auth.py @@ -227,7 +227,7 @@ def try_login(username, password): Unable to parse LDAP structure. If you're using Active Directory and not specifying an OU, you must set search_scope=SUBTREE in airflow.cfg. %s - """ % traceback.format_exc()) + """, traceback.format_exc()) raise LdapException( "Could not parse LDAP structure. " "Try setting search_scope in airflow.cfg, or check logs" diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index 4fb2d51454c0a8..f9c8a13f84d6bf 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -816,7 +816,7 @@ def run_query(self, if param_name == 'schemaUpdateOptions' and param: self.log.info("Adding experimental 'schemaUpdateOptions': " - "{0}".format(schema_update_options)) + "%s", schema_update_options) if param_name == 'destinationTable': for key in ['projectId', 'datasetId', 'tableId']: @@ -1157,8 +1157,9 @@ def run_load(self, "'WRITE_APPEND' or 'WRITE_TRUNCATE'.") else: self.log.info( - "Adding experimental " - "'schemaUpdateOptions': {0}".format(schema_update_options)) + "Adding experimental 'schemaUpdateOptions': %s", + schema_update_options + ) configuration['load'][ 'schemaUpdateOptions'] = schema_update_options @@ -1559,9 +1560,10 @@ def create_empty_dataset(self, dataset_id="", project_id="", param, param_name, param_default = param_tuple if param_name not in dataset_reference['datasetReference']: if param_default and not param: - self.log.info("{} was not specified. Will be used default " - "value {}.".format(param_name, - param_default)) + self.log.info( + "%s was not specified. Will be used default value %s.", + param_name, param_default + ) param = param_default dataset_reference['datasetReference'].update( {param_name: param}) @@ -1639,7 +1641,7 @@ def get_dataset(self, dataset_id, project_id=None): try: dataset_resource = self.service.datasets().get( datasetId=dataset_id, projectId=dataset_project_id).execute() - self.log.info("Dataset Resource: {}".format(dataset_resource)) + self.log.info("Dataset Resource: %s", dataset_resource) except HttpError as err: raise AirflowException( 'BigQuery job failed. Error was: {}'.format(err.content)) @@ -1686,7 +1688,7 @@ def get_datasets_list(self, project_id=None): try: datasets_list = self.service.datasets().list( projectId=dataset_project_id).execute()['datasets'] - self.log.info("Datasets List: {}".format(datasets_list)) + self.log.info("Datasets List: %s", datasets_list) except HttpError as err: raise AirflowException( @@ -1741,9 +1743,10 @@ def insert_all(self, project_id, dataset_id, table_id, } try: - self.log.info('Inserting {} row(s) into Table {}:{}.{}'.format( - len(rows), dataset_project_id, - dataset_id, table_id)) + self.log.info( + 'Inserting %s row(s) into Table %s:%s.%s', + len(rows), dataset_project_id, dataset_id, table_id + ) resp = self.service.tabledata().insertAll( projectId=dataset_project_id, datasetId=dataset_id, @@ -1751,8 +1754,10 @@ def insert_all(self, project_id, dataset_id, table_id, ).execute() if 'insertErrors' not in resp: - self.log.info('All row(s) inserted successfully: {}:{}.{}'.format( - dataset_project_id, dataset_id, table_id)) + self.log.info( + 'All row(s) inserted successfully: %s:%s.%s', + dataset_project_id, dataset_id, table_id + ) else: error_msg = '{} insert error(s) occurred: {}:{}.{}. Details: {}'.format( len(resp['insertErrors']), @@ -2034,11 +2039,10 @@ def var_print(var_name): if project_id is None: if var_name is not None: log = LoggingMixin().log - log.info('Project not included in {var}: {input}; ' - 'using project "{project}"'.format( - var=var_name, - input=table_input, - project=default_project_id)) + log.info( + 'Project not included in %s: %s; using project "%s"', + var_name, table_input, default_project_id + ) project_id = default_project_id return project_id, dataset_id, table_id diff --git a/airflow/contrib/hooks/gcp_container_hook.py b/airflow/contrib/hooks/gcp_container_hook.py index 68bf1785b6b0ec..909cc6338377b9 100644 --- a/airflow/contrib/hooks/gcp_container_hook.py +++ b/airflow/contrib/hooks/gcp_container_hook.py @@ -80,7 +80,7 @@ def wait_for_operation(self, operation, project_id=None): :type project_id: str :return: A new, updated operation fetched from Google Cloud """ - self.log.info("Waiting for OPERATION_NAME %s" % operation.name) + self.log.info("Waiting for OPERATION_NAME %s", operation.name) time.sleep(OPERATIONAL_POLL_INTERVAL) while operation.status != Operation.Status.DONE: if operation.status == Operation.Status.RUNNING or operation.status == \ @@ -151,8 +151,9 @@ def delete_cluster(self, name, project_id=None, retry=DEFAULT, timeout=DEFAULT): :return: The full url to the delete operation if successful, else None """ - self.log.info("Deleting (project_id={}, zone={}, cluster_id={})".format( - self.project_id, self.location, name)) + self.log.info( + "Deleting (project_id=%s, zone=%s, cluster_id=%s)", self.project_id, self.location, name + ) try: op = self.get_client().delete_cluster(project_id=project_id or self.project_id, @@ -164,7 +165,7 @@ def delete_cluster(self, name, project_id=None, retry=DEFAULT, timeout=DEFAULT): # Returns server-defined url for the resource return op.self_link except NotFound as error: - self.log.info('Assuming Success: ' + error.message) + self.log.info('Assuming Success: %s', error.message) def create_cluster(self, cluster, project_id=None, retry=DEFAULT, timeout=DEFAULT): """ @@ -200,10 +201,10 @@ def create_cluster(self, cluster, project_id=None, retry=DEFAULT, timeout=DEFAUL self._append_label(cluster, 'airflow-version', 'v' + version.version) - self.log.info("Creating (project_id={}, zone={}, cluster_name={})".format( - self.project_id, - self.location, - cluster.name)) + self.log.info( + "Creating (project_id=%s, zone=%s, cluster_name=%s)", + self.project_id, self.location, cluster.name + ) try: op = self.get_client().create_cluster(project_id=project_id or self.project_id, zone=self.location, @@ -214,7 +215,7 @@ def create_cluster(self, cluster, project_id=None, retry=DEFAULT, timeout=DEFAUL return op.target_link except AlreadyExists as error: - self.log.info('Assuming Success: ' + error.message) + self.log.info('Assuming Success: %s', error.message) return self.get_cluster(name=cluster.name).self_link def get_cluster(self, name, project_id=None, retry=DEFAULT, timeout=DEFAULT): @@ -234,10 +235,10 @@ def get_cluster(self, name, project_id=None, retry=DEFAULT, timeout=DEFAULT): :type timeout: float :return: google.cloud.container_v1.types.Cluster """ - self.log.info("Fetching cluster (project_id={}, zone={}, cluster_name={})".format( - project_id or self.project_id, - self.location, - name)) + self.log.info( + "Fetching cluster (project_id=%s, zone=%s, cluster_name=%s)", + project_id or self.project_id, self.location, name + ) return self.get_client().get_cluster(project_id=project_id or self.project_id, zone=self.location, diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/contrib/kubernetes/pod_launcher.py index 2704fd9d327151..f3708813f6c7d5 100644 --- a/airflow/contrib/kubernetes/pod_launcher.py +++ b/airflow/contrib/kubernetes/pod_launcher.py @@ -164,7 +164,7 @@ def _extract_xcom(self, pod): def _exec_pod_command(self, resp, command): if resp.is_open(): - self.log.info('Running command... %s\n' % command) + self.log.info('Running command... %s\n', command) resp.write_stdin(command + '\n') while resp.is_open(): resp.update(timeout=1) diff --git a/airflow/contrib/operators/aws_athena_operator.py b/airflow/contrib/operators/aws_athena_operator.py index 432410e31100aa..28054d320168ab 100644 --- a/airflow/contrib/operators/aws_athena_operator.py +++ b/airflow/contrib/operators/aws_athena_operator.py @@ -81,8 +81,9 @@ def on_kill(self): """ if self.query_execution_id: self.log.info('⚰️⚰️⚰️ Received a kill Signal. Time to Die') - self.log.info('Stopping Query with executionId - {queryId}'.format( - queryId=self.query_execution_id)) + self.log.info( + 'Stopping Query with executionId - %s', self.query_execution_id + ) response = self.hook.stop_query(self.query_execution_id) http_status_code = None try: @@ -93,6 +94,7 @@ def on_kill(self): if http_status_code is None or http_status_code != 200: self.log.error('Unable to request query cancel on athena. Exiting') else: - self.log.info('Polling Athena for query with id {queryId} to reach final state'.format( - queryId=self.query_execution_id)) + self.log.info( + 'Polling Athena for query with id %s to reach final state', self.query_execution_id + ) self.hook.poll_query_status(self.query_execution_id) diff --git a/airflow/contrib/operators/gcp_compute_operator.py b/airflow/contrib/operators/gcp_compute_operator.py index 451692ac30f8da..23c339bf32842a 100644 --- a/airflow/contrib/operators/gcp_compute_operator.py +++ b/airflow/contrib/operators/gcp_compute_operator.py @@ -359,8 +359,11 @@ def execute(self, context): # Group Manager. We assume success if the template is simply present existing_template = self._hook.get_instance_template( resource_id=self.body_patch['name'], project_id=self.project_id) - self.log.info("The {} template already existed. It was likely " - "created by previous run of the operator. Assuming success.") + self.log.info( + "The %s template already existed. It was likely created by previous run of the operator. " + "Assuming success.", + existing_template + ) return existing_template except HttpError as e: # We actually expect to get 404 / Not Found here as the template should @@ -372,8 +375,7 @@ def execute(self, context): new_body = deepcopy(old_body) self._field_sanitizer.sanitize(new_body) new_body = merge(new_body, self.body_patch) - self.log.info("Calling insert instance template with updated body: {}". - format(new_body)) + self.log.info("Calling insert instance template with updated body: %s", new_body) self._hook.insert_instance_template(body=new_body, request_id=self.request_id, project_id=self.project_id) diff --git a/airflow/contrib/operators/gcp_container_operator.py b/airflow/contrib/operators/gcp_container_operator.py index 90199c7591a4e0..c5325193f6ff04 100644 --- a/airflow/contrib/operators/gcp_container_operator.py +++ b/airflow/contrib/operators/gcp_container_operator.py @@ -318,5 +318,5 @@ def _get_field(self, extras, field, default=None): if long_f in extras: return extras[long_f] else: - self.log.info('Field {} not found in extras.'.format(field)) + self.log.info('Field %s not found in extras.', field) return default diff --git a/airflow/contrib/operators/mlengine_operator.py b/airflow/contrib/operators/mlengine_operator.py index 3e2f68c7420fef..46ecf03a5d7049 100644 --- a/airflow/contrib/operators/mlengine_operator.py +++ b/airflow/contrib/operators/mlengine_operator.py @@ -267,8 +267,9 @@ def check_existing_job(existing_job): raise if finished_prediction_job['state'] != 'SUCCEEDED': - self.log.error('MLEngine batch prediction job failed: {}'.format( - str(finished_prediction_job))) + self.log.error( + 'MLEngine batch prediction job failed: %s', str(finished_prediction_job) + ) raise RuntimeError(finished_prediction_job['errorMessage']) return finished_prediction_job['predictionOutput'] @@ -597,8 +598,7 @@ def execute(self, context): if self._mode == 'DRY_RUN': self.log.info('In dry_run mode.') - self.log.info('MLEngine Training job request is: {}'.format( - training_request)) + self.log.info('MLEngine Training job request is: %s', training_request) return hook = MLEngineHook( @@ -617,6 +617,5 @@ def check_existing_job(existing_job): raise if finished_training_job['state'] != 'SUCCEEDED': - self.log.error('MLEngine training job failed: {}'.format( - str(finished_training_job))) + self.log.error('MLEngine training job failed: %s', str(finished_training_job)) raise RuntimeError(finished_training_job['errorMessage']) diff --git a/airflow/contrib/operators/oracle_to_oracle_transfer.py b/airflow/contrib/operators/oracle_to_oracle_transfer.py index 1db95f7520bb1a..58613c4f334895 100644 --- a/airflow/contrib/operators/oracle_to_oracle_transfer.py +++ b/airflow/contrib/operators/oracle_to_oracle_transfer.py @@ -68,8 +68,7 @@ def __init__( def _execute(self, src_hook, dest_hook, context): with src_hook.get_conn() as src_conn: cursor = src_conn.cursor() - self.log.info("Querying data from source: {0}".format( - self.oracle_source_conn_id)) + self.log.info("Querying data from source: %s", self.oracle_source_conn_id) cursor.execute(self.source_sql, self.source_sql_params) target_fields = list(map(lambda field: field[0], cursor.description)) @@ -81,7 +80,7 @@ def _execute(self, src_hook, dest_hook, context): target_fields=target_fields, commit_every=self.rows_chunk) rows = cursor.fetchmany(self.rows_chunk) - self.log.info("Total inserted: {0} rows".format(rows_total)) + self.log.info("Total inserted: %s rows", rows_total) self.log.info("Finished data transfer.") cursor.close() diff --git a/airflow/contrib/operators/s3_delete_objects_operator.py b/airflow/contrib/operators/s3_delete_objects_operator.py index 635765496de8cd..76caace90f7b90 100644 --- a/airflow/contrib/operators/s3_delete_objects_operator.py +++ b/airflow/contrib/operators/s3_delete_objects_operator.py @@ -78,7 +78,7 @@ def execute(self, context): response = s3_hook.delete_objects(bucket=self.bucket, keys=self.keys) deleted_keys = [x['Key'] for x in response.get("Deleted", [])] - self.log.info("Deleted: {}".format(deleted_keys)) + self.log.info("Deleted: %s", deleted_keys) if "Errors" in response: errors_keys = [x['Key'] for x in response.get("Errors", [])] diff --git a/airflow/contrib/operators/s3_list_operator.py b/airflow/contrib/operators/s3_list_operator.py index 9c67c2fa3b78eb..d5a58d7d10b2fb 100644 --- a/airflow/contrib/operators/s3_list_operator.py +++ b/airflow/contrib/operators/s3_list_operator.py @@ -87,8 +87,9 @@ def execute(self, context): hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) self.log.info( - 'Getting the list of files from bucket: {0} in prefix: {1} (Delimiter {2})'. - format(self.bucket, self.prefix, self.delimiter)) + 'Getting the list of files from bucket: %s in prefix: %s (Delimiter {%s)', + self.bucket, self.prefix, self.delimiter + ) return hook.list_keys( bucket_name=self.bucket, diff --git a/airflow/contrib/operators/s3_to_gcs_operator.py b/airflow/contrib/operators/s3_to_gcs_operator.py index 9008c2da1c25b3..ad456a5c9bae04 100644 --- a/airflow/contrib/operators/s3_to_gcs_operator.py +++ b/airflow/contrib/operators/s3_to_gcs_operator.py @@ -154,8 +154,9 @@ def execute(self, context): files = list(set(files) - set(existing_files)) if len(files) > 0: - self.log.info('{0} files are going to be synced: {1}.'.format( - len(files), files)) + self.log.info( + '%s files are going to be synced: %s.', len(files), files + ) else: self.log.info( 'There are no new files to sync. Have a nice day!') diff --git a/airflow/contrib/operators/sagemaker_endpoint_operator.py b/airflow/contrib/operators/sagemaker_endpoint_operator.py index b3f17df128a35e..45af3381cb9cdd 100644 --- a/airflow/contrib/operators/sagemaker_endpoint_operator.py +++ b/airflow/contrib/operators/sagemaker_endpoint_operator.py @@ -128,7 +128,7 @@ def execute(self, context): else: raise ValueError('Invalid value! Argument operation has to be one of "create" and "update"') - self.log.info('{} SageMaker endpoint {}.'.format(log_str, endpoint_info['EndpointName'])) + self.log.info('%s SageMaker endpoint %s.', log_str, endpoint_info['EndpointName']) response = sagemaker_operation( endpoint_info, diff --git a/airflow/contrib/operators/winrm_operator.py b/airflow/contrib/operators/winrm_operator.py index f0b781e97dabf3..a9099b02dd6a88 100644 --- a/airflow/contrib/operators/winrm_operator.py +++ b/airflow/contrib/operators/winrm_operator.py @@ -84,7 +84,7 @@ def execute(self, context): winrm_client = self.winrm_hook.get_conn() try: - self.log.info("Running command: '{command}'...".format(command=self.command)) + self.log.info("Running command: '%s'...", self.command) command_id = self.winrm_hook.winrm_protocol.run_command( winrm_client, self.command diff --git a/airflow/contrib/sensors/aws_glue_catalog_partition_sensor.py b/airflow/contrib/sensors/aws_glue_catalog_partition_sensor.py index 74e25a29dc4ffe..d4c1d3a884e723 100644 --- a/airflow/contrib/sensors/aws_glue_catalog_partition_sensor.py +++ b/airflow/contrib/sensors/aws_glue_catalog_partition_sensor.py @@ -74,8 +74,8 @@ def poke(self, context): if '.' in self.table_name: self.database_name, self.table_name = self.table_name.split('.') self.log.info( - 'Poking for table {self.database_name}.{self.table_name}, ' - 'expression {self.expression}'.format(**locals())) + 'Poking for table %s. %s, expression %s', self.database_name, self.table_name, self.expression + ) return self.get_hook().check_for_partition( self.database_name, self.table_name, self.expression) diff --git a/airflow/contrib/sensors/bash_sensor.py b/airflow/contrib/sensors/bash_sensor.py index 7372a419b6900b..fec48c5778ef12 100644 --- a/airflow/contrib/sensors/bash_sensor.py +++ b/airflow/contrib/sensors/bash_sensor.py @@ -70,10 +70,7 @@ def poke(self, context): f.flush() fname = f.name script_location = tmp_dir + "/" + fname - self.log.info( - "Temporary script location: %s", - script_location - ) + self.log.info("Temporary script location: %s", script_location) self.log.info("Running command: %s", bash_command) sp = Popen( ['bash', fname], @@ -89,7 +86,6 @@ def poke(self, context): line = line.decode(self.output_encoding).strip() self.log.info(line) sp.wait() - self.log.info("Command exited with " - "return code {0}".format(sp.returncode)) + self.log.info("Command exited with return code %s", sp.returncode) return not sp.returncode diff --git a/airflow/contrib/sensors/hdfs_sensor.py b/airflow/contrib/sensors/hdfs_sensor.py index 832b81b8e5f257..fe9e5cfc6f4df6 100644 --- a/airflow/contrib/sensors/hdfs_sensor.py +++ b/airflow/contrib/sensors/hdfs_sensor.py @@ -35,9 +35,7 @@ def poke(self, context): """ sb = self.hook(self.hdfs_conn_id).get_conn() self.log.info( - 'Poking for {self.filepath} to be a directory ' - 'with files matching {self.regex.pattern}'. - format(**locals()) + 'Poking for %s to be a directory with files matching %s', self.filepath, self.regex.pattern ) result = [f for f in sb.ls([self.filepath], include_toplevel=False) if f['file_type'] == 'f' and @@ -68,11 +66,9 @@ def poke(self, context): self.ignore_copying) result = self.filter_for_filesize(result, self.file_size) if self.be_empty: - self.log.info('Poking for filepath {self.filepath} to a empty directory' - .format(**locals())) + self.log.info('Poking for filepath %s to a empty directory', self.filepath) return len(result) == 1 and result[0]['path'] == self.filepath else: - self.log.info('Poking for filepath {self.filepath} to a non empty directory' - .format(**locals())) + self.log.info('Poking for filepath %s to a non empty directory', self.filepath) result.pop(0) return bool(result) and result[0]['file_type'] == 'f' diff --git a/airflow/contrib/sensors/python_sensor.py b/airflow/contrib/sensors/python_sensor.py index ecdc7e6c8ea40f..81b7af72130a2f 100644 --- a/airflow/contrib/sensors/python_sensor.py +++ b/airflow/contrib/sensors/python_sensor.py @@ -76,6 +76,6 @@ def poke(self, context): context['templates_dict'] = self.templates_dict self.op_kwargs = context - self.log.info("Poking callable: " + str(self.python_callable)) + self.log.info("Poking callable: %s", str(self.python_callable)) return_value = self.python_callable(*self.op_args, **self.op_kwargs) return bool(return_value) diff --git a/airflow/contrib/sensors/sagemaker_training_sensor.py b/airflow/contrib/sensors/sagemaker_training_sensor.py index 2d820111a08c85..d550a6c434bfb4 100644 --- a/airflow/contrib/sensors/sagemaker_training_sensor.py +++ b/airflow/contrib/sensors/sagemaker_training_sensor.py @@ -91,7 +91,7 @@ def get_sagemaker_response(self): billable_time = \ (self.last_description['TrainingEndTime'] - self.last_description['TrainingStartTime']) * \ self.last_description['ResourceConfig']['InstanceCount'] - self.log.info('Billable seconds:{}'.format(int(billable_time.total_seconds()) + 1)) + self.log.info('Billable seconds: %s', int(billable_time.total_seconds()) + 1) return self.last_description diff --git a/airflow/contrib/sensors/wasb_sensor.py b/airflow/contrib/sensors/wasb_sensor.py index ec6a63bf3d633b..8b32d3fa221d99 100644 --- a/airflow/contrib/sensors/wasb_sensor.py +++ b/airflow/contrib/sensors/wasb_sensor.py @@ -90,10 +90,7 @@ def __init__(self, container_name, prefix, wasb_conn_id='wasb_default', self.check_options = check_options def poke(self, context): - self.log.info( - 'Poking for prefix: {self.prefix}\n' - 'in wasb://{self.container_name}'.format(**locals()) - ) + self.log.info('Poking for prefix: %s in wasb://%s', self.prefix, self.container_name) hook = WasbHook(wasb_conn_id=self.wasb_conn_id) return hook.check_for_prefix(self.container_name, self.prefix, **self.check_options) diff --git a/airflow/contrib/utils/gcp_field_sanitizer.py b/airflow/contrib/utils/gcp_field_sanitizer.py index bf9f9348f56a70..2f5ff6730a8acf 100644 --- a/airflow/contrib/utils/gcp_field_sanitizer.py +++ b/airflow/contrib/utils/gcp_field_sanitizer.py @@ -127,18 +127,20 @@ def _sanitize(self, dictionary, remaining_field_spec, current_path): if len(field_split) == 1: field_name = field_split[0] if field_name in dictionary: - self.log.info("Deleted {} [{}]".format(field_name, current_path)) + self.log.info("Deleted %s [%s]", field_name, current_path) del dictionary[field_name] else: - self.log.debug("The field {} is missing in {} at the path {}.". - format(field_name, dictionary, current_path)) + self.log.debug( + "The field %s is missing in %s at the path %s.", field_name, dictionary, current_path + ) else: field_name = field_split[0] remaining_path = field_split[1] child = dictionary.get(field_name) if child is None: - self.log.debug("The field {} is missing in {} at the path {}. ". - format(field_name, dictionary, current_path)) + self.log.debug( + "The field %s is missing in %s at the path %s. ", field_name, dictionary, current_path + ) elif isinstance(child, dict): self._sanitize(child, remaining_path, "{}.{}".format( current_path, field_name)) @@ -146,16 +148,16 @@ def _sanitize(self, dictionary, remaining_field_spec, current_path): for index, elem in enumerate(child): if not isinstance(elem, dict): self.log.warn( - "The field {} element at index {} is of wrong type. " - "It should be dict and is {}. Skipping it.". - format(current_path, index, elem)) + "The field %s element at index %s is of wrong type. " + "It should be dict and is %s. Skipping it.", + current_path, index, elem) self._sanitize(elem, remaining_path, "{}.{}[{}]".format( current_path, field_name, index)) else: self.log.warn( - "The field {} is of wrong type. " - "It should be dict or list and it is {}. Skipping it.". - format(current_path, child)) + "The field %s is of wrong type. It should be dict or list and it is %s. Skipping it.", + current_path, child + ) def sanitize(self, body): for elem in self._sanitize_specs: diff --git a/airflow/contrib/utils/gcp_field_validator.py b/airflow/contrib/utils/gcp_field_validator.py index 0337be14224548..0431c9ef9eadd3 100644 --- a/airflow/contrib/utils/gcp_field_validator.py +++ b/airflow/contrib/utils/gcp_field_validator.py @@ -263,14 +263,14 @@ def _validate_dict(self, children_validation_specs, full_field_path, value): for field_name in value.keys(): if field_name not in all_dict_keys: self.log.warning( - "The field '{}' is in the body, but is not specified in the " - "validation specification '{}'. " + "The field '%s' is in the body, but is not specified in the " + "validation specification '%s'. " "This might be because you are using newer API version and " "new field names defined for that version. Then the warning " "can be safely ignored, or you might want to upgrade the operator" - "to the version that supports the new API version.".format( - self._get_field_name_with_parent(field_name, full_field_path), - children_validation_specs)) + "to the version that supports the new API version.", + self._get_field_name_with_parent(field_name, full_field_path), + children_validation_specs) def _validate_union(self, children_validation_specs, full_field_path, dictionary_to_validate): @@ -296,15 +296,14 @@ def _validate_union(self, children_validation_specs, full_field_path, found_field_name = field_name if not field_found: self.log.warning( - "There is no '{}' union defined in the body {}. " - "Validation expected one of '{}' but could not find any. It's possible " + "There is no '%s' union defined in the body %s. " + "Validation expected one of '%s' but could not find any. It's possible " "that you are using newer API version and there is another union variant " "defined for that version. Then the warning can be safely ignored, " "or you might want to upgrade the operator to the version that " - "supports the new API version.".format( - full_field_path, - dictionary_to_validate, - [field['name'] for field in children_validation_specs])) + "supports the new API version.", + full_field_path, dictionary_to_validate, + [field['name'] for field in children_validation_specs]) def _validate_field(self, validation_spec, dictionary_to_validate, parent=None, force_optional=False): @@ -335,15 +334,14 @@ def _validate_field(self, validation_spec, dictionary_to_validate, parent=None, parent=parent) if required_api_version and required_api_version != self._api_version: self.log.debug( - "Skipping validation of the field '{}' for API version '{}' " - "as it is only valid for API version '{}'". - format(field_name, self._api_version, required_api_version)) + "Skipping validation of the field '%s' for API version '%s' " + "as it is only valid for API version '%s'", + field_name, self._api_version, required_api_version) return False value = dictionary_to_validate.get(field_name) if (optional or force_optional) and value is None: - self.log.debug("The optional field '{}' is missing. That's perfectly OK.". - format(full_field_path)) + self.log.debug("The optional field '%s' is missing. That's perfectly OK.", full_field_path) return False # Certainly down from here the field is present (value is not None) @@ -369,18 +367,17 @@ def _validate_field(self, validation_spec, dictionary_to_validate, parent=None, format(full_field_path, validation_spec, value)) if children_validation_specs is None: self.log.debug( - "The dict field '{}' has no nested fields defined in the " - "specification '{}'. That's perfectly ok - it's content will " - "not be validated." - .format(full_field_path, validation_spec)) + "The dict field '%s' has no nested fields defined in the " + "specification '%s'. That's perfectly ok - it's content will " + "not be validated.", full_field_path, validation_spec) else: self._validate_dict(children_validation_specs, full_field_path, value) elif field_type == 'union': if not children_validation_specs: raise GcpValidationSpecificationException( - "The union field '{}' has no nested fields " - "defined in specification '{}'. Unions should have at least one " - "nested field defined.".format(full_field_path, validation_spec)) + "The union field '%s' has no nested fields " + "defined in specification '%s'. Unions should have at least one " + "nested field defined.", full_field_path, validation_spec) self._validate_union(children_validation_specs, full_field_path, dictionary_to_validate) elif field_type == 'list': @@ -397,9 +394,8 @@ def _validate_field(self, validation_spec, dictionary_to_validate, parent=None, "Error while validating custom field '{}' specified by '{}': '{}'". format(full_field_path, validation_spec, e)) elif field_type is None: - self.log.debug("The type of field '{}' is not specified in '{}'. " - "Not validating its content.". - format(full_field_path, validation_spec)) + self.log.debug("The type of field '%s' is not specified in '%s'. " + "Not validating its content.", full_field_path, validation_spec) else: raise GcpValidationSpecificationException( "The field '{}' is of type '{}' in specification '{}'." @@ -439,10 +435,10 @@ def validate(self, body_to_validate): for field_name in body_to_validate.keys(): if field_name not in all_field_names: self.log.warning( - "The field '{}' is in the body, but is not specified in the " - "validation specification '{}'. " + "The field '%s' is in the body, but is not specified in the " + "validation specification '%s'. " "This might be because you are using newer API version and " "new field names defined for that version. Then the warning " "can be safely ignored, or you might want to upgrade the operator" - "to the version that supports the new API version.".format( - field_name, self._validation_specs)) + "to the version that supports the new API version.", + field_name, self._validation_specs) diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index b5c0e74c5c2dc9..96e226e4952eef 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -58,7 +58,7 @@ def queue_command(self, simple_task_instance, command, priority=1, queue=None): self.log.info("Adding to queue: %s", command) self.queued_tasks[key] = (command, priority, queue, simple_task_instance) else: - self.log.info("could not queue task {}".format(key)) + self.log.info("could not queue task %s", key) def queue_task_instance( self, @@ -139,7 +139,7 @@ def heartbeat(self): self.sync() def change_state(self, key, state): - self.log.debug("Changing state: {}".format(key)) + self.log.debug("Changing state: %s", key) self.running.pop(key, None) self.event_buffer[key] = state diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index e91bb7efc41136..09ed425547e12c 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -151,8 +151,9 @@ def __init__(self): def start(self): self.log.debug( - 'Starting Celery Executor using {} processes for syncing'.format( - self._sync_parallelism)) + 'Starting Celery Executor using %s processes for syncing', + self._sync_parallelism + ) def _num_tasks_per_send_process(self, to_send_count): """ @@ -181,9 +182,9 @@ def heartbeat(self): else: open_slots = self.parallelism - len(self.running) - self.log.debug("{} running task instances".format(len(self.running))) - self.log.debug("{} in queue".format(len(self.queued_tasks))) - self.log.debug("{} open slots".format(open_slots)) + self.log.debug("%s running task instances", len(self.running)) + self.log.debug("%s in queue", len(self.queued_tasks)) + self.log.debug("%s open slots", open_slots) sorted_queue = sorted( [(k, v) for k, v in self.queued_tasks.items()], @@ -224,8 +225,8 @@ def heartbeat(self): for key, command, result in key_and_async_results: if isinstance(result, ExceptionWithTraceback): self.log.error( - CELERY_SEND_ERR_MSG_HEADER + ":{}\n{}\n".format( - result.exception, result.traceback)) + CELERY_SEND_ERR_MSG_HEADER + ":%s\n%s\n", result.exception, result.traceback + ) elif result is not None: # Only pops when enqueued successfully, otherwise keep it # and expect scheduler loop to deal with it. @@ -236,7 +237,7 @@ def heartbeat(self): self.last_state[key] = celery_states.PENDING # Calling child class sync method - self.log.debug("Calling the {} sync method".format(self.__class__)) + self.log.debug("Calling the %s sync method", self.__class__) self.sync() def sync(self): @@ -267,8 +268,9 @@ def sync(self): for key_and_state in task_keys_to_states: if isinstance(key_and_state, ExceptionWithTraceback): self.log.error( - CELERY_FETCH_ERR_MSG_HEADER + ", ignoring it:{}\n{}\n".format( - key_and_state.exception, key_and_state.traceback)) + CELERY_FETCH_ERR_MSG_HEADER + ", ignoring it:%s\n%s\n", + repr(key_and_state.exception), key_and_state.traceback + ) continue key, state = key_and_state try: @@ -286,7 +288,7 @@ def sync(self): del self.tasks[key] del self.last_state[key] else: - self.log.info("Unexpected state: " + state) + self.log.info("Unexpected state: %s", state) self.last_state[key] = state except Exception: self.log.exception("Error syncing the Celery executor, ignoring it.") diff --git a/airflow/hooks/dbapi_hook.py b/airflow/hooks/dbapi_hook.py index 4fce22786997e1..7fa69436e6a13d 100644 --- a/airflow/hooks/dbapi_hook.py +++ b/airflow/hooks/dbapi_hook.py @@ -256,12 +256,11 @@ def insert_rows(self, table, rows, target_fields=None, commit_every=1000, if commit_every and i % commit_every == 0: conn.commit() self.log.info( - "Loaded {i} into {table} rows so far".format(**locals()) + "Loaded %s into %s rows so far", i, table ) conn.commit() - self.log.info( - "Done loading. Loaded a total of {i} rows".format(**locals())) + self.log.info("Done loading. Loaded a total of %s rows", i) @staticmethod def _serialize_cell(cell, conn=None): diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py index a45ebf472fe680..5e2f94f1258ddc 100644 --- a/airflow/hooks/druid_hook.py +++ b/airflow/hooks/druid_hook.py @@ -68,7 +68,7 @@ def get_conn_url(self): def submit_indexing_job(self, json_index_spec): url = self.get_conn_url() - self.log.info("Druid ingestion spec: {}".format(json_index_spec)) + self.log.info("Druid ingestion spec: %s", json_index_spec) req_index = requests.post(url, json=json_index_spec, headers=self.header) if req_index.status_code != 200: raise AirflowException('Did not get 200 when ' @@ -77,7 +77,7 @@ def submit_indexing_job(self, json_index_spec): req_json = req_index.json() # Wait until the job is completed druid_task_id = req_json['task'] - self.log.info("Druid indexing task-id: {}".format(druid_task_id)) + self.log.info("Druid indexing task-id: %s", druid_task_id) running = True @@ -136,8 +136,7 @@ def get_conn(self): path=conn.extra_dejson.get('endpoint', '/druid/v2/sql'), scheme=conn.extra_dejson.get('schema', 'http') ) - self.log.info('Get the connection to druid ' - 'broker on {host}'.format(host=conn.host)) + self.log.info('Get the connection to druid broker on %s', conn.host) return druid_broker_conn def get_uri(self): diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py index 3ec1682d655f4b..cd46f5e9a8a83a 100644 --- a/airflow/hooks/hive_hooks.py +++ b/airflow/hooks/hive_hooks.py @@ -233,7 +233,7 @@ def run_cli(self, hql, schema=None, verbose=True, hive_conf=None): hive_cmd.extend(['-f', f.name]) if verbose: - self.log.info(" ".join(hive_cmd)) + self.log.info("%s", " ".join(hive_cmd)) sp = subprocess.Popen( hive_cmd, stdout=subprocess.PIPE, diff --git a/airflow/hooks/http_hook.py b/airflow/hooks/http_hook.py index 200a6741f2cae0..9c8c3854f2200e 100644 --- a/airflow/hooks/http_hook.py +++ b/airflow/hooks/http_hook.py @@ -75,8 +75,7 @@ def get_conn(self, headers=None): try: session.headers.update(conn.extra_dejson) except TypeError: - self.log.warn('Connection to {} has invalid extra field.'.format( - conn.host)) + self.log.warn('Connection to %s has invalid extra field.', conn.host) if headers: session.headers.update(headers) diff --git a/airflow/hooks/oracle_hook.py b/airflow/hooks/oracle_hook.py index 202208084f32d1..928a29c6266530 100644 --- a/airflow/hooks/oracle_hook.py +++ b/airflow/hooks/oracle_hook.py @@ -175,11 +175,11 @@ def insert_rows(self, table, rows, target_fields=None, commit_every=1000): cur.execute(sql) if i % commit_every == 0: conn.commit() - self.log.info('Loaded {i} into {table} rows so far'.format(**locals())) + self.log.info('Loaded %s into %s rows so far', i, table) conn.commit() cur.close() conn.close() - self.log.info('Done loading. Loaded a total of {i} rows'.format(**locals())) + self.log.info('Done loading. Loaded a total of %s rows', i) def bulk_insert_rows(self, table, rows, target_fields=None, commit_every=5000): """ diff --git a/airflow/hooks/pig_hook.py b/airflow/hooks/pig_hook.py index fcfcc7fef267a0..7cb7d70ca413ed 100644 --- a/airflow/hooks/pig_hook.py +++ b/airflow/hooks/pig_hook.py @@ -67,7 +67,7 @@ def run_cli(self, pig, verbose=True): pig_properties_list = self.pig_properties.split() pig_cmd.extend(pig_properties_list) if verbose: - self.log.info(" ".join(pig_cmd)) + self.log.info("%s", " ".join(pig_cmd)) sp = subprocess.Popen( pig_cmd, stdout=subprocess.PIPE, diff --git a/airflow/hooks/webhdfs_hook.py b/airflow/hooks/webhdfs_hook.py index 9833be6cd6e6c9..58289ba40653a4 100644 --- a/airflow/hooks/webhdfs_hook.py +++ b/airflow/hooks/webhdfs_hook.py @@ -66,8 +66,8 @@ def get_conn(self): return client except HdfsError as e: self.log.debug( - "Read operation on namenode {nn.host} " - "failed with error: {e}".format(nn=nn, e=e) + "Read operation on namenode %s " + "failed with error: %s", nn.host, e ) nn_hosts = [c.host for c in nn_connections] no_nn_error = "Read operations failed " \ diff --git a/airflow/jobs.py b/airflow/jobs.py index fbb8c558c53401..e546a0fb296707 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -126,7 +126,7 @@ def kill(self, session=None): try: self.on_kill() except Exception as e: - self.log.error('on_kill() method failed: {}'.format(e)) + self.log.error('on_kill() method failed: %s', str(e)) session.merge(job) session.commit() raise AirflowException("Job shut down externally.") @@ -604,7 +604,7 @@ def _exit_gracefully(self, signum, frame): """ Helper method to clean up processor_agent to avoid leaving orphan processes. """ - self.log.info("Exiting gracefully upon receiving signal {}".format(signum)) + self.log.info("Exiting gracefully upon receiving signal %s", signum) if self.processor_agent: self.processor_agent.end() sys.exit(os.EX_OK) @@ -619,10 +619,7 @@ def manage_slas(self, dag, session=None): tasks that should have succeeded in the past hour. """ if not any([ti.sla for ti in dag.tasks]): - self.log.info( - "Skipping SLA check for %s because no tasks in DAG have SLAs", - dag - ) + self.log.info("Skipping SLA check for %s because no tasks in DAG have SLAs", dag) return TI = models.TaskInstance @@ -1098,9 +1095,10 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None): # Put one task instance on each line task_instance_str = "\n\t".join( ["{}".format(x) for x in task_instances_to_examine]) - self.log.info("{} tasks up for execution:\n\t{}" - .format(len(task_instances_to_examine), - task_instance_str)) + self.log.info( + "%s tasks up for execution:\n\t%s", len(task_instances_to_examine), + task_instance_str + ) # Get the pool settings pools = {p.pool: p for p in session.query(models.Pool).all()} @@ -1133,10 +1131,9 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None): num_queued = len(task_instances) self.log.info( - "Figuring out tasks to run in Pool(name={pool}) with {open_slots} " - "open slots and {num_queued} task instances in queue".format( - **locals() - ) + "Figuring out tasks to run in Pool(name=%s) with %s open slots " + "and %s task instances in queue", + pool, open_slots, num_queued ) priority_sorted_task_instances = sorted( @@ -1283,8 +1280,8 @@ def _change_state_for_executable_task_instances(self, task_instances, ["{}".format(x) for x in tis_to_set_to_queued]) session.commit() - self.log.info("Setting the following {} tasks to queued state:\n\t{}" - .format(len(tis_to_set_to_queued), task_instance_str)) + self.log.info("Setting the following %s tasks to queued state:\n\t%s", + len(tis_to_set_to_queued), task_instance_str) return simple_task_instances def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, @@ -2105,27 +2102,13 @@ def _task_instances_for_dag_run(self, dag_run, session=None): return tasks_to_run def _log_progress(self, ti_status): - msg = ' | '.join([ - "[backfill progress]", - "finished run {0} of {1}", - "tasks waiting: {2}", - "succeeded: {3}", - "running: {4}", - "failed: {5}", - "skipped: {6}", - "deadlocked: {7}", - "not ready: {8}" - ]).format( - ti_status.finished_runs, - ti_status.total_runs, - len(ti_status.to_run), - len(ti_status.succeeded), - len(ti_status.running), - len(ti_status.failed), - len(ti_status.skipped), - len(ti_status.deadlocked), - len(ti_status.not_ready)) - self.log.info(msg) + self.log.info( + '[backfill progress] | finished run %s of %s | tasks waiting: %s | succeeded: %s | ' + 'running: %s | failed: %s | skipped: %s | deadlocked: %s | not ready: %s', + ti_status.finished_runs, ti_status.total_runs, len(ti_status.to_run), len(ti_status.succeeded), + len(ti_status.running), len(ti_status.failed), len(ti_status.skipped), len(ti_status.deadlocked), + len(ti_status.not_ready) + ) self.log.debug( "Finished dag run loop iteration. Remaining tasks %s", diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py index c1355219ffef0a..6868cf1c54730c 100755 --- a/airflow/models/__init__.py +++ b/airflow/models/__init__.py @@ -526,7 +526,7 @@ def bag_dag(self, dag, parent_dag, root_dag): self.bag_dag(subdag, parent_dag=dag, root_dag=root_dag) self.dags[dag.dag_id] = dag - self.log.debug('Loaded DAG {dag}'.format(**locals())) + self.log.debug('Loaded DAG %s', dag) except AirflowDagCycleException as cycle_exception: # There was an error in bagging the dag. Remove it from the list of dags self.log.exception('Exception bagging dag: {dag.dag_id}'.format(**locals())) @@ -1252,15 +1252,11 @@ def _check_and_change_state_before_execution( return False # TODO: Logging needs cleanup, not clear what is being printed - hr = "\n" + ("-" * 80) + "\n" # Line break + hr = "\n" + ("-" * 80) # Line break # For reporting purposes, we report based on 1-indexed, # not 0-indexed lists (i.e. Attempt 1 instead of # Attempt 0 for the first attempt). - msg = "Starting attempt {attempt} of {total}".format( - attempt=self.try_number, - total=self.max_tries + 1) - # Set the task start date. In case it was re-scheduled use the initial # start date that is recorded in task_reschedule table self.start_date = timezone.utcnow() @@ -1284,11 +1280,12 @@ def _check_and_change_state_before_execution( # have been running prematurely. This should be handled in the # scheduling mechanism. self.state = State.NONE - msg = ("FIXME: Rescheduling due to concurrency limits reached at task " - "runtime. Attempt {attempt} of {total}. State set to NONE.").format( - attempt=self.try_number, - total=self.max_tries + 1) - self.log.warning(hr + msg + hr) + self.log.warning(hr) + self.log.warning( + "FIXME: Rescheduling due to concurrency limits reached at task runtime. Attempt %s of " + "%s. State set to NONE.", self.try_number, self.max_tries + 1 + ) + self.log.warning(hr) self.queued_dttm = timezone.utcnow() self.log.info("Queuing into pool %s", self.pool) @@ -1299,13 +1296,14 @@ def _check_and_change_state_before_execution( # Another worker might have started running this task instance while # the current worker process was blocked on refresh_from_db if self.state == State.RUNNING: - msg = "Task Instance already running {}".format(self) - self.log.warning(msg) + self.log.warning("Task Instance already running %s", self) session.commit() return False # print status message - self.log.info(hr + msg + hr) + self.log.info(hr) + self.log.info("Starting attempt %s of %s", self.try_number, self.max_tries + 1) + self.log.info(hr) self._try_number += 1 if not test_mode: @@ -1322,12 +1320,9 @@ def _check_and_change_state_before_execution( settings.engine.dispose() if verbose: if mark_success: - msg = "Marking success for {} on {}".format(self.task, - self.execution_date) - self.log.info(msg) + self.log.info("Marking success for %s on %s", self.task, self.execution_date) else: - msg = "Executing {} on {}".format(self.task, self.execution_date) - self.log.info(msg) + self.log.info("Executing %s on %s", self.task, self.execution_date) return True @provide_session diff --git a/airflow/operators/bash_operator.py b/airflow/operators/bash_operator.py index c6380cc7a80854..00a4f3dc67ba0f 100644 --- a/airflow/operators/bash_operator.py +++ b/airflow/operators/bash_operator.py @@ -95,7 +95,7 @@ def execute(self, context): self.env = os.environ.copy() airflow_context_vars = context_to_airflow_vars(context, in_env_var_format=True) - self.log.info('Exporting the following env vars:\n' + + self.log.info('Exporting the following env vars:\n%s', '\n'.join(["{}={}".format(k, v) for k, v in airflow_context_vars.items()])) diff --git a/airflow/operators/check_operator.py b/airflow/operators/check_operator.py index 404964dad6cc87..97e18571fda849 100644 --- a/airflow/operators/check_operator.py +++ b/airflow/operators/check_operator.py @@ -233,29 +233,25 @@ def execute(self, context=None): reference = dict(zip(self.metrics_sorted, row2)) ratios = {} test_results = {} - rlog = "Ratio for {0}: {1} \n Ratio threshold : {2}" - fstr = "'{k}' check failed. {r} is above {tr}" - estr = "The following tests have failed:\n {0}" - countstr = "The following {j} tests out of {n} failed:" for m in self.metrics_sorted: if current[m] == 0 or reference[m] == 0: ratio = None else: ratio = float(max(current[m], reference[m])) / \ min(current[m], reference[m]) - self.log.info(rlog.format(m, ratio, self.metrics_thresholds[m])) + self.log.info("Ratio for %s: %s \n Ratio threshold : %s", m, ratio, self.metrics_thresholds[m]) ratios[m] = ratio test_results[m] = ratio < self.metrics_thresholds[m] if not all(test_results.values()): failed_tests = [it[0] for it in test_results.items() if not it[1]] j = len(failed_tests) n = len(self.metrics_sorted) - self.log.warning(countstr.format(**locals())) + self.log.warning("The following %s tests out of %s failed:", j, n) for k in failed_tests: self.log.warning( - fstr.format(k=k, r=ratios[k], tr=self.metrics_thresholds[k]) + "'%s' check failed. %s is above %s", k, ratios[k], self.metrics_thresholds[k] ) - raise AirflowException(estr.format(", ".join(failed_tests))) + raise AirflowException("The following tests have failed:\n {0}".format(", ".join(failed_tests))) self.log.info("All tests have passed") def get_db_hook(self): diff --git a/airflow/operators/druid_check_operator.py b/airflow/operators/druid_check_operator.py index 514f61fc889888..ac18f0567d4add 100644 --- a/airflow/operators/druid_check_operator.py +++ b/airflow/operators/druid_check_operator.py @@ -83,9 +83,9 @@ def get_first(self, sql): return cur.fetchone() def execute(self, context=None): - self.log.info('Executing SQL check: {}'.format(self.sql)) + self.log.info('Executing SQL check: %s', self.sql) record = self.get_first(self.sql) - self.log.info("Record: {}".format(str(record))) + self.log.info("Record: %s", str(record)) if not record: raise AirflowException("The query returned None") self.log.info("Success.") diff --git a/airflow/operators/python_operator.py b/airflow/operators/python_operator.py index d5362d08418257..0693744ef8d433 100644 --- a/airflow/operators/python_operator.py +++ b/airflow/operators/python_operator.py @@ -100,7 +100,7 @@ def __init__( def execute(self, context): # Export context to make it available for callables to use. airflow_context_vars = context_to_airflow_vars(context, in_env_var_format=True) - self.log.info("Exporting the following env vars:\n" + + self.log.info("Exporting the following env vars:\n%s", '\n'.join(["{}={}".format(k, v) for k, v in airflow_context_vars.items()])) os.environ.update(airflow_context_vars) @@ -320,14 +320,14 @@ def _pass_op_args(self): def _execute_in_subprocess(self, cmd): try: - self.log.info("Executing cmd\n{}".format(cmd)) + self.log.info("Executing cmd\n%s", cmd) output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, close_fds=True) if output: - self.log.info("Got output\n{}".format(output)) + self.log.info("Got output\n%s", output) except subprocess.CalledProcessError as e: - self.log.info("Got error output\n{}".format(e.output)) + self.log.info("Got error output\n%s", e.output) raise def _write_string_args(self, filename): diff --git a/airflow/operators/s3_to_hive_operator.py b/airflow/operators/s3_to_hive_operator.py index 228470fad7ff0e..33aa5b0103627b 100644 --- a/airflow/operators/s3_to_hive_operator.py +++ b/airflow/operators/s3_to_hive_operator.py @@ -174,8 +174,9 @@ def execute(self, context): NamedTemporaryFile(mode="wb", dir=tmp_dir, suffix=file_ext) as f: - self.log.info("Dumping S3 key {0} contents to local file {1}" - .format(s3_key_object.key, f.name)) + self.log.info( + "Dumping S3 key %s contents to local file %s", s3_key_object.key, f.name + ) if self.select_expression: option = {} if self.headers: @@ -258,18 +259,17 @@ def _match_headers(self, header_list): raise AirflowException("Unable to retrieve header row from file") field_names = self.field_dict.keys() if len(field_names) != len(header_list): - self.log.warning("Headers count mismatch" - "File headers:\n {header_list}\n" - "Field names: \n {field_names}\n" - .format(**locals())) + self.log.warning( + "Headers count mismatch File headers:\n %s\nField names: \n %s\n", header_list, field_names + ) return False test_field_match = [h1.lower() == h2.lower() for h1, h2 in zip(header_list, field_names)] if not all(test_field_match): - self.log.warning("Headers do not match field names" - "File headers:\n {header_list}\n" - "Field names: \n {field_names}\n" - .format(**locals())) + self.log.warning( + "Headers do not match field names File headers:\n %s\nField names: \n %s\n", + header_list, field_names + ) return False else: return True diff --git a/airflow/security/kerberos.py b/airflow/security/kerberos.py index b91241bc1de1c4..3fc6aa345d79f4 100644 --- a/airflow/security/kerberos.py +++ b/airflow/security/kerberos.py @@ -45,7 +45,7 @@ def renew_from_kt(principal, keytab): "-c", configuration.conf.get('kerberos', 'ccache'), # specify credentials cache cmd_principal ] - log.info("Reinitting kerberos from keytab: " + " ".join(cmdv)) + log.info("Reinitting kerberos from keytab: %s", " ".join(cmdv)) subp = subprocess.Popen(cmdv, stdout=subprocess.PIPE, @@ -55,10 +55,10 @@ def renew_from_kt(principal, keytab): universal_newlines=True) subp.wait() if subp.returncode != 0: - log.error("Couldn't reinit from keytab! `kinit' exited with %s.\n%s\n%s" % ( - subp.returncode, - "\n".join(subp.stdout.readlines()), - "\n".join(subp.stderr.readlines()))) + log.error( + "Couldn't reinit from keytab! `kinit' exited with %s.\n%s\n%s", + subp.returncode, "\n".join(subp.stdout.readlines()), "\n".join(subp.stderr.readlines()) + ) sys.exit(subp.returncode) global NEED_KRB181_WORKAROUND @@ -76,24 +76,24 @@ def perform_krb181_workaround(principal): "-c", configuration.conf.get('kerberos', 'ccache'), "-R"] # Renew ticket_cache - log.info("Renewing kerberos ticket to work around kerberos 1.8.1: " + - " ".join(cmdv)) + log.info( + "Renewing kerberos ticket to work around kerberos 1.8.1: %s", " ".join(cmdv) + ) ret = subprocess.call(cmdv, close_fds=True) if ret != 0: principal = "%s/%s" % (principal or configuration.conf.get('kerberos', 'principal'), socket.getfqdn()) - fmt_dict = dict(princ=principal, - ccache=configuration.conf.get('kerberos', 'principal')) - log.error("Couldn't renew kerberos ticket in order to work around " - "Kerberos 1.8.1 issue. Please check that the ticket for " - "'%(princ)s' is still renewable:\n" - " $ kinit -f -c %(ccache)s\n" - "If the 'renew until' date is the same as the 'valid starting' " - "date, the ticket cannot be renewed. Please check your KDC " - "configuration, and the ticket renewal policy (maxrenewlife) " - "for the '%(princ)s' and `krbtgt' principals." % fmt_dict) + princ = principal + ccache = configuration.conf.get('kerberos', 'principal') + log.error( + "Couldn't renew kerberos ticket in order to work around Kerberos 1.8.1 issue. Please check that " + "the ticket for '%s' is still renewable:\n $ kinit -f -c %s\nIf the 'renew until' date is the " + "same as the 'valid starting' date, the ticket cannot be renewed. Please check your KDC " + "configuration, and the ticket renewal policy (maxrenewlife) for the '%s' and `krbtgt' " + "principals.", princ, ccache, princ + ) sys.exit(ret) diff --git a/airflow/sensors/external_task_sensor.py b/airflow/sensors/external_task_sensor.py index 97261d7ff97343..81813447507cd5 100644 --- a/airflow/sensors/external_task_sensor.py +++ b/airflow/sensors/external_task_sensor.py @@ -111,10 +111,9 @@ def poke(self, context, session=None): [datetime.isoformat() for datetime in dttm_filter]) self.log.info( - 'Poking for ' - '{self.external_dag_id}.' - '{self.external_task_id} on ' - '{} ... '.format(serialized_dttm_filter, **locals())) + 'Poking for %s.%s on %s ... ', + self.external_dag_id, self.external_task_id, serialized_dttm_filter + ) DM = DagModel TI = TaskInstance diff --git a/airflow/sensors/hdfs_sensor.py b/airflow/sensors/hdfs_sensor.py index 3eb5145ec9668b..bc3a3c63fd64e2 100644 --- a/airflow/sensors/hdfs_sensor.py +++ b/airflow/sensors/hdfs_sensor.py @@ -103,7 +103,7 @@ def filter_for_ignored_ext(result, ignored_ext, ignore_copying): def poke(self, context): sb = self.hook(self.hdfs_conn_id).get_conn() - self.log.info('Poking for file {self.filepath}'.format(**locals())) + self.log.info('Poking for file %s', self.filepath) try: # IMOO it's not right here, as there no raise of any kind. # if the filepath is let's say '/data/mydirectory', diff --git a/airflow/sensors/hive_partition_sensor.py b/airflow/sensors/hive_partition_sensor.py index ca10c863b478c3..5165540bb43464 100644 --- a/airflow/sensors/hive_partition_sensor.py +++ b/airflow/sensors/hive_partition_sensor.py @@ -65,8 +65,8 @@ def poke(self, context): if '.' in self.table: self.schema, self.table = self.table.split('.') self.log.info( - 'Poking for table {self.schema}.{self.table}, ' - 'partition {self.partition}'.format(**locals())) + 'Poking for table %s.%s, partition %s', self.schema, self.table, self.partition + ) if not hasattr(self, 'hook'): from airflow.hooks.hive_hooks import HiveMetastoreHook self.hook = HiveMetastoreHook( diff --git a/airflow/sensors/named_hive_partition_sensor.py b/airflow/sensors/named_hive_partition_sensor.py index f436614507ac8b..6f254589658540 100644 --- a/airflow/sensors/named_hive_partition_sensor.py +++ b/airflow/sensors/named_hive_partition_sensor.py @@ -61,9 +61,9 @@ def __init__(self, self.partition_names = partition_names self.hook = hook if self.hook and metastore_conn_id != 'metastore_default': - self.log.warning('A hook was passed but a non default' - 'metastore_conn_id=' - '{} was used'.format(metastore_conn_id)) + self.log.warning( + 'A hook was passed but a non defaul metastore_conn_id=%s was used', metastore_conn_id + ) @staticmethod def parse_partition_name(partition): @@ -89,9 +89,7 @@ def poke_partition(self, partition): schema, table, partition = self.parse_partition_name(partition) - self.log.info( - 'Poking for {schema}.{table}/{partition}'.format(**locals()) - ) + self.log.info('Poking for %s.%s/%s', schema, table, partition) return self.hook.check_for_named_partition( schema, table, partition) diff --git a/airflow/sensors/s3_key_sensor.py b/airflow/sensors/s3_key_sensor.py index f1e668e73340e1..b9b9caea6d7cc5 100644 --- a/airflow/sensors/s3_key_sensor.py +++ b/airflow/sensors/s3_key_sensor.py @@ -90,8 +90,7 @@ def __init__(self, def poke(self, context): from airflow.hooks.S3_hook import S3Hook hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) - full_url = "s3://" + self.bucket_name + "/" + self.bucket_key - self.log.info('Poking for key : {full_url}'.format(**locals())) + self.log.info('Poking for key : s3://%s/%s', self.bucket_name, self.bucket_key) if self.wildcard_match: return hook.check_for_wildcard_key(self.bucket_key, self.bucket_name) diff --git a/airflow/sensors/s3_prefix_sensor.py b/airflow/sensors/s3_prefix_sensor.py index e27cd2efa96678..16980e7b4ac31e 100644 --- a/airflow/sensors/s3_prefix_sensor.py +++ b/airflow/sensors/s3_prefix_sensor.py @@ -73,8 +73,7 @@ def __init__(self, self.verify = verify def poke(self, context): - self.log.info('Poking for prefix : {self.prefix}\n' - 'in bucket s3://{self.bucket_name}'.format(**locals())) + self.log.info('Poking for prefix : %s in bucket s3://%s', self.prefix, self.bucket_name) from airflow.hooks.S3_hook import S3Hook hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) return hook.check_for_prefix( diff --git a/airflow/sensors/web_hdfs_sensor.py b/airflow/sensors/web_hdfs_sensor.py index 67b1d385b40a23..f31a307a67ff9a 100644 --- a/airflow/sensors/web_hdfs_sensor.py +++ b/airflow/sensors/web_hdfs_sensor.py @@ -40,5 +40,5 @@ def __init__(self, def poke(self, context): from airflow.hooks.webhdfs_hook import WebHDFSHook c = WebHDFSHook(self.webhdfs_conn_id) - self.log.info('Poking for file {self.filepath}'.format(**locals())) + self.log.info('Poking for file %s', self.filepath) return c.check_for_path(hdfs_path=self.filepath) diff --git a/airflow/utils/cli_action_loggers.py b/airflow/utils/cli_action_loggers.py index 21304936f3ff57..2a0de06d663aa8 100644 --- a/airflow/utils/cli_action_loggers.py +++ b/airflow/utils/cli_action_loggers.py @@ -37,7 +37,7 @@ def register_pre_exec_callback(action_logger): :param action_logger: An action logger function :return: None """ - logging.debug("Adding {} to pre execution callback".format(action_logger)) + logging.debug("Adding %s to pre execution callback", action_logger) __pre_exec_callbacks.append(action_logger) @@ -50,7 +50,7 @@ def register_post_exec_callback(action_logger): :param action_logger: An action logger function :return: None """ - logging.debug("Adding {} to post execution callback".format(action_logger)) + logging.debug("Adding %s to post execution callback", action_logger) __post_exec_callbacks.append(action_logger) @@ -61,12 +61,12 @@ def on_pre_execution(**kwargs): :param kwargs: :return: None """ - logging.debug("Calling callbacks: {}".format(__pre_exec_callbacks)) + logging.debug("Calling callbacks: %s", __pre_exec_callbacks) for cb in __pre_exec_callbacks: try: cb(**kwargs) except Exception: - logging.exception('Failed on pre-execution callback using {}'.format(cb)) + logging.exception('Failed on pre-execution callback using %s', cb) def on_post_execution(**kwargs): @@ -78,12 +78,12 @@ def on_post_execution(**kwargs): :param kwargs: :return: None """ - logging.debug("Calling callbacks: {}".format(__post_exec_callbacks)) + logging.debug("Calling callbacks: %s", __post_exec_callbacks) for cb in __post_exec_callbacks: try: cb(**kwargs) except Exception: - logging.exception('Failed on post-execution callback using {}'.format(cb)) + logging.exception('Failed on post-execution callback using %s', cb) def default_action_log(log, **_): diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index 5026c3fd7fb4db..f0e0ae226861dc 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -510,8 +510,7 @@ def start(self): self._stat_queue, self._result_queue, self._async_mode) - self.log.info("Launched DagFileProcessorManager with pid: {}" - .format(self._process.pid)) + self.log.info("Launched DagFileProcessorManager with pid: %s", self._process.pid) def heartbeat(self): """ @@ -649,13 +648,11 @@ def end(self): # First try SIGTERM if manager_process.is_running() \ and manager_process.pid in [x.pid for x in this_process.children()]: - self.log.info( - "Terminating manager process: {}".format(manager_process.pid)) + self.log.info("Terminating manager process: %s", manager_process.pid) manager_process.terminate() # TODO: Remove magic number timeout = 5 - self.log.info("Waiting up to {}s for manager process to exit..." - .format(timeout)) + self.log.info("Waiting up to %ss for manager process to exit...", timeout) try: psutil.wait_procs({manager_process}, timeout) except psutil.TimeoutExpired: @@ -665,7 +662,7 @@ def end(self): # Then SIGKILL if manager_process.is_running() \ and manager_process.pid in [x.pid for x in this_process.children()]: - self.log.info("Killing manager process: {}".format(manager_process.pid)) + self.log.info("Killing manager process: %s", manager_process.pid) manager_process.kill() manager_process.wait() @@ -771,7 +768,7 @@ def _exit_gracefully(self, signum, frame): """ Helper method to clean up DAG file processors to avoid leaving orphan processes. """ - self.log.info("Exiting gracefully upon receiving signal {}".format(signum)) + self.log.info("Exiting gracefully upon receiving signal %s", signum) self.terminate() self.end() self.log.debug("Finished terminating DAG processors.") @@ -785,12 +782,11 @@ def start(self): user code. """ - self.log.info("Processing files using up to {} processes at a time " - .format(self._parallelism)) - self.log.info("Process each file at most once every {} seconds" - .format(self._file_process_interval)) - self.log.info("Checking for new files in {} every {} seconds" - .format(self._dag_directory, self.dag_dir_list_interval)) + self.log.info("Processing files using up to %s processes at a time ", self._parallelism) + self.log.info("Process each file at most once every %s seconds", self._file_process_interval) + self.log.info( + "Checking for new files in %s every %s seconds", self._dag_directory, self.dag_dir_list_interval + ) if self._async_mode: self.log.debug("Starting DagFileProcessorManager in async mode") @@ -842,8 +838,7 @@ def start_in_async(self): loop_duration = time.time() - loop_start_time if loop_duration < 1: sleep_length = 1 - loop_duration - self.log.debug("Sleeping for {0:.2f} seconds " - "to prevent excessive logging".format(sleep_length)) + self.log.debug("Sleeping for %.2f seconds to prevent excessive logging", sleep_length) time.sleep(sleep_length) def start_in_sync(self): @@ -898,13 +893,10 @@ def _refresh_dag_dir(self): self.last_dag_dir_refresh_time).total_seconds() if elapsed_time_since_refresh > self.dag_dir_list_interval: # Build up a list of Python files that could contain DAGs - self.log.info( - "Searching for files in {}".format(self._dag_directory)) + self.log.info("Searching for files in %s", self._dag_directory) self._file_paths = list_py_file_paths(self._dag_directory) self.last_dag_dir_refresh_time = timezone.utcnow() - self.log.info("There are {} files in {}" - .format(len(self._file_paths), - self._dag_directory)) + self.log.info("There are %s files in %s", len(self._file_paths), self._dag_directory) self.set_file_paths(self._file_paths) try: @@ -1238,8 +1230,7 @@ def _find_zombies(self, session): TI = airflow.models.TaskInstance limit_dttm = timezone.utcnow() - timedelta( seconds=self._zombie_threshold_secs) - self.log.info( - "Failing jobs without heartbeat after {}".format(limit_dttm)) + self.log.info("Failing jobs without heartbeat after %s", limit_dttm) tis = ( session.query(TI) @@ -1294,12 +1285,11 @@ def end(self): child_processes = [x for x in this_process.children(recursive=True) if x.is_running() and x.pid in pids_to_kill] for child in child_processes: - self.log.info("Terminating child PID: {}".format(child.pid)) + self.log.info("Terminating child PID: %s", child.pid) child.terminate() # TODO: Remove magic number timeout = 5 - self.log.info( - "Waiting up to %s seconds for processes to exit...", timeout) + self.log.info("Waiting up to %s seconds for processes to exit...", timeout) try: psutil.wait_procs( child_processes, timeout=timeout, @@ -1313,6 +1303,6 @@ def end(self): if len(child_processes) > 0: self.log.info("SIGKILL processes that did not terminate gracefully") for child in child_processes: - self.log.info("Killing child PID: {}".format(child.pid)) + self.log.info("Killing child PID: %s", child.pid) child.kill() child.wait() diff --git a/airflow/utils/log/es_task_handler.py b/airflow/utils/log/es_task_handler.py index 94a2224b187000..e81c923b7b8073 100644 --- a/airflow/utils/log/es_task_handler.py +++ b/airflow/utils/log/es_task_handler.py @@ -147,9 +147,7 @@ def es_read(self, log_id, offset): logs = s[self.MAX_LINE_PER_PAGE * self.PAGE:self.MAX_LINE_PER_PAGE] \ .execute() except Exception as e: - msg = 'Could not read log with log_id: {}, ' \ - 'error: {}'.format(log_id, str(e)) - self.log.exception(msg) + self.log.exception('Could not read log with log_id: %s, error: %s', log_id, str(e)) return logs diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py index 96fee9730ad40b..246fbb294f3510 100644 --- a/airflow/utils/log/gcs_task_handler.py +++ b/airflow/utils/log/gcs_task_handler.py @@ -49,8 +49,8 @@ def _build_hook(self): except Exception as e: self.log.error( 'Could not create a GoogleCloudStorageHook with connection id ' - '"{}". {}\n\nPlease make sure that airflow[gcp] is installed ' - 'and the GCS connection exists.'.format(remote_conn_id, str(e)) + '"%s". %s\n\nPlease make sure that airflow[gcp] is installed ' + 'and the GCS connection exists.', remote_conn_id, str(e) ) @property diff --git a/airflow/utils/timeout.py b/airflow/utils/timeout.py index f64800587792be..362baa197f33df 100644 --- a/airflow/utils/timeout.py +++ b/airflow/utils/timeout.py @@ -39,7 +39,7 @@ def __init__(self, seconds=1, error_message='Timeout'): self.error_message = error_message + ', PID: ' + str(os.getpid()) def handle_timeout(self, signum, frame): - self.log.error("Process timed out, PID: " + str(os.getpid())) + self.log.error("Process timed out, PID: %s", str(os.getpid())) raise AirflowTaskTimeout(self.error_message) def __enter__(self): diff --git a/airflow/www/api/experimental/endpoints.py b/airflow/www/api/experimental/endpoints.py index 13bc0ea929a3fa..9a47a3559b449c 100644 --- a/airflow/www/api/experimental/endpoints.py +++ b/airflow/www/api/experimental/endpoints.py @@ -86,7 +86,7 @@ def trigger_dag(dag_id): return response if getattr(g, 'user', None): - _log.info("User {} created {}".format(g.user, dr)) + _log.info("User %s created %s", g.user, dr) response = jsonify(message="Created {}".format(dr)) return response diff --git a/airflow/www/security.py b/airflow/www/security.py index 43485e0917803a..39feb7fb2dbab3 100644 --- a/airflow/www/security.py +++ b/airflow/www/security.py @@ -205,7 +205,7 @@ def delete_role(self, role_name): .filter(sqla_models.Role.name == role_name)\ .first() if role: - self.log.info("Deleting role '{}'".format(role_name)) + self.log.info("Deleting role '%s'", role_name) session.delete(role) session.commit() else: @@ -333,7 +333,7 @@ def clean_perms(self): deleted_count = pvms.delete() sesh.commit() if deleted_count: - self.log.info('Deleted {} faulty permissions'.format(deleted_count)) + self.log.info('Deleted %s faulty permissions', deleted_count) def _merge_perm(self, permission_name, view_menu_name): """ @@ -505,10 +505,10 @@ def _sync_dag_view_permissions(self, dag_id, access_control): def _get_or_create_dag_permission(perm_name): dag_perm = self.find_permission_view_menu(perm_name, dag_id) if not dag_perm: - self.log.info("Creating new permission '{}' on view '{}'".format( - perm_name, - dag_id - )) + self.log.info( + "Creating new permission '%s' on view '%s'", + perm_name, dag_id + ) dag_perm = self.add_permission_view_menu(perm_name, dag_id) return dag_perm @@ -521,11 +521,10 @@ def _revoke_stale_permissions(dag_view): for role in non_admin_roles: target_perms_for_role = access_control.get(role.name, {}) if perm.permission.name not in target_perms_for_role: - self.log.info("Revoking '{}' on DAG '{}' for role '{}'".format( - perm.permission, - dag_id, - role.name - )) + self.log.info( + "Revoking '%s' on DAG '%s' for role '%s'", + perm.permission, dag_id, role.name + ) self.del_permission_role(role, perm) dag_view = self.find_view_menu(dag_id) diff --git a/tests/contrib/hooks/test_gcp_container_hook.py b/tests/contrib/hooks/test_gcp_container_hook.py index be337c64baec22..5cee388599ff05 100644 --- a/tests/contrib/hooks/test_gcp_container_hook.py +++ b/tests/contrib/hooks/test_gcp_container_hook.py @@ -69,7 +69,7 @@ def test_delete_cluster_not_found(self, wait_mock, convert_mock, log_mock): self.gke_hook.delete_cluster('not-existing') wait_mock.assert_not_called() convert_mock.assert_not_called() - log_mock.info.assert_any_call("Assuming Success: " + message) + log_mock.info.assert_any_call("Assuming Success: %s", message) @mock.patch("airflow.contrib.hooks.gcp_container_hook.GKEClusterHook._dict_to_proto") @mock.patch( @@ -162,7 +162,7 @@ def test_create_cluster_already_exists(self, wait_mock, convert_mock, log_mock): self.gke_hook.create_cluster({}) wait_mock.assert_not_called() self.assertEqual(convert_mock.call_count, 1) - log_mock.info.assert_any_call("Assuming Success: " + message) + log_mock.info.assert_any_call("Assuming Success: %s", message) class GKEClusterHookGetTest(unittest.TestCase): diff --git a/tests/contrib/operators/test_gcp_container_operator.py b/tests/contrib/operators/test_gcp_container_operator.py index b337791be7b510..71df1692391013 100644 --- a/tests/contrib/operators/test_gcp_container_operator.py +++ b/tests/contrib/operators/test_gcp_container_operator.py @@ -285,7 +285,7 @@ def test_get_field(self): @mock.patch('airflow.contrib.operators.gcp_container_operator.GKEPodOperator.log') def test_get_field_fail(self, log_mock): log_mock.info = mock.Mock() - LOG_STR = 'Field {} not found in extras.' + LOG_STR = 'Field %s not found in extras.' FIELD_NAME = 'test_field' FIELD_VALUE = 'test_field_value' @@ -294,4 +294,4 @@ def test_get_field_fail(self, log_mock): ret_val = self.gke_op._get_field(extras, FIELD_NAME, default=FIELD_VALUE) # Assert default is returned upon failure self.assertEqual(FIELD_VALUE, ret_val) - log_mock.info.assert_called_with(LOG_STR.format(FIELD_NAME)) + log_mock.info.assert_called_with(LOG_STR, FIELD_NAME) diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py index 5ea8bf8c1589c9..bdccd7a610b2f4 100644 --- a/tests/executors/test_celery_executor.py +++ b/tests/executors/test_celery_executor.py @@ -155,12 +155,11 @@ def fake_celery_task(): mock_log.error.assert_called_once() args, kwargs = mock_log.error.call_args_list[0] - log = args[0] # Result of queuing is not a celery task but a dict, # and it should raise AttributeError and then get propagated # to the error log. - self.assertIn(celery_executor.CELERY_FETCH_ERR_MSG_HEADER, log) - self.assertIn('AttributeError', log) + self.assertIn(celery_executor.CELERY_FETCH_ERR_MSG_HEADER, args[0]) + self.assertIn('AttributeError', args[1]) if __name__ == '__main__': diff --git a/tests/utils/log/test_es_task_handler.py b/tests/utils/log/test_es_task_handler.py index c5164b1e1973ca..e40181a0a87aba 100644 --- a/tests/utils/log/test_es_task_handler.py +++ b/tests/utils/log/test_es_task_handler.py @@ -195,10 +195,9 @@ def test_read_raises(self): with mock.patch("elasticsearch_dsl.Search.execute") as mock_execute: mock_execute.side_effect = Exception('Failed to read') logs, metadatas = self.es_task_handler.read(self.ti, 1) - msg = "Could not read log with log_id: {}".format(self.LOG_ID) mock_exception.assert_called_once() args, kwargs = mock_exception.call_args - self.assertIn(msg, args[0]) + self.assertIn("Could not read log with log_id:", args[0]) self.assertEqual(1, len(logs)) self.assertEqual(len(logs), len(metadatas))