Урок 9. Аналитическая мощь ClickHouse как финальная точка DAG AirFlow
Если Postgres - это надежный банковский сейф, где каждая транзакция на вес золота, то ClickHouse - это промышленная мясорубка. Ему все равно, уникальны ли ваши записи (по умолчанию), он не поддерживает классические транзакции, но зато он умеет делать SELECT count(*) FROM hits по миллиарду строк за доли секунды. Для инженера Airflow работа с ClickHouse кардинально отличается от работы с обычными реляционными базами. Главное правило ClickHouse: Никогда не вставляйте данные по одной строке. Если вы напишете цикл в Python, который делает INSERT INTO table VALUES (...) миллион раз, вы положите кластер. ClickHouse любит, когда в него вставляют данные большими кусками (батчами) по 10–100 тысяч строк за раз. И Airflow должен уметь это организовать.
Шаг 1. Добавляем ClickHouse в инфраструктуру ETL pipeline Airflow
Расширим наш docker-compose.yaml. ClickHouse очень экономен к ресурсам, поэтому для тестов нам хватит минимальной конфигурации. Добавьте этот сервис: clickhouse: image: clickhouse/clickhouse-server:latest ports: - "8123:8123" # HTTP порт (для веб-клиентов и некоторых драйверов) - "9000:9000" # Нативный TCP порт (самый быстрый, для Python-драйвера) ulimits: nofile: soft: 262144 hard: 262144 healthcheck: test: interval: 30s timeout: 10s retries: 3 Не забудьте docker-compose up -d. Проверить работу можно, открыв localhost:8123 в браузере (должен ответить "Ok"). Также нам понадобится провайдер для Airflow. Добавьте в ваш Dockerfile: RUN pip install apache-airflow-providers-clickhouse clickhouse-driver И пересоберите образ.
Шаг 2. Настройка Connection: HTTP vs Native
В Airflow есть путаница с типами подключений к ClickHouse. HTTP (порт 8123): Проще, работает через requests. Надежно, но чуть медленнее на огромных объемах. Native (порт 9000): Работает через бинарный TCP-протокол. Это выбор чемпионов. Библиотека clickhouse-driver использует именно его. Настроим соединение clickhouse_native. Admin -> Connections -> Add Conn Id my_clickhouse Conn Type ClickHouse (если провайдер установлен корректно) Host clickhouse (имя сервиса Docker) Login/Password default / (пусто), если не меняли настройки Port 9000 (для нативного протокола)
Шаг 3. Практика: Загрузка данных из S3 в ClickHouse
У нас есть два пути загрузки данных, и выбор зависит от объема. "Ленивый" (через движок S3) ClickHouse настолько крут, что умеет сам ходить в S3 и забирать данные, вообще не нагружая Airflow. Airflow просто посылает команду: "Эй, ClickHouse, вот бакет, забери файлы". Это лучший способ для больших данных (ГБ и ТБ). "Классический ETL" (Python Driver) Airflow читает файл, преобразует его (например, меняет формат дат) и вставляет в ClickHouse. Этот способ мы разберем подробно, так как он учит работать с батчами и хуками. Напишем DAG, который берет CSV из S3 (результат прошлых статей) и вставляет его в таблицу user_stats. Подготовка таблицы (DDL) Сначала создадим таблицу. Обратите внимание на движок MergeTree - это стандарт для аналитики. CREATE TABLE IF NOT EXISTS user_stats ( date Date, name String, count UInt32 ) ENGINE = MergeTree() ORDER BY date; Код Airflow DAG: s3_to_clickhouse.py from airflow import DAG from airflow.operators.python import PythonOperator from airflow_clickhouse_plugin.hooks.clickhouse import ClickHouseHook # Или стандартный from airflow.providers.amazon.aws.hooks.s3 import S3Hook from datetime import datetime import io import csv def load_data_to_clickhouse(**context): # 1. Читаем данные из S3 s3_hook = S3Hook(aws_conn_id="minio_s3") bucket = "airflow-bucket" key = "users_export_2023-01-01.csv" # В реальности используйте шаблоны {{ ds }} # Скачиваем файл в память (для больших файлов лучше стримить или качать на диск!) obj = s3_hook.get_key(key, bucket) file_content = obj.get().read().decode('utf-8') # Парсим CSV в список кортежей # ClickHouse драйвер ждет список: data = reader = csv.DictReader(io.StringIO(file_content)) for row in reader: data.append(( row, row, int(row.get('count', 1)) # Защита от пустых значений )) print(f"Подготовлено {len(data)} строк для вставки.") # 2. Вставляем в ClickHouse # Используем execute с параметром params для bulk-вставки ch_hook = ClickHouseHook(clickhouse_conn_id="my_clickhouse") sql = "INSERT INTO user_stats (date, name, count) VALUES" # Магия clickhouse-driver: мы передаем список данных вторым аргументом. # Драйвер сам разобьет это на блоки и отправит бинарным потоком. # Это В РАЗЫ быстрее, чем циклы INSERT. ch_hook.execute(sql, data) print("Вставка завершена.") with DAG( dag_id="s3_to_clickhouse_loader", start_date=datetime(2023, 1, 1), schedule=None, catchup=False ) as dag: # 0. Создаем таблицу (лучше вынести в отдельный скрипт миграций, но для теста сойдет) create_table = PythonOperator( task_id="init_table", python_callable=lambda: ClickHouseHook(clickhouse_conn_id="my_clickhouse").execute( "CREATE TABLE IF NOT EXISTS user_stats (date Date, name String, count UInt32) ENGINE = MergeTree() ORDER BY date" ) ) # 1. Грузим данные load_task = PythonOperator( task_id="load_from_s3", python_callable=load_data_to_clickhouse ) create_table >> load_task
Тонкости и подводные камни
Работа с ClickHouse в Airflow полна нюансов, о которых не пишут в Quickstart-гайдах. Проблема идемпотентности (Дубликаты) ClickHouse не проверяет уникальность (Primary Key) при вставке в обычный MergeTree. Если вы запустите DAG два раза, у вас будет двойной объем данных. Решение для новичков: Перед вставкой делать ALTER TABLE ... DELETE WHERE date = '{{ ds }}'. Но в ClickHouse операции удаления (Mutation) - тяжелые и асинхронные. Решение для профи: Использовать движок ReplacingMergeTree (он схлопывает дубликаты в фоне) или вставлять данные во временную таблицу, а потом делать EXCHANGE PARTITION (атомарная замена куска данных). Типизация Postgres простит вам, если вы передадите число как строку "123". ClickHouse при вставке через нативный протокол строг. Если колонка UInt32, а вы суете str, драйвер упадет. Всегда явно приводите типы в Python (как мы сделали int(row)). Таймауты ClickHouse быстрый, но если вы попытаетесь вставить 10 ГБ одним запросом, соединение может разорваться. Совет: Разбивайте данные на чанки (chunks) по 10–50 тысяч строк внутри Python-кода и делайте ch_hook.execute в цикле. Альтернатива: ClickHouseOperator В провайдере есть готовый ClickHouseOperator. Он удобен для простых SQL-команд (оптимизация, удаление, создание таблиц). from airflow_clickhouse_plugin.operators.clickhouse import ClickHouseOperator optimize_table = ClickHouseOperator( task_id="optimize_user_stats", clickhouse_conn_id="my_clickhouse", sql="OPTIMIZE TABLE user_stats FINAL" ) Используйте его для сервисных задач, а загрузку данных делайте через Python/Hooks, так как вам нужен контроль над форматом данных. Исправьте финальные варианты кода Dags и конфигурационных файлов и при необходимости сравните с нашими на GitHub где лежит код к Уроку 9.
Помощь Cursor: Генерация SQL и кода вставки
ClickHouse SQL (диалект) местами специфичен. Cursor поможет не лезть в документацию за синтаксисом движков. Промпт 1 (DDL): "Напиши SQL для создания таблицы ClickHouse events, которая хранит логи веб-сайта (timestamp, user_id, url). Используй движок MergeTree, партиционирование по месяцам и TTL (время жизни), чтобы удалять данные старше года." Промпт 2 (Оптимизация вставки): "Посмотри на этот Python-код вставки в ClickHouse. Перепиши его так, чтобы использовать генератор (generator) и вставлять данные батчами по 20 000 строк, чтобы не перегружать оперативную память." Итог: Мы построили полный цикл: Данные -> Postgres -> S3 -> Обработка -> ClickHouse. Теперь в нашей базе лежат "золотые" данные, готовые к построению графиков в Grafana или Superset. Но есть одна проблема, с которой вы столкнетесь, когда дагов станет 50 штук. Как не писать один и тот же код 50 раз? Как создавать DAG-и динамически, на основе конфигурационных файлов, а не копипасты? В финальной статье мы поговорим про Best Practices, динамическую генерацию DAG-ов и организацию "чистого кода" в Airflow. Готовы к рефакторингу и высшему пилотажу?
Использованные референсы и материалы
ClickHouse Python Driver Documentation https://clickhouse-driver.readthedocs.io/en/latest/ Как работать с нативным TCP-протоколом ClickHouse из Python. Optimizing Bulk Inserts https://clickhouse.com/docs/en/optimize/bulk-inserts/ Почему в ClickHouse нельзя вставлять данные построчно, и зачем нам нужны батчи. MergeTree Table Engine https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/mergetree/ Как устроен основной движок таблиц и почему важен ключ сортировки. Полный перечень статей Бесплатного курса "Apache Airflow для начинающих" Урок 1. Apache Airflow с нуля: Архитектура, отличие от Cron и запуск в Docker Урок 2. Масштабирование Airflow: Настройка CeleryExecutor и Redis в Docker Compose Урок 3. Работа с базами данных в Airflow: Connections, Hooks и PostgresOperator Урок 4. Airflow и S3: Интеграция с MinIO и Yandex Object Storage Урок 5. Airflow и Hadoop: Настройка WebHDFS и работа с сенсорами (Sensors) Урок 6. Запуск Apache Spark из Airflow: Гайд по SparkSubmitOperator Урок 7. Airflow и Dask: Масштабирование тяжелых Python-задач и Pandas Урок 8. Event-Driven Airflow: Запуск DAG по событиям из Apache Kafka Урок 9. Загрузка данных в ClickHouse через Airflow: Быстрый ETL и батчинг Урок 10. Airflow Best Practices: Динамические DAGи, TaskFlow API и Алертинг













