Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-5803] Update S3Hook import paths [AIP-21] #6465

Merged
merged 5 commits into from
Nov 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/contrib/hooks/sagemaker_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from airflow.contrib.hooks.aws_hook import AwsHook
from airflow.contrib.hooks.aws_logs_hook import AwsLogsHook
from airflow.exceptions import AirflowException
from airflow.hooks.S3_hook import S3Hook
from airflow.providers.aws.hooks.s3 import S3Hook
from airflow.utils import timezone


Expand Down
4 changes: 2 additions & 2 deletions airflow/contrib/hooks/wasb_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def load_file(self, file_path, container_name, blob_name, **kwargs):
`BlockBlobService.create_blob_from_path()` takes.
:type kwargs: object
"""
# Reorder the argument order from airflow.hooks.S3_hook.load_file.
# Reorder the argument order from airflow.providers.aws.hooks.s3.load_file.
self.connection.create_blob_from_path(container_name, blob_name,
file_path, **kwargs)

Expand All @@ -120,7 +120,7 @@ def load_string(self, string_data, container_name, blob_name, **kwargs):
`BlockBlobService.create_blob_from_text()` takes.
:type kwargs: object
"""
# Reorder the argument order from airflow.hooks.S3_hook.load_string.
# Reorder the argument order from airflow.providers.aws.hooks.s3.load_string.
self.connection.create_blob_from_text(container_name, blob_name,
string_data, **kwargs)

Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/dynamodb_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
from boto.compat import json # type: ignore

from airflow.contrib.hooks.aws_dynamodb_hook import AwsDynamoDBHook
from airflow.hooks.S3_hook import S3Hook
from airflow.models.baseoperator import BaseOperator
from airflow.providers.aws.hooks.s3 import S3Hook


