Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Delta Sharing protocol #22692

Closed
wants to merge 21 commits into from

Conversation

alexott
Copy link
Contributor

@alexott alexott commented Apr 1, 2022

This pull request adds support for Delta Sharing - open source protocol & implementation for secure sharing of data between organizations.

This introduces a new provider - apache-airflow-providers-delta-sharing that right now consists of two pieces:

  1. Sensor for Delta Sharing tables - it allows to wait until data is changed & trigger execution of a some data processing
  2. Operator for downloading of the Delta Sharing table to a local disk

closes: #19473

@mik-laj
Copy link
Member

mik-laj commented Apr 2, 2022

Hello @alexott - I think you might want to read, what @potiuk wrote in https://lists.apache.org/thread/t1k3d0518v4kxz1pqsprdc78h0wxobg0 and possibly chime in or start a new discussion in devliust about contributing Flyte. I and @potiuk think adding a new provider should now always be discussed on the dev list first.

@alexott
Copy link
Contributor Author

alexott commented Apr 2, 2022

Ok, let me start discussion - although we already had an issue for it… Another possibility - merge it into Databricks provider

@alexott
Copy link
Contributor Author

alexott commented Apr 2, 2022

@mik-laj @potiuk I've added a system test for this provider that uses demo account that is publicly available.

@alexott
Copy link
Contributor Author

alexott commented Apr 3, 2022

Test has failed somewhere in the core...

@dbarrundiag
Copy link
Contributor

Just out of curiosity, shouldn't we try to leverage the official Python Connector that delta-sharing has already implemented? I feel like I see here a bunch of duplicate or unnecessary code and might be a better design if we follow the "official" protocol so we don't have to keep up with API changes here?

For example instead of re-writing all this code code and implementing a _extract_delta_sharing_version method as we do here: https://github.com/apache/airflow/blob/9547f8c718719f24d5b25b856c4b6839bf5d2524/airflow/providers/delta/sharing/hooks/delta_sharing.py#L223

couldn't we just call query_table_version which has already been implemented in the "official" REST client: https://github.com/delta-io/delta-sharing/blob/main/python/delta_sharing/rest_client.py#L229

or why introduce a DeltaSharingQueryResult in the Airflow provider if we already have the "official" QueryTableMetadataResponse or QueryTableVersionResponse here
https://github.com/delta-io/delta-sharing/blob/main/python/delta_sharing/rest_client.py#L66

What do you all think? The only "con" I see is that it introduces a requirement to install the python module, but i feel like that's totally fair no?

CC: @alexott @mik-laj @potiuk

@alexott
Copy link
Contributor Author

alexott commented Apr 4, 2022

@dbarrundiag primary reason - the dependencies for delta-sharing package are heavyweight - it depends on pyarrow, pandas, etc. that lead to ~50Mb download compared to ~0.5Mb download of requests. Another reason is that package doesn't provide a way just to download data files, without converting them to Pandas or Spark dataframe. Plus, from a technical side - for example, QueryTableMetadataResponse class doesn't include table version information and you need to query it separately, but it may lead to a race condition when version changes between query version and query data.

@dbarrundiag
Copy link
Contributor

IMHO I would think if we are creating an Airflow Provider for delta-sharing, then to have the actual delta-sharing package as a dependency is totally fair. Specially if that means that the logic in the airflow provider does not have to re-implement all the features that the delta-sharing package has already.

Additionally, at the moment there are only 2 use-cases for this provider but in the future there might be more that will also leverage the features delta-sharing provides.

@alexott
Copy link
Contributor Author

alexott commented Apr 4, 2022

I think that it's still overkill for a current implementation. Later, when we'll need that functionality, we can swap dependencies.

@potiuk
Copy link
Member

potiuk commented Apr 4, 2022

@alexott @dbarrundiag I think separation makes sense but there should be a deliberate effort to re-use common code and possibly this can be done with the way we learned when we implemented Dataflow.

And it's good to consider and review the current code and see if the duplication can be removed (I do not know enough about either so I leave it up to you to decide how to approach it).

You can take a look at Apache Beam and Dataflow relation (in google provider) which is similar:

  • we have the apache-airlfow-providers-google provider with Dataflow (Apache Beam service)
  • we have a separate apache-airlfow-providers-apache-beam

