Create a Comprehensive Tutorial for Apache Airflow: Constructing Data Pipelines Using Python

Apache Airflow has become the de facto library for pipeline orchestration in the Python ecosystem. It has gained popularity, contrary to similar solutions, due to its simplicity and extensibility. In this article, I will attempt to outline its main concepts and give you a clear understanding of when and how to use it.

Why and when should I consider Airflow?

Imagine that you want to build a machine learning pipeline that consists of several steps such as:

Read an image dataset from a cloud-based storage

Process the images

Train a deep learning model with the downloaded images

Upload the trained model in the cloud

Deploy the model

How would you schedule and automate this workflow? Cron jobs are a simple solution but they come with many problems. Most importantly, they won’t allow you to scale effectively. On the other hand, Airflow offers the ability to schedule and scale complex pipelines easily. It also enables you to automatically re-run them after failure, manage their dependencies and monitor them using logs and dashboards.

Before we build the aforementioned pipeline, let’s understand the basic concepts of Apache Airflow.

What is Airflow?

Apache Airflow is a tool for authoring, scheduling, and monitoring pipelines. As a result, is an ideal solution for ETL and MLOps use cases. Example use cases include:

Extracting data from many sources, aggregating them, transforming them, and store in a data warehouse.

Extract insights from data and display them in an analytics dashboard

Train, validate, and deploy machine learning models

Key components

When installing Airflow in its default edition, you will see four different components.

Webserver: Webserver is Airflow’s user interface (UI), which allows you to interact with it without the need for a CLI or an API. From there one can execute, and monitor pipelines, create connections with external systems, inspect their datasets, and many more.

Executor: Executors are the mechanism by which pipelines run. There are many different types that run pipelines locally, in a single machine, or in a distributed fashion. A few examples are LocalExecutor, SequentialExecutor, CeleryExecutor and KubernetesExecutor

Scheduler: The scheduler is responsible for executing different tasks at the correct time, re-running pipelines, backfilling data, ensuring tasks completion, etc.

PostgreSQL: A database where all pipeline metadata is stored. This is typically a Postgres but other SQL databases are supported too.

airflow-ui

The easiest way to install Airflow is using docker compose. You can download the official docker compose file from here:

$ curl -LfO ‘https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml’

Note that Airflow also resides on Pypi and can be downloaded using pip

Basic concepts of Airflow

In order to get started with Airflow, one has to be familiar with its main concepts, which can be a little tricky. So let’s try to demystify them.

DAGs

All pipelines are defined as directed acyclic graphs (DAGs). Any time we execute a DAG, an individual run is created. Each DAG run is separate from another and contains a status regarding the execution stage of the DAG. This means that the same DAGs can be executed many times in parallel.

dag-example

To instantiate a DAG, you can use the DAG function or with a context manager as follows:

from airflow import DAG

with DAG(

“mlops”,

default_args={

“retries”: 1,

},

schedule=timedelta(days=1),

start_date=datetime(2023, 1, 1)

) as dag:

The context manager accepts some global variables regarding the DAG and some default arguments. The default arguments are passed into all tasks and can be overridden on a per-task basis. The complete list of parameters can be found on the official docs.

In this example, we define that the DAG will start on 1/1/2023 and will be executed each day. The retries argument ensures that it will be re-run once after a possible failure.

Tasks

Each node of the DAG represents a Task, meaning an individual piece of code. Each task may have some upstream and downstream dependencies. These dependencies express how tasks are related to each other and in which order they should be executed. Whenever a new DAG run is initialized, all tasks are initialized as Task instances. This means that each Task instance is a specific run for the given task.

complex-dag

Operators

Operators can be viewed as templates for predefined tasks because they encapsulate boilerplate code and abstract much of their logic. Some common operators are BashOperator, PythonOperator, MySqlOperator, S3FileTransformOperator. As you can tell, the operators help you define tasks that follow a specific pattern. For example, the MySqlOperator creates a task to execute a SQL query and the BashOperator executes a bash script.

Operators are defined inside the DAG context manager as below. The following code creates two tasks, one to execute a bash command and one to execute a MySQL query.

with DAG(

“tutorial”

) as dag:

task1 = BashOperator(

task_id=”print_date”,

bash_command=”date”,

)task2 = MySqlOperator(

task_id=”load_table”,

sql=”/scripts/load_table.sql”

)

