Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP Operator and sensor #103

Merged
merged 4 commits into from
Jul 9, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions airflow/example_dags/example_http_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""
### Example HTTP operator and sensor
"""
from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor
from datetime import datetime, timedelta
import json

seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
datetime.min.time())

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': seven_days_ago,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_interval': timedelta(minutes=5),
}

dag = DAG('example_http_operator', default_args=default_args)

dag.doc_md = __doc__

# t1, t2 and t3 are examples of tasks created by instatiating operators
t1 = SimpleHttpOperator(
task_id='post_op',
endpoint='api/v1.0/nodes',
data=json.dumps({"priority": 5}),
headers={"Content-Type": "application/json"},
response_check=lambda response: True if len(response.json()) == 0 else False,
dag=dag)

t5 = SimpleHttpOperator(
task_id='post_op_formenc',
endpoint='nodes/url',
data="name=Joe",
headers={"Content-Type": "application/x-www-form-urlencoded"},
dag=dag)

t2 = SimpleHttpOperator(
task_id='get_op',
method='GET',
endpoint='api/v1.0/nodes',
data={"param1": "value1", "param2": "value2"},
headers={},
dag=dag)

t3 = SimpleHttpOperator(
task_id='put_op',
method='PUT',
endpoint='api/v1.0/nodes',
data=json.dumps({"priority": 5}),
headers={"Content-Type": "application/json"},
dag=dag)

t4 = SimpleHttpOperator(
task_id='del_op',
method='DELETE',
endpoint='api/v1.0/nodes',
data="some=data",
headers={"Content-Type": "application/x-www-form-urlencoded"},
dag=dag)

sensor = HttpSensor(
task_id='http_sensor_check',
conn_id='http_default',
endpoint='api/v1.0/apps',
params={},
headers={"Content-Type": "application/json"},
response_check=lambda response: True if "collation" in response.content else False,
poke_interval=5,
dag=dag)

t1.set_upstream(sensor)
t2.set_upstream(t1)
t3.set_upstream(t2)
t4.set_upstream(t3)
t5.set_upstream(t4)
1 change: 1 addition & 0 deletions airflow/hooks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
'samba_hook': ['SambaHook'],
'sqlite_hook': ['SqliteHook'],
'S3_hook': ['S3Hook'],
'http_hook': ['HttpHook'],
}

_import_module_attrs(globals(), _hooks)
Expand Down
95 changes: 95 additions & 0 deletions airflow/hooks/http_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import logging

import requests

from airflow.hooks.base_hook import BaseHook
from airflow.utils import AirflowException


class HttpHook(BaseHook):
"""
Interact with HTTP servers.
"""

def __init__(self, method='POST', http_conn_id='http_default'):
self.http_conn_id = http_conn_id
self.method = method

# headers is required to make it required
def get_conn(self, headers):
"""
Returns http session for use with requests
"""
conn = self.get_connection(self.http_conn_id)
session = requests.Session()
self.base_url = conn.host

if conn.port:
self.base_url = self.base_url + ":" + str(conn.port) + "/"
if conn.login:
session.auth = (conn.login, conn.password)
if headers:
session.headers.update(headers)

return session

def run(self, endpoint, data=None, headers=None, extra_options=None):
"""
Performs the request
"""
session = self.get_conn(headers)

url = self.base_url + endpoint
req = None
if self.method == 'GET':
# GET uses params
req = requests.Request(self.method,
url,
params=data,
headers=headers)
else:
# Others use data
req = requests.Request(self.method,
url,
data=data,
headers=headers)

prepped_request = session.prepare_request(req)
logging.info("Sending '" + self.method + "' to url: " + url)
return self.run_and_check(session, prepped_request, extra_options)

def run_and_check(self, session, prepped_request, extra_options):
"""
Grabs extra options like timeout and actually runs the request,
checking for the result
"""
stream = extra_options.get("stream", False)
verify = extra_options.get("verify", False)
proxies = extra_options.get("proxies", {})
cert = extra_options.get("cert", None)
timeout = extra_options.get("timeout", None)
allow_redirects = extra_options.get("allow_redirects", True)

response = session.send(prepped_request,
stream=stream,
verify=verify,
proxies=proxies,
cert=cert,
timeout=timeout,
allow_redirects=allow_redirects)

try:
response.raise_for_status()
except requests.exceptions.HTTPError:
# Tried rewrapping, but not supported. This way, it's possible
# to get reason and code for failure by checking first 3 chars
# for the code, or do a split on ':'
logging.error("HTTP error: " + response.reason)
if self.method != 'GET':
# The sensor uses GET, so this prevents filling up the log
# with the body every time the GET 'misses'.
# That's ok to do, because GETs should be repeatable and
# all data should be visible in the log (no post data)
logging.error(response.text)
raise AirflowException(str(response.status_code)+":"+response.reason)
return response
3 changes: 3 additions & 0 deletions airflow/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,19 @@
'S3PrefixSensor',
'HdfsSensor',
'TimeSensor',
'HttpSensor'
],
'subdag_operator': ['SubDagOperator'],
'hive_stats_operator': ['HiveStatsCollectionOperator'],
's3_to_hive_operator': ['S3ToHiveTransfer'],
'hive_to_mysql': ['HiveToMySqlTransfer'],
's3_file_transform_operator': ['S3FileTransformOperator'],
'http_operator': ['SimpleHttpOperator']
}

_import_module_attrs(globals(), _operators)


def integrate_plugins():
"""Integrate plugins to the context"""
from airflow.plugins_manager import operators as _operators
Expand Down
64 changes: 64 additions & 0 deletions airflow/operators/http_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import logging

from airflow.hooks import HttpHook
from airflow.models import BaseOperator
from airflow.utils import apply_defaults, AirflowException


class SimpleHttpOperator(BaseOperator):
"""
Calls an endpoint on an HTTP system to execute an action

