Skip to content

Commit

Permalink
parent 6ef0e37
Browse files Browse the repository at this point in the history
author Ash Berlin-Taylor <[email protected]> 1564493832 +0100
committer wayne.morris <[email protected]> 1564516048 -0400

parent 6ef0e37
author Ash Berlin-Taylor <[email protected]> 1564493832 +0100
committer wayne.morris <[email protected]> 1564515968 -0400

parent 6ef0e37
author Ash Berlin-Taylor <[email protected]> 1564493832 +0100
committer wayne.morris <[email protected]> 1564515909 -0400

parent 6ef0e37
author Ash Berlin-Taylor <[email protected]> 1564493832 +0100
committer wayne.morris <[email protected]> 1564515887 -0400

parent 6ef0e37
author Ash Berlin-Taylor <[email protected]> 1564493832 +0100
committer wayne.morris <[email protected]> 1564507924 -0400

parent 6ef0e37
author Ash Berlin-Taylor <[email protected]> 1564493832 +0100
committer wayne.morris <[email protected]> 1564507818 -0400

parent 6ef0e37
author Ash Berlin-Taylor <[email protected]> 1564493832 +0100
committer wayne.morris <[email protected]> 1564507092 -0400

parent 6ef0e37
author Ash Berlin-Taylor <[email protected]> 1564493832 +0100
committer wayne.morris <[email protected]> 1564507071 -0400

parent 6ef0e37
author Ash Berlin-Taylor <[email protected]> 1564493832 +0100
committer wayne.morris <[email protected]> 1564507049 -0400

parent 6ef0e37
author Ash Berlin-Taylor <[email protected]> 1564493832 +0100
committer wayne.morris <[email protected]> 1564506218 -0400

parent 6ef0e37
author Ash Berlin-Taylor <[email protected]> 1564493832 +0100
committer wayne.morris <[email protected]> 1564506121 -0400

parent 6ef0e37
author Ash Berlin-Taylor <[email protected]> 1564493832 +0100
committer wayne.morris <[email protected]> 1564505391 -0400

parent 6ef0e37
author Ash Berlin-Taylor <[email protected]> 1564493832 +0100
committer wayne.morris <[email protected]> 1564504191 -0400

parent 6ef0e37
author Ash Berlin-Taylor <[email protected]> 1564493832 +0100
committer wayne.morris <[email protected]> 1564504099 -0400

[AIRFLOW-5052] Added the include_deleted param to salesforce_hook

[AIRFLOW-1840] Support back-compat on old celery config

The new names are in-line with Celery 4, but if
anyone upgrades Airflow
without following the UPDATING.md instructions
(which we probably assume
most people won't, not until something stops
working) their workers
would suddenly just start failing. That's bad.

This will issue a warning but carry on working as
expected. We can
remove the deprecation settings (but leave the
code in config) after
this release has been made.

Closes apache#3549 from ashb/AIRFLOW-1840-back-compat

(cherry picked from commit a4592f9)
Signed-off-by: Bolke de Bruin <[email protected]>

[AIRFLOW-2812] Fix error in Updating.md for upgrading to 1.10

Closes apache#3654 from nrhvyc/AIRFLOW-2812

[AIRFLOW-2816] Fix license text in docs/license.rst

(cherry picked from commit af15f11)
Signed-off-by: Bolke de Bruin <[email protected]>

