diff --git a/airflow/providers/qubole/CHANGELOG.rst b/airflow/providers/qubole/CHANGELOG.rst index eac8d8e1711108..3512f3d8733807 100644 --- a/airflow/providers/qubole/CHANGELOG.rst +++ b/airflow/providers/qubole/CHANGELOG.rst @@ -19,6 +19,15 @@ Changelog --------- +1.0.3 +..... + +Features +~~~~~~~~ + +* ``Feature add support for include_headers in get_results (#15598)`` + + 1.0.2 ..... diff --git a/airflow/providers/qubole/hooks/qubole.py b/airflow/providers/qubole/hooks/qubole.py index 99e60274087097..cd23bbaf8f8f32 100644 --- a/airflow/providers/qubole/hooks/qubole.py +++ b/airflow/providers/qubole/hooks/qubole.py @@ -203,7 +203,15 @@ def kill(self, ti): self.cmd.cancel() # pylint: disable=consider-using-with - def get_results(self, ti=None, fp=None, inline: bool = True, delim=None, fetch: bool = True) -> str: + def get_results( + self, + ti=None, + fp=None, + inline: bool = True, + delim=None, + fetch: bool = True, + include_headers: bool = False, + ) -> str: """ Get results (or just s3 locations) of a command from Qubole and save into a file @@ -225,7 +233,9 @@ def get_results(self, ti=None, fp=None, inline: bool = True, delim=None, fetch: cmd_id = ti.xcom_pull(key="qbol_cmd_id", task_ids=self.task_id) self.cmd = self.cls.find(cmd_id) - self.cmd.get_results(fp, inline, delim, fetch) # type: ignore[attr-defined] + self.cmd.get_results( + fp, inline, delim, fetch, arguments=[include_headers] + ) # type: ignore[attr-defined] fp.flush() fp.close() return fp.name diff --git a/airflow/providers/qubole/hooks/qubole_check.py b/airflow/providers/qubole/hooks/qubole_check.py index 987ea59ae20211..c15f47f54ff1cd 100644 --- a/airflow/providers/qubole/hooks/qubole_check.py +++ b/airflow/providers/qubole/hooks/qubole_check.py @@ -112,7 +112,7 @@ def get_query_results(self) -> Optional[str]: cmd_id = self.cmd.id self.log.info("command id: %d", cmd_id) query_result_buffer = StringIO() - self.cmd.get_results(fp=query_result_buffer, inline=True, delim=COL_DELIM) + self.cmd.get_results(fp=query_result_buffer, inline=True, delim=COL_DELIM, arguments=[True]) query_result = query_result_buffer.getvalue() query_result_buffer.close() return query_result diff --git a/airflow/providers/qubole/operators/qubole.py b/airflow/providers/qubole/operators/qubole.py index b090af6dc6ebe1..ce8a6c7c1809d9 100644 --- a/airflow/providers/qubole/operators/qubole.py +++ b/airflow/providers/qubole/operators/qubole.py @@ -250,9 +250,17 @@ def on_kill(self, ti=None) -> None: else: self.get_hook().kill(ti) - def get_results(self, ti=None, fp=None, inline: bool = True, delim=None, fetch: bool = True) -> str: + def get_results( + self, + ti=None, + fp=None, + inline: bool = True, + delim=None, + fetch: bool = True, + include_headers: bool = False, + ) -> str: """get_results from Qubole""" - return self.get_hook().get_results(ti, fp, inline, delim, fetch) + return self.get_hook().get_results(ti, fp, inline, delim, fetch, include_headers) def get_log(self, ti) -> None: """get_log from Qubole""" diff --git a/airflow/providers/qubole/provider.yaml b/airflow/providers/qubole/provider.yaml index bd4cd5827ee08f..dad3364cd8ded6 100644 --- a/airflow/providers/qubole/provider.yaml +++ b/airflow/providers/qubole/provider.yaml @@ -22,6 +22,7 @@ description: | `Qubole `__ versions: + - 1.0.3 - 1.0.2 - 1.0.1 - 1.0.0