Пройдя интересный курс обучения «Построение корпоративной аналитической платформы» (кстати, рекомендуем тем, кто начинает изучать облачные технологии и ClickHouse) и увидев всякие фишки ClickHouse (вроде интеграции с Kafka), мы предположили, что для ClickHouse или его cli-клиента нет проблем распарсить XML-файл и разложить данные по столбцам. Но нет, оказалось, что такое ClickHouse пока не умеет. Придётся самим парсить XML и делать вставку в ClickHouse.
Немного поспорили, что лучше использовать в данной ситуации. Выбирали между Airflow и сервисами Yandex Cloud, в итоге пришли ко второму, а именно — бóльшую часть логики решили сделать на Yandex Cloud Functions, причиной тому послужили факторы:
В итоге эту часть задачи мы реализовали примерно по следующей схеме:
# import зависимостей
import boto3
# подключение к облаку
boto_session = boto3.session.Session(
aws_access_key_id=os.getenv('ACCESS_KEY_ID'),
aws_secret_access_key=os.getenv('SECRET_ACCESS_KEY')
)
# подключение к объектному хранилищу
s3 = boto_session.client(
service_name='s3',
endpoint_url='https://storage.yandexcloud.net'
)
# подключение к ClickHouse
clickhouse = Client(host=os.getenv('CH_HOST'),
user=os.getenv('CH_USER'),
password=os.getenv('CH_PASSWORD'),
database=os.getenv('CH_DB'),
secure=True,
settings={'input_format_import_nested_json': 1})
# основной обработчик событий
# определяет, что вызов произошёл по сообщению из очереди,
# из тела сообщения берётся идентификатор файла,
# файл скачивается, парсится, данные вставляются в ClickHouse
def handler(event, context):
if 'messages' in event:
for message in event['messages']:
# получение текста сообщения:
task_json = json.loads(message['details']['message']['body'])
# получение объекта из S3:
get_object_response = s3.get_object(Bucket=task_json['bucket_id'],
Key=task_json['object_id'])
# парсинг файла:
parsed_data = parse(data=xmltodict.parse(get_object_response['Body'].read()),
filename=task_json['object_id'])
# сохранение данных:
store(data=parsed_data, file=task_json['object_id'])
# удаление обработанного файла:
s3.delete_object(Bucket=task_json['bucket_id'], Key=task_json['object_id'])
def parse(data, filename):
# ... реализация парсинга
def store(data, filename):
# ... реализация сохранения в ClickHouse
Какие варианты мы рассматривали при подборе решения этого пункта задачи:
Плюс к этому ReplacingMergeTree предлагает дедубликацию по ключу сортировки (секция ORDER BY в DLL таблицы), такого рода дедубликация по ключу совсем не подходит, т.к. в рамках задачи нужно не просто удалять дубли по фиксированному ключу, а выбирать из них, исходя из значений других полей, не входящих в ключ сортировки.
Например, поступают некоторые данные в таблицу с полями field1 (UInt32), field2 (UInt32), field3 (DateTime), field4 (String), необходимо иметь в таблице только уникальные сочетания (field1, field2). Значение field4 должно остаться той строки, у которой минимальное значение в field3 среди всех поступивших с этим сочетанием (field1, field2) строк.
DROP TABLE IF EXISTS raw;
CREATE TABLE raw
(
...
insertTime DateTime('Europe/Moscow') DEFAULT now()
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(insertTime)
ORDER BY (...)
TTL toStartOfDay(insertTime) + INTERVAL 1 DAY;
WITH rawParts AS (
SELECT partKey, MAX(insertTime) maxInsertTime
FROM raw
WHERE insertTime > toStartOfMinute(now()) - INTERVAL 1 HOUR
GROUP BY partKey),
cleanParts AS (
SELECT partKey, MAX(insertTime) maxInsertTime
FROM clean
WHERE partKey IN (SELECT origOperDay FROM rawParts)
GROUP BY partKey)
SELECT DISTINCT toYYYYMMDD(rawParts.partKey) AS partKey FROM rawParts LEFT JOIN parts USING partKey
WHERE rawParts.maxInsertTime > parts.maxInsertTime
ORDER BY partKey;
-- Предварительная очистка промежуточной таблицы:
ALTER TABLE tmp DROP PARTITION {partKey};
-- Вставка в промежуточную таблицу накопленных, свежих и сразу очищенных данных:
INSERT INTO tmp
SELECT *
FROM (
SELECT * FROM raw r WHERE partkey = {partKey}
UNION ALL
SELECT * FROM clean WHERE partkey = {partKey}
)
ORDER BY field1, field2, field3, ...
LIMIT 1 BY field1, field2, field3, ...;
-- Удаление партиции в чистовой таблице:
ALTER TABLE clean DROP PARTITION {partKey};
-- Перемещение партиции из промежуточной в чистовую таблицу:
ALTER TABLE clean MOVE PARTITION {partKey} TO TABLE clean;
# импорт используемых модулей
# import ...
# подключение к облаку
boto_session = boto3.session.Session(
aws_access_key_id=os.getenv('ACCESS_KEY_ID'),
aws_secret_access_key=os.getenv('SECRET_ACCESS_KEY')
)
# подключение к ClickHouse
clickhouse = Client(host=os.getenv('CH_HOST'),
user=os.getenv('CH_USER'),
password=os.getenv('CH_PASSWORD'),
database=os.getenv('CH_DB'),
secure=True)
# подключение к очереди Yandex Queue для отправки сообщений
ymq_queue = boto_session.resource(
service_name='sqs',
endpoint_url='https://message-queue.api.cloud.yandex.net',
region_name='ru-central1'
).Queue(os.getenv('YMQ_QUEUE_URL'))
# основной обработчик событий
def handler(event, context):
try:
if event is not None and type(event) == dict and 'messages' in event:
for message in event['messages']:
if 'event_metadata' in message and \
'event_type' in message['event_metadata'] and \
message['event_metadata'][
'event_type'] == 'yandex.cloud.events.serverless.triggers.TimerMessage':
# запуск по событию, инициированному триггером с типом Timer
# поиск партиций для merge и отправка заданий в очередь
queue_partitions()
else:
# запуск по сообщению из очереди
task_json = json.loads(message['details']['message']['body'])
partKey = str(task_json['partKey'])
# merge конкретной партиции
do_partition(partKey=partKey)
except Exception as ex:
logging.error(ex)
# поиск партиций
def queue_partitions():
# запрос поиска партиций для merge (см. выше пример)
query = '...'
rows_gen = clickhouse.execute_iter(query)
# отправка каждой партиции в очередь отдельным сообщением
for partKey in rows_gen:
ymq_queue.send_message(MessageBody=json.dumps({
'partKey': partKey[0]
}))
# merge конкретной партиции (см. последовательность запросов выше)
def do_partition(partKey):
# ...
CREATE TABLE clean
(
id Int64,
positions Nested(
id UInt16,
value1 String,
value2 String
),
insertTime DateTime('Europe/Moscow') DEFAULT now()
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(insertTime)
ORDER BY (id);
SELECT id, p.id, p.value1, p.value2
FROM clean ARRAY JOIN positions AS p
CREATE TABLE mappings
(
from String,
to String
) ENGINE = MergeTree()
ORDER BY (from);
SELECT p.value1, m.to value1_to
FROM clean ARRAY JOIN positions AS p
LEFT JOIN mappings AS m ON p.value1 = m.from;
SELECT p.value1, m1.to value1_to, p.value2, m2.to value2_to
FROM clean ARRAY JOIN positions AS p
LEFT JOIN mappings AS m1 ON p.value1 = m1.from
LEFT JOIN mappings AS m2 ON p.value2 = m2.from
DB::Exception: Multiple JOIN does not support mix with ARRAY JOINs
SELECT p.value1, value1_to, p.value2, m2.to value2_to FROM
(
SELECT p.value1, m1.to value1_to, p.value2
FROM clean ARRAY JOIN positions AS p
LEFT JOIN mappings AS m1 ON p.value1 = m1.from
) t
LEFT JOIN mappings AS m2 ON p.value2 = m2.from