Хранилище данных ClickHouse и сервисы Yandex Cloud
17.08.2022

Делимся опытом внедрения решения на базе СУБД ClickHouse и сервисов Yandex Cloud.

Вкратце по основным пунктам задачи:
  • требуется в online-режиме загружать данные в хранилище ClickHouse, данные постоянно поступают в виде XML-файлов в общую директорию;
  • данные в файлах могут дублироваться, дубли нужно устранять по определенным условиям;
  • на основе очищенных данных необходимо строить ряд витрин с агрегациями.
Мы здесь не коснёмся тонких настроек ClickHouse или его масштабирования, но затронем достаточно интересные на наш взгляд вещи. Итак, обо всём по порядку.

Требуется в online-режиме загружать данные в хранилище ClickHouse, данные постоянно поступают в виде XML-файлов в общую директорию.

Пройдя интересный курс обучения «Построение корпоративной аналитической платформы» (кстати, рекомендуем тем, кто начинает изучать облачные технологии и ClickHouse) и увидев всякие фишки ClickHouse (вроде интеграции с Kafka), мы предположили, что для ClickHouse или его cli-клиента нет проблем распарсить XML-файл и разложить данные по столбцам. Но нет, оказалось, что такое ClickHouse пока не умеет. Придётся самим парсить XML и делать вставку в ClickHouse.

Немного поспорили, что лучше использовать в данной ситуации. Выбирали между Airflow и сервисами Yandex Cloud, в итоге пришли ко второму, а именно — бóльшую часть логики решили сделать на Yandex Cloud Functions, причиной тому послужили факторы:

  • ClickHouse будет развернут в облаке Yandex Cloud как управляемый сервис, удобно иметь программную предобработку данных в том же облаке рядом с БД;
  • Yandex Cloud Functions имеют нетарифицируемый объем (так называемый «free tier», по-русски - халява), что удешевляет разработку, тестирование функционала и дальнейшую эксплуатацию. Главное, что не нужно содержать целую виртуальную машину с Airflow;
  • Есть много сервисов в Yandex Cloud, с которыми просто и удобно работать из Yandex Cloud Functions - Yandex Object Storage, Yandex Message Queue, Yandex DB и т.д.;
  • Легкость масштабирования Yandex Cloud Functions, мы становимся практически не ограничены в вычислительных ресурсах и параллельности;
  • Вся инфраструктура в облаке описывается в Terraform (у Yandex Cloud есть provider) – опять удобно, просто и минимальные затраты на CICD/DevOps.

В итоге эту часть задачи мы реализовали примерно по следующей схеме:


В on-premise запущен скрипт, который непрерывно поллит общую директорию и каждый поступивший в неё XML-файл загружает (1) в облако – в Бакет Yandex Object Storage. Окончание загрузки файла в Бакет служит сигналом для запуска (2) функции Диспетчер, которая кладёт (3) сообщение с информацией о файле в очередь, таким образом осуществляется троттлинг (сдерживание) нагрузки.
Дальше из очереди сообщения читает (4) функция Парсер, скачивает файл из Бакета, парсит и вставляет (5) данные в хранилище. По окончанию обработки файл из Бакета удаляется (6), чтобы не занимать место и зазря не тратить деньги.
В ClickHouse все данные сохраняются в таблицу со структурой, аналогичной схеме XML из файла, - всё благодаря богатому списку типов данных ClickHouse.
Примерная реализация функции Парсер на Python в облаке: 

# import зависимостей 
import boto3 

# подключение к облаку 
boto_session = boto3.session.Session( 
    aws_access_key_id=os.getenv('ACCESS_KEY_ID'), 
    aws_secret_access_key=os.getenv('SECRET_ACCESS_KEY') 
) 
# подключение к объектному хранилищу 
s3 = boto_session.client( 
    service_name='s3', 
    endpoint_url='https://storage.yandexcloud.net' 
) 
# подключение к ClickHouse 
clickhouse = Client(host=os.getenv('CH_HOST'), 
                    user=os.getenv('CH_USER'), 
                    password=os.getenv('CH_PASSWORD'), 
                    database=os.getenv('CH_DB'), 
                    secure=True, 
                    settings={'input_format_import_nested_json': 1}) 
