Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-6370] Skip Cassandra tests if cluster is not up #6926

Merged
merged 1 commit into from
Dec 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions tests/providers/apache/cassandra/hooks/test_cassandra_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,25 @@
import unittest

import mock
from cassandra.cluster import Cluster
from cassandra.cluster import Cluster, UnresolvableContactPoints
from cassandra.policies import (
DCAwareRoundRobinPolicy, RoundRobinPolicy, TokenAwarePolicy, WhiteListRoundRobinPolicy,
)
from flaky import flaky

from airflow.models import Connection
from airflow.providers.apache.cassandra.hooks.cassandra import CassandraHook
from airflow.utils import db


@flaky(max_runs=4, min_passes=1)
def cassandra_is_not_up():
try:
Cluster(["cassandra"])
return False
except UnresolvableContactPoints:
return True


@unittest.skipIf(cassandra_is_not_up(), "Cassandra is not up.")
class TestCassandraHook(unittest.TestCase):
def setUp(self):
db.merge_conn(
Expand Down Expand Up @@ -73,13 +80,14 @@ def test_get_conn(self):
self.assertEqual(cluster.port, 9042)
self.assertTrue(isinstance(cluster.load_balancing_policy, TokenAwarePolicy))

def test_get_lb_policy(self):
def test_get_lb_policy_with_no_args(self):
# test LB policies with no args
self._assert_get_lb_policy('RoundRobinPolicy', {}, RoundRobinPolicy)
self._assert_get_lb_policy('DCAwareRoundRobinPolicy', {}, DCAwareRoundRobinPolicy)
self._assert_get_lb_policy('TokenAwarePolicy', {}, TokenAwarePolicy,
expected_child_policy_type=RoundRobinPolicy)

def test_get_lb_policy_with_args(self):
# test DCAwareRoundRobinPolicy with args
self._assert_get_lb_policy('DCAwareRoundRobinPolicy',
{'local_dc': 'foo', 'used_hosts_per_remote_dc': '3'},
Expand All @@ -101,6 +109,7 @@ def test_get_lb_policy(self):
'child_load_balancing_policy_args': {'hosts': ['host-1', 'host-2']}
}, TokenAwarePolicy, expected_child_policy_type=WhiteListRoundRobinPolicy)

def test_get_lb_policy_invalid_policy(self):
# test invalid policy name should default to RoundRobinPolicy
self._assert_get_lb_policy('DoesNotExistPolicy', {}, RoundRobinPolicy)

Expand All @@ -112,6 +121,7 @@ def test_get_lb_policy(self):
TokenAwarePolicy,
expected_child_policy_type=RoundRobinPolicy)

def test_get_lb_policy_no_host_for_white_list(self):
# test host not specified for WhiteListRoundRobinPolicy should throw exception
self._assert_get_lb_policy('WhiteListRoundRobinPolicy',
{},
Expand Down
47 changes: 10 additions & 37 deletions tests/providers/apache/cassandra/sensors/test_cassandra_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,63 +17,36 @@
# specific language governing permissions and limitations
# under the License.


import unittest
from unittest.mock import patch

from flaky import flaky

from airflow import DAG
from airflow.providers.apache.cassandra.sensors.record import CassandraRecordSensor
from airflow.providers.apache.cassandra.sensors.table import CassandraTableSensor
from airflow.utils import timezone

DEFAULT_DATE = timezone.datetime(2017, 1, 1)


@flaky(max_runs=4, min_passes=1)
class TestCassandraRecordSensor(unittest.TestCase):

def setUp(self):
args = {
'owner': 'airflow',
'start_date': DEFAULT_DATE
}
self.dag = DAG('test_dag_id', default_args=args)
self.sensor = CassandraRecordSensor(
@patch("airflow.providers.apache.cassandra.sensors.record.CassandraHook")
def test_poke(self, mock_hook):
sensor = CassandraRecordSensor(
task_id='test_task',
cassandra_conn_id='cassandra_default',
dag=self.dag,
table='t',
keys={'foo': 'bar'}
)
sensor.poke(None)
mock_hook.return_value.record_exists.assert_called_once_with('t', {'foo': 'bar'})

@patch("airflow.contrib.hooks.cassandra_hook.CassandraHook.record_exists")
def test_poke(self, mock_record_exists):
self.sensor.poke(None)
mock_record_exists.assert_called_once_with('t', {'foo': 'bar'})


@flaky(max_runs=4, min_passes=1)
class TestCassandraTableSensor(unittest.TestCase):

def setUp(self):
args = {
'owner': 'airflow',
'start_date': DEFAULT_DATE
}
self.dag = DAG('test_dag_id', default_args=args)
self.sensor = CassandraTableSensor(
@patch("airflow.providers.apache.cassandra.sensors.table.CassandraHook")
def test_poke(self, mock_hook):
sensor = CassandraTableSensor(
task_id='test_task',
cassandra_conn_id='cassandra_default',
dag=self.dag,
table='t',
)

@patch("airflow.contrib.hooks.cassandra_hook.CassandraHook.table_exists")
def test_poke(self, mock_table_exists):
self.sensor.poke(None)
mock_table_exists.assert_called_once_with('t')
sensor.poke(None)
mock_hook.return_value.table_exists.assert_called_once_with('t')


if __name__ == '__main__':
Expand Down