Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-3980] Unify logger #4804

Merged
merged 1 commit into from
Mar 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ def run(args, dag=None):
dag = get_dag(args)
elif not dag:
with db.create_session() as session:
log.info('Loading pickle id {args.pickle}'.format(args=args))
log.info('Loading pickle id %s', args.pickle)
dag_pickle = session.query(DagPickle).filter(DagPickle.id == args.pickle).first()
if not dag_pickle:
raise AirflowException("Who hid the pickle!? [missing pickle]")
Expand Down
4 changes: 2 additions & 2 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,12 @@ def get(self, section, key, **kwargs):

else:
log.warning(
"section/key [{section}/{key}] not found in config".format(**locals())
"section/key [%s/%s] not found in config", section, key
)

raise AirflowConfigException(
"section/key [{section}/{key}] not found "
"in config".format(**locals()))
"in config".format(section=section, key=key))

def getboolean(self, section, key, **kwargs):
val = str(self.get(section, key, **kwargs)).lower().strip()
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/auth/backends/ldap_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def try_login(username, password):
Unable to parse LDAP structure. If you're using Active Directory
and not specifying an OU, you must set search_scope=SUBTREE in airflow.cfg.
%s
""" % traceback.format_exc())
""", traceback.format_exc())
raise LdapException(
"Could not parse LDAP structure. "
"Try setting search_scope in airflow.cfg, or check logs"
Expand Down
40 changes: 22 additions & 18 deletions airflow/contrib/hooks/bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ def run_query(self,

if param_name == 'schemaUpdateOptions' and param:
self.log.info("Adding experimental 'schemaUpdateOptions': "
"{0}".format(schema_update_options))
"%s", schema_update_options)

if param_name == 'destinationTable':
for key in ['projectId', 'datasetId', 'tableId']:
Expand Down Expand Up @@ -1157,8 +1157,9 @@ def run_load(self,
"'WRITE_APPEND' or 'WRITE_TRUNCATE'.")
else:
self.log.info(
"Adding experimental "
"'schemaUpdateOptions': {0}".format(schema_update_options))
"Adding experimental 'schemaUpdateOptions': %s",
schema_update_options
)
configuration['load'][
'schemaUpdateOptions'] = schema_update_options