# основной обработчик событий 
# определяет, что вызов произошёл по сообщению из очереди, 
# из тела сообщения берётся идентификатор файла, 
# файл скачивается, парсится, данные вставляются в ClickHouse 
def handler(event, context): 
    if 'messages' in event: 
        for message in event['messages']: 
            # получение текста сообщения: 
            task_json = json.loads(message['details']['message']['body']) 
            
            # получение объекта из S3: 
            get_object_response = s3.get_object(Bucket=task_json['bucket_id'],  
                                        Key=task_json['object_id']) 
                                         
            # парсинг файла: 
            parsed_data = parse(data=xmltodict.parse(get_object_response['Body'].read()),  
                                        filename=task_json['object_id']) 
                                      
            # сохранение данных: 
            store(data=parsed_data, file=task_json['object_id'])        
            # удаление обработанного файла: 
            s3.delete_object(Bucket=task_json['bucket_id'], Key=task_json['object_id'])             
          
def parse(data, filename): 
    # ... реализация парсинга      
def store(data, filename): 
    # ... реализация сохранения в ClickHouse 
Данные в файлах могут дублироваться, дубли нужно устранять.

Какие варианты мы рассматривали при подборе решения этого пункта задачи:

  • Первый вариант, что пришёл в голову, - при парсинге XML-файла и перед вставкой данных в ClickHouse "на лету" проверять, не будет ли дублей, а дальше делать insert/upsert в зависимости от ситуации. Не подходит, т.к. ClickHouse не OLTP, а OLAP и об этом явно сказано в документации. Обновление данных в таблице реализовано как мутация - достаточно тяжеловесная операция, которую не удастся выполнять часто/много/быстро. Часто/много/быстро в ClickHouse можно вставлять (рекомендуем к прочтению интересную статью-benchmark Faster ClickHouse Imports);
  • Второй вариант - специальный движок таблиц ReplacingMergeTree, который предназначен как раз для дедубликации данных по заданному ключу. Но тут дело в том, что на основании очищенных (дедублицированных) данных нужно строить и обновлять витрины в том же режиме, близком к online, а движок ReplacingMergeTree осуществляет манипуляции с данными в фоновом режиме и в неопределенный момент, поэтому имеются риски собрать итоговую витрину на задублированных данных, а требования к отчётам высокие - данные должны быть максимально точными.

Плюс к этому ReplacingMergeTree предлагает дедубликацию по ключу сортировки (секция ORDER BY в DLL таблицы), такого рода дедубликация по ключу совсем не подходит, т.к. в рамках задачи нужно не просто удалять дубли по фиксированному ключу, а выбирать из них, исходя из значений других полей, не входящих в ключ сортировки.

Например, поступают некоторые данные в таблицу с полями field1 (UInt32), field2 (UInt32), field3 (DateTime), field4 (String), необходимо иметь в таблице только уникальные сочетания (field1, field2). Значение field4 должно остаться той строки, у которой минимальное значение в field3 среди всех поступивших с этим сочетанием (field1, field2) строк.

  • Третий вариант, на котором и остановились, - загружать в буферную таблицу всё, что поступило в Бакет и дальше осуществлять merge буферной в «чистовую» таблицу.
Буферная таблица, в которую вставляет «сырые» данные функция Парсер, имеет поле со временем вставки, партиционирование по этому полю и срок жизни записей:

DROP TABLE IF EXISTS raw; 
CREATE TABLE raw 
( 
... 
    insertTime DateTime('Europe/Moscow') DEFAULT now()  
) ENGINE = MergeTree() 
PARTITION BY toYYYYMMDD(insertTime) 
ORDER BY (...) 
TTL toStartOfDay(insertTime) + INTERVAL 1 DAY; 
Вообще TTL в ClickHouse отличная фича, которая одной строкой в DDL таблицы избавляет нас от самостоятельной организации очистки таблиц (от написания каких-то кастомных джобов, их контроля запуска и мониторинга).

Используя Yandex Cloud Functions Trigger типа Timer, по расписанию запускается функция Очиститель, которая сравнивает «сырые» и «чистовые» данные, определяет, что в сырых данные есть что-то новенькое, и пора обновить чистовые данные.

Примерно таким запросом выбираются партиции (2), которые требуют merge:

WITH rawParts AS ( 
SELECT partKey, MAX(insertTime) maxInsertTime 
FROM raw 
WHERE insertTime > toStartOfMinute(now()) - INTERVAL 1 HOUR 
GROUP BY partKey), 
cleanParts AS ( 
SELECT partKey, MAX(insertTime) maxInsertTime 
FROM clean 
WHERE partKey IN (SELECT origOperDay FROM rawParts) 
GROUP BY partKey) 
SELECT DISTINCT toYYYYMMDD(rawParts.partKey) AS partKey FROM rawParts LEFT JOIN parts USING partKey 
WHERE rawParts.maxInsertTime > parts.maxInsertTime 
ORDER BY partKey; 
Фактически этот запрос получает из буферной таблицы список пополненных данными партиций за последний час и по времени самой свежей вставки сравнивает с партициями «чистовой» таблицы, если обнаружено расхождение (есть подозрение на свежие данные, которые отсутствуют в «чистовой» таблице), то такие партиции попадают в результат запроса. В реальном же случае условия отбора могут быть гораздо сложнее.
Функция Очиститель, выполнив запрос (2), результат построчно складывает в очередь для отдельного «попартиционного» merge (3).
Сам «попартиционный» merge (5) в простейшем случае можно сделать таким рядом запросов:

