Documentation
⏳ Airflow
Что такое Airflow

Куратор раздела

Владимир Шустиков
Владимир Шустиков

Напутственные слова перед изучением материала

!!!Сюда стоит лезть, после изучения курсов и статей по Python, SQL, Docker, Командной строки Linux!!!

Данная статья охватывает основы работы с оркестратором AirFlow. Рассмотрим кратко теорию, которую спрашивают достаточно часто, а так же посмотрим, как локально развернуть AirFlow с помощью docker-compose в простом примере одного из тестовых заданий по AirFlow.

Приятного погружения.


AirFlow

Возьмём определение с официального репозитория AirFlow (opens in a new tab).

AirFlow — это платформа для программирования, планирования и мониторинга рабочих процессов.

А в общем и целом нужно запомнить, что AirFlow это оркестратор (не ELT-инструмент), в котором есть возможность прописывать ETL процессы, на языке Python. Каждый такой процесс представляет собой DAG, состоящий из определённых задач.

AirFlow

DAG

DAG(Directed Acyclic Graph, направленный ациклический граф) представлет из себя набор тАсок(задач), идущих последовательно друг за дружкой, либо параллельно, которые нельзя зациклить по кругу, т.е. своего рода строится прямолинейный конвеер обработки данных.

AirFlow

Архитектура AirFlow

AirFlow состоит из четырёх основных, взаимосвязанных компонентов:

  • Планировщик AirFlow(Scheduler)
  • Исполнитель(Executor)
  • Воркеры (Workers)
  • Веб-сервер AirFlow

AirFlow

Задачи планировщика

  • Анализ графа;
  • Проверка параметра scheduler_interval, который определяет частоту выполнения DAG;
  • Планирование очереди графов.

Задачи Исполнителя

  • Запуск задач;
  • Распеделение задач между воркерами.

Задачи воркеров

  • Получает задачи от исполнителя;
  • Отвечает за полное выполнение задач.

Задачи Веб-сервера AirFlow

  • Визуализация DAG'а, который проанализировал планировщик;
  • Предоставлять интерфейс пользователю для отслеживания работы DAG.

Инициализация DAG и его основные параметры

В Python-файле DAG инициализируется очень просто, необходимо импортировать класс DAG из библиотки AirFlow:

from airflow import DAG

И далее создать экземпляр объекта. Обычно переменную экземпляра принято называть dag.

dag = DAG(
    перечисление свойств
)

Именно этот экземпляр класса передаёются в дальнейшем каждому оператору, так оператор понимает, что принадлежит именно этому экземпляру.

PythonOperator(
  ...
  dag=dag
)

Свойства экземпляра DAG

У экземпляра DAG есть большое количество свойств, но остановлюсь я только на 3х, постоянно встречающихся в каждом DAG-экземпляре

  1. dag_id — идентификатор DAG. Именно этот идентификатор проставляется в имени DAG на главной странице Airflow.
dag = DAG(
    dag_id = "load_file_to_psql"
    ...
)
  1. start_date — задается дата и время начала планирования запусков DAG. Обычно для задания даты используется библиотека datetime.
import datetime as dt
 
dag = DAG(
    dag_id = "load_file_to_psql"
    start_date = dt.datetime(2024, 11, 13)
    ...
)
  1. schedule_interval — планируемое время запуска DAG.
import datetime as dt
 
dag = DAG(
    dag_id = "load_file_to_psql"
    start_date = dt.datetime(2024, 11, 13)
    schedule_interval = dt.timedelta(days=5) # раз в 5 дней, т.е. 13, 18, 23 и т.д.
    ...
)

В schedule_interval есть несколько способов задать время запуска. Ради примера в скобках будет указан ежедневный интервал.

  • CRON — о нём во всех подробностях описано здесь. (schedule_interval = '0 0 * * *')
  • timedelta — из библиотеки datetime (schedule_interval = dt.timedelta(days=5))
  • макросы (schedule_interval = '@daily')
МакросЗначение
@onceОдин и только один раз
@hourlyЗапуск один раз в час в начале часа
@dailyЗапуск один раз в день в полночь
@weeklyЗапуск один раз в неделю в полночь в воскресенье утром
@monthlyЗапуск один раз в месяц в полночь первого числа месяца
@yearlyЗапуск один раз в год в полночь 1 января

Если мы хотим запускать DAG вручную, то необходимо прописать значение None (schedule_interval = None)

Операторы AirFlow

Как вам уже известно, конвеер обработки является направленным ацикличным графом, в свою очередь DAG состоит из определённых задач, которые определяются операторами.

В AirFlow существует множество операторов, с их полным списком можно ознакомиться здесь (opens in a new tab)

Вот примеры часто встречаемых операторов, с которыми мы познакомимся в примере ниже:

  • PythonOperator
  • BashOperator
  • PostgresOperator

