What is Airflow and the best contexts to use it?
To understand Airflow we must first start with the problem that saw to its creation. In 2015, a company we all know now, Airbnb, encountered an issue. There was massive expansion, which is always a good thing, but so was the volume of data they had to deal with. This meant more data engineers, data scientist and analyst. A dedicated workforce tasked with creating automated processes by writing scheduled batch jobs. Just as how you would get more hands to bail out water pouring into a ship to remain afloat.
The came along Data Engineer Maxime Beauchemin, who created an open-sourced Airflow with the idea that it would allow them to quickly author, iterate on, and monitor their batch data pipelines. Needless to say that this ended up being far more than a lifesaver. It basically solved the issues that come with long-running cron tasks that execute bulky scripts.
Now Apache Airflow is a platform that is used to programmatically authoring, scheduling, and monitoring workflows. It is completely open-source and is especially useful in architecting complex data pipelines. An added bonus is that since it is written in Python, you're able to interface with any third party python API or database to extract, transform, or load your data into its final destination.
With Airflow, workflows are architected and expressed as DAGs, with each step of the DAG defined as a specific Task. It is designed with the belief that all ETL (Extract, Transform, Load data processing) is best expressed as code, and as such is a code-first platform that allows you to iterate on your workflows quickly and efficiently. As a result of its code-first design philosophy, Airflow allows for a degree of customizability and extensibility that other ETL tools do not support.
Also known as a Directed Acyclic Graph, is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.
A DAG is defined in a Python script, which represents the DAGs structure (tasks and their dependencies) as code.
Imagine a scenario where you have three tasks, task A has to be completed before tasks B and if task B fails to run you can retry up to 5 times. Whereas task C can be run at any given time but after a certain date or/and timestamp.
From the above example, you would notice that a DAG describes how you want to carry out a certain workflow; but at the same time, we did not indicate anything about what is to be done. A, B, and C could be anything. Maybe A prepares data for B to analyze while C sends a report. Or perhaps A tracks your location so B can open a certain door while C turns on the lights once you're in. The main point here is that the DAG isn’t concerned with what the individual tasks do; its job is to make sure that whatever they do happens at the right time, or in the right order, or with the right handling of any unexpected issues.
To define a DAG, you need to declare them within a standard Python file that is then placed in the Airflow’s DAG_FOLDER. Airflow will execute the code in each file to dynamically build the DAG objects. You can have as many DAGs as you want, each describing an arbitrary number of tasks. In general, each one should correspond to a single logical workflow.
Simply stated, execution_date is the logical date and time which the DAG Run, and its task instances, are running for.
This enables task instances to process data for the desired logical date & time. While a task_instance or DAG run might have a physical start date of now, their logical date might be a few months back because of other processes that were being loaded at the time.
A DAG run and all task instances built within it are instanced with the same executiondate so that logically you can think of a DAG run as simulating the DAG running all of its tasks at some previous date & time specified by the executiondate.
When an operator is instantiated, it is referred to as a “task”. The instantiation establishes specific values when calling the abstract operator, and the parameterized task evolves into a node in a DAG.
A task instance represents a specific run of a task and is defined as the combination of a DAG, a task, and a point in time. Task instances have an indicative state, represented by “running”, “success”, “failed”, “skipped”, “up for retry”, etc.
A task goes through various stages from start to completion. In the Airflow UI (graph and tree views), these stages are displayed by a colour representing each stage.
If you recall earlier we mentioned that a DAG can have multiple tasks and does not concern itself with what the task actually does. So while DAGs gives us a description on how to run a workflow, Operators determine what actually gets done by a task.
So then an operator describes a single task in a workflow. Operators are usually (but not always) atomic, meaning they can stand on their own and don’t need to share resources with any other operators. The job of the DAG hence is to make sure that operators run in the correct order; other than those dependencies, operators generally run independently. It might, in fact, run on two completely different machines.
Another point to note is that if two operators need to share information, like a filename or small amount of data, it would be best to consider merging them into a single operator. If it absolutely can’t be avoided, Airflow does have a feature for operator cross-communication called XCom.
Airflow provides operators for many common tasks, including:
- BashOperator - executes a bash command
- PythonOperator - calls an arbitrary Python function
- EmailOperator - sends an email
- SimpleHttpOperator - sends an HTTP request
- MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, etc. - executes a SQL command
- Sensor - an Operator that waits (polls) for a certain time, file, database row, S3 key, etc…
In addition to these basic building blocks, there are many more specific operators: DockerOperator, HiveOperator, S3FileTransformOperator, PrestoToMySqlTransfer, SlackAPIOperator… and the list goes on.
But keep in mind that Operators are only loaded by Airflow if they are assigned to a DAG.
Scheduler based on Celery
One of the schedulers that can be used by Airflow is called Celery. It is an asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation but supports scheduling as well.
The execution units, called tasks, are executed concurrently on a single or more worker server using multiprocessing, Eventlet, or event. Tasks can execute asynchronously (in the background) or synchronously (wait until ready).
A use case scenario for Celery is in production systems where millions of tasks are processed each day.
Airflow itself does have it's own scheduler. The Airflow scheduler monitors all tasks and all DAGs and triggers the task instances whose dependencies have been met. Behind the scenes, it spins up a subprocess, which monitors and stays in sync with a folder for all DAG objects it may contain, and periodically (every minute or so) collects DAG parsing results and inspects active tasks to see whether they can be triggered.
The Airflow scheduler is designed to run as a persistent service in an Airflow production environment. To get it started, you need to execute airflow scheduler. It will use the configuration specified in airflow.cfg.
Note that if you run a DAG on a schedule_interval of one day, the run stamped 2020-01-01 will be triggered soon after 2020-01-01T23:59. In other words, the job instance is started once the period it covers has ended.
Some systems can get overwhelmed when too many processes hit them at the same time. Airflow pools can be used to limit the execution parallelism on arbitrary sets of tasks. The list of pools is managed in the UI (Menu -> Admin -> Pools) by giving the pools a name and assigning it a number of worker slots. Tasks can then be associated with one of the existing pools by using the pool parameter when creating tasks (i.e., instantiating operators).
The pool parameter can be used in conjunction with priorityweight to define priorities in the queue, and which tasks get executed first as slots open up in the pool. The default priorityweight is 1 and can be bumped to any number. When sorting the queue to evaluate which task should be executed next, we use the priorityweight, summed up with all of the priorityweight values from tasks downstream from this task. You can use this to bump a specific important task and the whole path to that task gets prioritize accordingly.
Tasks will be scheduled as usual while the slots fill up. Once capacity is reached, runnable tasks get queued and their state will show as such in the UI. As slots free up, queued tasks start running based on the priority_weight (of the task and its descendants).
Note that if tasks are not given a pool, they are assigned to a default pool defaultpool. defaultpool is initialized with 128 slots and can change through the UI or CLI (though it cannot be removed).
Monitoring the correctness and performance of your airflow jobs (dag runs) should be a core concern of a BI development team. Airflow has good support for basic monitoring of your jobs:
- SLA misses: airflow is able to send out an email bundling all SLA misses for a specific scheduling interval.
- EmailOperator: airflow can send out emails when a specific point in a DAG is reached
- Sending notifications to popular online services like Slack or HipChat.
- The airflow dashboard itself, which you can manually refresh to verify which jobs ran successfully or failed recently.
- A PythonOperator, which can track the output of a particular task and then make more informed decisions based on the context in which it is run and then branch to a specific branch of execution for notification purposes. This type of monitoring is very useful to track execution errors, but there are some cases where you need more sophisticated methods, for example in the case of data quality monitoring. This is where some great online services like Datadog come in.
Airflow provides countless benefits to those in the pipeline business. Its benefits can basically be categorized as code quality and code visibility. It provides developers with a better way to build data pipelines by serving as a sort of 'framework' for creating pipelines. In the same way, a web framework might help developers by abstracting common patterns,
It does this by providing data engineers with tools to simplify certain repetitive aspects of pipeline creation. Airflow comes with numerous powerful integrations that serve almost any need when it comes to output data. By leveraging these tools, engineers begin to see their pipelines abiding by a well-understood format, making code readable to others.
But what really makes Airflow stand out is it's GUI. By allowing data engineers to handle multiple pipelines they save themselves endless headaches that come from those very pipelines being prone to failure. They gain immediate visibility across all our pipelines to quickly spot areas of failure. Even more impressive is that the code we write is visually represented in Airflow's GUI. Not only can we check the heartbeat of our pipelines, but we can also view graphical representations of the very code we write.