-- Предварительная очистка промежуточной таблицы: 
ALTER TABLE tmp DROP PARTITION {partKey}; 
-- Вставка в промежуточную таблицу накопленных, свежих и сразу очищенных данных: 
INSERT INTO tmp 
SELECT * 
FROM ( 
SELECT * FROM raw r WHERE partkey = {partKey} 
UNION ALL 
SELECT * FROM clean WHERE partkey = {partKey} 
) 
ORDER BY field1, field2, field3, ... 
LIMIT 1 BY field1, field2, field3, ...;  
-- Удаление партиции в чистовой таблице: 
ALTER TABLE clean DROP PARTITION {partKey}; 
-- Перемещение партиции из промежуточной в чистовую таблицу: 
ALTER TABLE clean MOVE PARTITION {partKey} TO TABLE clean; 
Данные сначала вставляются в промежуточную таблицу, а затем происходит подмена партиции в чистовой таблице, что делает весьма незаметным и быстрым обновление «чистовой» таблицы.

Из важного тут стоит отметить, что при вставке в промежуточную таблицу используется подзапрос, а с этим делом в ClickHouse не всё так уж просто - результат подзапроса должен помещаться в оперативную память одного сервера, ограниченную параметром max_memory_usage (или max_memory_usage_for_user). Если результат подзапроса не поместится в память, тогда весь запрос завершится с ошибкой:
DB::Exception: Memory limit (for query) exceeded
Поэтому для работоспособности такого решения потребовалось подобрать оптимальное сочетание объема одной партиции (читай – ключ партиционирования) и max_memory_usage (читай – объем оперативной памяти у хоста ClickHouse с учётом параллельности подобных запросов).
«Очистку» данных выполняет функция примерно с таким кодом:

# импорт используемых модулей 
# import ...  
# подключение к облаку 
boto_session = boto3.session.Session( 
    aws_access_key_id=os.getenv('ACCESS_KEY_ID'), 
    aws_secret_access_key=os.getenv('SECRET_ACCESS_KEY') 
) 
# подключение к ClickHouse 
clickhouse = Client(host=os.getenv('CH_HOST'), 
                    user=os.getenv('CH_USER'), 
                    password=os.getenv('CH_PASSWORD'), 
                    database=os.getenv('CH_DB'), 
                    secure=True) 
# подключение к очереди Yandex Queue для отправки сообщений 
ymq_queue = boto_session.resource( 
    service_name='sqs', 
    endpoint_url='https://message-queue.api.cloud.yandex.net', 
    region_name='ru-central1' 
).Queue(os.getenv('YMQ_QUEUE_URL'))   
# основной обработчик событий 
def handler(event, context): 
    try: 
        if event is not None and type(event) == dict and 'messages' in event: 
            for message in event['messages']: 
                if 'event_metadata' in message and \ 
                        'event_type' in message['event_metadata'] and \ 
                        message['event_metadata'][ 
                            'event_type'] == 'yandex.cloud.events.serverless.triggers.TimerMessage': 
                    # запуск по событию, инициированному триггером с типом Timer 
                    # поиск партиций для merge и отправка заданий в очередь 
                    queue_partitions() 
                else: 
                    # запуск по сообщению из очереди 
                    task_json = json.loads(message['details']['message']['body']) 
                    partKey = str(task_json['partKey']) 
                    # merge конкретной партиции 
                    do_partition(partKey=partKey) 
    except Exception as ex: 
        logging.error(ex) 
# поиск партиций 
def queue_partitions(): 
    # запрос поиска партиций для merge (см. выше пример) 
    query = '...'  
    rows_gen = clickhouse.execute_iter(query) 
    # отправка каждой партиции в очередь отдельным сообщением 
    for partKey in rows_gen: 
        ymq_queue.send_message(MessageBody=json.dumps({ 
            'partKey': partKey[0] 
        })) 
# merge конкретной партиции (см. последовательность запросов выше) 
def do_partition(partKey): 
    # ... 