Думаю, из названия и так понятно, для чего они нужны. Так что тут без комментариев

Передача данных между задачами.

Существует 2 метода передачи данных между тасками в AirFlow:

  1. Механизм XCom;
  2. Сохранение данных в хранилищах.

Механизм XCom

XCom - позволяет обмениваться сообщениями между задачами. Предназначен он исключительно для небольших данных.

Согласно документации в зависимости от используемой базы данных метаинформации:

  • SQLite - до 2х Гб.
  • PostgreSQL - до 1 Гб.
  • MySQL - до 64 Кб.

!!!Запомните!!! XCom можно использовать для передачи небольших объемов данных, например, значение агрегации, количества строк в файле, даже можно небольшой файл передать, но в остальных случаях используйте внешние решения для хранения данных, как пример, сохраняйте все в каталог tmp и потом забирайте данные оттуда.

Определяется Xcom 2мя способами:

  • с помощью команд xcom_push и xcom_pull(с этим методом мы познакомимся в примере)
  • с помощью Taskflow API (декоратор @task, вот хорошая статься (opens in a new tab) с примером)

Пример тестового задания

Необходимо написать DAG, который будет выполнять следующие задачи:

1. С помощью PythonOperator необходимо сгенерировать тестовые данные и записать их в файл в каталог /tmp/data.csv ( для простоты можно взять 2 колонки - id, value )
2. С помощью BashOperator переместить файл в каталог /tmp/processed_data
3. C помощью PythonOperator нужно загрузить данные из файла в таблицу в Postgres ( таблицу можно предварительно создать )
4. После записи данных в таблицу последним таском выведите в логах сообщение о количестве загруженных данных.
 
С помощью XCom необходимо:
 
Передать путь до файла из п.1 в оператор в п.2.
Передать количество записей из п.3 в п.4

Запуск DAG'а

Для начала развернём с помощью Docker Compose, AirFlow.

Жми и копируй docker-compose.yaml в свой каталог.

Посмотреть docker-compose.yaml
x-airflow-common:
  &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.10.3}
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
    AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
    AIRFLOW__CORE__TEST_CONNECTION: 'Enabled'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
  volumes:
    - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
    - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
    - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
    - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:0"
  depends_on:
    &airflow-common-depends-on
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy
 
