Skip to content

Commit

Permalink
chore: try using the entire table path for flink inserts
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Oct 9, 2024
1 parent e1c0e72 commit 606dccb
Showing 1 changed file with 6 additions and 2 deletions.
8 changes: 6 additions & 2 deletions ibis/backends/flink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -906,18 +906,22 @@ def insert(
)
return self.raw_sql(statement.compile())

identifier = sg.table(

Check warning on line 909 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L909

Added line #L909 was not covered by tests
table_name, db=database, catalog=catalog, quoted=self.compiler.quoted
).sql(self.dialect)

if isinstance(obj, pa.Table):
obj = obj.to_pandas()
if isinstance(obj, dict):
obj = pd.DataFrame.from_dict(obj)
if isinstance(obj, pd.DataFrame):
table = self._table_env.from_pandas(obj)
return table.execute_insert(table_name, overwrite=overwrite)
return table.execute_insert(identifier, overwrite=overwrite)

Check warning on line 919 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L919

Added line #L919 was not covered by tests

if isinstance(obj, list):
# pyflink infers datatypes, which may sometimes result in incompatible types
table = self._table_env.from_elements(obj)
return table.execute_insert(table_name, overwrite=overwrite)
return table.execute_insert(identifier, overwrite=overwrite)

Check warning on line 924 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L924

Added line #L924 was not covered by tests

raise ValueError(
"No operation is being performed. Either the obj parameter "
Expand Down

0 comments on commit 606dccb

Please sign in to comment.