Documentation
⚡ ClickHouse
Шпаргалка по ClickHouse

ClickHouse — Конспект

Составлен на основе лекции в ШАД`е, лекции на BC#5, курсового ноутбука и roadmapde.roadmappers.ru


Оглавление

  1. Что такое ClickHouse и зачем он нужен
  2. Архитектура: колоночное хранение
  3. OLAP vs OLTP: когда что использовать
  4. Кластеры, шарды и реплики
  5. Типы данных
  6. Создание таблиц: DDL и атрибуты колонок
  7. Партиционирование
  8. Индексирование: разреженный индекс и вторичные индексы
  9. Семейство движков MergeTree
  10. Вспомогательные движки хранения
  11. Интеграционные движки
  12. Distributed и Replicated: масштабирование
  13. Агрегатные функции и комбинаторы
  14. JOIN в ClickHouse
  15. Практические советы и паттерны

1. Что такое ClickHouse и зачем он нужен

ClickHouse — колоночная аналитическая база данных (OLAP) с открытым исходным кодом, разработанная Яндексом изначально для хранения данных Яндекс.Метрики. Сейчас развивается как самостоятельный open-source проект.

Ключевые характеристики:

  • Предназначен для больших объёмов данных (петабайты и выше)
  • Очень быстрое считывание и агрегация по колонкам
  • Быстрая пакетная вставка данных
  • MPP-архитектура (Massively Parallel Processing) — параллельная обработка на нескольких серверах
  • Не транзакционная БД — не предназначена для частых UPDATE/DELETE

Для чего НЕ подходит:

  • Частые UPDATE и DELETE (называются «мутациями» в ClickHouse и работают медленно)
  • OLTP-нагрузки (короткие транзакции, работа с отдельными строками)
Правило: ClickHouse = много данных + быстрое чтение + пакетная запись

2. Архитектура: колоночное хранение

Строковое хранение (Postgres / OLTP)

В строковых БД данные записываются построчно — каждая строка таблицы хранится как единый непрерывный блок данных. Это выгодно для:

  • быстрых UPDATE/DELETE (все данные строки рядом)
  • поиска конкретной записи по первичному ключу

Но медленно при агрегациях по большим таблицам — чтобы просуммировать одну колонку из 100, нужно прочитать все 100 колонок каждой строки.

Колоночное хранение (ClickHouse / OLAP)

Каждая колонка хранится в отдельном файле. Агрегация по одной колонке — это чтение одного файла, а не всей таблицы. Также даёт отличную степень сжатия: однотипные данные (например, массив целых чисел) сжимаются намного лучше, чем смешанные типы в строке.

Таблица: id | name | balance
В файловой системе:
  - id.bin    (только id)
  - name.bin  (только name)
  - balance.bin (только balance)

Преимущества:

  • Быстрая агрегация (COUNT, SUM, AVG, GROUP BY)
  • Эффективное сжатие данных
  • Векторизованное выполнение запросов

Недостатки:

  • Медленные UPDATE/DELETE (нужно обновить данные в каждом файле-колонке)
  • Нет полноценной поддержки ACID-транзакций

3. OLAP vs OLTP: когда что использовать

ХарактеристикаOLTP (Postgres)OLAP (ClickHouse)
Тип храненияСтроковоеКолоночное
Объём данныхГигабайтыТерабайты и петабайты
Основная операцияINSERT/UPDATE/DELETESELECT + агрегации
UPDATE/DELETEБыстроМедленно (мутации)
АгрегацииМедленноОчень быстро
СжатиеСлабоеОтличное
ПримерыPostgres, MySQLClickHouse, BigQuery

Мутации в ClickHouse — это как называются ALTER TABLE ... DELETE/UPDATE. Они работают путём пересборки всей таблицы (создаётся новая копия без удалённых строк), что крайне медленно. Избегайте их в production.


4. Кластеры, шарды и реплики

Однонодовый vs кластерный ClickHouse

Одиночный ClickHouse прост в работе, но кластер даёт два ключевых преимущества:

  • Шардирование — горизонтальное масштабирование: данные распределяются по нескольким серверам, запросы выполняются параллельно
  • Репликация — отказоустойчивость: копии данных лежат на нескольких серверах

Архитектура кластера

Пользователь

Прокси / Load Balancer (порт 8123)
    ↓             ↓
ClickHouse1    ClickHouse2    (порт 9000 — нативный)
  [Shard 1]    [Shard 2]
  [Replica]    [Replica]

Важно: При подключении к кластеру вы попадаете на конкретный сервер (в зависимости от нагрузки). Таблица, созданная только на одном сервере, не видна с других серверов.

Системная таблица кластера

-- Посмотреть состав кластера
SELECT * FROM system.clusters;

ON CLUSTER — создание объектов на всём кластере

Ключевое слово ON CLUSTER {cluster} говорит ClickHouse отправить DDL-команду на все серверы кластера.

-- Создать базу данных на всём кластере
CREATE DATABASE IF NOT EXISTS mydb ON CLUSTER '{cluster}';
 
-- Создать таблицу на всём кластере
CREATE TABLE IF NOT EXISTS mydb.events_local ON CLUSTER '{cluster}'
(
    id     UInt64,
    dt     DateTime,
    event  String
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(dt)
ORDER BY (id, dt);

Ограничения ON CLUSTER:

  • Работает только с DDL (CREATE, DROP, ALTER)
  • На INSERT, UPDATE — не работает
  • {cluster} — встроенный макрос, подставляет имя текущего кластера

Паттерн именования таблиц

Стандартная практика в production:

events_local  — локальная таблица (MergeTree), хранит данные
events        — distributed-таблица (ссылка на events_local)

Аналитики и приложения работают только с events, никогда с events_local.


5. Типы данных

Числовые типы

-- Целые числа (со знаком): Int8, Int16, Int32, Int64, Int128, Int256
i  Int8    -- -128 до 127
i  Int32   -- -2 млрд до 2 млрд
 
-- Беззнаковые: UInt8, UInt16, UInt32, UInt64, UInt128, UInt256
ui UInt8   -- 0 до 255
ui UInt64  -- 0 до 18 квинтиллионов
 
-- Вещественные (для математики, НЕ для финансов!)
fl Float32
fl Float64
 
-- Точная дробная арифметика (для финансов)
dc Decimal(9, 4)    -- 9 цифр всего, 4 после запятой
dc Decimal32(4)     -- алиас
dc Decimal64(4)

Строки и фиксированные строки

st  String          -- переменная длина, UTF-8
fst FixedString(5)  -- ровно N байт, быстрее для коротких фиксированных строк

Специальные типы

UID UUID            -- генерация: generateUUIDv4()
ip4 IPv4            -- '192.168.1.1'
ip6 IPv6            -- '2001:db8::1'

Дата и время

dt  Date            -- только дата: '2025-04-30'
dtm DateTime        -- дата + время (с секундами)
dtm64 DateTime64(3) -- дата + время с миллисекундами (параметр — точность)
 
-- Полезные функции
toDate('2025-01-01')
toDateTime('2025-01-01 12:00:00')
now()               -- текущее время
today()             -- сегодняшняя дата
toYYYYMM(dt)        -- числовое представление год+месяц: 202501

Составные типы

-- Массив
ar Array(UInt32)
-- Обращение: ar[1], ar.size0 (размер)
 
-- Кортеж (Tuple)
tu Tuple(UInt32, String)
 
-- Вложенная структура
ns Nested(col1 String, col2 UInt32, col3 Float32)
-- Обращение: ns.col1, ns.col2
 
-- Map
mp Map(String, UInt32)
-- Обращение: mp['key1']
 
-- Enum (экономит место и обеспечивает валидацию)
en Enum('оплачено' = 1, 'не оплачено' = 0)

Модификаторы типов

-- Nullable — разрешает NULL-значения (добавляет накладные расходы)
a Nullable(UInt32)
 
-- LowCardinality — словарное кодирование для низкой кардинальности (статусы, коды, категории)
b LowCardinality(String)  -- быстрее и компактнее для повторяющихся значений

Приведение типов

-- Синтаксис ::
SELECT '42'::Int64
 
-- CAST
SELECT CAST('42' AS Int8)
 
-- Функции с обработкой ошибок
toInt64OrNull('abc')      -- вернёт NULL при ошибке
toInt8OrZero('abc')       -- вернёт 0 при ошибке
toInt8OrDefault('abc', -1) -- вернёт -1 при ошибке
 
-- Беззнаковые: выход за пределы → значение по модулю диапазона
toUInt8(-1)  -- вернёт 255
toUInt8(256) -- вернёт 0

6. Создание таблиц: DDL и атрибуты колонок

Базовый синтаксис CREATE TABLE

CREATE TABLE IF NOT EXISTS mydb.my_table ON CLUSTER '{cluster}'
(
    id   UInt64,
    name String,
    dt   DateTime
)
ENGINE = MergeTree              -- ОБЯЗАТЕЛЬНО указывать движок
PRIMARY KEY (id)                -- необязательно; по умолчанию = ORDER BY
ORDER BY (id, dt)               -- ОБЯЗАТЕЛЬНО для MergeTree; определяет физический порядок данных
PARTITION BY toYYYYMM(dt)       -- необязательно; разбивает данные на секции
TTL dt + INTERVAL 1 MONTH DELETE; -- необязательно; автоудаление устаревших данных

Ключевые правила:

  • ENGINE — обязательный параметр, без него будет ошибка
  • ORDER BY — обязателен для MergeTree; определяет физический порядок строк на диске
  • PRIMARY KEY должен быть префиксом ORDER BY
  • Рекомендуется не более 4 полей в ORDER BY — иначе падает производительность

Атрибуты колонок

CREATE TABLE mydb.test_fields
(
    col_default    UInt64  DEFAULT 42,                   -- значение по умолчанию при вставке
    col_materialized UInt64 MATERIALIZED col_default*33, -- вычисляется автоматически, нельзя вставить явно
    col_alias      UInt64  ALIAS col_default + 1,        -- не хранится физически; вычисляется при чтении
    col_codec      String  CODEC(ZSTD(10)),              -- алгоритм сжатия (ZSTD, LZ4, Delta, Gorilla...)
    col_comment    Date    COMMENT 'Дата создания',      -- комментарий к колонке
    col_ttl        UInt64  DEFAULT 10 TTL col_comment + INTERVAL 5 DAY -- TTL для отдельной колонки
)
ENGINE = MergeTree
ORDER BY (col_default);

Важно про MATERIALIZED и ALIAS:

  • SELECT *не включает MATERIALIZED и ALIAS колонки
  • Обращаться к ним можно только явно по имени
  • В MATERIALIZED и ALIAS нельзя вставлять данные
-- Ошибка!
INSERT INTO product_sales (sale_id, total) VALUES (1, 100);
-- Cannot insert column total, because it is MATERIALIZED column.

Кодеки сжатия:

КодекПрименение
LZ4По умолчанию; быстрый, умеренное сжатие
ZSTD(level)Лучшее сжатие; уровень 1–22
DeltaДля монотонно растущих числовых рядов (временные метки)
GorillaДля метрик с плавающей точкой
NONEБез сжатия

TTL (Time-to-Live)

-- TTL для всей строки (на уровне таблицы)
ENGINE = MergeTree
ORDER BY id
TTL dt + INTERVAL 1 MONTH DELETE;
 
-- TTL для отдельной колонки (значение сбрасывается до DEFAULT)
col_ttl UInt64 DEFAULT 0 TTL dt + INTERVAL 10 DAY
 
-- Принудительное применение TTL (без ожидания фонового merge)
OPTIMIZE TABLE my_table FINAL;

ORDER BY tuple() — без сортировки

-- Данные вставляются в порядке insert (рандомный порядок)
ORDER BY tuple()

Описание таблицы

DESCRIBE TABLE mydb.my_table;  -- показывает колонки, типы и комментарии

7. Партиционирование

Партиция — физическое разделение данных на диске по некоторому критерию. Позволяет ClickHouse читать только нужные секции при наличии фильтра по полю партиции.

4 типа партиционирования

Диапазоном (Range)

CREATE TABLE mydb.table_range (...)
ENGINE = MergeTree
PARTITION BY
    CASE
        WHEN id < 10000 THEN 'range_1'
        WHEN id < 20000 THEN 'range_2'
        ELSE 'range_3'
    END
ORDER BY id;

Интервалом (наиболее распространённый)

CREATE TABLE mydb.table_interval (...)
ENGINE = MergeTree
PARTITION BY toYYYYMM(sale_date)  -- партиция на каждый месяц
ORDER BY id;

Хеш-функцией

CREATE TABLE mydb.table_hash (...)
ENGINE = MergeTree
PARTITION BY cityHash64(user_id) % 10  -- 10 равномерных партиций
ORDER BY user_id;

Комбинированное

CREATE TABLE mydb.table_composite (...)
ENGINE = MergeTree
PARTITION BY (toYYYYMM(order_date), customer_id % 10)
ORDER BY order_id;

Просмотр партиций

-- Через виртуальную колонку _part
SELECT _part, count(*) FROM mydb.table_interval GROUP BY _part;
 
-- Через системную таблицу
SELECT * FROM system.parts WHERE table = 'my_table';

Принудительное слияние партиций

OPTIMIZE TABLE mydb.my_table FINAL;
-- FINAL — дожидается полного слияния (без него MergeTree делает одно слияние шага)

8. Индексирование: разреженный индекс и вторичные индексы

Внутренняя структура парта (что лежит на диске)

Каждая вставка в MergeTree создаёт новый парт (part) — отдельную директорию на диске. Например: 20250101_1_1_0. В этой директории лежат файлы:

20250101_1_1_0/
├── primary.idx       — разреженный первичный индекс (подгружается в RAM)
├── id.bin            — сжатые данные колонки id
├── id.mrk2           — marks-файл колонки id
├── name.bin          — сжатые данные колонки name
├── name.mrk2         — marks-файл колонки name
├── dt.bin            — сжатые данные колонки dt
├── dt.mrk2           — marks-файл колонки dt
└── checksums.txt     — контрольные суммы

Marks-файлы (*.mrk) — связывают индекс с данными. Для каждой гранулы marks-файл хранит byte-offset в .bin-файле и номер строки. Именно через marks ClickHouse знает, с какого байта начинать чтение нужной гранулы.

Процесс чтения данных по запросу:

  1. primary.idx (в RAM) → находим диапазон гранул по условию WHERE
  2. *.mrk → узнаём byte-offset нужных гранул в сжатом .bin-файле
  3. *.bin → читаем с диска только нужные гранулы (остальные колонки пропускаются целиком)

LSM-tree (Lazy Merge): ClickHouse не перезаписывает данные при каждой вставке — новые данные создают новый парт. Со временем мелкие парты в фоне сливаются в крупные. Счётчик слияний виден в имени парта: _1_1_0 → после слияния → _1_2_1 (последняя цифра = число слияний).

-- Посмотреть все активные парты таблицы
SELECT name, rows, bytes_on_disk, active
FROM system.parts
WHERE table = 'my_table' AND active = 1
ORDER BY name;

Оптимальные размеры блоков чтения:

  • HDD: ~1 МБ за чтение
  • SSD: ~512 КБ
  • RAM: сотни байт

Колоночное хранение выгодно именно здесь: вместо чтения 100 колонок всей строки читается ровно один .bin-файл нужной колонки.


Разреженный (sparse) первичный индекс

В отличие от B-tree индексов в обычных БД (один индекс на каждую строку), ClickHouse использует разреженный индекс: индексная метка создаётся один раз на гранулу данных.

Гранула (granule) — логический блок строк, по умолчанию 8 192 строки. Это минимальная единица чтения данных.

Данные:            [строки 1-8192] [строки 8193-16384] [строки 16385-24576] ...
Индекс:               ↑ метка 0        ↑ метка 1             ↑ метка 2

Как работает поиск:

  1. ClickHouse смотрит в индекс (он всегда в RAM) и находит диапазон гранул, где могут быть нужные данные
  2. Читает только эти гранулы с диска
  3. Применяет фильтр внутри гранул

Критически важно: индексный файл всегда загружается в RAM при старте. Если RAM недостаточно — кластер не запустится.

Настройка размера гранулы

CREATE TABLE ... ENGINE = MergeTree
SETTINGS index_granularity = 8192;  -- значение по умолчанию

Меньше гранула → точнее поиск, но больше файл индекса. Стандартное значение 8192 подходит для большинства случаев.

Эффективное использование первичного индекса

ORDER BY определяет ключ сортировки = ключ первичного индекса. Для эффективной фильтрации:

  • Поля в WHERE должны совпадать с началом ORDER BY
  • Порядок полей в ORDER BY важен: первые поля дают лучшую фильтрацию
ORDER BY (region, user_id, dt)
 
-- Эффективный запрос (использует индекс):
WHERE region = 'Moscow'
 
-- Менее эффективный (частично использует индекс):
WHERE user_id = 12345
 
-- НЕ использует первичный индекс (нет первого поля):
WHERE dt > '2024-01-01'

Вторичные индексы пропуска данных (Data Skipping Indexes)

Первичный индекс покрывает только поля из ORDER BY. Для фильтрации по другим колонкам используются индексы пропуска данных (secondary / skipping indexes). Они хранят метаданные о гранулах и позволяют пропускать блоки, в которых точно нет нужных данных.

Синтаксис:

-- Добавить индекс к существующей таблице
ALTER TABLE mydb.events
    ADD INDEX idx_event_type (event_type) TYPE set(100) GRANULARITY 4;
 
-- Применить индекс к уже существующим данным (без этого индекс работает только для новых данных)
ALTER TABLE mydb.events MATERIALIZE INDEX idx_event_type;
 
-- Удалить индекс
ALTER TABLE mydb.events DROP INDEX idx_event_type;
 
-- Объявить индекс при создании таблицы
CREATE TABLE mydb.events (
    id       UInt64,
    dt       DateTime,
    event    String,
    user_id  UInt64,
    amount   Float32,
    INDEX idx_event event TYPE set(100) GRANULARITY 4,
    INDEX idx_amount amount TYPE minmax GRANULARITY 2
)
ENGINE = MergeTree
ORDER BY (id, dt);

GRANULARITY N — индексная запись создаётся каждые N гранул (а не каждую). Чем меньше — тем точнее, но больше файл индекса.


Тип minmax

Хранит минимальное и максимальное значение колонки внутри каждого блока гранул. Эффективен для числовых и временны́х полей с диапазонными условиями.

INDEX idx_amount amount TYPE minmax GRANULARITY 4
 
-- Эффективно использует minmax:
WHERE amount > 1000
WHERE amount BETWEEN 100 AND 500
WHERE dt >= '2024-01-01'

Как работает: если max значения в блоке < 1000 — весь блок пропускается без чтения.


Тип set

Хранит уникальные значения колонки внутри блока гранул. Эффективен для полей с низкой кардинальностью при условии равенства.

INDEX idx_event event TYPE set(100) GRANULARITY 4
-- 100 — максимальное количество уникальных значений; 0 = без ограничений
 
-- Эффективно использует set:
WHERE event = 'click'
WHERE event IN ('click', 'view')

Как работает: если нужного значения нет в наборе блока — блок пропускается.


Типы на основе Bloom Filter

Probabilistic-структуры. Могут давать ложноположительные срабатывания (блок прочитается лишний раз), но никогда не пропустят нужные данные.

ТипПрименение
bloom_filterТочные совпадения по числовым или строковым полям
ngrambf_v1(n, size, hashes, seed)Поиск подстрок (LIKE, hasToken) в строках
tokenbf_v1(size, hashes, seed)Поиск по токенам (словам) в строках
-- bloom_filter: для равенства и IN
INDEX idx_user_id user_id TYPE bloom_filter(0.01) GRANULARITY 4
-- 0.01 — допустимая вероятность ложного срабатывания (1%)
 
WHERE user_id = 12345          -- использует индекс
WHERE user_id IN (1, 2, 3)    -- использует индекс
 
-- ngrambf_v1: для LIKE-запросов (n=3 — триграммы)
INDEX idx_name name TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4
 
WHERE name LIKE '%москва%'     -- использует индекс
WHERE hasToken(name, 'click')  -- использует индекс
 
-- tokenbf_v1: для полнотекстового поиска по словам
INDEX idx_desc description TYPE tokenbf_v1(256, 2, 0) GRANULARITY 4
 
WHERE hasToken(description, 'payment')  -- использует индекс

Сравнение вторичных индексов

ТипУсловияПоляПамять
minmax>, <, BETWEEN, диапазоныЧисла, датыОчень мало
set(N)=, INНизкая кардинальностьМало
bloom_filter=, INЛюбыеУмеренно
ngrambf_v1LIKE '%str%', подстрокиСтрокиУмеренно
tokenbf_v1hasToken, словаСтрокиУмеренно

Правило: сначала проверьте, помогает ли правильный ORDER BY. Вторичные индексы — дополнение, а не замена первичному.


9. Семейство движков MergeTree

MergeTree — основной движок ClickHouse. Все данные сначала пишутся небольшими кусками (parts), затем в фоне сливаются (merge). Это называется Log-Structured Merge Tree (LSM Tree).

MergeTree — базовый движок

CREATE TABLE mydb.events_local
(
    id    UInt64,
    dt    DateTime,
    event String,
    value Float64
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(dt)
ORDER BY (id, dt);

Подходит для большинства случаев хранения данных.


SummingMergeTree — автосуммирование

Группирует записи с одинаковым ключом сортировки и суммирует указанные числовые поля при слиянии.

CREATE TABLE mydb.daily_stats
(
    user_id  UInt32,
    dt       DateTime,
    sessions UInt32,      -- будет суммироваться
    clicks   UInt64,      -- будет суммироваться
    extra    String       -- не суммируется (оставляется произвольное значение)
)
ENGINE = SummingMergeTree(sessions, clicks)  -- если не указать — все числовые
ORDER BY (user_id)
PARTITION BY toYYYYMM(dt);

Важно:

  • Слияние происходит только в рамках одной партиции
  • До слияния могут существовать несуммированные блоки — используйте FINAL:
-- Без FINAL — данные могут быть несуммированы
SELECT user_id, SUM(sessions) FROM mydb.daily_stats GROUP BY user_id;
 
-- FINAL — принудительное логическое слияние перед выдачей
SELECT user_id, sessions FROM mydb.daily_stats FINAL;
 
-- Ручное физическое слияние
OPTIMIZE TABLE mydb.daily_stats FINAL;

AggregatingMergeTree — агрегирование с промежуточными состояниями

Группирует записи по ключу сортировки и хранит промежуточные состояния агрегатных функций. Нужен для сложных агрегатов (uniq, quantile, groupArray).

Используется совместно со специальными типами данных:

SimpleAggregateFunction — для простых агрегатов (хранит конечное состояние):

CREATE TABLE mydb.simple_agg
(
    id      UInt64,
    val_sum SimpleAggregateFunction(sum, UInt64),
    val_max SimpleAggregateFunction(max, UInt32)
)
ENGINE = AggregatingMergeTree
ORDER BY id;
 
-- Вставка обычная
INSERT INTO mydb.simple_agg SELECT 1, number, number FROM numbers(10);
 
-- Чтение: нужно явно применить агрегат
SELECT id, sum(val_sum), max(val_max) FROM mydb.simple_agg GROUP BY id;
-- или через FINAL
SELECT id, val_sum, val_max FROM mydb.simple_agg FINAL;

AggregateFunction — для сложных агрегатов (хранит промежуточное состояние):

CREATE TABLE mydb.complex_agg
(
    id         UInt64,
    val_uniq   AggregateFunction(uniq, UInt64),
    val_quant  AggregateFunction(quantiles(0.5, 0.9), UInt64)
)
ENGINE = AggregatingMergeTree
ORDER BY id;
 
-- Вставка через State-комбинатор
INSERT INTO mydb.complex_agg
SELECT 1, uniqState(number), quantilesState(0.5, 0.9)(number)
FROM numbers(100);
 
-- Чтение через Merge-комбинатор
SELECT id, uniqMerge(val_uniq), quantilesMerge(0.5, 0.9)(val_quant)
FROM mydb.complex_agg
GROUP BY id;

Комбинаторы агрегатных функций

КомбинаторОписание
StateПри вставке: сохраняет промежуточное состояние
MergeПри чтении: объединяет состояния и даёт финальный результат
MergeStateОбъединяет состояния, возвращает промежуточное (для цепочки)
SimpleStateДля SimpleAggregateFunction
IfУсловная агрегация: sumIf(val, condition)
ArrayАгрегация по массиву
DistinctАгрегация по уникальным значениям

ReplacingMergeTree — дедупликация по ключу

Удаляет дублирующиеся строки с одинаковым ключом сортировки при слиянии. Опционально принимает версионную колонку (оставляется максимальная версия).

-- Без версии (оставляется любая из дублей)
CREATE TABLE mydb.users
(
    id  UInt32,
    dt  Date,
    val UInt32
)
ENGINE = ReplacingMergeTree
ORDER BY (id, dt);
 
-- С версией (оставляется запись с максимальным значением dt)
CREATE TABLE mydb.users_versioned
(
    id  UInt32,
    dt  Date,   -- версионная колонка (может быть числовой)
    val UInt32
)
ENGINE = ReplacingMergeTree(dt)
ORDER BY (id)
PARTITION BY toYYYYMM(dt);

Важно: дедупликация гарантирована только внутри одной партиции и только после слияния. До слияния дубли могут присутствовать. Используйте FINAL для получения актуальных данных.


CollapsingMergeTree — схлопывание по флагу

Удаляет дубли по ключу сортировки на основе флага Sign (+1 или -1). Используется для хранения изменений состояния (CDC-паттерн).

CREATE TABLE mydb.books_progress
(
    user_id UInt64,
    book_id UInt64,
    page    UInt8,
    Sign    Int8   -- только +1 или -1
)
ENGINE = CollapsingMergeTree(Sign)
ORDER BY (user_id, book_id);

Логика:

  • Sign = 1 — текущее состояние записи
  • Sign = -1 — «отмена» предыдущей записи
-- Пользователь прочитал 10 страниц
INSERT INTO mydb.books_progress VALUES (1, 42, 10, 1);
 
-- Пользователь прочитал ещё 5 страниц (сначала отменяем, потом вставляем новое)
INSERT INTO mydb.books_progress VALUES (1, 42, 10, -1);  -- отмена
INSERT INTO mydb.books_progress VALUES (1, 42, 15, 1);   -- новое состояние

После слияния строки с +1 и -1 для одного ключа взаимоуничтожаются.


Комбинирование Replicated + другой движок семейства MergeTree

Репликация поддерживается только для движков семейства MergeTree. Можно комбинировать:

  • ReplicatedMergeTree
  • ReplicatedReplacingMergeTree
  • ReplicatedSummingMergeTree
  • ReplicatedAggregatingMergeTree
  • ReplicatedCollapsingMergeTree

Синтаксис: параметры Replicated (ZK-путь и реплика) пишутся первыми, затем — параметры внутреннего движка через запятую:

ENGINE = ReplicatedReplacingMergeTree(
    '/clickhouse/tables/{cluster}/{shard}/events',  -- ZK-путь
    '{replica}',                                    -- имя реплики
    dt                                              -- версионная колонка ReplacingMergeTree
)
ORDER BY (id);

DROP TABLE SYNC — обязательно для Replicated-таблиц

Для ReplicatedMergeTree-таблиц всегда используйте SYNC при удалении:

-- ПРАВИЛЬНО: очищает данные И метаданные ZooKeeper
DROP TABLE mydb.events_local ON CLUSTER '{cluster}' SYNC;
 
-- НЕПРАВИЛЬНО: данные удалятся, но ZooKeeper-путь останется
DROP TABLE mydb.events_local ON CLUSTER '{cluster}';

Без SYNC: ZooKeeper сохраняет метаданные удалённой таблицы, и вы не сможете создать таблицу с тем же ZooKeeper-путём снова — только вручную зачищать через DBA. Берите SYNC за привычку при любом DROP TABLE в ClickHouse.

FINAL vs OPTIMIZE TABLE — когда что применять

FINALOPTIMIZE TABLE FINAL
ТипЛогическое слияние (в RAM, только для текущего запроса)Физическое слияние (переписывает файлы на диске)
СкоростьБыстрееМедленнее; может блокировать и потреблять много RAM
Данные на дискеНе меняютсяИзменяются — парты реально сливаются
Когда применятьДнём во время работы аналитиковНочью после батчевой загрузки
-- Рекомендация для аналитиков (работа днём):
SELECT * FROM mydb.events FINAL;
 
-- Рекомендация для ETL (ночная загрузка, перед началом рабочего дня):
OPTIMIZE TABLE mydb.events FINAL;

FINAL работает для ReplacingMergeTree, SummingMergeTree, AggregatingMergeTree, CollapsingMergeTree. Для базового MergeTree — может вернуть ошибку или ничего не делать.

Нетранзакционность ClickHouse — практические последствия

ClickHouse не гарантирует атомарность INSERT. Если вставка прерывается на середине — часть строк уже записана:

-- Вставляем 500 млн строк, соединение обрывается
INSERT INTO mydb.events SELECT ... FROM numbers(500000000);
-- Результат: в таблице может оказаться, например, 337 млн строк

Как восстановиться при повторной загрузке:

  • Если таблица MergeTree — самый простой способ: пересоздать таблицу и загрузить заново
  • Если таблица ReplacingMergeTree — можно безопасно запустить вставку повторно (дубли удалятся через OPTIMIZE FINAL или FINAL)
-- Паттерн идемпотентной загрузки через ReplacingMergeTree:
INSERT INTO mydb.events ...;  -- запускаем снова при сбое
OPTIMIZE TABLE mydb.events FINAL;  -- убираем дубли

Хэш-колонка для дедупликации всей строки (вопрос с собеседований)

Задача: есть таблица из 100+ колонок, нужно дедуплицировать по полной строке (не по ID).

Неправильно: добавить все 100 колонок в ORDER BY — ClickHouse упадёт (ORDER BY хранится частично в RAM).

Правильно: добавить вспомогательную колонку с хэшом всей строки:

CREATE TABLE mydb.events (
    id       UInt64,
    -- ... 100 других колонок ...
    row_hash UInt64 MATERIALIZED cityHash64(id, col1, col2, ...)  -- хэш всей строки
)
ENGINE = ReplacingMergeTree
ORDER BY (id, row_hash);  -- row_hash обеспечивает уникальность всей строки

Или добавить хэш на этапе загрузки (в Spark, Python):

-- При вставке вычисляем хэш заранее и передаём как колонку
INSERT INTO mydb.events (id, ..., row_hash) VALUES (..., cityHash64(...));

10. Вспомогательные движки хранения

Log — простое хранение (для тестов)

ENGINE = Log

Каждая колонка — отдельный файл. Нет индекса, нет поддержки партиций. Подходит для небольших вспомогательных таблиц и тестирования.

File — хранение в файле определённого формата

ENGINE = File(CSV)  -- или TSV, JSON, Parquet и др.

Данные читаются/пишутся из файла на диске. Удобно для импорта/экспорта.

Memory — только в RAM

ENGINE = Memory

Данные хранятся исключительно в оперативной памяти. При перезапуске ClickHouse — данные теряются. Используется для временных таблиц и кэшей в запросах.

Buffer — буферизация вставок

Накапливает данные в RAM и периодически сбрасывает их в целевую таблицу. Снижает нагрузку от частых мелких вставок.

CREATE TABLE mydb.events_buffer (...)
ENGINE = Buffer(
    mydb,         -- база данных целевой таблицы
    events_local, -- целевая таблица
    16,           -- параллелизм (рекомендуется 16)
    30,           -- min время сброса (секунды)
    60,           -- max время сброса (секунды)
    5,            -- min строк для сброса
    100,          -- max строк для сброса
    10000,        -- min байт для сброса
    1000000       -- max байт для сброса
);

Сброс происходит при достижении любого из условий.

Set — оптимизация IN-запросов

Хранит уникальные значения в памяти. Таблица используется только в правой части IN.

CREATE TABLE mydb.blocked_users (id UInt32) ENGINE = Set SETTINGS persistent = 1;
INSERT INTO mydb.blocked_users SELECT number FROM numbers(100);
 
-- Использование: только в WHERE ... IN
SELECT * FROM mydb.orders WHERE user_id IN mydb.blocked_users;

GenerateRandom — генерация тестовых данных

CREATE TABLE mydb.test_data
(
    id  UInt32,
    val String,
    dt  Date,
    a   Float32,
    b   UUID,
    g   Array(UInt32)
)
ENGINE = GenerateRandom;
 
-- Каждый SELECT генерирует случайные данные
SELECT * FROM mydb.test_data LIMIT 100;

11. Интеграционные движки

Интеграционные движки не хранят данные в ClickHouse — они ссылаются на внешние источники.

PostgreSQL — подключение к Postgres

CREATE TABLE mydb.pg_orders (...)
ENGINE = PostgreSQL(
    'postgres_host:5432',  -- хост:порт
    'prod_db',             -- имя БД в Postgres
    'orders',              -- таблица в Postgres
    'user',                -- логин
    'password',            -- пароль
    'public'               -- схема (необязательно)
);
 
-- SELECT автоматически пушится в Postgres (pushdown)
SELECT * FROM mydb.pg_orders WHERE status = 'paid';

Простые фильтры выполняются на стороне Postgres (pushdown predicates), что снижает трафик.

Kafka — интеграция с Apache Kafka

-- 1. Таблица-приёмник (куда реально пишутся данные)
CREATE TABLE mydb.events_local (...) ENGINE = MergeTree ...;
 
-- 2. Таблица-источник Kafka
CREATE TABLE mydb.kafka_events (id UInt32, event String)
ENGINE = Kafka
SETTINGS
    kafka_broker_list = 'kafka:9092',
    kafka_topic_list  = 'mydb.events.topic',
    kafka_group_name  = 'clickhouse_consumer_group',
    kafka_format      = 'JSONEachRow';
 
-- 3. Материализованное представление — связывает Kafka с реальной таблицей
CREATE MATERIALIZED VIEW mydb.kafka_mv TO mydb.events_local AS
SELECT id, event FROM mydb.kafka_events;
 
-- Отправка данных в Kafka
INSERT INTO mydb.kafka_events SELECT number, 'click' FROM numbers(30);

Паттерн получения данных из Kafka: Kafka Engine Table → Materialized View → MergeTree Table

Без Materialized View данные из Kafka никуда не запишутся.

Полный паттерн: чтение JSON из Kafka (Production)

Реальные Kafka-топики часто содержат сырой JSON. Стандартный подход:

-- 1. Принимаем сырой JSON как одну строку
CREATE TABLE mydb.kafka_stage (json_kafka String)
ENGINE = Kafka
SETTINGS
    kafka_broker_list = 'kafka:9092',
    kafka_topic_list  = 'source.public.order_events',
    kafka_group_name  = 'clickhouse_consumer_group',
    kafka_format      = 'JSONAsString';   -- весь JSON-объект → одна строка
 
-- 2. Таблица-хранилище (промежуточная или финальная)
CREATE TABLE mydb.orders_stage (json_kafka String) ENGINE = Log;
 
-- 3. Materialized View: данные из Kafka → хранилище
CREATE MATERIALIZED VIEW mydb.kafka_mv TO mydb.orders_stage AS
    SELECT json_kafka FROM mydb.kafka_stage;
 
-- 4. Чтение и разбор JSON при SELECT
SELECT
    JSONExtractInt(JSONExtractString(json_kafka, 'before'), 'id')           AS before_id,
    JSONExtractString(JSONExtractString(json_kafka, 'before'), 'status')    AS before_status,
    JSONExtractInt(JSONExtractString(json_kafka, 'after'),  'id')           AS after_id,
    JSONExtractString(JSONExtractString(json_kafka, 'after'),  'status')    AS after_status
FROM mydb.orders_stage;

Функции для работы с JSON

-- JSONExtract<Type>(<json_string>, <key>)
JSONExtractInt(json, 'id')           -- Int
JSONExtractString(json, 'name')      -- String
JSONExtractFloat(json, 'amount')     -- Float
JSONExtractUInt(json, 'ts')          -- UInt
JSONExtractBool(json, 'active')      -- Bool
JSONExtractRaw(json, 'nested_obj')   -- сырой JSON-фрагмент (строка)
 
-- Вложенные поля: сначала вытащить объект, потом поле
JSONExtractString(JSONExtractRaw(json, 'address'), 'city')
 
-- simpleJSON* — более быстрая, но менее строгая альтернатива
simpleJSONExtractString(json, 'name')
simpleJSONExtractRaw(json, 'nested')

Отладка Kafka-таблицы

-- По умолчанию SELECT из Kafka-таблицы запрещён (данные потребляются)
-- Разрешить временно для отладки:
SET stream_like_engine_allow_direct_select = 1;
SELECT * FROM mydb.kafka_stage LIMIT 5;

12. Distributed и Replicated: масштабирование

Три типа таблиц в ClickHouse

Важная классификация, которую нужно держать в голове:

ТипПримерыНазначение
Хранение данныхMergeTree и вся его семьяФизически хранят данные в колоночных файлах
Ссылочные (виртуальные)DistributedНе хранят данных, ссылаются на хранящие таблицы
ИнтеграционныеPostgreSQL, Kafka, S3Ссылаются на внешние источники

Distributed — горизонтальное масштабирование (шардирование)

Distributed — это таблица-ссылка (виртуальная таблица). Она не хранит данные, но ссылается на локальные таблицы на всех шардах и обеспечивает распределённое выполнение запросов.

-- Локальная таблица (реальное хранение)
CREATE TABLE mydb.events_local ON CLUSTER '{cluster}'
(
    id    UInt64,
    dt    DateTime,
    event String
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(dt)
ORDER BY (id, dt);
 
-- Distributed-таблица (ссылка)
CREATE TABLE mydb.events ON CLUSTER '{cluster}'
(
    id    UInt64,
    dt    DateTime,
    event String
)
ENGINE = Distributed(
    '{cluster}',      -- имя кластера
    mydb,             -- база данных локальной таблицы
    events_local,     -- имя локальной таблицы
    rand()            -- ключ шардирования (как распределять данные)
);

Ключ шардирования:

-- rand() — случайное равномерное распределение (для тестов, плохо для JOIN)
ENGINE = Distributed('{cluster}', db, table_local, rand())
 
-- По колонке — данные с одинаковым user_id попадут на один шард (хорошо для JOIN)
ENGINE = Distributed('{cluster}', db, table_local, user_id)
 
-- halfMD5 — хеш от нескольких колонок (рекомендуется на production)
ENGINE = Distributed('{cluster}', db, table_local, halfMD5(user_id, region))

halfMD5 — специальная хеш-функция ClickHouse с постоянно возрастающим значением. Быстрая, компактная (числовой тип). Хорошая практика для распределения по нескольким полям.

Как вставлять данные:

-- Через distributed-таблицу (данные автоматически распределяются по шардам)
INSERT INTO mydb.events VALUES (...);
 
-- НЕ делать: прямая вставка в локальную таблицу попадёт только на один сервер
INSERT INTO mydb.events_local VALUES (...);  -- данные будут только на этом сервере

Как читать данные:

-- Через distributed — агрегирует данные со всех шардов
SELECT count() FROM mydb.events;
 
-- hostname() — видно, с каких серверов пришли данные
SELECT hostname(), count() FROM mydb.events GROUP BY hostname();
 
-- _part — видно партиции
SELECT _part, count() FROM mydb.events GROUP BY _part;

Важно: результат distributed-запроса недетерминирован по порядку строк — данные приходят в том порядке, в котором шарды отвечают. Каждый шард выдаёт свои строки блоком, и эти блоки стыкуются в любом порядке. Для сортировки всегда явно добавляйте ORDER BY.

-- Порядок строк непредсказуем — данные с первого шарда, потом со второго (или наоборот)
SELECT * FROM mydb.events;
 
-- Правильно: если важен порядок — добавить ORDER BY
SELECT * FROM mydb.events ORDER BY dt DESC LIMIT 100;

ReplicatedMergeTree — репликация (отказоустойчивость)

ReplicatedMergeTree создаёт копии данных на нескольких серверах. Используется совместно с ZooKeeper/ClickHouse Keeper для координации.

CREATE TABLE mydb.events_local ON CLUSTER '{cluster}'
(
    id    UInt64,
    dt    DateTime,
    event LowCardinality(String)
)
ENGINE = ReplicatedMergeTree(
    '/clickhouse/tables/{cluster}/{shard}/mydb/events',  -- путь в ZooKeeper
    '{replica}'                                           -- имя реплики
)
PARTITION BY toDate(dt)
ORDER BY (id);

Макросы в пути ZooKeeper:

МакросЗначение
{cluster}Имя кластера из конфига
{shard}Номер/имя шарда
{replica}Имя реплики (обычно имя хоста)

Путь /clickhouse/tables/{cluster}/{shard}/... — стандартная конвенция. Главное, чтобы путь был уникальным для каждого шарда и одинаковым для всех реплик одного шарда.

Данные, записанные на одну реплику, автоматически реплицируются на другие.

Создание Distributed поверх Replicated (стандартная production-схема):

-- Шаг 1: локальная реплицируемая таблица
CREATE TABLE mydb.events ON CLUSTER '{cluster}'
(
    time DateTime,
    uid  Int64,
    type LowCardinality(String)
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/events', '{replica}')
PARTITION BY toDate(time)
ORDER BY (uid);
 
-- Шаг 2: distributed-таблица с таким же именем в другой БД (или с суффиксом _distr)
-- Удобный синтаксис: AS <table> копирует схему колонок
CREATE TABLE mydb.events_distr ON CLUSTER '{cluster}'
AS mydb.events                                           -- копирует структуру колонок
ENGINE = Distributed('{cluster}', mydb, events, uid);   -- шардирование по uid

AS <table> при создании — копирует только структуру колонок (без ENGINE, ORDER BY и т.д.). Это удобно, чтобы не дублировать длинное определение схемы.

Итоговая архитектура production-кластера

Пользователь/Аналитик

  events (Distributed)        ← единая точка входа
   /              \
events_local     events_local  ← физические данные на шардах
[Shard 1]        [Shard 2]
[Replica 1]      [Replica 1]
[Replica 2]      [Replica 2]

13. Агрегатные функции и комбинаторы

Основные агрегатные функции

SELECT
    count(),                    -- количество строк
    count(id),                  -- количество непустых значений
    sum(amount),                -- сумма
    avg(price),                 -- среднее
    min(dt), max(dt),           -- минимум/максимум
    uniq(user_id),              -- приближённое число уникальных (аналог COUNT DISTINCT)
    countDistinct(user_id),     -- точное число уникальных
    groupArray(event),          -- собрать все значения в массив
    quantiles(0.5, 0.9)(price)  -- квантили
FROM mydb.events;

Условные агрегаты (суффикс If)

SELECT
    countIf(user_id, device = 'mobile')         -- количество мобильных пользователей
    sumIf(amount, status = 'paid')               -- сумма только оплаченных
    avgIf(view_time, page = 'home')              -- среднее время только на главной
    uniqIf(user_id, device = 'web')              -- уникальные пользователи с веба
    groupArrayIf(page, device = 'web')           -- список страниц только с веба
    countDistinctIf(user_id, device = 'mobile')  -- уникальные мобильные
FROM mydb.page_views;

Пример полного запроса с комбинаторами

SELECT
    device,
    uniq(user_id) AS unique_users,
    avgIf(view_time, page = 'home') AS avg_home_time,
    groupArrayIf(page, user_id = 2 AND device = 'web') AS web_pages
FROM mydb.page_views
GROUP BY device;

Агрегатные окно-функции (combinators)

КомбинаторПрименениеПример
IfУсловная агрегацияsumIf(val, cond)
ArrayАгрегация по массивуsumArray(arr_col)
DistinctПо уникальным значениямsumDistinct(val)
StateСохранить промежуточное состояниеsumState(val)
MergeОбъединить промежуточные состоянияsumMerge(state_col)
OrNullNULL при ошибкеtoInt64OrNull(str)
OrZero0 при ошибкеtoInt8OrZero(str)

14. JOIN в ClickHouse

ClickHouse поддерживает стандартные SQL-джойны, но с рядом особенностей для работы в распределённой среде.

Типы JOIN

-- INNER JOIN
SELECT a.id, a.name, b.amount
FROM mydb.users a
INNER JOIN mydb.orders b ON a.id = b.user_id;
 
-- LEFT JOIN
SELECT a.id, a.name, b.amount
FROM mydb.users a
LEFT JOIN mydb.orders b ON a.id = b.user_id;
 
-- FULL OUTER JOIN
SELECT ...
FROM table1 FULL OUTER JOIN table2 ON ...;
 
-- CROSS JOIN
SELECT ... FROM table1 CROSS JOIN table2;

Особенности JOIN в ClickHouse

Правило правой таблицы: в ClickHouse правая таблица в JOIN полностью загружается в память (hash table). Поэтому:

  • Меньшая таблица должна быть справа
  • Большая таблица — слева
-- Правильно: маленький справочник справа
SELECT events.*, users.name
FROM mydb.events
LEFT JOIN mydb.users ON events.user_id = users.id;  -- users меньше events

JOIN с Distributed-таблицами

При джойне двух distributed-таблиц ClickHouse отправляет правую таблицу на каждый шард. Это дорого при больших правых таблицах.

Оптимизация — одинаковый ключ шардирования для обеих таблиц:

-- Обе таблицы шардированы по user_id
-- JOIN по user_id будет выполнен локально на каждом шарде без передачи данных по сети
ENGINE = Distributed('{cluster}', db, users_local, user_id)
ENGINE = Distributed('{cluster}', db, orders_local, user_id)

Движок Join (специализированный)

-- Создать таблицу-джойн для многократного использования
CREATE TABLE mydb.user_dict (id UInt32, name String)
ENGINE = Join(ANY, LEFT, id);  -- тип джойна и ключ
 
-- Заполнить
INSERT INTO mydb.user_dict VALUES ...;
 
-- Использовать в запросе
SELECT id, joinGet('mydb.user_dict', 'name', user_id)
FROM mydb.events;

JOIN в распределённой среде (Distributed JOIN)

Это одна из самых важных тем при работе с кластерным ClickHouse.

Проблема: JOIN двух Distributed-таблиц запрещён по умолчанию

-- Обе таблицы Distributed с РАЗНЫМИ ключами шардирования
-- tabl_join_1 шардирована по id1
-- tabl_join_2 шардирована по id2
 
-- ЭТО ВЕРНЁТ ОШИБКУ по умолчанию:
SELECT *
FROM mydb.tabl_join_1 AS t1
JOIN mydb.tabl_join_2 AS t2 ON t1.id1 = t2.id1;
-- Double-distributed IN/JOIN subqueries is denied...

Проблема в том, что данные с одинаковым id1 могут лежать на разных шардах для разных таблиц.

Решение 1: GLOBAL JOIN — собрать всё на координаторе

SELECT *
FROM mydb.tabl_join_1 AS t1
GLOBAL JOIN mydb.tabl_join_2 AS t2 ON t1.id1 = t2.id1;

Как работает:

  1. Правая таблица (tabl_join_2) полностью собирается с всех шардов на координатор
  2. Эта копия рассылается на все шарды как broadcast
  3. Каждый шард выполняет JOIN локально

Когда применять: правая таблица небольшая (справочник, словарь). При большой правой таблице — высокая нагрузка на координатор и сеть.

Решение 2: distributed_product_mode = 'local'

SET distributed_product_mode = 'local';  -- по умолчанию 'deny'
 
SELECT *
FROM mydb.tabl_join_1 AS t1
JOIN mydb.tabl_join_2 AS t2 ON t1.id1 = t2.id1;

Как работает: каждый шард выполняет JOIN только со своими локальными данными.

Опасность: если таблицы шардированы по разным ключам, строки с одинаковым id1 могут лежать на разных шардах → джойн даст неполные/неправильные результаты.

Когда безопасно: обе таблицы шардированы по одному и тому же ключу, по которому идёт JOIN.

Решение 3 (лучшее): одинаковый ключ шардирования

-- Таблица 1: шардирование по user_id
ENGINE = Distributed('{cluster}', db, orders_local, user_id)
 
-- Таблица 2: шардирование по user_id (тому же!)
ENGINE = Distributed('{cluster}', db, users_local, user_id)
 
-- Теперь JOIN по user_id выполняется локально на каждом шарде
SET distributed_product_mode = 'local';
SELECT * FROM mydb.orders t1
JOIN mydb.users t2 ON t1.user_id = t2.user_id;

ASOF JOIN — приближённый JOIN по неравенству

Особый тип JOIN для временны́х рядов: находит для каждой строки левой таблицы ближайшее значение из правой таблицы по условию неравенства (<=, >=, <, >).

SELECT T_A.k, T_A.ts, T_B.ts, T_A.a, T_B.b
FROM
    -- Левая таблица: события с временными метками
    (
        SELECT number AS k,
               toDateTime('2020-10-10 10:30:00') + number * 100 AS ts,
               number * 10 AS a
        FROM system.numbers LIMIT 5
    ) AS T_A
ASOF JOIN
    -- Правая таблица: снапшоты/цены/состояния с временными метками
    (
        SELECT number AS k,
               toDateTime('2020-10-10 10:00:00') + number * 100 AS ts,
               number * 100 AS b
        FROM system.numbers LIMIT 5
        UNION ALL
        SELECT number AS k,
               toDateTime('2020-10-10 11:00:00') + number * 100 AS ts,
               number * 1000 AS b
        FROM system.numbers LIMIT 5
    ) AS T_B
ON T_A.k = T_B.k AND T_A.ts >= T_B.ts;
-- Для каждой строки T_A найти строку T_B с максимальным ts <= T_A.ts

Применение: курсы валют (найти актуальный курс на момент транзакции), логи событий (найти ближайшее состояние), расписания.

Пример задачи с собеседования:

Дано: таблица логинов пользователей (user_id, login_time) и таблица акций (event_name, start_time, end_time). Найти количество логинов во время каждой акции.

SELECT
    e.event_name,
    count() AS login_count
FROM user_login AS l
ASOF JOIN event AS e
    ON l.login_time >= e.start_time
WHERE l.login_time <= e.end_time
GROUP BY e.event_name;
 
-- Альтернатива без ASOF:
SELECT e.event_name, count() AS logins
FROM user_login l
JOIN event e ON l.login_time BETWEEN e.start_time AND e.end_time
GROUP BY e.event_name;

15. Практические советы и паттерны

Выбор ORDER BY

-- Хорошо: часто используемые в WHERE поля первыми
ORDER BY (region, user_id, dt)
 
-- Плохо: слишком много полей — замедляет вставки и мержи
ORDER BY (a, b, c, d, e, f)  -- не более 4 полей

Выбор PARTITION BY

-- Хорошо: по дате с помощью toYYYYMM — стандарт для временных рядов
PARTITION BY toYYYYMM(dt)
 
-- Осторожно: слишком много мелких партиций = много файлов
PARTITION BY toYYYYMMDD(dt)  -- ОК только если данных много в сутки
 
-- Плохо: партиция по высококардинальному полю
PARTITION BY user_id  -- миллионы партиций — катастрофа

Ключ шардирования для Distributed

-- Production-рекомендация: halfMD5 от поля, по которому чаще всего GROUP BY / JOIN
ENGINE = Distributed('{cluster}', db, tbl_local, halfMD5(user_id))

Паттерн Local + Distributed

Всегда создавайте пару таблиц:
  - events_local  (MergeTree / ReplicatedMergeTree)  — хранение
  - events        (Distributed)                        — доступ

Вставка → events (distributed)
Чтение  → events (distributed)
Прямой доступ к events_local — только для отладки

FINAL vs OPTIMIZE TABLE

-- FINAL — логическое слияние при чтении (медленнее, не меняет данные)
SELECT * FROM mydb.table FINAL;
 
-- OPTIMIZE TABLE — физическое слияние на диске (нужна осторожность на production)
OPTIMIZE TABLE mydb.table FINAL;

Системные таблицы для мониторинга

-- Текущие запросы
SELECT * FROM system.processes;
 
-- Части таблицы (партиции, размеры)
SELECT * FROM system.parts WHERE table = 'my_table' AND active = 1;
 
-- Состав кластера
SELECT * FROM system.clusters;
 
-- Реплики
SELECT * FROM system.replicas;
 
-- Словари
SELECT * FROM system.dictionaries;
 
-- Метрики
SELECT * FROM system.metrics;
SELECT * FROM system.asynchronous_metrics;

Полезные функции

-- Хостнейм сервера (удобно в distributed-запросах)
SELECT hostname(), count() FROM events GROUP BY hostname();
 
-- Генератор строк (для тестов)
SELECT * FROM numbers(100);         -- 0..99
SELECT * FROM numbers(1000000);     -- 0..999999
 
-- Информация о типе колонки
SELECT toTypeName(col) FROM table LIMIT 1;
 
-- Форматирование чисел
SELECT formatReadableSize(1073741824);  -- '1.00 GiB'
SELECT formatReadableQuantity(1500000); -- '1.50 million'

Материализованные представления (Materialized Views) — предупреждение

В ClickHouse Materialized Views работают принципиально иначе, чем в PostgreSQL:

  • Только append: MV в ClickHouse — это триггер на INSERT. При каждой новой вставке в source-таблицу, результат запроса MV добавляется (append) в target-таблицу.
  • Нет rebuild: В отличие от PostgreSQL, нельзя "обновить" MV командой REFRESH. Нет полного пересчёта.
  • ReplacingMergeTree логика не работает: Если написать MV с агрегацией и положить результат в ReplacingMergeTree — дедупликации всё равно не будет, потому что каждый новый батч просто добавляет новые строки, не заменяя старые.
  • Практика: В ClickHouse MV практически не используют для витрин данных. Основной сценарий — интеграционный (Kafka → MV → MergeTree).
-- Типичный (и почти единственный) нормальный сценарий MV:
CREATE MATERIALIZED VIEW kafka_mv TO events_local AS
SELECT
    JSONExtractString(message, 'user_id') AS user_id,
    JSONExtractUInt(message, 'ts')        AS ts
FROM kafka_events;
-- Всё остальное (витрины, агрегаты) — лучше делать через обычные батч-джобы

Оконные функции — предупреждение

ClickHouse поддерживает WINDOW функции, но:

  • Медленно и проблематично: Оконные функции в ClickHouse реализованы значительно хуже, чем в PostgreSQL. Они работают, но плохо оптимизированы.
  • Альтернатива: Почти любую задачу с окнами можно переписать через подзапрос с GROUP BY. Это быстрее и надёжнее.
-- Плохо (оконная функция):
SELECT user_id, event, row_number() OVER (PARTITION BY user_id ORDER BY ts) AS rn
FROM events;
 
-- Лучше (подзапрос с GROUP BY):
SELECT e.user_id, e.event
FROM events e
JOIN (
    SELECT user_id, min(ts) AS first_ts
    FROM events
    GROUP BY user_id
) g ON e.user_id = g.user_id AND e.ts = g.first_ts;

Частые ошибки

ОшибкаПричинаРешение
Таблица не видна на другом сервереСоздана без ON CLUSTERПересоздать с ON CLUSTER '{cluster}'
Данные есть только на одном шардеINSERT в _local таблицуINSERT в Distributed-таблицу
Медленный агрегатSummingMergeTree без FINALДобавить FINAL или OPTIMIZE TABLE FINAL
Ошибка: MATERIALIZED columnПопытка вставить в MATERIALIZED/ALIASУбрать эти поля из INSERT
Кластер не запускаетсяМало RAM для индексаУвеличить RAM или гранулу
ZooKeeper засорён после DROP TABLEDROP TABLE без SYNC на Replicated-таблицеВсегда использовать DROP TABLE ... SYNC
MV не обновляет данныеОжидание поведения как в PostgreSQL REFRESHMV в CH — только append-триггер, не перестраивается

Быстрая шпаргалка: какой движок выбрать

Нужно хранить обычные данные?
    → MergeTree

Нужна автоматическая агрегация при записи (счётчики, суммы)?
    → SummingMergeTree (простые суммы)
    → AggregatingMergeTree (сложные агрегаты: uniq, quantile, any)

Нужна дедупликация по ключу?
    → ReplacingMergeTree

Нужно отслеживать изменения состояния (CDC)?
    → CollapsingMergeTree

Нужна интеграция с Kafka?
    → Kafka Engine + Materialized View → MergeTree

Нужна интеграция с PostgreSQL?
    → PostgreSQL Engine

Нужны быстрые вставки из множества маленьких батчей?
    → Buffer Engine → MergeTree

Нужен оптимизированный IN-запрос?
    → Set Engine

Нужны тестовые данные?
    → GenerateRandom Engine

Нужно временно хранить данные в памяти?
    → Memory Engine

Документация ClickHouse: clickhouse.com/docs (opens in a new tab)