diff --git a/airflow/api/common/experimental/delete_dag.py b/airflow/api/common/experimental/delete_dag.py index a14f08c1254bb..3c0b87f03cfde 100644 --- a/airflow/api/common/experimental/delete_dag.py +++ b/airflow/api/common/experimental/delete_dag.py @@ -41,9 +41,9 @@ def delete_dag(dag_id, keep_records_in_log=True, session=None): if dag is None: raise DagNotFound("Dag id {} not found".format(dag_id)) - if dag.fileloc and os.path.exists(dag.fileloc): + if dag.get_local_fileloc() and os.path.exists(dag.get_local_fileloc()): raise DagFileExists("Dag id {} is still in DagBag. " - "Remove the DAG file first: {}".format(dag_id, dag.fileloc)) + "Remove the DAG file first: {}".format(dag_id, dag.get_local_fileloc())) count = 0 diff --git a/airflow/api/common/experimental/get_code.py b/airflow/api/common/experimental/get_code.py index 505dba0c1a92f..87f5b4fe8a067 100644 --- a/airflow/api/common/experimental/get_code.py +++ b/airflow/api/common/experimental/get_code.py @@ -31,7 +31,7 @@ def get_code(dag_id): # type (str) -> str dag = check_and_get_dag(dag_id=dag_id) try: - with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as file: + with wwwutils.open_maybe_zipped(dag.get_local_fileloc(), 'r') as file: code = file.read() return code except IOError as exception: diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index f55d965daf78a..8cefa950436db 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -117,6 +117,7 @@ def get_dag(self, dag_id): """ from airflow.models.dag import DagModel # Avoid circular import + dag = None # If asking for a known subdag, we want to refresh the parent root_dag_id = dag_id if dag_id in self.dags: @@ -135,7 +136,7 @@ def get_dag(self, dag_id): ): # Reprocess source file # TODO: remove the below hack to find relative dag location in webserver - filepath = dag.fileloc if dag else orm_dag.fileloc + filepath = dag.get_local_fileloc() if dag else orm_dag.get_local_fileloc() found_dags = self.process_file( filepath=correct_maybe_zipped(filepath), only_if_updated=False) @@ -248,7 +249,7 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): if isinstance(dag, DAG): if not dag.full_filepath: dag.full_filepath = filepath - if dag.fileloc != filepath and not is_zipfile: + if dag.get_local_fileloc() != filepath and not is_zipfile: dag.fileloc = filepath try: dag.is_subdag = False diff --git a/airflow/sensors/external_task_sensor.py b/airflow/sensors/external_task_sensor.py index ca98cbf50e736..5a2b68a5466fc 100644 --- a/airflow/sensors/external_task_sensor.py +++ b/airflow/sensors/external_task_sensor.py @@ -125,12 +125,12 @@ def poke(self, context, session=None): raise AirflowException('The external DAG ' '{} does not exist.'.format(self.external_dag_id)) else: - if not os.path.exists(dag_to_wait.fileloc): + if not os.path.exists(dag_to_wait.get_local_fileloc()): raise AirflowException('The external DAG ' '{} was deleted.'.format(self.external_dag_id)) if self.external_task_id: - refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id) + refreshed_dag_info = DagBag(dag_to_wait.get_local_fileloc()).get_dag(self.external_dag_id) if not refreshed_dag_info.has_task(self.external_task_id): raise AirflowException('The external task' '{} in DAG {} does not exist.'.format(self.external_task_id, diff --git a/airflow/version.py b/airflow/version.py index 4e19b9fd40adf..2bd29cebde3e7 100644 --- a/airflow/version.py +++ b/airflow/version.py @@ -18,5 +18,5 @@ # under the License. # -version = '1.10.4+twtr5' +version = '1.10.4+twtr6'