Task dependencies

To form the DAG’s structure, we need to define dependencies between each task. One way is to use the >> symbol as shown below:

task1 >> task2 >> task3

Note that one task may have multiple dependencies:

task1 >> [task2, task3]

The other way is through the set_downstream, set_upstream functions:

t1.set_downstream([t2, t3])

XComs

XComs, or cross communications, are responsible for communication between tasks. XComs objects can push or pull data between tasks. More specifically, they push data into the metadata database where other tasks can pull from. That’s why there is a limit to the amount of data that can be passed through them. However, if one needs to transfer large data, they can use suitable external data storages such as object storage or NoSQL databases.

xcoms

Take a look at the following code. The two tasks are communicating via xcoms using the ti argument (short for task instance). The train_model task is pushing the model_path into the metadata database, which is pulled by the deploy_model task.

dag = DAG(

‘mlops_dag’,

)

def train_model(ti):

model_path = train_and_save_model()

ti.xcom_push(key=’model_path’, value=model_path)

def deploy_model(ti):

model_path = ti.xcom_pull(key=’model_path’, task_ids=’train_model’)

deploy_trained_model(model_path)

train_model_task = PythonOperator(

task_id=’train_model’,

python_callable=train_model,

dag=dag

)

deploy_model_task = PythonOperator(

task_id=’deploy_model’,

python_callable=deploy_model,

dag=dag

)

train_model_task >> deploy_model_task

Taskflow

The Taskflow API is an easy way to define a task using the Python decorator @task. If all the task’s logic can be written with Python, then a simple annotation can define a new task. Taskflow automatically manages dependencies and communications between other tasks.

Using the Taskflow API, we can initialize a DAG with the @dag decorator. Here is an example:

@dag(

start_date=datetime(2023, 1, 1),

schedule_interval=’@daily’

)

def mlops():

@task

def load_data():

. . .

return df

@taskdef preprocessing(data):

. . .

return data

@task

def fit(data):

return None

df = load_data()

data = preprocessing(df)

model = fit(data)

dag = mlops()

Note that dependencies between tasks are implied through each function arguments. Here we have a simple chaining order but things can get much more complex. Taskflow API also solves communication problem between tasks, so it’s limited need to use XComs.

Scheduling

Scheduling jobs is one of the core features of Airflow. This can be done using the `schedule_interval` argument which receives a cron expression, a `datetime.timedelta` object, or a predefined preset such as `@hourly`, `@daily` etc. A more flexible approach is to use the recently added timetables that let you define custom schedules using Python.

Here is an example of how to use the `schedule_interval` argument. The below DAG will be executed daily.

@dag(

start_date=datetime(2023,1,1),

schedule_interval = ‘@daily’,

catchup =False

)

def my_dag():

pass

Two very important concepts you need to understand regarding scheduling are backfilling and catchup.

Once we define a DAG, we set up a start date and a schedule interval. If `catchup=True`, Airflow will create DAG runs for all schedule intervals from the start date until the current date. If `catchup=False`, Airflow will schedule only runs from the current date.

Backfilling extends this idea by enabling us to create past runs from the CLI irrespective of the value of the `catchup` parameter:

`$ airflow backfill -s -e `

dag-runs

Connections and Hooks

Airflow provides an easy way to configure connections with external systems or services. Connections can be created using the UI, as environment variables, or through a config file. They usually require a URL, authentication info and a unique id. Hooks are an API that abstracts communication with these external systems. For example, we can define a PostgreSQL connection through the UI as follows:

connections

And then use the PostgresHook to establish the connection and execute our queries:

pg_hook = PostgresHook(postgres_conn_id=’postgres_custom’)

conn = pg_hook.get_conn()

cursor = conn.cursor()

cursor.execute(‘create table _mytable (ModelID int, ModelName varchar(255)’)

cursor.close()

conn.close()

Advanced concepts

To keep this tutorial as self-complete as possible, I need to mention a few more advanced concepts. I won’t go into many details for each one of them but I highly urge you to check them out, if you want to master Airflow.

Branching: Branching allows you to divide a task into many different tasks either for conditioning your workflow or for splitting the processing. The most common way is BranchPythonOperator.

Task Groups: Task Groups help you organize your tasks in a single unit. It’s a great tool to simplify your graph view and for repeating patterns.

Dynamic Dags

Latest articles

Related articles