[AIRFLOW-2817] Force explicit choice on GPL dependency (apache#3660)

By default one of Apache Airflow's dependencies pulls in a GPL
library. Airflow should not install (and upgrade) without an explicit choice.

This is part of the Apache requirements as we cannot depend on Category X
software.

(cherry picked from commit c37fc0b)
Signed-off-by: Bolke de Bruin <[email protected]>
(cherry picked from commit b39e453)
Signed-off-by: Bolke de Bruin <[email protected]>

[AIRFLOW-2869] Remove smart quote from default config

Closes apache#3716 from wdhorton/remove-smart-quote-
from-cfg

(cherry picked from commit 67e2bb9)
Signed-off-by: Bolke de Bruin <[email protected]>
(cherry picked from commit 700f5f0)
Signed-off-by: Bolke de Bruin <[email protected]>

[AIRFLOW-2140] Don't require kubernetes for the SparkSubmit hook (apache#3700)

This extra dep is a quasi-breaking change when upgrading - previously
there were no deps outside of Airflow itself for this hook. Importing
the k8s libs breaks installs that aren't also using Kubernetes.

This makes the dep optional for anyone who doesn't explicitly use the
functionality

(cherry picked from commit 0be002e)
Signed-off-by: Bolke de Bruin <[email protected]>
(cherry picked from commit f58246d)
Signed-off-by: Bolke de Bruin <[email protected]>

[AIRFLOW-2859] Implement own UtcDateTime (apache#3708)

The different UtcDateTime implementations all have issues.
Either they replace tzinfo directly without converting
or they do not convert to UTC at all.

We also ensure all mysql connections are in UTC
in order to keep sanity, as mysql will ignore the
timezone of a field when inserting/updating.

(cherry picked from commit 6fd4e60)
Signed-off-by: Bolke de Bruin <[email protected]>
(cherry picked from commit 8fc8c7a)
Signed-off-by: Bolke de Bruin <[email protected]>

[AIRFLOW-2895] Prevent scheduler from spamming heartbeats/logs

Reverts most of AIRFLOW-2027 until the issues with it can be fixed.

Closes apache#3747 from aoen/revert_min_file_parsing_time_commit

[AIRFLOW-2979] Make celery_result_backend conf Backwards compatible (apache#3832)

(apache#2806) Renamed `celery_result_backend` to `result_backend` and broke backwards compatibility.

[AIRFLOW-2524] Add Amazon SageMaker Training (apache#3658)

Add SageMaker Hook, Training Operator & Sensor
Co-authored-by: srrajeev-aws <[email protected]>

[AIRFLOW-2524] Add Amazon SageMaker Tuning (apache#3751)

Add SageMaker tuning Operator and sensor
Co-authored-by: srrajeev-aws <[email protected]>

[AIRFLOW-2524] Add SageMaker Batch Inference (apache#3767)

* Fix for comments
* Fix sensor test
* Update non_terminal_states and failed_states to static variables of SageMakerHook

Add SageMaker Transform Operator & Sensor
Co-authored-by: srrajeev-aws <[email protected]>

[AIRFLOW-2763] Add check to validate worker connectivity to metadata Database

[AIRFLOW-2786] Gracefully handle Variable import errors (apache#3648)

Variables that are added through a file are not
checked as explicity as creating a Variable in the
web UI. This handles exceptions that could be caused
by improper keys or values.

[AIRFLOW-2860] DruidHook: time check is wrong (apache#3745)

[AIRFLOW-2773] Validates Dataflow Job Name

Closes apache#3623 from kaxil/AIRFLOW-2773

[AIRFLOW-2845] Asserts in contrib package code are changed on raise ValueError and TypeError (apache#3690)

[AIRFLOW-1917] Trim extra newline and trailing whitespace from log (apache#3862)

[AIRFLOW-XXX] Fix SlackWebhookOperator docs (apache#3915)

The docs refer to `conn_id` while the actual argument is `http_conn_id`.

[AIRFLOW-2912] Add Deploy and Delete operators for GCF (apache#3969)

Both Deploy and Delete operators interact with Google
Cloud Functions to manage functions. Both are idempotent
and make use of GcfHook - hook that encapsulates
communication with GCP over GCP API.

[AIRFLOW-3078] Basic operators for Google Compute Engine (apache#4022)

Add GceInstanceStartOperator, GceInstanceStopOperator and GceSetMachineTypeOperator.

Each operator includes:
- core logic
- input params validation
- unit tests
- presence in the example DAG
- docstrings
- How-to and Integration documentation

Additionally, in GceHook error checking if response is 200 OK was added:

Some types of errors are only visible in the response's "error" field
and the overall HTTP response is 200 OK.

That is why apart from checking if status is "done" we also check
if "error" is empty, and if not an exception is raised with error
message extracted from the "error" field of the response.

In this commit we also separated out Body Field Validator to
separate module in tools - this way it can be reused between
various GCP operators, it has proven to be usable in at least
two of them now.

Co-authored-by: sprzedwojski <[email protected]>
Co-authored-by: potiuk <[email protected]>

[AIRFLOW-3183] Fix bug in DagFileProcessorManager.max_runs_reached() (apache#4031)

The condition is intended to ensure the function
will return False if any file's run_count is still smaller
than max_run. But the operator used here is "!=".
Instead, it should be "<".

This is because in DagFileProcessorManager,
there is no statement helping limit the upper
limit of run_count. It's possible that
files' run_count will be bigger than max_run.
In such case, max_runs_reached() method
may fail its purpose.

[AIRFLOW-3099] Don't ever warn about missing sections of config (apache#4028)

Rather than looping through and setting each config variable
individually, and having to know which sections are optional and which
aren't, instead we can just call a single function on ConfigParser and
it will read the config from the dict, and more importantly here, never
error about missing sections - it will just create them as needed.

[AIRFLOW-3089] Drop hard-coded url scheme in google auth redirect. (apache#3919)

The google auth provider hard-codes the `_scheme` in the callback url to
`https` so that airflow generates correct urls when run behind a proxy
that terminates tls. But this means that google auth can't be used when
running without https--for example, during local development. Also,
hard-coding `_scheme` isn't the correct solution to the problem of
running behind a proxy. Instead, the proxy should be configured to set
the `X-Forwarded-Proto` header to `https`; Flask interprets this header
and generates the appropriate callback url without hard-coding the
scheme.

[AIRFLOW-3178] Handle percents signs in configs for airflow run (apache#4029)

* [AIRFLOW-3178] Don't mask defaults() function from ConfigParser

ConfigParser (the base class for AirflowConfigParser) expects defaults()
to be a function - so when we re-assign it to be a property some of the
methods from ConfigParser no longer work.

* [AIRFLOW-3178] Correctly escape percent signs when creating temp config

Otherwise we have a problem when we come to use those values.

* [AIRFLOW-3178] Use os.chmod instead of shelling out

There's no need to run another process for a built in Python function.

This also removes a possible race condition that would make temporary
config file be readable by more than the airflow or run-as user
The exact behaviour would depend on the umask we run under, and the
primary group of our user, likely this would mean the file was readably
by members of the airflow group (which in most cases would be just the
airflow user). To remove any such possibility we chmod the file
before we write to it

[AIRFLOW-2216] Use profile for AWS hook if S3 config file provided in aws_default connection extra parameters (apache#4011)

Use profile for AWS hook if S3 config file provided in
aws_default connection extra parameters
Add test to validate profile set

[AIRFLOW-3138] Use current data type for migrations (apache#3985)

* Use timestamp instead of timestamp with timezone for migration.

[AIRFLOW-3119] Enable debugging with Celery(apache#3950)

This will enable --loglevel when launching a
celery worker and inherit that LOGGING_LEVEL
setting from airflow.cfg

[AIRFLOW-3197] EMRHook is missing new parameters of the AWS API (apache#4044)

Allow passing any params to the CreateJobFlow API, so that we don't have
to stay up to date with AWS api changes.

[AIRFLOW-3203] Fix DockerOperator & some operator test (apache#4049)

- For argument `image`, no need to explicitly
  add "latest" if tag is omitted.
  "latest" will be used by default if no
  tag provided. This is handled by `docker` package itself.

- Intermediate variable `cpu_shares` is not needed.

- Fix wrong usage of `cpu_shares` and `cpu_shares`.
  Based on
  https://docker-py.readthedocs.io/en/stable/api.html#docker.api.container.ContainerApiMixin.create_host_config,
  They should be an arguments of
  self.cli.create_host_config()
  rather than
  APIClient.create_container().

- Change name of the corresponding test script,
  to ensure it can be discovered.

- Fix the test itself.

- Some other test scripts are not named properly,
  which result in failure of test discovery.

[AIRFLOW-3232] More readable GCF operator documentation (apache#4067)

[AIRFLOW-3231] Basic operators for Google Cloud SQL (apache#4097)

Add CloudSqlInstanceInsertOperator, CloudSqlInstancePatchOperator and CloudSqlInstanceDeleteOperator.

Each operator includes:
- core logic
- input params validation
- unit tests
- presence in the example DAG
- docstrings
- How-to and Integration documentation

Additionally, small improvements to GcpBodyFieldValidator were made:
- add simple list validation capability (type="list")
- introduced parameter allow_empty, which can be set to False
	to test for non-emptiness of a string instead of specifying
	a regexp.

Co-authored-by: sprzedwojski <[email protected]>
Co-authored-by: potiuk <[email protected]>

[AIRFLOW-2524] Update SageMaker hook and operators (apache#4091)

This re-works the SageMaker functionality in Airflow to be more complete, and more useful for the kinds of operations that SageMaker supports.

We removed some files and operators here, but these were only added after the last release so we don't need to worry about any sort of back-compat.

[AIRFLOW-3276] Cloud SQL: database create / patch / delete operators (apache#4124)

[AIRFLOW-2192] Allow non-latin1 usernames with MySQL backend by adding a SQL_ENGINE_ENCODING param and default to UTF-8 (apache#4087)

Compromised of:

Since we have unicode_literals importred and the engine arguments must be strings in Python2 explicitly make 'utf-8' a string.

replace bare exception with conf.AirflowConfigException for missing value.

It's just got for strings apparently.

Add utf-8 to default_airflow.cfg - question do I still need the try try/except block or can we depend on defaults (I note some have both).

Get rid of try/except block and depend on default_airflow.cfg

Use __str__ since calling str just gives us back a newstr as well.

Test that a panda user can be saved.

[AIRFLOW-3295] Fix potential security issue in DaskExecutor (apache#4128)

When user decides to use TLS/SSL encryption
for DaskExecutor communications,
`Distributed.Security` object will be created.

However, argument `require_encryption` is missed
to be set to `True` (its default value is `False`).

This may fail the TLS/SSL encryption setting-up.

[AIRFLOW-XXX] Fix flake8 errors from apache#4144

[AIRFLOW-2574] Cope with '%' in SQLA DSN when running migrations (apache#3787)

Alembic uses a ConfigParser like Airflow does, and "%% is a special
value in there, so we need to escape it. As per the Alembic docs:

> Note that this value is passed to ConfigParser.set, which supports
> variable interpolation using pyformat (e.g. `%(some_value)s`). A raw
> percent sign not part of an interpolation symbol must therefore be
> escaped, e.g. `%%`

[AIRFLOW-3090] Demote dag start/stop log messages to debug (apache#3920)

[AIRFLOW-3090] Specify path of key file in log message (apache#3921)

[AIRFLOW-3111] Fix instructions in UPDATING.md and remove comment (apache#3944)

artifacts in default_airflow.cfg

- fixed incorrect instructions in UPDATING.md regarding core.log_filename_template and elasticsearch.elasticsearch_log_id_template
- removed comments referencing "additional curly braces" from
default_airflow.cfg since they're irrelevant to the rendered airflow.cfg

[AIRFLOW-3127] Fix out-dated doc for Celery SSL (apache#3967)

Now in `airflow.cfg`, for Celery-SSL, the item names are
"ssl_active", "ssl_key", "ssl_cert", and "ssl_cacert".
(since PR https://github.com/apache/incubator-airflow/pull/2806/files)

But in the documentation
https://airflow.incubator.apache.org/security.html?highlight=celery
or
https://github.com/apache/incubator-airflow/blob/master/docs/security.rst,
it's "CELERY_SSL_ACTIVE", "CELERY_SSL_KEY", "CELERY_SSL_CERT", and
"CELERY_SSL_CACERT", which is out-dated and may confuse readers.

[AIRFLOW-3187] Update airflow.gif file with a slower version (apache#4033)

[AIRFLOW-3164] Verify server certificate when connecting to LDAP (apache#4006)

Misconfiguration and improper checking of exceptions disabled
server certificate checking. We now only support TLS connections
and do not support insecure connections anymore.

[AIRFLOW-2779] Add license headers to doc files (apache#4178)

This adds ASF license headers to all the .rst and .md files with the
exception of the Pull Request template (as that is included verbatim
when opening a Pull Request on Github which would be messy)

Added the include_deleted parameter to salesforce hook
  • Loading branch information
ashb authored and wayne.morris committed Jul 31, 2019
1 parent f76d9da commit 90b46bd
Show file tree
Hide file tree
Showing 18 changed files with 1,119 additions and 568 deletions.
27 changes: 24 additions & 3 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,27 @@ If you want to use LDAP auth backend without TLS then you will have to create a
custom-auth backend based on
https://github.com/apache/airflow/blob/1.10.0/airflow/contrib/auth/backends/ldap_auth.py

### EMRHook now passes all of connection's extra to CreateJobFlow API

EMRHook.create_job_flow has been changed to pass all keys to the create_job_flow API, rather than
just specific known keys for greater flexibility.

However prior to this release the "emr_default" sample connection that was created had invalid
configuration, so creating EMR clusters might fail until your connection is updated. (Ec2KeyName,
Ec2SubnetId, TerminationProtection and KeepJobFlowAliveWhenNoSteps were all top-level keys when they
should be inside the "Instances" dict)

### LDAP Auth Backend now requires TLS

Connecting to an LDAP serever over plain text is not supported anymore. The
certificate presented by the LDAP server must be signed by a trusted
certificiate, or you must provide the `cacert` option under `[ldap]` in the
config file.

If you want to use LDAP auth backend without TLS then you will habe to create a
custom-auth backend based on
https://github.com/apache/incubator-airflow/blob/1.10.0/airflow/contrib/auth/backends/ldap_auth.py

## Airflow 1.10

Installation and upgrading requires setting `SLUGIFY_USES_TEXT_UNIDECODE=yes` in your environment or
Expand Down Expand Up @@ -646,11 +667,11 @@ With Airflow 1.9 or lower, `FILENAME_TEMPLATE`, `PROCESSOR_FILENAME_TEMPLATE`, `
```
[core]
fab_logging_level = WARN
log_filename_template = {{{{ ti.dag_id }}}}/{{{{ ti.task_id }}}}/{{{{ ts }}}}/{{{{ try_number }}}}.log
log_processor_filename_template = {{{{ filename }}}}.log
log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log
log_processor_filename_template = {{ filename }}.log
[elasticsearch]
elasticsearch_log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
elasticsearch_log_id_template = {dag_id}-{task_id}-{execution_date}-{try_number}
elasticsearch_end_of_log_mark = end_of_log
```

Expand Down
4 changes: 4 additions & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,10 @@ def read_dict(self, *args, **kwargs):
super().read_dict(*args, **kwargs)
self._validate()

def read_dict(self, *args, **kwargs):
super(AirflowConfigParser, self).read_dict(*args, **kwargs)
self._validate()

def has_option(self, section, option):
try:
# Using self.get() to avoid reimplementing the priority order
Expand Down
58 changes: 58 additions & 0 deletions airflow/contrib/example_dags/example_gcp_function_delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Example Airflow DAG that deletes a Google Cloud Function.
This DAG relies on the following Airflow variables
https://airflow.apache.org/concepts.html#variables
* PROJECT_ID - Google Cloud Project where the Cloud Function exists.
* LOCATION - Google Cloud Functions region where the function exists.
* ENTRYPOINT - Name of the executable function in the source code.
"""

import datetime

import airflow
from airflow import models
from airflow.contrib.operators.gcp_function_operator import GcfFunctionDeleteOperator

# [START howto_operator_gcf_delete_args]
PROJECT_ID = models.Variable.get('PROJECT_ID', '')
LOCATION = models.Variable.get('LOCATION', '')
ENTRYPOINT = models.Variable.get('ENTRYPOINT', '')
# A fully-qualified name of the function to delete

FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,
ENTRYPOINT)
default_args = {
'start_date': airflow.utils.dates.days_ago(1)
}
# [END howto_operator_gcf_delete_args]

with models.DAG(
'example_gcp_function_delete',
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)
) as dag:
# [START howto_operator_gcf_delete]
t1 = GcfFunctionDeleteOperator(
task_id="gcf_delete_task",
name=FUNCTION_NAME
)
# [END howto_operator_gcf_delete]
113 changes: 113 additions & 0 deletions airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Example Airflow DAG that creates a Google Cloud Function and then deletes it.
This DAG relies on the following Airflow variables
https://airflow.apache.org/concepts.html#variables
* PROJECT_ID - Google Cloud Project to use for the Cloud Function.
* LOCATION - Google Cloud Functions region where the function should be
created.
* SOURCE_ARCHIVE_URL - Path to the zipped source in Google Cloud Storage
or
* SOURCE_UPLOAD_URL - Generated upload URL for the zipped source
or
* ZIP_PATH - Local path to the zipped source archive
or
* SOURCE_REPOSITORY - The URL pointing to the hosted repository where the function is
defined in a supported Cloud Source Repository URL format
https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions#SourceRepository
* ENTRYPOINT - Name of the executable function in the source code.
"""

import datetime

from airflow import models
from airflow.contrib.operators.gcp_function_operator \
import GcfFunctionDeployOperator, GcfFunctionDeleteOperator
from airflow.utils import dates

# [START howto_operator_gcf_deploy_variables]
PROJECT_ID = models.Variable.get('PROJECT_ID', '')
LOCATION = models.Variable.get('LOCATION', '')
SOURCE_ARCHIVE_URL = models.Variable.get('SOURCE_ARCHIVE_URL', '')
SOURCE_UPLOAD_URL = models.Variable.get('SOURCE_UPLOAD_URL', '')
SOURCE_REPOSITORY = models.Variable.get('SOURCE_REPOSITORY', '')
ZIP_PATH = models.Variable.get('ZIP_PATH', '')
ENTRYPOINT = models.Variable.get('ENTRYPOINT', '')
FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,
ENTRYPOINT)
RUNTIME = 'nodejs6'
VALIDATE_BODY = models.Variable.get('VALIDATE_BODY', True)

# [END howto_operator_gcf_deploy_variables]

# [START howto_operator_gcf_deploy_body]
body = {
"name": FUNCTION_NAME,
"entryPoint": ENTRYPOINT,
"runtime": RUNTIME,
"httpsTrigger": {}
}
# [END howto_operator_gcf_deploy_body]

# [START howto_operator_gcf_deploy_args]
default_args = {
'start_date': dates.days_ago(1),
'project_id': PROJECT_ID,
'location': LOCATION,
'body': body,
'validate_body': VALIDATE_BODY
}
# [END howto_operator_gcf_deploy_args]

# [START howto_operator_gcf_deploy_variants]
if SOURCE_ARCHIVE_URL:
body['sourceArchiveUrl'] = SOURCE_ARCHIVE_URL
elif SOURCE_REPOSITORY:
body['sourceRepository'] = {
'url': SOURCE_REPOSITORY
}
elif ZIP_PATH:
body['sourceUploadUrl'] = ''
default_args['zip_path'] = ZIP_PATH
elif SOURCE_UPLOAD_URL:
body['sourceUploadUrl'] = SOURCE_UPLOAD_URL
else:
raise Exception("Please provide one of the source_code parameters")
# [END howto_operator_gcf_deploy_variants]


with models.DAG(
'example_gcp_function_deploy_delete',
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)
) as dag:
# [START howto_operator_gcf_deploy]
deploy_task = GcfFunctionDeployOperator(
task_id="gcf_deploy_task",
name=FUNCTION_NAME
)
# [END howto_operator_gcf_deploy]
delete_task = GcfFunctionDeleteOperator(
task_id="gcf_delete_task",
name=FUNCTION_NAME
)
deploy_task >> delete_task
6 changes: 4 additions & 2 deletions airflow/contrib/hooks/salesforce_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,21 @@ def get_conn(self):
)
return self.conn

def make_query(self, query):
def make_query(self, query, include_deleted=False, **kwargs ):
"""
Make a query to Salesforce.
:param query: The query to make to Salesforce.
:type query: str
:param include_deleted: True if deleted records should be included
:type include_deleted: bool
:return: The query result.
:rtype: dict
"""
conn = self.get_conn()

self.log.info("Querying for all objects")
query_results = conn.query_all(query)
query_results = conn.query_all(query, include_deleted=include_deleted, **kwargs)

self.log.info("Received results: Total size: %s; Done: %s",
query_results['totalSize'], query_results['done'])
Expand Down
11 changes: 11 additions & 0 deletions airflow/kubernetes/kube_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@
has_kubernetes = False
_import_err = e

try:
from kubernetes import config, client
from kubernetes.client.rest import ApiException
has_kubernetes = True
except ImportError as e:
# We need an exception class to be able to use it in ``except`` elsewhere
# in the code base
ApiException = BaseException
has_kubernetes = False
_import_err = e


def _load_kube_config(in_cluster, cluster_context, config_file):
if not has_kubernetes:
Expand Down
30 changes: 30 additions & 0 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,16 @@ def configure_orm(disable_connection_pool=False):
# For Python2 we get back a newstr and need a str
engine_args['encoding'] = engine_args['encoding'].__str__()

try:
# Allow the user to specify an encoding for their DB otherwise default
# to utf-8 so jobs & users with non-latin1 characters can still use
# us.
engine_args['encoding'] = conf.get('core', 'SQL_ENGINE_ENCODING')
except conf.AirflowConfigException:
engine_args['encoding'] = 'utf-8'
# For Python2 we get back a newstr and need a str
engine_args['encoding'] = engine_args['encoding'].__str__()

engine = create_engine(SQL_ALCHEMY_CONN, **engine_args)
reconnect_timeout = conf.getint('core', 'SQL_ALCHEMY_RECONNECT_TIMEOUT')
setup_event_handlers(engine, reconnect_timeout)
Expand Down Expand Up @@ -243,6 +253,26 @@ def validate_session():
return conn_status


def validate_session():
try:
worker_precheck = conf.getboolean('core', 'worker_precheck')
except conf.AirflowConfigException:
worker_precheck = False
if not worker_precheck:
return True
else:
check_session = sessionmaker(bind=engine)
session = check_session()
try:
session.execute("select 1")
conn_status = True
except exc.DBAPIError as err:
log.error(err)
conn_status = False
session.close()
return conn_status


def configure_action_logging():
"""
Any additional configuration (register callback) for airflow.utils.action_loggers
Expand Down
Loading

0 comments on commit 90b46bd

Please sign in to comment.