Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-5127] Add gzip support for CassandraToGoogleCloudStorageOperator #5738

Merged
merged 1 commit into from
Aug 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions airflow/contrib/operators/cassandra_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
This module contains operator for copying
data from Cassandra to Google cloud storage in JSON format.
"""
import json
from base64 import b64encode
from cassandra.util import Date, Time, SortedSet, OrderedMapSerializedKey
from datetime import datetime
from decimal import Decimal
from tempfile import NamedTemporaryFile
from uuid import UUID

from cassandra.util import Date, Time, SortedSet, OrderedMapSerializedKey

from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.contrib.hooks.cassandra_hook import CassandraHook
from airflow.exceptions import AirflowException
Expand Down Expand Up @@ -59,6 +63,8 @@ class CassandraToGoogleCloudStorageOperator(BaseOperator):
:type approx_max_file_size_bytes: long
:param cassandra_conn_id: Reference to a specific Cassandra hook.
:type cassandra_conn_id: str
:param gzip: Option to compress file for upload
:type gzip: bool
:param google_cloud_storage_conn_id: Reference to a specific Google
cloud storage hook.
:type google_cloud_storage_conn_id: str
Expand All @@ -78,6 +84,7 @@ def __init__(self,
filename,
schema_filename=None,
approx_max_file_size_bytes=1900000000,
gzip=False,
cassandra_conn_id='cassandra_default',
google_cloud_storage_conn_id='google_cloud_default',
delegate_to=None,
Expand All @@ -92,6 +99,7 @@ def __init__(self,
self.cassandra_conn_id = cassandra_conn_id
self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
self.delegate_to = delegate_to
self.gzip = gzip

self.hook = None

Expand Down Expand Up @@ -201,7 +209,7 @@ def _upload_to_gcs(self, files_to_upload):
google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to)
for object, tmp_file_handle in files_to_upload.items():
hook.upload(self.bucket, object, tmp_file_handle.name, 'application/json')
hook.upload(self.bucket, object, tmp_file_handle.name, 'application/json', self.gzip)

@classmethod
def generate_data_dict(cls, names, values):
Expand Down
94 changes: 54 additions & 40 deletions tests/contrib/operators/test_cassandra_to_gcs_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,73 +19,87 @@

import unittest
from unittest import mock
from airflow.contrib.operators.cassandra_to_gcs import \
CassandraToGoogleCloudStorageOperator
from airflow.contrib.operators.cassandra_to_gcs import (
CassandraToGoogleCloudStorageOperator,
)

TMP_FILE_NAME = "temp-file"


class CassandraToGCSTest(unittest.TestCase):
@mock.patch("airflow.contrib.operators.cassandra_to_gcs.NamedTemporaryFile")
@mock.patch(
"airflow.contrib.operators.cassandra_to_gcs.GoogleCloudStorageHook.upload"
)
@mock.patch("airflow.contrib.operators.cassandra_to_gcs.CassandraHook")
def test_execute(self, mock_hook, mock_upload, mock_tempfile):
test_bucket = "test-bucket"
schema = "schema.json"
filename = "data.json"
gzip = True
mock_tempfile.return_value.name = TMP_FILE_NAME

@mock.patch('airflow.contrib.operators.gcs_to_s3.GoogleCloudStorageHook.upload')
@mock.patch('airflow.contrib.hooks.cassandra_hook.CassandraHook.get_conn')
def test_execute(self, upload, get_conn):
operator = CassandraToGoogleCloudStorageOperator(
task_id='test-cas-to-gcs',
cql='select * from keyspace1.table1',
bucket='test-bucket',
filename='data.json',
schema_filename='schema.json')

task_id="test-cas-to-gcs",
cql="select * from keyspace1.table1",
bucket=test_bucket,
filename=filename,
schema_filename=schema,
gzip=gzip,
)
operator.execute(None)

self.assertTrue(get_conn.called_once())
self.assertTrue(upload.called_once())
mock_hook.return_value.get_conn.assert_called_once_with()
mock_upload.assert_called_with(
test_bucket, schema, TMP_FILE_NAME, "application/json", gzip
)

def test_convert_value(self):
op = CassandraToGoogleCloudStorageOperator
self.assertEqual(op.convert_value('None', None), None)
self.assertEqual(op.convert_value('int', 1), 1)
self.assertEqual(op.convert_value('float', 1.0), 1.0)
self.assertEqual(op.convert_value('str', "text"), "text")
self.assertEqual(op.convert_value('bool', True), True)
self.assertEqual(op.convert_value('dict', {"a": "b"}), {"a": "b"})
self.assertEqual(op.convert_value("None", None), None)
self.assertEqual(op.convert_value("int", 1), 1)
self.assertEqual(op.convert_value("float", 1.0), 1.0)
self.assertEqual(op.convert_value("str", "text"), "text")
self.assertEqual(op.convert_value("bool", True), True)
self.assertEqual(op.convert_value("dict", {"a": "b"}), {"a": "b"})

from datetime import datetime

now = datetime.now()
self.assertEqual(op.convert_value('datetime', now), str(now))
self.assertEqual(op.convert_value("datetime", now), str(now))

from cassandra.util import Date
date_str = '2018-01-01'

date_str = "2018-01-01"
date = Date(date_str)
self.assertEqual(op.convert_value('date', date), str(date_str))
self.assertEqual(op.convert_value("date", date), str(date_str))

import uuid
from base64 import b64encode

test_uuid = uuid.uuid4()
encoded_uuid = b64encode(test_uuid.bytes).decode('ascii')
self.assertEqual(op.convert_value('uuid', test_uuid), encoded_uuid)
encoded_uuid = b64encode(test_uuid.bytes).decode("ascii")
self.assertEqual(op.convert_value("uuid", test_uuid), encoded_uuid)

b = b'abc'
encoded_b = b64encode(b).decode('ascii')
self.assertEqual(op.convert_value('binary', b), encoded_b)
b = b"abc"
encoded_b = b64encode(b).decode("ascii")
self.assertEqual(op.convert_value("binary", b), encoded_b)

from decimal import Decimal

d = Decimal(1.0)
self.assertEqual(op.convert_value('decimal', d), float(d))
self.assertEqual(op.convert_value("decimal", d), float(d))

from cassandra.util import Time

time = Time(0)
self.assertEqual(op.convert_value('time', time), '00:00:00')
self.assertEqual(op.convert_value("time", time), "00:00:00")

date_str_lst = ['2018-01-01', '2018-01-02', '2018-01-03']
date_str_lst = ["2018-01-01", "2018-01-02", "2018-01-03"]
date_lst = [Date(d) for d in date_str_lst]
self.assertEqual(op.convert_value('list', date_lst), date_str_lst)
self.assertEqual(op.convert_value("list", date_lst), date_str_lst)

date_tpl = tuple(date_lst)
self.assertEqual(op.convert_value('tuple', date_tpl),
{'field_0': '2018-01-01',
'field_1': '2018-01-02',
'field_2': '2018-01-03', })


if __name__ == '__main__':
unittest.main()
self.assertEqual(
op.convert_value("tuple", date_tpl),
{"field_0": "2018-01-01", "field_1": "2018-01-02", "field_2": "2018-01-03"},
)