def _convert_item_to_json_bytes(item):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
This module allows you to transfer mail attachments from a mail server into s3 bucket.
"""
from airflow.contrib.hooks.imap_hook import ImapHook
from airflow.hooks.S3_hook import S3Hook
from airflow.models import BaseOperator
from airflow.providers.aws.hooks.s3 import S3Hook
from airflow.utils.decorators import apply_defaults


Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/mongo_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
from bson import json_util

from airflow.contrib.hooks.mongo_hook import MongoHook
from airflow.hooks.S3_hook import S3Hook
from airflow.models import BaseOperator
from airflow.providers.aws.hooks.s3 import S3Hook
from airflow.utils.decorators import apply_defaults


Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/s3_copy_object_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
# specific language governing permissions and limitations
# under the License.

from airflow.hooks.S3_hook import S3Hook
from airflow.models import BaseOperator
from airflow.providers.aws.hooks.s3 import S3Hook
from airflow.utils.decorators import apply_defaults


Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/s3_delete_objects_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
# under the License.

from airflow.exceptions import AirflowException
from airflow.hooks.S3_hook import S3Hook
from airflow.models import BaseOperator
from airflow.providers.aws.hooks.s3 import S3Hook
from airflow.utils.decorators import apply_defaults


Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/s3_list_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

from typing import Iterable

from airflow.hooks.S3_hook import S3Hook
from airflow.models import BaseOperator
from airflow.providers.aws.hooks.s3 import S3Hook
from airflow.utils.decorators import apply_defaults


Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/s3_to_gcs_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from airflow.contrib.operators.s3_list_operator import S3ListOperator
from airflow.exceptions import AirflowException
from airflow.gcp.hooks.gcs import GoogleCloudStorageHook, _parse_gcs_url
from airflow.hooks.S3_hook import S3Hook
from airflow.providers.aws.hooks.s3 import S3Hook
from airflow.utils.decorators import apply_defaults


Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/s3_to_sftp_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
from urllib.parse import urlparse

from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.hooks.S3_hook import S3Hook
from airflow.models import BaseOperator
from airflow.providers.aws.hooks.s3 import S3Hook
from airflow.utils.decorators import apply_defaults


Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/sftp_to_s3_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
from urllib.parse import urlparse

from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.hooks.S3_hook import S3Hook
from airflow.models import BaseOperator
from airflow.providers.aws.hooks.s3 import S3Hook
from airflow.utils.decorators import apply_defaults


Expand Down
2 changes: 1 addition & 1 deletion airflow/operators/gcs_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from airflow.gcp.hooks.gcs import GoogleCloudStorageHook
from airflow.gcp.operators.gcs import GoogleCloudStorageListOperator
from airflow.hooks.S3_hook import S3Hook
from airflow.providers.aws.hooks.s3 import S3Hook
from airflow.utils.decorators import apply_defaults


Expand Down
2 changes: 1 addition & 1 deletion airflow/operators/google_api_to_s3_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import sys

from airflow.gcp.hooks.discovery_api import GoogleDiscoveryApiHook
from airflow.hooks.S3_hook import S3Hook
from airflow.models import BaseOperator
from airflow.models.xcom import MAX_XCOM_SIZE
from airflow.providers.aws.hooks.s3 import S3Hook
from airflow.utils.decorators import apply_defaults


Expand Down
2 changes: 1 addition & 1 deletion airflow/operators/redshift_to_s3_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
from typing import List, Optional, Union

from airflow.hooks.postgres_hook import PostgresHook
from airflow.hooks.S3_hook import S3Hook
from airflow.models import BaseOperator
from airflow.providers.aws.hooks.s3 import S3Hook
from airflow.utils.decorators import apply_defaults


Expand Down
8 changes: 3 additions & 5 deletions airflow/operators/s3_file_transform_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
from typing import Optional, Union

from airflow.exceptions import AirflowException
from airflow.hooks.S3_hook import S3Hook
from airflow.models import BaseOperator
from airflow.providers.aws.hooks.s3 import S3Hook
from airflow.utils.decorators import apply_defaults


Expand Down Expand Up @@ -111,10 +111,8 @@ def execute(self, context):
raise AirflowException(
"Either transform_script or select_expression must be specified")

source_s3 = S3Hook(aws_conn_id=self.source_aws_conn_id,
verify=self.source_verify)
dest_s3 = S3Hook(aws_conn_id=self.dest_aws_conn_id,
verify=self.dest_verify)
source_s3 = S3Hook(aws_conn_id=self.source_aws_conn_id, verify=self.source_verify)
dest_s3 = S3Hook(aws_conn_id=self.dest_aws_conn_id, verify=self.dest_verify)

self.log.info("Downloading source S3 file %s", self.source_s3_key)
if not source_s3.check_for_key(self.source_s3_key):
Expand Down
2 changes: 1 addition & 1 deletion airflow/operators/s3_to_hive_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@

from airflow.exceptions import AirflowException
from airflow.hooks.hive_hooks import HiveCliHook
from airflow.hooks.S3_hook import S3Hook
from airflow.models import BaseOperator
from airflow.providers.aws.hooks.s3 import S3Hook
from airflow.utils.compression import uncompress_file
from airflow.utils.decorators import apply_defaults
from airflow.utils.file import TemporaryDirectory
Expand Down
2 changes: 1 addition & 1 deletion airflow/operators/s3_to_redshift_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
from typing import List, Optional, Union

from airflow.hooks.postgres_hook import PostgresHook
from airflow.hooks.S3_hook import S3Hook
from airflow.models import BaseOperator
from airflow.providers.aws.hooks.s3 import S3Hook
from airflow.utils.decorators import apply_defaults


Expand Down
2 changes: 1 addition & 1 deletion airflow/sensors/s3_key_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def __init__(self,
self.verify = verify

def poke(self, context):
from airflow.hooks.S3_hook import S3Hook
from airflow.providers.aws.hooks.s3 import S3Hook
hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
self.log.info('Poking for key : s3://%s/%s', self.bucket_name, self.bucket_key)
if self.wildcard_match:
Expand Down
2 changes: 1 addition & 1 deletion airflow/sensors/s3_prefix_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def __init__(self,

def poke(self, context):
self.log.info('Poking for prefix : %s in bucket s3://%s', self.prefix, self.bucket_name)
from airflow.hooks.S3_hook import S3Hook
from airflow.providers.aws.hooks.s3 import S3Hook
hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
return hook.check_for_prefix(
prefix=self.prefix,
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/log/s3_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def __init__(self, base_log_folder, s3_log_folder, filename_template):
def hook(self):
remote_conn_id = conf.get('core', 'REMOTE_LOG_CONN_ID')
try:
from airflow.hooks.S3_hook import S3Hook
from airflow.providers.aws.hooks.s3 import S3Hook
return S3Hook(remote_conn_id)
except Exception:
self.log.error(
Expand Down
2 changes: 1 addition & 1 deletion tests/contrib/hooks/test_sagemaker_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
LogState, SageMakerHook, secondary_training_status_changed, secondary_training_status_message,
)
from airflow.exceptions import AirflowException
from airflow.hooks.S3_hook import S3Hook
from airflow.providers.aws.hooks.s3 import S3Hook
from tests.compat import mock

role = 'arn:aws:iam:role/test-role'
Expand Down
2 changes: 1 addition & 1 deletion tests/contrib/operators/test_s3_to_sftp_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class TestS3ToSFTPOperator(unittest.TestCase):
@mock_s3
def setUp(self):
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.hooks.S3_hook import S3Hook
from airflow.providers.aws.hooks.s3 import S3Hook

hook = SSHHook(ssh_conn_id='ssh_default')
s3_hook = S3Hook('aws_default')
Expand Down
2 changes: 1 addition & 1 deletion tests/contrib/operators/test_sftp_to_s3_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.sftp_to_s3_operator import SFTPToS3Operator
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.hooks.S3_hook import S3Hook
from airflow.models import DAG, TaskInstance
from airflow.providers.aws.hooks.s3 import S3Hook
from airflow.settings import Session
from airflow.utils import timezone
from airflow.utils.timezone import datetime
Expand Down
4 changes: 2 additions & 2 deletions tests/hooks/test_s3_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@

from botocore.exceptions import NoCredentialsError

from airflow.hooks.S3_hook import provide_bucket_name
from airflow.models import Connection
from airflow.providers.aws.hooks.s3 import provide_bucket_name

try:
from airflow.hooks.S3_hook import S3Hook
from airflow.providers.aws.hooks.s3 import S3Hook
except ImportError:
S3Hook = None # type: ignore

Expand Down
2 changes: 1 addition & 1 deletion tests/operators/test_gcs_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import unittest

from airflow.hooks.S3_hook import S3Hook
from airflow.operators.gcs_to_s3 import GoogleCloudStorageToS3Operator
from airflow.providers.aws.hooks.s3 import S3Hook
from tests.compat import mock

try:
Expand Down
2 changes: 1 addition & 1 deletion tests/operators/test_s3_file_transform_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def test_execute_with_failing_transform_script(self, mock_Popen):

self.assertEqual('Transform script failed: 42', str(e.exception))

@mock.patch('airflow.hooks.S3_hook.S3Hook.select_key', return_value="input")
@mock.patch('airflow.providers.aws.hooks.s3.S3Hook.select_key', return_value="input")
@mock_s3
def test_execute_with_select_expression(self, mock_select_key):
bucket = "bucket"
Expand Down
2 changes: 1 addition & 1 deletion tests/operators/test_s3_to_hive_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ def test_execute_with_select_expression(self, mock_hiveclihook):
input_serialization['CSV']['FileHeaderInfo'] = 'USE'

# Confirm that select_key was called with the right params
with mock.patch('airflow.hooks.S3_hook.S3Hook.select_key',
with mock.patch('airflow.providers.aws.hooks.s3.S3Hook.select_key',
return_value="") as mock_select_key:
# Execute S3ToHiveTransfer
s32hive = S3ToHiveTransfer(**self.kwargs)
Expand Down
4 changes: 2 additions & 2 deletions tests/sensors/test_s3_key_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def test_parse_bucket_key(self, key, bucket, parsed_key, parsed_bucket):
self.assertEqual(s.bucket_key, parsed_key)
self.assertEqual(s.bucket_name, parsed_bucket)

@mock.patch('airflow.hooks.S3_hook.S3Hook')
@mock.patch('airflow.providers.aws.hooks.s3.S3Hook')
def test_poke(self, mock_hook):
s = S3KeySensor(
task_id='s3_key_sensor',
Expand All @@ -78,7 +78,7 @@ def test_poke(self, mock_hook):
mock_hook.return_value.check_for_key.return_value = True
self.assertTrue(s.poke(None))

@mock.patch('airflow.hooks.S3_hook.S3Hook')
@mock.patch('airflow.providers.aws.hooks.s3.S3Hook')
def test_poke_wildcard(self, mock_hook):
s = S3KeySensor(
task_id='s3_key_sensor',
Expand Down
2 changes: 1 addition & 1 deletion tests/sensors/test_s3_prefix_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

class TestS3PrefixSensor(unittest.TestCase):

@mock.patch('airflow.hooks.S3_hook.S3Hook')
@mock.patch('airflow.providers.aws.hooks.s3.S3Hook')
def test_poke(self, mock_hook):
s = S3PrefixSensor(
task_id='s3_prefix',
Expand Down
6 changes: 3 additions & 3 deletions tests/utils/log/test_s3_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import unittest
from unittest import mock

from airflow.hooks.S3_hook import S3Hook
from airflow.models import DAG, TaskInstance
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.aws.hooks.s3 import S3Hook
from airflow.utils.log.s3_task_handler import S3TaskHandler
from airflow.utils.state import State
from airflow.utils.timezone import datetime
Expand Down Expand Up @@ -81,7 +81,7 @@ def test_hook(self):
def test_hook_raises(self):
handler = self.s3_task_handler
with mock.patch.object(handler.log, 'error') as mock_error:
with mock.patch("airflow.hooks.S3_hook.S3Hook") as mock_hook:
with mock.patch("airflow.providers.aws.hooks.s3.S3Hook") as mock_hook:
mock_hook.side_effect = Exception('Failed to connect')
# Initialize the hook
handler.hook
Expand All @@ -103,7 +103,7 @@ def test_log_exists_raises(self):
self.assertFalse(self.s3_task_handler.s3_log_exists('s3://nonexistentbucket/foo'))

def test_log_exists_no_hook(self):
with mock.patch("airflow.hooks.S3_hook.S3Hook") as mock_hook:
with mock.patch("airflow.providers.aws.hooks.s3.S3Hook") as mock_hook:
mock_hook.side_effect = Exception('Failed to connect')
self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location))

Expand Down