Skip to content

Commit

Permalink
[AIRFLOW-6670][depends on AIRFLOW-6669] Move contrib operators to pro…
Browse files Browse the repository at this point in the history
…viders package (#7286)
  • Loading branch information
mik-laj authored Jan 29, 2020
1 parent 1988a97 commit 057f3ae
Show file tree
Hide file tree
Showing 52 changed files with 2,367 additions and 1,881 deletions.
2 changes: 1 addition & 1 deletion airflow/contrib/example_dags/example_gcs_to_gdrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import os

from airflow import models
from airflow.contrib.operators.gcs_to_gdrive_operator import GCSToGoogleDriveOperator
from airflow.providers.google.suite.operators.gcs_to_gdrive_operator import GCSToGoogleDriveOperator
from airflow.utils.dates import days_ago

GCS_TO_GDRIVE_BUCKET = os.environ.get("GCS_TO_DRIVE_BUCKET", "example-object")
Expand Down
51 changes: 8 additions & 43 deletions airflow/contrib/operators/file_to_wasb.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,49 +16,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from airflow.models import BaseOperator
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
from airflow.utils.decorators import apply_defaults


class FileToWasbOperator(BaseOperator):
"""
Uploads a file to Azure Blob Storage.
"""This module is deprecated. Please use `airflow.providers.microsoft.azure.operators.file_to_wasb`."""

:param file_path: Path to the file to load. (templated)
:type file_path: str
:param container_name: Name of the container. (templated)
:type container_name: str
:param blob_name: Name of the blob. (templated)
:type blob_name: str
:param wasb_conn_id: Reference to the wasb connection.
:type wasb_conn_id: str
:param load_options: Optional keyword arguments that
`WasbHook.load_file()` takes.
:type load_options: dict
"""
template_fields = ('file_path', 'container_name', 'blob_name')
import warnings

@apply_defaults
def __init__(self, file_path, container_name, blob_name,
wasb_conn_id='wasb_default', load_options=None, *args,
**kwargs):
super().__init__(*args, **kwargs)
if load_options is None:
load_options = {}
self.file_path = file_path
self.container_name = container_name
self.blob_name = blob_name
self.wasb_conn_id = wasb_conn_id
self.load_options = load_options
# pylint: disable=unused-import
from airflow.providers.microsoft.azure.operators.file_to_wasb import FileToWasbOperator # noqa

def execute(self, context):
"""Upload a file to Azure Blob Storage."""
hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
self.log.info(
'Uploading %s to wasb://%s as %s',
self.file_path, self.container_name, self.blob_name,
)
hook.load_file(self.file_path, self.container_name,
self.blob_name, **self.load_options)
warnings.warn(
"This module is deprecated. Please use `airflow.providers.microsoft.azure.operators.file_to_wasb`.",
DeprecationWarning, stacklevel=2
)
135 changes: 9 additions & 126 deletions airflow/contrib/operators/gcs_to_gdrive_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,132 +16,15 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This module contains a Google Cloud Storage operator.
"""
import tempfile
from typing import Optional
"""This module is deprecated. Please use `airflow.providers.google.suite.operators.gcs_to_gdrive_operator`."""

from airflow.exceptions import AirflowException
from airflow.gcp.hooks.gcs import GCSHook
from airflow.models import BaseOperator
from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
from airflow.utils.decorators import apply_defaults
import warnings

WILDCARD = "*"
# pylint: disable=unused-import
from airflow.providers.google.suite.operators.gcs_to_gdrive_operator import GCSToGoogleDriveOperator # noqa


class GCSToGoogleDriveOperator(BaseOperator):
"""
Copies objects from a Google Cloud Storage service service to Google Drive service, with renaming
if requested.
Using this operator requires the following OAuth 2.0 scope:
.. code-block:: none
https://www.googleapis.com/auth/drive
:param source_bucket: The source Google Cloud Storage bucket where the object is. (templated)
:type source_bucket: str
:param source_object: The source name of the object to copy in the Google cloud
storage bucket. (templated)
You can use only one wildcard for objects (filenames) within your bucket. The wildcard can appear
inside the object name or at the end of the object name. Appending a wildcard to the bucket name
is unsupported.
:type source_object: str
:param destination_object: The destination name of the object in the destination Google Drive
service. (templated)
If a wildcard is supplied in the source_object argument, this is the prefix that will be prepended
to the final destination objects' paths.
Note that the source path's part before the wildcard will be removed;
if it needs to be retained it should be appended to destination_object.
For example, with prefix ``foo/*`` and destination_object ``blah/``, the file ``foo/baz`` will be
copied to ``blah/baz``; to retain the prefix write the destination_object as e.g. ``blah/foo``, in
which case the copied file will be named ``blah/foo/baz``.
:type destination_object: str
:param move_object: When move object is True, the object is moved instead of copied to the new location.
This is the equivalent of a mv command as opposed to a cp command.
:type move_object: bool
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must have domain-wide delegation enabled.
:type delegate_to: str
"""

template_fields = ("source_bucket", "source_object", "destination_object")
ui_color = "#f0eee4"

@apply_defaults
def __init__(
self,
source_bucket: str,
source_object: str,
destination_object: Optional[str] = None,
move_object: bool = False,
gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
*args,
**kwargs
):
super().__init__(*args, **kwargs)

self.source_bucket = source_bucket
self.source_object = source_object
self.destination_object = destination_object
self.move_object = move_object
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.gcs_hook = None # type: Optional[GCSHook]
self.gdrive_hook = None # type: Optional[GoogleDriveHook]

def execute(self, context):

self.gcs_hook = GCSHook(
google_cloud_storage_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to
)
self.gdrive_hook = GoogleDriveHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to)

if WILDCARD in self.source_object:
total_wildcards = self.source_object.count(WILDCARD)
if total_wildcards > 1:
error_msg = (
"Only one wildcard '*' is allowed in source_object parameter. "
"Found {} in {}.".format(total_wildcards, self.source_object)
)

raise AirflowException(error_msg)

prefix, delimiter = self.source_object.split(WILDCARD, 1)
objects = self.gcs_hook.list(self.source_bucket, prefix=prefix, delimiter=delimiter)

for source_object in objects:
if self.destination_object is None:
destination_object = source_object
else:
destination_object = source_object.replace(prefix, self.destination_object, 1)

self._copy_single_object(source_object=source_object, destination_object=destination_object)
else:
self._copy_single_object(
source_object=self.source_object, destination_object=self.destination_object
)

def _copy_single_object(self, source_object, destination_object):
self.log.info(
"Executing copy of gs://%s/%s to gdrive://%s",
self.source_bucket,
source_object,
destination_object,
)

with tempfile.NamedTemporaryFile() as file:
filename = file.name
self.gcs_hook.download(
bucket_name=self.source_bucket, object_name=source_object, filename=filename
)
self.gdrive_hook.upload_file(local_location=filename, remote_location=destination_object)

if self.move_object:
self.gcs_hook.delete(self.source_bucket, source_object)
warnings.warn(
"This module is deprecated. "
"Please use `airflow.providers.google.suite.operators.gcs_to_gdrive_operator`.",
DeprecationWarning, stacklevel=2
)
115 changes: 17 additions & 98 deletions airflow/contrib/operators/oracle_to_azure_data_lake_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,101 +16,20 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import os
from tempfile import TemporaryDirectory

import unicodecsv as csv

from airflow.models import BaseOperator
from airflow.providers.microsoft.azure.hooks.azure_data_lake import AzureDataLakeHook
from airflow.providers.oracle.hooks.oracle import OracleHook
from airflow.utils.decorators import apply_defaults


class OracleToAzureDataLakeTransfer(BaseOperator):
"""
Moves data from Oracle to Azure Data Lake. The operator runs the query against
Oracle and stores the file locally before loading it into Azure Data Lake.
:param filename: file name to be used by the csv file.
:type filename: str
:param azure_data_lake_conn_id: destination azure data lake connection.
:type azure_data_lake_conn_id: str
:param azure_data_lake_path: destination path in azure data lake to put the file.
:type azure_data_lake_path: str
:param oracle_conn_id: source Oracle connection.
:type oracle_conn_id: str
:param sql: SQL query to execute against the Oracle database. (templated)
:type sql: str
:param sql_params: Parameters to use in sql query. (templated)
:type sql_params: str
:param delimiter: field delimiter in the file.
:type delimiter: str
:param encoding: encoding type for the file.
:type encoding: str
:param quotechar: Character to use in quoting.
:type quotechar: str
:param quoting: Quoting strategy. See unicodecsv quoting for more information.
:type quoting: str
"""

template_fields = ('filename', 'sql', 'sql_params')
ui_color = '#e08c8c'

@apply_defaults
def __init__(
self,
filename,
azure_data_lake_conn_id,
azure_data_lake_path,
oracle_conn_id,
sql,
sql_params=None,
delimiter=",",
encoding="utf-8",
quotechar='"',
quoting=csv.QUOTE_MINIMAL,
*args, **kwargs):
super().__init__(*args, **kwargs)
if sql_params is None:
sql_params = {}
self.filename = filename
self.oracle_conn_id = oracle_conn_id
self.sql = sql
self.sql_params = sql_params
self.azure_data_lake_conn_id = azure_data_lake_conn_id
self.azure_data_lake_path = azure_data_lake_path
self.delimiter = delimiter
self.encoding = encoding
self.quotechar = quotechar
self.quoting = quoting

def _write_temp_file(self, cursor, path_to_save):
with open(path_to_save, 'wb') as csvfile:
csv_writer = csv.writer(csvfile, delimiter=self.delimiter,
encoding=self.encoding, quotechar=self.quotechar,
quoting=self.quoting)
csv_writer.writerow(map(lambda field: field[0], cursor.description))
csv_writer.writerows(cursor)
csvfile.flush()

def execute(self, context):
oracle_hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
azure_data_lake_hook = AzureDataLakeHook(
azure_data_lake_conn_id=self.azure_data_lake_conn_id)

self.log.info("Dumping Oracle query results to local file")
conn = oracle_hook.get_conn()
cursor = conn.cursor()
cursor.execute(self.sql, self.sql_params)

with TemporaryDirectory(prefix='airflow_oracle_to_azure_op_') as temp:
self._write_temp_file(cursor, os.path.join(temp, self.filename))
self.log.info("Uploading local file to Azure Data Lake")
azure_data_lake_hook.upload_file(os.path.join(temp, self.filename),
os.path.join(self.azure_data_lake_path,
self.filename))
cursor.close()
conn.close()
"""
This module is deprecated.
Please use `airflow.providers.microsoft.azure.operators.oracle_to_azure_data_lake_transfer`.
"""

import warnings

# pylint: disable=unused-import
from airflow.providers.microsoft.azure.operators.oracle_to_azure_data_lake_transfer import ( # noqa
OracleToAzureDataLakeTransfer,
)

warnings.warn(
"This module is deprecated. "
"Please use `airflow.providers.microsoft.azure.operators.oracle_to_azure_data_lake_transfer`.",
DeprecationWarning, stacklevel=2
)
Loading

0 comments on commit 057f3ae

Please sign in to comment.