Skip to content

Commit

Permalink
Merge pull request #3 from airbnb/master
Browse files Browse the repository at this point in the history
Refresh from origin
  • Loading branch information
mtustin-handy committed Oct 1, 2015
2 parents 2b9c775 + c40f6e6 commit 14fdfa6
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 30 deletions.
3 changes: 3 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@ TODO
* Pause flag at the task level
* Task callbacks as tasks?
* Increase unit test coverage

#### OTher
deprecate TimeSensor
2 changes: 1 addition & 1 deletion airflow/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
'jdbc_operator': ['JdbcOperator'],
'mssql_operator': ['MsSqlOperator'],
'mssql_to_hive': ['MsSqlToHiveTransfer'],
'slack_operator': ['SlackAPIPostOperator', 'SlackAPIOperator'],
'slack_operator': ['SlackAPIOperator', 'SlackAPIPostOperator'],
'generic_transfer': ['GenericTransfer'],
}

Expand Down
40 changes: 31 additions & 9 deletions airflow/operators/slack_operator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from slackclient import SlackClient
from airflow.models import BaseOperator
from airflow.utils import apply_defaults


class SlackAPIOperator(BaseOperator):
Expand All @@ -8,13 +9,15 @@ class SlackAPIOperator(BaseOperator):
The SlackAPIPostOperator is derived from this operator.
In the future additional Slack API Operators will be derived from this class as well
:param token: Slack api token
:type token: String
:param token: Slack API token (https://api.slack.com/web)
:type token: string
:param method: The Slack API Method to Call (https://api.slack.com/methods)
:type method: String
:param param: API Method call parameters (https://api.slack.com/methods)
:type param: dict
:type method: string
:param params: API Method call parameters (https://api.slack.com/methods)
:type params: dict
"""

@apply_defaults
def __init__(self,
token='unset',
method='unset',
Expand All @@ -25,7 +28,24 @@ def __init__(self,
self.method = method
self.params = params

def construct_api_call_params(self):
"""
Used by the execute function. Allows templating on the source fields of the api_call_params dict before construction
Override in child classes.
Each SlackAPIOperator child class is responsible for having a construct_api_call_params function
which sets self.api_call_params with a dict of API call parameters (https://api.slack.com/methods)
"""

pass

def execute(self, **kwargs):
"""
SlackAPIOperator calls will not fail even if the call is not unsuccessful.
It should not prevent a DAG from completing in success
"""
if not self.params:
self.construct_api_call_params()
sc = SlackClient(self.token)
sc.api_call(self.method, **self.params)

Expand All @@ -43,9 +63,11 @@ class SlackAPIPostOperator(SlackAPIOperator):
:param icon_url: url to icon used for this message
:type icon_url: string
"""

template_fields = ('username', 'text')
ui_color = '#FFBA40'

@apply_defaults
def __init__(self,
channel='#general',
username='Airflow',
Expand All @@ -59,13 +81,13 @@ def __init__(self,
self.username = username
self.text = text
self.icon_url = icon_url
super(SlackAPIPostOperator, self).__init__(method=self.method,
*args, **kwargs)

def construct_api_call_params(self):
self.params = {
'channel': self.channel,
'username': self.username,
'text': self.text,
'icon_url': self.icon_url,
}
super(SlackAPIPostOperator, self).__init__(method=self.method,
params=self.params,
*args, **kwargs)

4 changes: 4 additions & 0 deletions airflow/plugins_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import inspect
import logging
import os
import sys
from itertools import chain
merge = chain.from_iterable

Expand Down Expand Up @@ -32,6 +33,9 @@ def validate(cls):
plugins_folder = conf.get('core', 'airflow_home') + '/plugins'
plugins_folder = os.path.expanduser(plugins_folder)

if plugins_folder not in sys.path:
sys.path.append(plugins_folder)

plugins = []

# Crawl through the plugins folder to find AirflowPlugin derivatives
Expand Down
23 changes: 16 additions & 7 deletions airflow/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import socket
import sys
import time
import traceback

from flask._compat import PY2
from flask import (
Expand Down Expand Up @@ -715,7 +716,13 @@ def code(self):

@app.errorhandler(404)
def circles(self):
return render_template('airflow/circles.html'), 404
return render_template(
'airflow/circles.html', hostname=socket.gethostname()), 404

@app.errorhandler(500)
def show_traceback(self):
return render_template(
'airflow/traceback.html', info=traceback.format_exc()), 500

@expose('/sandbox')
@login_required
Expand Down Expand Up @@ -1062,18 +1069,20 @@ def tree(self):
base_date = datetime.now()
else:
base_date = dateutil.parser.parse(base_date)
base_date = utils.round_time(base_date, dag.schedule_interval)
form = TreeForm(data={'base_date': base_date, 'num_runs': num_runs})

start_date = dag.start_date
if not start_date and 'start_date' in dag.default_args:
start_date = dag.default_args['start_date']

if start_date:
base_date = utils.round_time(base_date, dag.schedule_interval, start_date)
else:
base_date = utils.round_time(base_date, dag.schedule_interval)
# if a specific base_date is requested, don't round it
if not request.args.get('base_date'):
if start_date:
base_date = utils.round_time(
base_date, dag.schedule_interval, start_date)
else:
base_date = utils.round_time(base_date, dag.schedule_interval)

form = TreeForm(data={'base_date': base_date, 'num_runs': num_runs})

from_date = (base_date - (num_runs * dag.schedule_interval))

Expand Down
1 change: 1 addition & 0 deletions airflow/www/templates/airflow/circles.html
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<div style="font-family: verdana;">
<h1>Airflow 404 = lots of circles</h1>
<div style="color: white">{{ hostname }}</div>
<div
id="div_svg"
class="content"
Expand Down
5 changes: 5 additions & 0 deletions airflow/www/templates/airflow/traceback.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<h1> Internal Server Error </h1>

<div>
<pre>{{ info }}</pre>
</div>
30 changes: 17 additions & 13 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,34 @@
# Kept manually in sync with airflow.__version__
version = '1.5.1'

celery = [
'celery>=3.1.17',
'flower>=0.7.3'
]
crypto = ['cryptography>=0.9.3']
doc = [
'sphinx>=1.2.3',
'sphinx-argparse>=0.1.13',
'sphinx-rtd-theme>=0.1.6',
'Sphinx-PyPI-upload>=0.2.1'
]
druid = ['pydruid>=0.2.1']
hdfs = ['snakebite>=2.4.13']
hive = [
'hive-thrift-py>=0.0.1',
'pyhive>=0.1.3',
'pyhs2>=0.6.0',
]
jdbc = ['jaydebeapi>=0.2.0']
mssql = ['pymssql>=2.1.1', 'unicodecsv>=0.13.0']
mysql = ['mysql-python>=1.2.5']
postgres = ['psycopg2>=2.6']
optional = ['librabbitmq>=1.6.1']
samba = ['pysmbclient>=0.1.3']
druid = ['pydruid>=0.2.1']
oracle = ['cx_Oracle>=5.1.2']
postgres = ['psycopg2>=2.6']
s3 = ['boto>=2.36.0']
jdbc = ['jaydebeapi>=0.2.0']
mssql = ['pymssql>=2.1.1', 'unicodecsv>=0.13.0']
hdfs = ['snakebite>=2.4.13']
samba = ['pysmbclient>=0.1.3']
slack = ['slackclient>=0.15']
crypto = ['cryptography>=0.9.3']
oracle = ['cx_Oracle>=5.1.2']
statsd = ['statsd>=3.0.1, <4.0']
vertica = ['vertica-python>=0.5.1']

all_dbs = postgres + mysql + hive + mssql + hdfs + vertica
Expand All @@ -43,14 +48,12 @@
scripts=['airflow/bin/airflow'],
install_requires=[
'alembic>=0.8.0, <0.9',
'celery>=3.1.17, <4.0',
'chartkick>=0.4.2, < 0.5',
'dill>=0.2.2, <0.3',
'flask>=0.10.1, <0.11',
'flask-admin==1.2.0',
'flask-cache>=0.13.1, <0.14',
'flask-login>=0.2.11, <0.3',
'flower>=0.7.3, <0.8',
'future>=0.15.0, <0.16',
'gunicorn>=19.3.0, <20.0',
'jinja2>=2.7.3, <3.0',
Expand All @@ -61,12 +64,13 @@
'requests>=2.5.1, <3',
'setproctitle>=1.1.8, <2',
'sqlalchemy>=0.9.8, <0.10',
'statsd>=3.0.1, <4.0',
'thrift>=0.9.2, <0.10',
],
extras_require={
'all': devel + optional,
'all_dbs': all_dbs,
'celery': celery,
'crypto': crypto,
'devel': devel,
'doc': doc,
'druid': druid,
Expand All @@ -75,12 +79,12 @@
'jdbc': jdbc,
'mssql': mssql,
'mysql': mysql,
'oracle': oracle,
'postgres': postgres,
's3': s3,
'samba': samba,
'slack': slack,
'crypto': crypto,
'oracle': oracle,
'statsd': statsd,
'vertica': vertica,
},
author='Maxime Beauchemin',
Expand Down

0 comments on commit 14fdfa6

Please sign in to comment.