Expand Down Expand Up @@ -1559,9 +1560,10 @@ def create_empty_dataset(self, dataset_id="", project_id="",
param, param_name, param_default = param_tuple
if param_name not in dataset_reference['datasetReference']:
if param_default and not param:
self.log.info("{} was not specified. Will be used default "
"value {}.".format(param_name,
param_default))
self.log.info(
"%s was not specified. Will be used default value %s.",
param_name, param_default
)
param = param_default
dataset_reference['datasetReference'].update(
{param_name: param})
Expand Down Expand Up @@ -1639,7 +1641,7 @@ def get_dataset(self, dataset_id, project_id=None):
try:
dataset_resource = self.service.datasets().get(
datasetId=dataset_id, projectId=dataset_project_id).execute()
self.log.info("Dataset Resource: {}".format(dataset_resource))
self.log.info("Dataset Resource: %s", dataset_resource)
except HttpError as err:
raise AirflowException(
'BigQuery job failed. Error was: {}'.format(err.content))
Expand Down Expand Up @@ -1686,7 +1688,7 @@ def get_datasets_list(self, project_id=None):
try:
datasets_list = self.service.datasets().list(
projectId=dataset_project_id).execute()['datasets']
self.log.info("Datasets List: {}".format(datasets_list))
self.log.info("Datasets List: %s", datasets_list)

except HttpError as err:
raise AirflowException(
Expand Down Expand Up @@ -1741,18 +1743,21 @@ def insert_all(self, project_id, dataset_id, table_id,
}

try:
self.log.info('Inserting {} row(s) into Table {}:{}.{}'.format(
len(rows), dataset_project_id,
dataset_id, table_id))
self.log.info(
'Inserting %s row(s) into Table %s:%s.%s',
len(rows), dataset_project_id, dataset_id, table_id
)

resp = self.service.tabledata().insertAll(
projectId=dataset_project_id, datasetId=dataset_id,
tableId=table_id, body=body
).execute()

if 'insertErrors' not in resp:
self.log.info('All row(s) inserted successfully: {}:{}.{}'.format(
dataset_project_id, dataset_id, table_id))
self.log.info(
'All row(s) inserted successfully: %s:%s.%s',
dataset_project_id, dataset_id, table_id
)
else:
error_msg = '{} insert error(s) occurred: {}:{}.{}. Details: {}'.format(
len(resp['insertErrors']),
Expand Down Expand Up @@ -2034,11 +2039,10 @@ def var_print(var_name):
if project_id is None:
if var_name is not None:
log = LoggingMixin().log
log.info('Project not included in {var}: {input}; '
'using project "{project}"'.format(
var=var_name,
input=table_input,
project=default_project_id))
log.info(
'Project not included in %s: %s; using project "%s"',
var_name, table_input, default_project_id
)
project_id = default_project_id

return project_id, dataset_id, table_id
Expand Down
27 changes: 14 additions & 13 deletions airflow/contrib/hooks/gcp_container_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def wait_for_operation(self, operation, project_id=None):
:type project_id: str
:return: A new, updated operation fetched from Google Cloud
"""
self.log.info("Waiting for OPERATION_NAME %s" % operation.name)
self.log.info("Waiting for OPERATION_NAME %s", operation.name)
time.sleep(OPERATIONAL_POLL_INTERVAL)
while operation.status != Operation.Status.DONE:
if operation.status == Operation.Status.RUNNING or operation.status == \
Expand Down Expand Up @@ -151,8 +151,9 @@ def delete_cluster(self, name, project_id=None, retry=DEFAULT, timeout=DEFAULT):
:return: The full url to the delete operation if successful, else None
"""

self.log.info("Deleting (project_id={}, zone={}, cluster_id={})".format(
self.project_id, self.location, name))
self.log.info(
"Deleting (project_id=%s, zone=%s, cluster_id=%s)", self.project_id, self.location, name
)

try:
op = self.get_client().delete_cluster(project_id=project_id or self.project_id,
Expand All @@ -164,7 +165,7 @@ def delete_cluster(self, name, project_id=None, retry=DEFAULT, timeout=DEFAULT):
# Returns server-defined url for the resource
return op.self_link
except NotFound as error:
self.log.info('Assuming Success: ' + error.message)
self.log.info('Assuming Success: %s', error.message)

def create_cluster(self, cluster, project_id=None, retry=DEFAULT, timeout=DEFAULT):
"""
Expand Down Expand Up @@ -200,10 +201,10 @@ def create_cluster(self, cluster, project_id=None, retry=DEFAULT, timeout=DEFAUL

self._append_label(cluster, 'airflow-version', 'v' + version.version)

self.log.info("Creating (project_id={}, zone={}, cluster_name={})".format(
self.project_id,
self.location,
cluster.name))
self.log.info(
"Creating (project_id=%s, zone=%s, cluster_name=%s)",
self.project_id, self.location, cluster.name
)
try:
op = self.get_client().create_cluster(project_id=project_id or self.project_id,
zone=self.location,
Expand All @@ -214,7 +215,7 @@ def create_cluster(self, cluster, project_id=None, retry=DEFAULT, timeout=DEFAUL

return op.target_link
except AlreadyExists as error:
self.log.info('Assuming Success: ' + error.message)
self.log.info('Assuming Success: %s', error.message)
return self.get_cluster(name=cluster.name).self_link

def get_cluster(self, name, project_id=None, retry=DEFAULT, timeout=DEFAULT):
Expand All @@ -234,10 +235,10 @@ def get_cluster(self, name, project_id=None, retry=DEFAULT, timeout=DEFAULT):
:type timeout: float
:return: google.cloud.container_v1.types.Cluster
"""
self.log.info("Fetching cluster (project_id={}, zone={}, cluster_name={})".format(
project_id or self.project_id,
self.location,
name))
self.log.info(
"Fetching cluster (project_id=%s, zone=%s, cluster_name=%s)",
project_id or self.project_id, self.location, name
)

return self.get_client().get_cluster(project_id=project_id or self.project_id,
zone=self.location,
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/kubernetes/pod_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def _extract_xcom(self, pod):

def _exec_pod_command(self, resp, command):
if resp.is_open():
self.log.info('Running command... %s\n' % command)
self.log.info('Running command... %s\n', command)
resp.write_stdin(command + '\n')
while resp.is_open():
resp.update(timeout=1)
Expand Down
10 changes: 6 additions & 4 deletions airflow/contrib/operators/aws_athena_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ def on_kill(self):
"""
if self.query_execution_id:
self.log.info('⚰️⚰️⚰️ Received a kill Signal. Time to Die')
self.log.info('Stopping Query with executionId - {queryId}'.format(
queryId=self.query_execution_id))
self.log.info(
'Stopping Query with executionId - %s', self.query_execution_id
)
response = self.hook.stop_query(self.query_execution_id)
http_status_code = None
try:
Expand All @@ -93,6 +94,7 @@ def on_kill(self):
if http_status_code is None or http_status_code != 200:
self.log.error('Unable to request query cancel on athena. Exiting')
else:
self.log.info('Polling Athena for query with id {queryId} to reach final state'.format(
queryId=self.query_execution_id))
self.log.info(
'Polling Athena for query with id %s to reach final state', self.query_execution_id
)
self.hook.poll_query_status(self.query_execution_id)
10 changes: 6 additions & 4 deletions airflow/contrib/operators/gcp_compute_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,11 @@ def execute(self, context):
# Group Manager. We assume success if the template is simply present
existing_template = self._hook.get_instance_template(
resource_id=self.body_patch['name'], project_id=self.project_id)
self.log.info("The {} template already existed. It was likely "
"created by previous run of the operator. Assuming success.")
self.log.info(
"The %s template already existed. It was likely created by previous run of the operator. "
"Assuming success.",
existing_template
)
return existing_template
except HttpError as e:
# We actually expect to get 404 / Not Found here as the template should
Expand All @@ -372,8 +375,7 @@ def execute(self, context):
new_body = deepcopy(old_body)
self._field_sanitizer.sanitize(new_body)
new_body = merge(new_body, self.body_patch)
self.log.info("Calling insert instance template with updated body: {}".
format(new_body))
self.log.info("Calling insert instance template with updated body: %s", new_body)
self._hook.insert_instance_template(body=new_body,
request_id=self.request_id,
project_id=self.project_id)
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/gcp_container_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,5 +318,5 @@ def _get_field(self, extras, field, default=None):
if long_f in extras:
return extras[long_f]
else:
self.log.info('Field {} not found in extras.'.format(field))
self.log.info('Field %s not found in extras.', field)
return default
11 changes: 5 additions & 6 deletions airflow/contrib/operators/mlengine_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,9 @@ def check_existing_job(existing_job):
raise

if finished_prediction_job['state'] != 'SUCCEEDED':
self.log.error('MLEngine batch prediction job failed: {}'.format(
str(finished_prediction_job)))
self.log.error(
'MLEngine batch prediction job failed: %s', str(finished_prediction_job)
)
raise RuntimeError(finished_prediction_job['errorMessage'])

return finished_prediction_job['predictionOutput']
Expand Down Expand Up @@ -597,8 +598,7 @@ def execute(self, context):

if self._mode == 'DRY_RUN':
self.log.info('In dry_run mode.')
self.log.info('MLEngine Training job request is: {}'.format(
training_request))
self.log.info('MLEngine Training job request is: %s', training_request)
return

hook = MLEngineHook(
Expand All @@ -617,6 +617,5 @@ def check_existing_job(existing_job):
raise

if finished_training_job['state'] != 'SUCCEEDED':
self.log.error('MLEngine training job failed: {}'.format(
str(finished_training_job)))
self.log.error('MLEngine training job failed: %s', str(finished_training_job))
raise RuntimeError(finished_training_job['errorMessage'])
5 changes: 2 additions & 3 deletions airflow/contrib/operators/oracle_to_oracle_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ def __init__(
def _execute(self, src_hook, dest_hook, context):
with src_hook.get_conn() as src_conn:
cursor = src_conn.cursor()
self.log.info("Querying data from source: {0}".format(
self.oracle_source_conn_id))
self.log.info("Querying data from source: %s", self.oracle_source_conn_id)
cursor.execute(self.source_sql, self.source_sql_params)
target_fields = list(map(lambda field: field[0], cursor.description))

Expand All @@ -81,7 +80,7 @@ def _execute(self, src_hook, dest_hook, context):
target_fields=target_fields,
commit_every=self.rows_chunk)
rows = cursor.fetchmany(self.rows_chunk)
self.log.info("Total inserted: {0} rows".format(rows_total))
self.log.info("Total inserted: %s rows", rows_total)

self.log.info("Finished data transfer.")
cursor.close()
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/s3_delete_objects_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def execute(self, context):
response = s3_hook.delete_objects(bucket=self.bucket, keys=self.keys)

deleted_keys = [x['Key'] for x in response.get("Deleted", [])]
self.log.info("Deleted: {}".format(deleted_keys))
self.log.info("Deleted: %s", deleted_keys)

if "Errors" in response:
errors_keys = [x['Key'] for x in response.get("Errors", [])]
Expand Down
5 changes: 3 additions & 2 deletions airflow/contrib/operators/s3_list_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ def execute(self, context):
hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)

self.log.info(
'Getting the list of files from bucket: {0} in prefix: {1} (Delimiter {2})'.
format(self.bucket, self.prefix, self.delimiter))
'Getting the list of files from bucket: %s in prefix: %s (Delimiter {%s)',
self.bucket, self.prefix, self.delimiter
)

return hook.list_keys(
bucket_name=self.bucket,
Expand Down
5 changes: 3 additions & 2 deletions airflow/contrib/operators/s3_to_gcs_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,9 @@ def execute(self, context):

files = list(set(files) - set(existing_files))
if len(files) > 0:
self.log.info('{0} files are going to be synced: {1}.'.format(
len(files), files))
self.log.info(
'%s files are going to be synced: %s.', len(files), files
)
else:
self.log.info(
'There are no new files to sync. Have a nice day!')
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/sagemaker_endpoint_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def execute(self, context):
else:
raise ValueError('Invalid value! Argument operation has to be one of "create" and "update"')

self.log.info('{} SageMaker endpoint {}.'.format(log_str, endpoint_info['EndpointName']))
self.log.info('%s SageMaker endpoint %s.', log_str, endpoint_info['EndpointName'])

response = sagemaker_operation(
endpoint_info,
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/winrm_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def execute(self, context):
winrm_client = self.winrm_hook.get_conn()

try:
self.log.info("Running command: '{command}'...".format(command=self.command))
self.log.info("Running command: '%s'...", self.command)
command_id = self.winrm_hook.winrm_protocol.run_command(
winrm_client,
self.command
Expand Down
4 changes: 2 additions & 2 deletions airflow/contrib/sensors/aws_glue_catalog_partition_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ def poke(self, context):
if '.' in self.table_name:
self.database_name, self.table_name = self.table_name.split('.')
self.log.info(
'Poking for table {self.database_name}.{self.table_name}, '
'expression {self.expression}'.format(**locals()))
'Poking for table %s. %s, expression %s', self.database_name, self.table_name, self.expression
)

return self.get_hook().check_for_partition(
self.database_name, self.table_name, self.expression)
Expand Down
8 changes: 2 additions & 6 deletions airflow/contrib/sensors/bash_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,7 @@ def poke(self, context):
f.flush()
fname = f.name
script_location = tmp_dir + "/" + fname
self.log.info(
"Temporary script location: %s",
script_location
)
self.log.info("Temporary script location: %s", script_location)
self.log.info("Running command: %s", bash_command)
sp = Popen(
['bash', fname],
Expand All @@ -89,7 +86,6 @@ def poke(self, context):
line = line.decode(self.output_encoding).strip()
self.log.info(line)
sp.wait()
self.log.info("Command exited with "
"return code {0}".format(sp.returncode))
self.log.info("Command exited with return code %s", sp.returncode)

return not sp.returncode
Loading