This repository contains the Airflow DAGs for the Stellar ETL project. These DAGs provide a workflow for exporting data from the Stellar network and uploading the data into BigQuery.
- Installation and Setup
- Execution Procedures
- Understanding the Setup
- Further Development
Below are instructions to intialize the Google Cloud SDK and create the GCP project, dataset, and GCS bucket if needed.
- Download the Google Cloud SDK.
- Initialize the Cloud SDK and login to your Google account
- Login to the Google Cloud Console
- Create a new Google Project or use an existing project
NOTE: The project name you choose corresponds to the Airflow variable "bq_project".
- Log in to Google BigQuery
- Create a new dataset with the desired name or use an existing dataset
NOTE: The dataset name you choose corresponds to the Airflow variable "bq_dataset".
-
Open the Cloud Storage browser
-
Create a new Google Storage bucket that will store exported files
NOTE: Creating a new Cloud Composer environment will automatically create a new GCS bucket.
NOTE: The dataset name you choose corresponds to the Airflow variable "gcs_exported_data_bucket_name".
NOTE: Creating a new environment with Cloud Composer will create a new GCS bucket.
WARNING: Make sure that you adhere to the location requirements for Cloud Storage buckets and BigQuery datasets. Otherwise, it will not be possible to upload data to BigQuery.
Cloud Composer is the preferred method of deployment. Cloud Composer is a managed service used for Airflow deployment that provides much of the infrastructure required to host an Airflow instance. The steps for setting up a Cloud Composer environment are detailed below.
Create a new Cloud Composer environment using the UI or by following the setup instructions in Create Cloud Composer environments
For AIRFLOW 2.x: Be wary of choosing "autopilot" for environment resource management. The ephemeral storage provided by autopilot-ed containers is capped at 10GB, which may not be enough for hefty tasks (such as
state_table_dag
'sexport_task
), or any task that runs captive core. You can add a second node pool to your Composer 1 environment, and configure it to be managed by autopilot if desired. Composer 2 environments use autopilot exclusively for resource management.
Note: If no service account is provided, GCP will use the default GKE service account. For quick setup this is an easy option. Remember to adjust the disk size, machine type, and node count to fit your needs. The python version must be 3, and the image must be
composer-1.16.11-airflow-1.10.14
or later. GCP deprecates support for older versions of composer and airflow. It is recommended that you select a stable, latest version to avoid an environment upgrade. See the command reference page for a detailed list of parameters.
TROUBLESHOOTING: If the environment creation fails because the "Composer Backend timed out" try disabling and enabling the Cloud Composer API. If the creation fails again, try creating a service account with Owner permissions and use it to create the Composer environment.
Cloud Composer may take a while to setup the environment. Once the process is finished, you can view the environment by going to the Composer section of the Cloud Console.
NOTE: Creating an environment will also create a new Google Cloud Storage bucket. You can check this bucket's name by clicking on the DAGs folder link in the Composer section of the Cloud Console.
After the environment is created, select the environment and navigate to the environment configuration tab. Look for the value under DAGs folder. It will be of the form gs://airflow_bucket/dags
. The airflow_bucket
value will be used in this step and the next. Run the command below in order to upload the DAGs and schemas to your Airflow bucket.
> bash upload_static_to_gcs.sh <airflow_bucket>
Afterwards, you can navigate to the Airflow UI for your Cloud Composer environment. To do so, navigate to the Composer section of the Cloud Console, and click the link under Airflow webserver
. Then, pause the DAGs by clicking the on/off toggle to the left of their names. DAGs should remain paused until you have finished setting up the environment. Some DAGs may not show up due to errors that will be fixed as the following steps are completed.
The Airflow DAGs require service account keys to perform their operations. Generate a service account key for a service account that has access to BigQuery and Google Cloud Storage. Then, add this key file to the data folder in your airflow_bucket
.
NOTE: The name of the key file corresponds to the Airflow variable "api_key_path". The data folder in Cloud Storage corresponds to the path "/home/airflow/gcs/data/", but ensure that the variable has the correct filename.
If the Kubernetes pods contain long-running or resource intensive operations, it is best to create a separate node pool for task execution. Executing the tasks on the same node pool as the airflow-scheduler
will contribute to resource starvation and transient failures in the DAG.
Find the Kubernetes cluster name that is used by your Cloud Composer environment. To do so, select the environment, navigate to the ENVIRONMENT CONFIGURATION
tab, and look for the value of GKE cluster
. The cluster name is the final part of this path.
Then, run the command:
gcloud container node-pools create <pool_name> --cluster <cluster_name> \
--zone <composer_zone> --project <project_id>
Alternatively, node pools can be created through the UI with the ADD NODE POOL
button. Select the environment, navigate to the ENVIRONMENT CONFIGURATION
tab, and look for the value under GKE cluster
Security can only be applied upon pool creation, so ensure that your security account and scopes are correct. If they need to be updated, you will need to delete the node pool and recreate it.
NOTE: The name of the pool will be used in the Airflow variable "affinity".
A sample affinity configuration is below, as well as defined in the
airflow_variables.txt
. The user must supply the node pool name invalues
.
"affinity": {
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [{
"matchExpressions": [{
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
"values": [<node-pool-1>,
<node-pool-2>,]
}]
}]
}
}
},
Open the Google Cloud Shell. Run these commands:
gcloud container clusters get-credentials <cluster_name> --region=<composer_region>
kubectl create ns <namespace_name>
kubectl create clusterrolebinding default-admin --clusterrole cluster-admin \
--serviceaccount=<service_account> --namespace <namespace_name>
The first command acquires credentials, allowing you to execute the next commands. The second command creates the new namespace, and the third allows the service account that executes tasks to act in the new namespace.
To find the value of <airflow_worker_namespace>
, select your Cloud Composer environment, navigate to the ENVIRONMENT CONFIGURATION
tab, and look for the value of GKE cluster
. Click on the link that says view cluster workloads
.
A new page will open with a list of Kubernetes workflows. Click on airflow-worker
in order to go to the details page for that Deployment. Look for the value of Namespace
.
NOTE: The name of the newly created namespace corresponds to the Airflow variable "namespace".
There are a few extra hoops to jump through to configure Workload Identity, so that export
tasks have permissions to upload files to GCS.
You will be creating a Kubernetes service account, and bind it to a Google service account that your task is authenticated as.
Steps taken from this doc.
-
Create a namespace in the k8s cluster where the Composer env is running:
kubectl create namespace <namespace_name>
-
Create a k8s service account:
kubectl create serviceaccount <service_account_name> \ --namespace <namespace_name>
-
Create a Google service account, if one doesn't already exist:
gcloud iam service-accounts create <service_account_name> \ --project=<project_id>
-
Grant the Google service account that you're using
storage.objectAdmin
permissions, it doesn't already have it.gcloud projects add-iam-policy-binding hubble-261722 \ --member "<Google service account>" \ --role "roles/storage.objectAdmin"
-
Associate the Google and k8s service accounts:
gcloud iam service-accounts add-iam-policy-binding <Google service account email> \ --role roles/iam.workloadIdentityUser \ --member "<k8s service account>"
-
Annotate the k8s service account with the Google service account:
kubectl annotate serviceaccount <k8s service account> \ --namespace <namespace_name> \ iam.gke.io/gcp-service-account=<Google service account>
-
Set the corresponding airflow variables (
k8s_namespace
andk8s_service_account
) for tasks running onKubernetesPodOperator
.
Find the Kubernetes cluster workloads that are used by your Cloud Composer environment. To do so, select the environment, navigate to the ENVIRONMENT CONFIGURATION
tab, and look for the GKE cluster
section. Click on the link that says view cluster workloads
.
A new page will open with a list of Kubernetes workflows. Click on airflow-worker
in order to go to the details page for that Deployment. Click the edit
button. This will take you to a tab with a Kubernetes configuration. In subsequent steps, you will edit this file. For an example of a finalized config file, see this example file.
WARNING: You shouldn't copy the example file directly because it has environment variables and config values that are set up for a different project.
NOTE: This deployment file contains two separate containers: airflow-worker and gcs-syncd. Only the airflow-worker container should be edited.
Mount Docker on Airflow Workers
In this step, mount the Docker.sock and Docker. In addition, edit the security config so that the container runs as privileged, allowing it to access Docker. See [this commit](https://github.com/marc-chan/cloud_composer_examples/commit/f3e6a202ef0bfd2214385def7e36be33db191df6#diff-fc2e428a07c8d60059e54e5154f0c540) for an example of how to make these changes.Add Volume for Local Files to Airflow Workers
In this step, add another volumeMount to airflow-workers. This local path will be used for temporary storage of exported files. In addition, make sure that you add the corresponding volume with the type DirectoryOrCreate.Here is an example of what your volumeMounts and volumes should look like at the end of this step:
...
volumeMounts:
- mountPath: /etc/airflow/airflow_cfg
name: airflow-config
- mountPath: /home/airflow/gcs
name: gcsdir
- mountPath: /var/run/docker.sock
name: docker-host
- mountPath: /bin/docker
name: docker-app
- mountPath: /home/airflow/etlData
name: etl-data
...
volumes:
- configMap:
defaultMode: 420
name: airflow-configmap
name: airflow-config
- emptyDir: {}
name: gcsdir
- hostPath:
path: /var/run/docker.sock
type: ""
name: docker-host
- hostPath:
path: /usr/bin/docker
type: ""
name: docker-app
- hostPath:
path: /home/airflow/etlData
type: DirectoryOrCreate
name: etl-data
NOTE: The mount path chosen corresponds to the Airflow variable
local_output_path
.
Add Poststart Script to Airflow Workers
Find the namespace name in the airflow-worker config file. It should be near the top of the file, and may look like `composer-1-12-0-airflow-1-10-10-2fca78f7`. This value will be used in later commands.Next, open the cloud shell. Keep your airflow-worker configuration file open, or save it. In the cloud shell, create a text file called poststart.sh
by running the command: nano poststart.sh
. Then, copy the text from the poststart.sh
file in this repository into the newly opened file.
-
If you changed the path for the local folder in the previous step, make sure that you edit line 13:
for file in /home/airflow/etlData/*
-
It should reflect the path changes you made. Once the file is finalized, run these commands:
gcloud container clusters get-credentials <cluster_name> --region=<composer_region> kubectl create configmap start-config --from-file poststart.sh -n <namespace_name>
-
Return to the airflow-worker config file. Add a new volumeMount to /etc/scripts.
... volumeMounts: ... - mountPath: /etc/scripts name: config-volume ...
-
Then, add a new Volume that links to the configMap you created.
... volumes: ... - configMap: defaultMode: 511 name: start-config name: config-volume ...
-
This will make the script available to the Airflow workers. In order for them to call it automatically, add a postStart hook to airflow-worker above the existing preStop hook.
... lifecycle: postStart: exec: command: - /bin/bash - /etc/scripts/poststart.sh preStop: exec: command: - bash - -c - pkill -f "MainProcess" ...
Click here if you are interested in knowing what the script does.
The export tasks in the etl use Docker images with their own filesystems. Mounting a folder to the Docker image allows us to connect the airflow-worker filesystem to the Docker image filesystem. However, there are multiple airflow-worker instances, and tasks are distributed between them. This means that an export task may occur on one worker, and the subsequent task that needs that file could occur on a different worker instance. There needs to be some way to pool all the data from all the worker instances.
Fortunately, Cloud Composer provides a folder at /home/airflow/gcs/data. This folder is described in detail here. Essentially, the folder is synchronized between all the workers, and it also is linked to the data folder in the environment's Cloud Storage bucket. This means that data stored here will be available to all workers, solving the problem. Unfortunately, since this folder is already connected to a Cloud Storage bucket, it cannot also connect to a Docker image.
Instead, we connect a local folder defined in the previous step. The poststart.sh
script runs constantly in the background. It moves files from the local folder to the gcs/data folder. The script is more complicated than a simple move command because it needs to ensure that no programs are writing to the files before they are moved.
In order to add the Airflow variables and connections, navigate to the Airflow web server. To do so, navigate to the Composer section of the Cloud Console, and click the link under Airflow Webserver
.
Click the Admin tab, then Connections. Click create, then:
- Set the
Conn Id
field togoogle_cloud_platform_connection
. - Set the
Conn Type
toGoogle Cloud Platform
. - Set the
Project Id
to your project id - Set the
Keyfile Path
to<api_key_path>
. - The
<api_key_path>
should be the same as the Airflow variableapi_key_path
.
Next, add the Airflow variables. Click the Admin tab, then Variables. Click the Choose file
button, select your variables file, and click import variables.
The airflow_variables.txt
file provides a set of default values for variables.
Variable name | Description | Should be changed? |
---|---|---|
affinity | JSON object that represents the pod's affinity | Yes, if you followed the optional step and made a new node pool. |
api_key_path | path to the Google Cloud Platform API key | No, unless your filename is different. |
bq_dataset | name of the BigQuery dataset | Yes. Change to your dataset name. |
bq_project | name of the BigQuery project | Yes. Change to your project name. |
gcs_exported_data_bucket_name | name of the Google Cloud Storage bucket that will store exported data | Yes. Change to the name of the bucket you made. |
image_name | name of the ETL's Docker image | No, unless you need a specific image version. |
image_output_path | local output path within the ETL image | No. |
image_pull_policy | Specifies how image pull behavior. Valid values are: Always , IfNotPresent , or Never |
No, unless you handle image updates manually. |
local_output_path | local output path within the airflow-worker that is used for temporary storage | No, unless you changed the path when modifying the Kubernetes config. |
namespace | namespace name for ETL tasks that generate Kubernetes pods | Yes, if you followed the optional step and made a new namespace |
output_file_names | JSON object. Each key should be a data structure, and the value should be the name of the output file for that data structure | Yes, if desired. Make sure each type has a different filename. |
output_path | shared output path for exported data | No, unless you have a different shared storage solution. |
owner | the name of the owner of the Airflow DAGs | Yes. |
schema_filepath | file path to schema folder | No, unless schemas are in a different location |
table_ids | JSON object. Each key should be a data structure, and the value should be the name of the BigQuery table | Yes, if desired. Make sure each type has a different table name. |
cluster_fields | JSON object. Each key should be a BigQuery table, and the value is a list of columns that the table is clustered by | Yes, if desired for tables that want clustering |
parititon_fields | JSON object. Each key should be a BigQuery table, and the value is a JSON object of type and field to partition by | Yes, if desired for tables that want partitioning |
gcs_exported_object_prefix | String to prefix run_id export task output path with | Yes, if desired to prefix run_id |
sentry_dsn | Sentry Data Source Name to tell where Sentry SDK should send events | Yes |
sentry_environment | Environment that sentry alerts will fire | Yes |
use_testnet | Flag to use testnet data instead of mainnet | Yes, if desired to use testnet data |
task_timeout | JSON object. Each key should be the airflow util task name, and the value is the timeout in seconds | Yes, if desired to give tasks timeout |
Variable name | Description | Should be changed? |
---|---|---|
resources | Resources to request and allocate to Kubernetes Pods. | No, unless pods need more resources |
kube_config_location | Location of the kubernetes config file. See here for a guide on finding the Kube config file. If you are running the pods in the same cluster as Airflow, you can leave this value blank. | No, unless the pods are in a different cluster than Airflow. |
kubernetes_sidecar_image | Image used for xcom sidecar | No, unless you want to pull a different alpine-based image. |
k8s_namespace | Namespace to run the task in | No, unless the pods are moved into a new namespace |
k8s_service_account | K8s service account the task runs as | No, unless k8s authentication is modified, and is likely linked to the associated GCP service account. |
volume_config | JSON objects representing the configuration for your Kubernetes volume. | Yes. Change configs to match your volume (see below for example configs) |
volume_name | Name of the persistent ReadWriteMany volume associated with the claim. | Yes. Change to your volume name. |
Here are some example volume_config
values. Note that a ReadWriteMany volume is required when tasks run in parallel.
- For a an NFS volume set
volume_config={"nfs": {"path": "/", "server": "my-server.provider.cloud"}}
. - In order to set up a persistent volume claim, set
volume_config={"persistentVolumeClaim":{"claimName": <claim>}
- In order to set up a host path volume, set
volume_config="hostPath":{"path": <path>, "type": "DirectoryOrCreate"}}
NOTE: Google Cloud Composer instance of airflow has limited CLI support. Supported Airflow CLI commands
First, this image has a shows the Airflow web UI components for pausing and triggering DAGs:
- Ensure that the Airflow scheduler is running:
airflow scheduler
- Ensure that the Airflow web server is running:
airflow webserver -p <port>
- Enable the DAGs
- Use the command
airflow unpause <DAG name>
or use the Airflow UI
- Use the command
You can clear failed tasks in the task-instance context menu in the Airflow UI. Clearing failed tasks gives them a chance to run again without requiring you to run the entire DAG again.
This section contains information about the Airflow setup. It includes our DAG diagrams and explanations of tasks. For general Airflow knowledge, check out the Airflow concepts overview or the Airflow tutorial.
- exports transactions, operations, trades, and effects from Stellar using CaptiveCore
- inserts into BigQuery
NOTE: SDF writes to both a private dataset and public dataset. Non-SDF instances will probably only need to write to a single private dataset.
- exports assets and ledgers from Stellar's history archives
- inserts into BigQuery
NOTE: SDF writes to both a private dataset and public dataset. Non-SDF instances will probably only need to write to a single private dataset.
- exports accounts, account_signers, offers, claimable_balances, liquidity pools, and trustlines
- inserts into BigQuery
NOTE: Bucket List DAG is unsupported.
- exports from Stellar's bucket list, which contains data on accounts, offers, trustlines, account signers, liqudity pools, and claimable balances
- inserts into BigQuery
This file contains methods for creating time tasks. Time tasks call the get_ledger_range_from_times function in the stellar-etl Docker image. The tasks receive the execution time of the current DAG run and the expected execution time of the next run. They convert this time range into a ledger range that can be passed to the export tasks.
This file contains methods for creating export tasks. Export tasks call export functions in the stellar-etl Docker image with a ledger range determined by the upstream time task. The data is exported in a newline-delimited JSON text file with a file name in the format [start ledger]-[end ledger]-[data type].txt
.
This file contains methods for creating tasks that appends information from a Google Cloud Storage file to a BigQuery table. These tasks will create a new table if one does not exist. These tasks are used for history archive data structures, as Stellar wants to keep a complete record of the ledger's entire history.
This file contains methods for creating apply tasks. Apply tasks are used to merge a file from Google Cloud Storage into a BigQuery table. Apply tasks differ from the other task that appends in that they apply changes. This means that they update, delete, and insert rows. These tasks are used for accounts, offers, and trustlines, as the BigQuery table represents the point in time state of these data structures. This means that, for example, a merge task could alter the account balance field in the table if a user performed a transaction, delete a row in the table if a user deleted their account, or add a new row if a new account was created.
Apply tasks can also be used to insert unique values only. This behavior is used for orderbook and history archive data structures. Instead of performing a merge operation, which would update or delete existing rows, the task will simply insert new rows if they don't already exist. This helps prevent duplicated data in a scenario where rows shouldn't change or be deleted. Essentially, this task replicates the behavior of a primary key in a database when used for orderbooks.
This file pulls and inserts batch stats into BigQuery.
Data is inserted into history_archives_dag_runs
.
This file contains methods for creating BigQuery insert job tasks. The task will read the query from the specified sql file and will return a BigQuery job operator configured to the GCP project and datasets defined.
This file creates an ExternalTaskSensor that triggers on specified DAG tasks's success.
This file deletes data from a specified BigQuery project.dataset.table
according to the batch interval.
NOTE: If the batch interval is changed, the deleted data might not align with the prior batch intervals.
This section details further areas of development. It covers a basic guide on how to add new features and test changes to existing features. It also contains a list of project TODOs (check the GitHub issues page for more!)
This section covers some possible extensions or further work that can be done.
Git can run special scripts at various places in the Git workflow (which the system calls “hooks”). These scripts can do whatever you want and, in theory, can help a team with their development flow.
pre-commit
makes hook scripts extremely accessible to teams.
-
Install
pre-commit
# using pip $ pip install pre-commit==3.2.1
-
Set up the Git hook scripts
$ pre-commit install pre-commit installed at .git/hooks/pre-commit
That's it. Now pre-commit
will run automatically on git commit
!
Adding new DAGs is a fairly straightforward process. Create a new python file in the dags
folder. Create your dag object using the code below:
dag = DAG(
'dag_id',
default_args=get_default_dag_args(),
description='DAG description.',
schedule_interval=None,
)
The get_default_dag_args()
is defined in the dags/stellar-etl-airflow/default.py file.
Feel free to add more arguments or customize the existing ones. The documentation for a DAG is available here.
If you have created a new DAG, or wish to extend an existing DAG, you can add tasks to it by calling the various create_X_task
functions that are in the repository. See here for details on how to create dependencies between tasks.
Adding new tasks is a more involved process. You likely need to add a new python file in the dags/stellar_etl_airflow
folder. This file should include a function that creates and returns the new task, as well as any auxiliary functions related to the task.
Airflow has a variety of operators. The ones that are most likely to be used are:
- DockerOperator, which can be used to execute commands within a docker container
- KubernetesPodOperator, which can start new Kubernetes Pods
- PythonOperator, which can run Python functions
You may also find this list of Google-related operators useful for interacting with Google Cloud Storage or BigQuery.
An example of a simple task is the time task. This task converts a time into a ledger range using a stellar-etl command. Since it needs to use the stellar-etl, we need a KubernetesPodOperator. We provide the operator with the command, the task_id, the parent DAG, and some parameters specific to KubernetesPodOperator.
More complex tasks might require a good amount of extra code to set up variables, authenticate, or check for errors. However, keep in mind that tasks should be idempotent. This means that tasks should produce the same output even if they are run multiple times. The same input should always produce the same output.
You may find that you need to pass small amounts of information, like filenames or numbers, from one task to another. You can do so with Airflow's XCOM system.
Once you make a change, you can test it using the Airflow command line interface. Here's a quick outline of how to test changes:
- Run
kubectl get pods --all-namespaces
. Look for a pod that starts withairflow-worker
. - Run
kubectl -n <pod_namespace> exec -it airflow-worker-<rest_of_pod_name> -c airflow-worker -- /bin/bash
to get inside the worker - Run
airflow task test history_archive_export <task_id> <test_date>
. Note that if the task you changed has dependencies, you need to runairflow test
on those upstream tasks for the exact same date. - Run
airflow task test
on the tasks that depend on the the task you just changed. Ensure that they still perform as expected.
This guide can also be useful for testing deployment in a new environment. Follow this testing process for all the taks in your DAGs to ensure that they work end-to-end.
An alternative to the testing flow above is to trigger
the task in the Airflow UI. From there you are able to view the task status, log, and task details.