На основе очищенных данных необходимо строить ряд витрин с агрегациями.
Тут схема, процесс и реализация абсолютно идентичны предыдущей части – есть исходная таблица и нужно на её данных строить/обновлять витрину. Стоить только отметить только один нюанс, с которым столкнулись, - это использование массивов и вложенных таблиц в ClickHouse.
Например, у нас есть исходная таблица со следующей структурой:

CREATE TABLE clean 
( 
id Int64, 
positions Nested( 
        id UInt16, 
        value1 String, 
value2 String 
    ), 
insertTime DateTime('Europe/Moscow') DEFAULT now() 
) ENGINE = MergeTree() 
PARTITION BY toYYYYMMDD(insertTime) 
ORDER BY (id); 
Мы хотим просто выбрать все данные в плоском виде и делаем такой запрос:

SELECT id, p.id, p.value1, p.value2  
FROM clean ARRAY JOIN positions AS p 
Это работает.
Дальше, например, у нас есть другая таблица с маппингами:

CREATE TABLE mappings 
( 
from String, 
to String 
) ENGINE = MergeTree() 
ORDER BY (from); 
И мы хотим применить маппинг к одному полю нашей таблицы с чистовыми данными:

SELECT p.value1, m.to value1_to 
FROM clean ARRAY JOIN positions AS p 
LEFT JOIN mappings AS m ON p.value1 = m.from; 
Это тоже отлично работает.
Но если мы захотим сделать еще один маппинг по другому полю:

SELECT p.value1, m1.to value1_to, p.value2, m2.to value2_to 
FROM clean ARRAY JOIN positions AS p 
LEFT JOIN mappings AS m1 ON p.value1 = m1.from 
LEFT JOIN mappings AS m2 ON p.value2 = m2.from 
То получим ошибку:

DB::Exception: Multiple JOIN does not support mix with ARRAY JOINs 
Да, есть такое ограничение. И обойти его можно, используя подзапрос:

SELECT p.value1, value1_to, p.value2, m2.to value2_to FROM 
( 
SELECT p.value1, m1.to value1_to, p.value2 
FROM clean ARRAY JOIN positions AS p 
LEFT JOIN mappings AS m1 ON p.value1 = m1.from 
) t 
LEFT JOIN mappings AS m2 ON p.value2 = m2.from 
Но как отмечено ранее, подзапросы накладывают существенные ограничения, прежде всего на объем обрабатываемых запросом данных. И если в исходной таблице будут триллионы строк, то такой запрос вряд ли сможет быть выполнен. Придётся либо упрощать исходную таблицу (чтобы не делать два раза JOIN), либо упрощать целевую таблицу (отказаться от доп. маппинга), либо обрабатывать исходную таблицу частями (как, например, сделано при «очистке» сырых данных) и искать оптимальное сочетание вышеуказанных параметров решения.

Несколько заключительных слов
В целом в ClickHouse достаточно много разных ограничений, в том числе синтаксических, до поддержки полноценного SQL он пока не дотягивает, к сожалению. Тот же ARRAY JOIN в одном запросе/подзапросе можно использовать только один раз.

При проектировании решений с использованием ClickHouse мы рекомендуем сразу детально продумывать все структуры таблиц, необходимых сквозному процессу, – от таблиц с сырыми данных до конечных витрин, с учётом различных ограничений ClickHouse.

Чтобы получать финишные данные за наименьшее количество итераций с наименьшим количеством промежуточных таблиц и подзапросов иногда стоит отказаться от оригинальной структуры исходных данных в виде массивов или вложенных таблиц, или иногда будет быстрее применить маппинг или сделать доп. вычисление еще на уровне обработки сырых данных перед вставкой в ClickHouse.
В общем, чем проще и полнее исходные данные (таблицы с данными), тем проще и дешевле в дальнейшем с ними работать и тем больше открываются прелести Full Scan у ClickHouse’а.

Что касается плюсов и лучшей применимости ClickHouse, как нам видится, эта СУБД лучше всего пригодна в кейсах, когда в режиме online нужно сохранять колоссальные объёмы структурированных данных и иметь возможность из этих данных быстро получать несложные выборки (хотя бы без подзапросов и JOIN с крупными таблицами).

Вставка огромных объемов данных в ClickHouse абсолютно не проблема, как и простая выборка (с агрегацией) из большой таблицы, ведь изначально под такой тип задач ClickHouse и создавался в Yandex.Metrica.
А поддержка своеобразного SQL, хоть и весьма ограниченного, и классических способов подключения JDBC/ODBC является огромным плюсом.

Читайте так же