Those two packages have extras which make them depend on each other - i.e. if someone wants to use Google Provider to just manage Dataflow instances - they can just install apache-airflow-providers-google but if they also want to use it to run Apache Beam jobs within such instance, they can install apache-airflow-providers-google[apache.beam] (and in reverse if they use Apache Beam and want to use Dataflow they can install apache-airflow-providers-apache-beam[google]. Or they can simply install both providers manually.

Google has host of dependencies on its own but Apache Beam brings AWFULLY lot more (in fact Apache Beam was a single reason why we released Python 3.10 and ARM support that late). So situation was rather similar.

Initially we only had Dataflow Provider and Apache Beam was extracted here:

And then there were some follow-up changes to reuse more common code, for example:

We already handle nicely "optional" features in Providers - i.e. if some of the cross-providers dependencies are missing and the import errors are handled gracefully (only visinle in debug logs) - to account for such "optional" features - so the "Databricks" provider does not have to have all the dependencies of delta-sharing. The "extra" indicates that you can use one with the other and some code in delta-sharing might be used from Databricks one so there is no unnecessary duplication involved.

Actually we even detect such cross-provider dependencies automatically, so there is even no need to remember about it when we prepare changelog (but we can also optionally add dependencies for minimum versions of providers in case there are some expectations about the cross-provider versions, which we already used for example with sftp <> ssh cross-provider dependencies.

Google has more such cross-provider depdendencies (but take a look at apache.beam):

https://airflow.apache.org/docs/apache-airflow-providers-google/stable/index.html#cross-provider-package-dependencies

@alexott
Copy link
Contributor Author

alexott commented Apr 4, 2022

thank you for detailed clarification Jarek! I maybe talk with Delta Sharing dev team on implementing only base functionality as a separate package, or something like this.

@potiuk
Copy link
Member

potiuk commented Apr 4, 2022

thank you for detailed clarification Jarek! I maybe talk with Delta Sharing dev team on implementing only base functionality as a separate package, or something like this.

Yep. Let us know. I am not against any solution here, it's just the question of maintenance and we've learned some hard way that initially when we split Beam/Dataflow, there was enough common code that it made sense to refactor it (and it was more than once I think). So better to think about it upfront :)

@alexott
Copy link
Contributor Author

alexott commented Apr 4, 2022

This situation is different than Beam/Dataflow - it's not about sharing code between providers, but about using ready to use library that brings too many dependencies (imho)

@alexott alexott force-pushed the delta-sharing-support branch 2 times, most recently from b0ee3f6 to 64cd9b1 Compare April 10, 2022 16:46
@potiuk
Copy link
Member

potiuk commented Apr 12, 2022

Hey @alexott - let's see what comes out of the discussion in https://lists.apache.org/thread/nvfc75kj2w1tywvvkw8ho5wkx1dcvgrn

I am not agains merging Delta Sharing, but I think - with the influx of new provider, discuss and settle on the approach we are going to have.

Your voice - on why you think delta sharing provider will be better in the community is most welcome.

file_size = file['size']
if os.path.exists(dest_file_path):
stat = os.stat(dest_file_path)
if file_size == stat.st_size and not self.overwrite_existing:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be safer if we do some sort of MD5 checksum check here and not just the size of the file?

)
return self._extract_delta_sharing_version(response, f"{share}.{schema}.{table}")

def query_table(
Copy link
Contributor

@dbarrundiag dbarrundiag Apr 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect example to use this instead https://github.com/delta-io/delta-sharing/blob/main/python/delta_sharing/rest_client.py#L242 and I can totally see how it's valuable to add the version to the ListFilesInTableResponse class to avoid race conditions... https://github.com/delta-io/delta-sharing/blob/main/python/delta_sharing/rest_client.py#L77

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

List files could have more overhead on the server side - that's why it's documented separately in the Delta Sharing protocol.

@potiuk
Copy link
Member

potiuk commented Jun 27, 2022

Hey @alexott , sorry It took a bit long but I think we got to a consensus regarding new providers, and we also propose some kind of mix-governance aproach, where (among the others) the stakeholders for the future providers (which we are going to technically split to separate repositories soon) should take a bit more responsibility for maintenance:

#24680

If that does not scare you away, and you still want to add the provider to Airflow community providers, feel free to rebase the PR. I will also ask you (hopefully it will be merged soon) to rebase it after we merge #24672 - we are going to change the way how we keep depdencies for providers in order to prepare them to separate to different repository.

Let us know what you want to do, either close the PR or rebase it and lead it to completion after #24672 is merged.

@alexott
Copy link
Contributor Author

alexott commented Jun 27, 2022

great news @potiuk ! I'm happy to rebase the implementation after #24672 is merged

@potiuk
Copy link
Member

potiuk commented Jul 4, 2022

Feel Free. It's merged.

@alexott
Copy link
Contributor Author

alexott commented Jul 10, 2022

Thank you Jarek. I'm starting the work on updating to match the new providers governance model, but I will need to work a bit more on supporting new functionality introduced in Delta Sharing.

@alexott alexott marked this pull request as draft July 24, 2022 12:53
@alexott
Copy link
Contributor Author

alexott commented Jul 24, 2022

Status update: I've started to update to match consensus, but I need to make updates to incorporate changes in the Delta Sharing protocol

from cached_property import cached_property


USER_AGENT_HEADER = {'user-agent': f'airflow-{__version__}'}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we set this with a delta-sharing prefix:
USER_AGENT_HEADER = {'user-agent': f'delta-sharing-airflow-{version}'}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be changed to match recent changes in user agent format

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what changes specifically?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll take a note on our side to know that this header will indicate the traffic from airflow connector.

@github-actions
Copy link

github-actions bot commented Dec 1, 2022

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Dec 1, 2022
@github-actions github-actions bot closed this Dec 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:dev-tools area:providers kind:documentation stale Stale PRs per the .github/workflows/stale.yml policy file
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add DeltaSharing provider and sensor.
6 participants