Skip to content

Commit

Permalink
[AIRFLOW-4811] Implement GCP DLP' Hook and Operators (#5539)
Browse files Browse the repository at this point in the history
Implement GCP DLP' Hook and Operators
  • Loading branch information
ryanyuan authored and mik-laj committed Jul 30, 2019
1 parent 05c01a9 commit 6ef0e37
Show file tree
Hide file tree
Showing 10 changed files with 5,964 additions and 1 deletion.
82 changes: 82 additions & 0 deletions airflow/contrib/example_dags/example_gcp_dlp_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Example Airflow DAG that execute the following tasks using
Cloud DLP service in the Google Cloud Platform:
1) Creating a content inspect template;
2) Using the created template to inspect content;
3) Deleting the template from GCP .
"""

import os
from google.cloud.dlp_v2.types import ContentItem, InspectConfig, InspectTemplate

import airflow
from airflow.models import DAG
from airflow.contrib.operators.gcp_dlp_operator import (
CloudDLPCreateInspectTemplateOperator,
CloudDLPDeleteInspectTemplateOperator,
CloudDLPInspectContentOperator,
)


default_args = {"start_date": airflow.utils.dates.days_ago(1)}


GCP_PROJECT = os.environ.get("GCP_PROJECT_ID", "example-project")
TEMPLATE_ID = "dlp-inspect-8034856"
ITEM = ContentItem(
table={
"headers": [{"name": "column1"}],
"rows": [{"values": [{"string_value": "My phone number is (206) 555-0123"}]}],
}
)
INSPECT_CONFIG = InspectConfig(
info_types=[{"name": "PHONE_NUMBER"}, {"name": "US_TOLLFREE_PHONE_NUMBER"}]
)
INSPECT_TEMPLATE = InspectTemplate(inspect_config=INSPECT_CONFIG)


with DAG("example_gcp_dlp", default_args=default_args, schedule_interval=None) as dag:
create_template = CloudDLPCreateInspectTemplateOperator(
project_id=GCP_PROJECT,
inspect_template=INSPECT_TEMPLATE,
template_id=TEMPLATE_ID,
task_id="create_template",
xcom_push=True,
dag=dag,
)

inspect_content = CloudDLPInspectContentOperator(
task_id="inpsect_content",
project_id=GCP_PROJECT,
item=ITEM,
inspect_template_name="{{ task_instance.xcom_pull('create_template', key='return_value')['name'] }}",
dag=dag,
)

delete_template = CloudDLPDeleteInspectTemplateOperator(
task_id="delete_template",
template_id=TEMPLATE_ID,
project_id=GCP_PROJECT,
dag=dag,
)

create_template > inspect_content > delete_template
Loading

0 comments on commit 6ef0e37

Please sign in to comment.