:param http_conn_id: The connection to run the sensor against
:type http_conn_id: string
:param endpoint: The relative part of the full url
:type endpoint: string
:param method: The HTTP method to use, default = "POST"
:type method: string
:param data: The data to pass. POST-data in POST/PUT and params
in the URL for a GET request.
:type data: For POST/PUT, depends on the content-type parameter,
for GET a dictionary of key/value string pairs
:param headers: The HTTP headers to be added to the GET request
:type headers: a dictionary of string key/value pairs
:param response_check: A check against the 'requests' response object.
Returns True for 'pass' and False otherwise.
:type response_check: A lambda or defined function.
:param extra_options: Extra options for the 'requests' library, see the
'requests' documentation (options to modify timeout, ssl, etc.)
:type extra_options: A dictionary of options, where key is string and value
depends on the option that's being modified.
"""

template_fields = ('endpoint',)
template_ext = ()
ui_color = '#f4a460'

@apply_defaults
def __init__(self,
endpoint,
method='POST',
data=None,
headers=None,
response_check=None,
extra_options=None,
http_conn_id='http_default', *args, **kwargs):
super(SimpleHttpOperator, self).__init__(*args, **kwargs)
self.http_conn_id = http_conn_id
self.method = method
self.endpoint = endpoint
self.headers = headers or {}
self.data = data or {}
self.response_check = response_check
self.extra_options = extra_options or {}

def execute(self, context):
http = HttpHook(self.method, http_conn_id=self.http_conn_id)
logging.info("Calling HTTP method")
response = http.run(self.endpoint,
self.data,
self.headers,
self.extra_options)
if self.response_check:
if not self.response_check(response):
raise AirflowException("Response check returned False.")
70 changes: 70 additions & 0 deletions airflow/operators/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,73 @@ def poke(self, context):
logging.info(
'Checking if the time ({0}) has come'.format(self.target_time))
return datetime.now().time() > self.target_time


class HttpSensor(BaseSensorOperator):
"""
Executes a HTTP get statement and returns False on failure:
404 not found or response_check function returned False

:param http_conn_id: The connection to run the sensor against
:type http_conn_id: string
:param endpoint: The relative part of the full url
:type endpoint: string
:param params: The parameters to be added to the GET url
:type params: a dictionary of string key/value pairs
:param headers: The HTTP headers to be added to the GET request
:type headers: a dictionary of string key/value pairs
:param response_check: A check against the 'requests' response object.
Returns True for 'pass' and False otherwise.
:type response_check: A lambda or defined function.
:param extra_options: Extra options for the 'requests' library, see the
'requests' documentation (options to modify timeout, ssl, etc.)
:type extra_options: A dictionary of options, where key is string and value
depends on the option that's being modified.
"""

template_fields = ('endpoint',)

@apply_defaults
def __init__(self,
endpoint,
http_conn_id='http_default',
params=None,
headers=None,
response_check=None,
extra_options=None, *args, **kwargs):
super(HttpSensor, self).__init__(*args, **kwargs)

self.endpoint = endpoint
self.http_conn_id = http_conn_id
self.params = params or {}
self.headers = headers or {}
self.extra_options = extra_options or {}
self.response_check = response_check

session = settings.Session()
site = session.query(DB).filter(DB.conn_id == http_conn_id).first()
if not site:
raise AirflowException("http_conn_id not found in the repository")

self.hook = hooks.HttpHook(method='GET',
http_conn_id=self.http_conn_id)
session.commit()
session.close()

def poke(self, context):
logging.info('Poking: ' + self.endpoint)

try:
response = self.hook.run(self.endpoint,
data=self.params,
headers=self.headers,
extra_options=self.extra_options)
if self.response_check:
# run content check on response
return self.response_check(response)
except AirflowException as ae:
if ae.message.startswith("404"):
return False
raise

return True
9 changes: 9 additions & 0 deletions airflow/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@ def initdb():
host='{}/sqlite_default.db'.format(home)))
session.commit()

conn = session.query(C).filter(C.conn_id == 'http_default').first()
if not conn:
home = conf.get('core', 'AIRFLOW_HOME')
session.add(
models.Connection(
conn_id='http_default', conn_type='http',
host='http://www.google.com'))
session.commit()

# Known event types
KET = models.KnownEventType
if not session.query(KET).filter(KET.know_event_type == 'Holiday').first():
Expand Down
1 change: 1 addition & 0 deletions airflow/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -1524,6 +1524,7 @@ class ConnectionModelView(wwwutils.SuperUserMixin, ModelView):
'conn_type': [
('ftp', 'FTP',),
('hdfs', 'HDFS',),
('http', 'HTTP',),
('hive_cli', 'Hive Client Wrapper',),
('hive_metastore', 'Hive Metastore Thrift',),
('hiveserver2', 'Hive Server 2 Thrift',),
Expand Down
4 changes: 3 additions & 1 deletion docs/code.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ There are 3 main types of operators:
HiveOperator,
HivePartitionSensor,
HiveToMySqlTransfer,
SimpleHttpOperator,
HttpSensor,
MySqlOperator,
MySqlToHiveTransfer,
PostgresOperator,
Expand Down Expand Up @@ -111,7 +113,7 @@ Hooks
-----
.. automodule:: airflow.hooks
:show-inheritance:
:members: MySqlHook, PostgresHook, PrestoHook, HiveCliHook, HiveServer2Hook, HiveMetastoreHook, S3Hook, SqliteHook
:members: MySqlHook, PostgresHook, PrestoHook, HiveCliHook, HiveServer2Hook, HiveMetastoreHook, HttpHook, S3Hook, SqliteHook

Executors
---------
Expand Down
Loading