Пост-знакомство
Привет, меня зовут Стас.
Последние 7 лет я занимаюсь платформами данных - от хадупов в телекоме и проме до гибких платформ в среднем бизнесе.
3 года развиваю канал про dbt & modern data stack, люблю устраивать, проводить и участвовать в митапах: dbt meetups, x5 openmetadata meetup, rostelecom nifi meetup, croc bigdata meetup...
Канал про DBT сильно вырос и стало не всегда удобно делиться мыслями, достижениями и интересностями в нем - поэтому появился этот канал.
А еще наша команда сейчас занимается одним из первых внедрений интересной аналитической базы данных starrocks в русскоязычной сфере технологий - так что про это будет много :)
Всем спасибо за участие.
Привет, меня зовут Стас.
Последние 7 лет я занимаюсь платформами данных - от хадупов в телекоме и проме до гибких платформ в среднем бизнесе.
3 года развиваю канал про dbt & modern data stack, люблю устраивать, проводить и участвовать в митапах: dbt meetups, x5 openmetadata meetup, rostelecom nifi meetup, croc bigdata meetup...
Канал про DBT сильно вырос и стало не всегда удобно делиться мыслями, достижениями и интересностями в нем - поэтому появился этот канал.
А еще наша команда сейчас занимается одним из первых внедрений интересной аналитической базы данных starrocks в русскоязычной сфере технологий - так что про это будет много :)
Всем спасибо за участие.
🔥3❤1👍1
Про мощь тестов и кафку в старроксе.
Абсолютно случайно обнаружил интересную проблему в недавно заведенном потоке данных в starrocks - время в строчке отличалось от целевого на 1 секунду примерно в 50% случаев. Путем долгого копания и проверки исходных данных конечно я вышел на самого себя - картинка в заголовке ровно про это :)
А теперь откуда растут корни такого странного запроса.
В starrocks есть встроенный способ загрузки потоковых данных из kafka в таблички без всяких промежуточных таблиц и прочего непотребства - routine load. Работает быстро, стабильно, с небольшой нагрузкой, и даже умеет делать обновление по условию.
Но есть и ряд недостатков:
- нельзя фильтровать данные по полю, которого нет в итоговой табличке (на примере снизу event_name), но при этом можно отбрасывать использованные в других функциях колонки (payload)
- несмотря на большие количество функций по работе с временем нельзя получить datetime с миллисекундами из unixtime с миллисекундами
И про тесты: несмотря на двойной заслон тестов - мы поднимаем каждое новое задание через дев стенд и полные интеграционные тесты с прогоном fuzz данных, а далее потоковые данные по возможности сверяем с осевшими на проде - баг с картинки был пропущен :) Известная поговорка: тесты позволяют найти старые проблемы...
Абсолютно случайно обнаружил интересную проблему в недавно заведенном потоке данных в starrocks - время в строчке отличалось от целевого на 1 секунду примерно в 50% случаев. Путем долгого копания и проверки исходных данных конечно я вышел на самого себя - картинка в заголовке ровно про это :)
А теперь откуда растут корни такого странного запроса.
В starrocks есть встроенный способ загрузки потоковых данных из kafka в таблички без всяких промежуточных таблиц и прочего непотребства - routine load. Работает быстро, стабильно, с небольшой нагрузкой, и даже умеет делать обновление по условию.
Но есть и ряд недостатков:
- нельзя фильтровать данные по полю, которого нет в итоговой табличке (на примере снизу event_name), но при этом можно отбрасывать использованные в других функциях колонки (payload)
- несмотря на большие количество функций по работе с временем нельзя получить datetime с миллисекундами из unixtime с миллисекундами
И про тесты: несмотря на двойной заслон тестов - мы поднимаем каждое новое задание через дев стенд и полные интеграционные тесты с прогоном fuzz данных, а далее потоковые данные по возможности сверяем с осевшими на проде - баг с картинки был пропущен :) Известная поговорка: тесты позволяют найти старые проблемы...
CREATE ROUTINE LOAD dds.quotes ON quotes
COLUMNS(
event_name
, payload
, project=JSON_QUERY(payload, '$.project')
, pair=JSON_QUERY(payload, '$.code')
, ts=TIMESTAMPADD(MILLISECOND, cast(substring(cast(JSON_QUERY(payload, '$.timestamp_ms') as varchar(13)), -3) as int), FROM_UNIXTIME(CAST(JSON_QUERY(payload, '$.timestamp_ms') AS BIGINT) / 1000))
, ask=JSON_QUERY(payload, '$.ask')
, bid=JSON_QUERY(payload, '$.bid')
, mid=JSON_QUERY(payload, '$.mid')
),
WHERE event_name = 'ticks.platform'
PROPERTIES (
"max_batch_interval"="30",
"format"="json",
"jsonpaths"="[\"$.event_name\",\"$.payload\"]",
"timezone"="Europe/Moscow"
)
FROM KAFKA (
"kafka_broker_list"="kafka-main:9092",
"kafka_topic"="ticks.platform",
"property.kafka_default_offsets"="OFFSET_BEGINNING",
"property.group.id"="dp_sr_quotes"
);
👍6❤1🔥1