From 066930527fa2beeb499facb75c19dc50cb203075 Mon Sep 17 00:00:00 2001 From: Itay Levy Date: Fri, 30 Apr 2021 11:31:54 -0700 Subject: [PATCH 1/4] Added support for including headers in qubole results --- airflow/providers/qubole/CHANGELOG.rst | 9 +++++++++ airflow/providers/qubole/hooks/qubole.py | 4 ++-- airflow/providers/qubole/hooks/qubole_check.py | 2 +- airflow/providers/qubole/operators/qubole.py | 4 ++-- airflow/providers/qubole/provider.yaml | 1 + 5 files changed, 15 insertions(+), 5 deletions(-) diff --git a/airflow/providers/qubole/CHANGELOG.rst b/airflow/providers/qubole/CHANGELOG.rst index eac8d8e1711108..720c533703145c 100644 --- a/airflow/providers/qubole/CHANGELOG.rst +++ b/airflow/providers/qubole/CHANGELOG.rst @@ -19,6 +19,15 @@ Changelog --------- +1.0.3 +..... + +Features +~~~~~~~~ + +* ``Feature add support for include_headers in get_results`` + + 1.0.2 ..... diff --git a/airflow/providers/qubole/hooks/qubole.py b/airflow/providers/qubole/hooks/qubole.py index 99e60274087097..579441eff9e367 100644 --- a/airflow/providers/qubole/hooks/qubole.py +++ b/airflow/providers/qubole/hooks/qubole.py @@ -203,7 +203,7 @@ def kill(self, ti): self.cmd.cancel() # pylint: disable=consider-using-with - def get_results(self, ti=None, fp=None, inline: bool = True, delim=None, fetch: bool = True) -> str: + def get_results(self, ti=None, fp=None, inline: bool = True, delim=None, fetch: bool = True, include_headers: bool = False) -> str: """ Get results (or just s3 locations) of a command from Qubole and save into a file @@ -225,7 +225,7 @@ def get_results(self, ti=None, fp=None, inline: bool = True, delim=None, fetch: cmd_id = ti.xcom_pull(key="qbol_cmd_id", task_ids=self.task_id) self.cmd = self.cls.find(cmd_id) - self.cmd.get_results(fp, inline, delim, fetch) # type: ignore[attr-defined] + self.cmd.get_results(fp, inline, delim, fetch, arguments=[include_headers]) # type: ignore[attr-defined] fp.flush() fp.close() return fp.name diff --git a/airflow/providers/qubole/hooks/qubole_check.py b/airflow/providers/qubole/hooks/qubole_check.py index 987ea59ae20211..c15f47f54ff1cd 100644 --- a/airflow/providers/qubole/hooks/qubole_check.py +++ b/airflow/providers/qubole/hooks/qubole_check.py @@ -112,7 +112,7 @@ def get_query_results(self) -> Optional[str]: cmd_id = self.cmd.id self.log.info("command id: %d", cmd_id) query_result_buffer = StringIO() - self.cmd.get_results(fp=query_result_buffer, inline=True, delim=COL_DELIM) + self.cmd.get_results(fp=query_result_buffer, inline=True, delim=COL_DELIM, arguments=[True]) query_result = query_result_buffer.getvalue() query_result_buffer.close() return query_result diff --git a/airflow/providers/qubole/operators/qubole.py b/airflow/providers/qubole/operators/qubole.py index b090af6dc6ebe1..35024153d1ed81 100644 --- a/airflow/providers/qubole/operators/qubole.py +++ b/airflow/providers/qubole/operators/qubole.py @@ -250,9 +250,9 @@ def on_kill(self, ti=None) -> None: else: self.get_hook().kill(ti) - def get_results(self, ti=None, fp=None, inline: bool = True, delim=None, fetch: bool = True) -> str: + def get_results(self, ti=None, fp=None, inline: bool = True, delim=None, fetch: bool = True, include_headers: bool = False) -> str: """get_results from Qubole""" - return self.get_hook().get_results(ti, fp, inline, delim, fetch) + return self.get_hook().get_results(ti, fp, inline, delim, fetch,include_headers) def get_log(self, ti) -> None: """get_log from Qubole""" diff --git a/airflow/providers/qubole/provider.yaml b/airflow/providers/qubole/provider.yaml index bd4cd5827ee08f..dad3364cd8ded6 100644 --- a/airflow/providers/qubole/provider.yaml +++ b/airflow/providers/qubole/provider.yaml @@ -22,6 +22,7 @@ description: | `Qubole `__ versions: + - 1.0.3 - 1.0.2 - 1.0.1 - 1.0.0 From 96e5451352fc7f38973dcf94cae19a2d89fdf4cf Mon Sep 17 00:00:00 2001 From: Itay Levy Date: Fri, 30 Apr 2021 11:33:46 -0700 Subject: [PATCH 2/4] Added missing issue number --- airflow/providers/qubole/CHANGELOG.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/qubole/CHANGELOG.rst b/airflow/providers/qubole/CHANGELOG.rst index 720c533703145c..3512f3d8733807 100644 --- a/airflow/providers/qubole/CHANGELOG.rst +++ b/airflow/providers/qubole/CHANGELOG.rst @@ -25,7 +25,7 @@ Changelog Features ~~~~~~~~ -* ``Feature add support for include_headers in get_results`` +* ``Feature add support for include_headers in get_results (#15598)`` 1.0.2 From e9a21caef3a009b792eebf5b4330c3569690da66 Mon Sep 17 00:00:00 2001 From: Itay Levy Date: Sat, 1 May 2021 08:05:34 -0700 Subject: [PATCH 3/4] Fixed pylint errors and warnings --- airflow/providers/qubole/hooks/qubole.py | 6 ++++-- airflow/providers/qubole/operators/qubole.py | 5 +++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/airflow/providers/qubole/hooks/qubole.py b/airflow/providers/qubole/hooks/qubole.py index 579441eff9e367..7a4ea765e8e70d 100644 --- a/airflow/providers/qubole/hooks/qubole.py +++ b/airflow/providers/qubole/hooks/qubole.py @@ -203,7 +203,8 @@ def kill(self, ti): self.cmd.cancel() # pylint: disable=consider-using-with - def get_results(self, ti=None, fp=None, inline: bool = True, delim=None, fetch: bool = True, include_headers: bool = False) -> str: + def get_results(self, ti=None, fp=None, inline: bool = True, delim=None, fetch: bool = True, + include_headers: bool = False) -> str: """ Get results (or just s3 locations) of a command from Qubole and save into a file @@ -225,7 +226,8 @@ def get_results(self, ti=None, fp=None, inline: bool = True, delim=None, fetch: cmd_id = ti.xcom_pull(key="qbol_cmd_id", task_ids=self.task_id) self.cmd = self.cls.find(cmd_id) - self.cmd.get_results(fp, inline, delim, fetch, arguments=[include_headers]) # type: ignore[attr-defined] + self.cmd.get_results(fp, inline, delim, fetch, + arguments=[include_headers]) # type: ignore[attr-defined] fp.flush() fp.close() return fp.name diff --git a/airflow/providers/qubole/operators/qubole.py b/airflow/providers/qubole/operators/qubole.py index 35024153d1ed81..6458a869ca0261 100644 --- a/airflow/providers/qubole/operators/qubole.py +++ b/airflow/providers/qubole/operators/qubole.py @@ -250,9 +250,10 @@ def on_kill(self, ti=None) -> None: else: self.get_hook().kill(ti) - def get_results(self, ti=None, fp=None, inline: bool = True, delim=None, fetch: bool = True, include_headers: bool = False) -> str: + def get_results(self, ti=None, fp=None, inline: bool = True, delim=None, fetch: bool = True, + include_headers: bool = False) -> str: """get_results from Qubole""" - return self.get_hook().get_results(ti, fp, inline, delim, fetch,include_headers) + return self.get_hook().get_results(ti, fp, inline, delim, fetch, include_headers) def get_log(self, ti) -> None: """get_log from Qubole""" From a53c82159f7e6da697c3c2309cbdbd8c540a304a Mon Sep 17 00:00:00 2001 From: Itay Levy Date: Mon, 3 May 2021 09:46:40 -0700 Subject: [PATCH 4/4] fixing formatting that caused error with statics checks --- airflow/providers/qubole/hooks/qubole.py | 16 ++++++++++++---- airflow/providers/qubole/operators/qubole.py | 11 +++++++++-- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/airflow/providers/qubole/hooks/qubole.py b/airflow/providers/qubole/hooks/qubole.py index 7a4ea765e8e70d..cd23bbaf8f8f32 100644 --- a/airflow/providers/qubole/hooks/qubole.py +++ b/airflow/providers/qubole/hooks/qubole.py @@ -203,8 +203,15 @@ def kill(self, ti): self.cmd.cancel() # pylint: disable=consider-using-with - def get_results(self, ti=None, fp=None, inline: bool = True, delim=None, fetch: bool = True, - include_headers: bool = False) -> str: + def get_results( + self, + ti=None, + fp=None, + inline: bool = True, + delim=None, + fetch: bool = True, + include_headers: bool = False, + ) -> str: """ Get results (or just s3 locations) of a command from Qubole and save into a file @@ -226,8 +233,9 @@ def get_results(self, ti=None, fp=None, inline: bool = True, delim=None, fetch: cmd_id = ti.xcom_pull(key="qbol_cmd_id", task_ids=self.task_id) self.cmd = self.cls.find(cmd_id) - self.cmd.get_results(fp, inline, delim, fetch, - arguments=[include_headers]) # type: ignore[attr-defined] + self.cmd.get_results( + fp, inline, delim, fetch, arguments=[include_headers] + ) # type: ignore[attr-defined] fp.flush() fp.close() return fp.name diff --git a/airflow/providers/qubole/operators/qubole.py b/airflow/providers/qubole/operators/qubole.py index 6458a869ca0261..ce8a6c7c1809d9 100644 --- a/airflow/providers/qubole/operators/qubole.py +++ b/airflow/providers/qubole/operators/qubole.py @@ -250,8 +250,15 @@ def on_kill(self, ti=None) -> None: else: self.get_hook().kill(ti) - def get_results(self, ti=None, fp=None, inline: bool = True, delim=None, fetch: bool = True, - include_headers: bool = False) -> str: + def get_results( + self, + ti=None, + fp=None, + inline: bool = True, + delim=None, + fetch: bool = True, + include_headers: bool = False, + ) -> str: """get_results from Qubole""" return self.get_hook().get_results(ti, fp, inline, delim, fetch, include_headers)