services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 10s
      retries: 5
      start_period: 5s
    restart: always
 
  redis:
    image: redis:7.2-bookworm
    expose:
      - 6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 30s
      retries: 50
      start_period: 30s
    restart: always
 
  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - "8080:8080"
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully
 
  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully
 
  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    environment:
      <<: *airflow-common-env
      DUMB_INIT_SETSID: "0"
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully
 
  airflow-triggerer:
    <<: *airflow-common
    command: triggerer
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully
 
  airflow-init:
    <<: *airflow-common
    entrypoint: /bin/bash
    command:
      - -c
      - |
        if [[ -z "${AIRFLOW_UID}" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
          echo "If you are on Linux, you SHOULD follow the instructions below to set "
          echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
          echo "For other operating systems you can get rid of the warning with manually created .env file:"
          echo "    See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
          echo
        fi
        one_meg=1048576
        mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
        cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
        disk_available=$$(df / | tail -1 | awk '{print $$4}')
        warning_resources="false"
        if (( mem_available < 4000 )) ; then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
          echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
          echo
          warning_resources="true"
        fi
        if (( cpus_available < 2 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
          echo "At least 2 CPUs recommended. You have $${cpus_available}"
          echo
          warning_resources="true"
        fi
        if (( disk_available < one_meg * 10 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
          echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
          echo
          warning_resources="true"
        fi
        if [[ $${warning_resources} == "true" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
          echo "Please follow the instructions to increase amount of resources available:"
          echo "   https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
          echo
        fi
        mkdir -p /sources/logs /sources/dags /sources/plugins
        chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
        exec /entrypoint airflow version
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_MIGRATE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
      _PIP_ADDITIONAL_REQUIREMENTS: ''
    user: "0:0"
    volumes:
      - ${AIRFLOW_PROJ_DIR:-.}:/sources
 
  airflow-cli:
    <<: *airflow-common
    profiles:
      - debug
    environment:
      <<: *airflow-common-env
      CONNECTION_CHECK_MAX_COUNT: "0"
    command:
      - bash
      - -c
      - airflow
 
  flower:
    <<: *airflow-common
    command: celery flower
    profiles:
      - flower
    ports:
      - "5555:5555"
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully
 
  server_psql:
    image: postgres:14
    container_name: server_psql
    restart: always
    ports:
      - '5431:5432'
    environment:
      POSTGRES_USER: user
      POSTGRES_PASSWORD: user
      POSTGRES_DB: user
 
volumes:
  postgres-db-volume:

С помощью WSL или любой командной строки, где ты работаешь с Docker, переходишь в каталог со скаченным или скопированным файлом и выполняешь команду:

docker compose up -d

Пока у тебя качаются и поднимаются контейнеры, скачай следующий файл test_task.py

Посмотреть test_task.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from hashlib import md5
from sqlalchemy import create_engine
import pandas as pd
 
 
default_args = {
    "owner": "@Shust_DE",
    "depends_on_past": False,
    "start_date": datetime(2024, 11, 13),
    "email": ["https://t.me/Shust_DE"],
    "schedule_interval": "@hourly",
}
 
 
def _generate_file(**kwargs):
    ti = kwargs["ti"]
    path_filename = "/tmp/data.csv"
    table = [
        (i, md5(int(i).to_bytes(8, "big", signed=True)).hexdigest())
        for i in range(1, 100)
    ]
    table = pd.DataFrame(table, columns=["id", "md5_id"])
    table.to_csv(path_filename, index=False)
    ti.xcom_push(key="path_file", value=path_filename)
 
 
def _data_in_postgres(**kwargs):
    ti = kwargs["ti"]
    df = pd.read_csv("/tmp/processed_data/data.csv")
    engine = create_engine("postgresql://user:user@host.docker.internal:5431/user")
    df.to_sql("table_name", engine, if_exists="append", schema="public", index=False)
    ti.xcom_push(key="count_string", value=len(df))
 
 
dag = DAG(dag_id="load_file_to_psql", default_args=default_args)
 
generate_file = PythonOperator(
    task_id="generate_file",
    python_callable=_generate_file,
    dag=dag,
)
 
 
move_data_file = BashOperator(
    task_id="move_data_file",
    bash_command=(
        "mkdir -p /tmp/processed_data/ && "
        "mv {{ ti.xcom_pull(task_ids='generate_file', key='path_file') }} /tmp/processed_data/"
    ),
    dag=dag,
)
 
 
create_table_psql = PostgresOperator(
    task_id="create_table",
    postgres_conn_id="psql_connection",
    sql=""" DROP TABLE IF EXISTS table_name;
                CREATE TABLE table_name ( id int, 
                             md5_id text); """,
)
 
 
data_in_postgres = PythonOperator(
    task_id="data_in_postgres",
    python_callable=_data_in_postgres,
    dag=dag,
)
 
 
print_count_string_in_df = BashOperator(
    task_id="print_count_string_in_df",
    bash_command=(
        """echo "В таблице {{ ti.xcom_pull(task_ids='data_in_postgres', key='count_string') }} строк" """
    ),
    dag=dag,
)
 
 
(
    generate_file
    >> move_data_file
    >> create_table_psql
    >> data_in_postgres
    >> print_count_string_in_df
)
 

После того, как контейнеры поднимутся, в каталоге с docker-compose.yaml появятся 4 каталога

config, dags, logs, plugins.

Необходимо файл test_task.py переместить в каталог dags.

Теперь заходим в браузер и в URL-строке прописываем:

localhost:8080

Тем самым переходим на стартовую страницу.

AirFlow

Логин: airflow
Пароль: airflow

После входа нас перебрасывает на главную страницу, где мы увидим DAG, который мы скинули в каталог(если его там нет, подождите минут 5 и обновите страницу, в крайнем случае рестартаните Docker Compose)

Перед тем, как запустить DAG, необходимо прописать коннект к БД PostgreSQL, которая присутствует в Docker Compose файле.

Для это в верхней строке меню наводим курсор на Admin и выбираетм пункт Connections. Нажимаем синий плюсик в открывшимся окне подключения, прописываем следующие параметры:

ПараметрЗначение
Connection Idpsql_connection
Connection TypePostgres
Hosthost.docker.internal
Databaseuser
Loginuser
Passworduser
Port5431
Extraмогут быть кавычки, необходимо оставить пустое поле, иначе будет ошибка подключения.

AirFlow

Нажимаем Save

Возвращаемся на главную страницу и заходим в наш DAG, жамкув по его имени.

Далее запускаем DAG, нажав на Trigger DAG(треугольник) в правом верхнем углу страницы.

Всё поздавляю, вы запустили свой первый DAG, теперь разберёмся с кодом, написанным в DAG'е.

Поясняю за код

Для начала необходимо инициализировать DAG с помощью класса DAG, для этого импортируем класс из библиотеки airflow.

from airflow import DAG

Инициализация DAG происходит следующим образом:

dag = DAG(
    dag_id="load_file_to_psql",
    default_args=default_args
)

Чтобы не захламлять сам класс DAG, аргументы можно передать через параметр default_args и объявить словарь с параметрами отдельно.

default_args = {
  'owner': '@Shust_DE',
  'depends_on_past': False,
  'start_date': datetime.now(),
  'email': ['https://t.me/Shust_DE'],
  'schedule_interval': "@hourly",
}

Далее берём первый пунт задания. Для генерации файла я использую библиотеку pandas. Следующий код генерирует данные и сохраняет их в файл.

import pandas as pd
 
path_filename = '/tmp/data.csv'
table = [(i, md5(int(i).to_bytes(8, 'big', signed=True)).hexdigest()) for i in range(1, 100)]
table = pd.DataFrame(table, columns=['id', 'md5_id'])
table.to_csv(path_filename, index=False)

Помещаем данный фрагмент в функцию _generate_file. Но не забываем, что путь к файлу нам необходимо передать через XCom. Поэтому необходимо в функцию передать все аргументы DAG'а, т.е. переменную kwargs. Из данного словаря нам нужен ключ ti (или task_instans, это одно и тоже, ссылка на один и тот же объект). Поэтому приравниваем его к переменной ti, чтобы не запутаться.

ti = kwargs['ti']

Далее необходимо у объекта вызвать метод xcom_push и передать ключ и значение XCom переменной.

ti.xcom_push(key='path_file', value=path_filename)

Объединяем и получаем полноценную функцию _generate_file:

def _generate_file(**kwargs):
  ti = kwargs['ti']
  path_filename = '/tmp/data.csv'
  table = [(i, md5(int(i).to_bytes(8, 'big', signed=True)).hexdigest()) for i in range(1, 100)]
  table = pd.DataFrame(table, columns=['id', 'md5_id'])
  table.to_csv(path_filename, index=False)
  ti.xcom_push(key='path_file', value=path_filename)

В PythonOperator нам просто нужно передать данную функцию следующим образом:

generate_file = PythonOperator(
    task_id='generate_file',
    python_callable=_generate_file,
    dag=dag,
)

За второй пункт отвечает следующая конструкция:

move_data_file = BashOperator(
  task_id="move_data_file",
  bash_command=("mkdir -p /tmp/processed_data/ && "
          "mv {{ ti.xcom_pull(task_ids='generate_file', key='path_file') }} /tmp/processed_data/"),
  dag=dag,
)

Здесь применяется команда создания каталога и проверки его на существование

mkdir -p /tmp/processed_data/

И, соотвественно, перемещение самого файла

mv {{ ti.xcom_pull(task_ids='generate_file', key='path_file') }} /tmp/processed_data/

Как можно заметить, так как нам нужно достать путь к файлу из XCom в Bash-конструкцию, мы воспользовались Jinja-шаблонизатором для языка Python.

Для пункта 3 изначально создается таблица с помощью оператора PostgresOperator в конструкции:

create_table_psql = PostgresOperator(
	task_id='create_table',
    postgres_conn_id='psql_connection',
    sql=""" DROP TABLE IF EXISTS table_name;
            CREATE TABLE table_name ( id int,
                                      md5_id text);
        """
)

Думаю, тут и так все понятно и комментарии излишни. А если ты не знаешь, что делает SQL код, то какого лешего ты тут забыл, иди учи SQL!

Для загрузки данных используется следующий код:

df = pd.read_csv("/tmp/processed_data/data.csv")
engine = create_engine('postgresql://user:user@host.docker.internal:5431/user')
df.to_sql('table_name', engine, if_exists='append', schema='public', index=False)

Но так как у нас есть дополнительное условие, а именно в XCom переменную положить количество строк в файл, то опять обращаемся к ti.

ti = kwargs['ti']
ti.xcom_push(key='count_string', value=len(df))

Объединяем, и получается следующая функция _data_in_postgres:

def _data_in_postgres(**kwargs):
  ti = kwargs['ti']
  df = pd.read_csv("/tmp/processed_data/data.csv")
  engine = create_engine('postgresql://user:user@host.docker.internal:5431/user')
  df.to_sql('table_name', engine, if_exists='append', schema='public', index=False)
  ti.xcom_push(key='count_string', value=len(df))

Теперь необходимо вывести просто количество строк, делаем все тоже самое, как и при переносе файла. За это отвечается конструкция:

print_count_string_in_df = BashOperator(
  task_id="print_count_string_in_df",
  bash_command=('''echo "В таблице {{ ti.xcom_pull(task_ids='data_in_postgres', key='count_string') }} строк" '''),
  dag=dag,
)

Ну и наконец, нужно построить граф, для этого последовательно выстраиваем операторы:

(
    generate_file >> 
    move_data_file >> 
    create_table_psql >> 
    data_in_postgres >> 
    print_count_string_in_df
)

Чтобы посмотреть результат последнего пункта, во вкладке graph выдели последнюю зелёную таску (print_count_string_in_df), и появятся дополнительные вкладки. Перейдите во вкладку Logs, и вы увидите запись, представленную на скриншоте.

AirFlow