During one of the lessons we’ve been examining popular file formats. How does columnar storage, data encoding, compression algorithms make difference?
File formats comparison: CSV, JSON, Parquet, ORC
Key results
Whenever you need to store your data on S3 / Data Lake / External table choose file format wisely:
– Parquet / ORC are the best options due to efficient data layout, compression, indexing capabilities
– Columnar formats allow for column projection and partition pruning (reading only relevant data!)
– Binary formats enable schema evolution which is very applicable for constantly changing business environment
Inputs
– I used Clickhouse + S3 table engine to compare different file formats
– Single node Clickhouse database was used – s2.small preset: 4 vCPU, 100% vCPU rate, 16 GB RAM
– Source data: TPCH synthetic dataset for 1 year – 18.2M rows, 2GB raw CSV size
– A single query is run at a time to ensure 100% dedicated resources
To perform it yourself you might need Yandex.Cloud account, set up Clickhouse database, generate S3 keys. Source data is available via public S3 link: https://storage.yandexcloud.net/otus-dwh/dbgen/lineorder.tbl.
Comparison measures
1. Time to serialize / deserialize
What time does it take to write data on disk in a particular format?
– Compressed columnar formats ORC, Parquet take leadership here
– It takes x6 times longer to write JSON data on disk compared with columnar formats on average (120 sec. vs 20 sec.)
– The less data you write on disk the less time it takes - no surprise
2. Storage size
What amount of disk space is used to store data?
– Obviously best results show zstd compressed ORC and Parquet formats
– Worst result is uncompressed JSON which is almost 3 times larger than source CSV data (you have to copy schema for every row!)
– Great results for zstd compressed CSV data which is 632MB vs 2GB of uncompressed data
3. Query latency (response time)
How fast can one get query results for a simple analytical query?
3.1. OLAP query including whole dataset (12/12 months)
– Columnar formats outperform text formats because they allow to access only specific columns and there’s no excessive IO
– Compression accounts for lower IO operations thus lower latency
3.2. OLAP query including subset of rows (1/12 months)
– Results for this query are pretty much the same as for the previous one without WHERE condition
– Although columnar formats allow for partition pruning and reading only relevant rows (according to WHERE condition), it is not pushed down
– Clickhouse EXPLAIN command revealed that filter is applied only after the whole result set is returned from S3 😕
See scripts and queries →
File formats comparison: CSV, JSON, Parquet, ORC
Key results
Whenever you need to store your data on S3 / Data Lake / External table choose file format wisely:
– Parquet / ORC are the best options due to efficient data layout, compression, indexing capabilities
– Columnar formats allow for column projection and partition pruning (reading only relevant data!)
– Binary formats enable schema evolution which is very applicable for constantly changing business environment
Inputs
– I used Clickhouse + S3 table engine to compare different file formats
– Single node Clickhouse database was used – s2.small preset: 4 vCPU, 100% vCPU rate, 16 GB RAM
– Source data: TPCH synthetic dataset for 1 year – 18.2M rows, 2GB raw CSV size
– A single query is run at a time to ensure 100% dedicated resources
To perform it yourself you might need Yandex.Cloud account, set up Clickhouse database, generate S3 keys. Source data is available via public S3 link: https://storage.yandexcloud.net/otus-dwh/dbgen/lineorder.tbl.
Comparison measures
1. Time to serialize / deserialize
What time does it take to write data on disk in a particular format?
– Compressed columnar formats ORC, Parquet take leadership here
– It takes x6 times longer to write JSON data on disk compared with columnar formats on average (120 sec. vs 20 sec.)
– The less data you write on disk the less time it takes - no surprise
2. Storage size
What amount of disk space is used to store data?
– Obviously best results show zstd compressed ORC and Parquet formats
– Worst result is uncompressed JSON which is almost 3 times larger than source CSV data (you have to copy schema for every row!)
– Great results for zstd compressed CSV data which is 632MB vs 2GB of uncompressed data
3. Query latency (response time)
How fast can one get query results for a simple analytical query?
3.1. OLAP query including whole dataset (12/12 months)
– Columnar formats outperform text formats because they allow to access only specific columns and there’s no excessive IO
– Compression accounts for lower IO operations thus lower latency
3.2. OLAP query including subset of rows (1/12 months)
– Results for this query are pretty much the same as for the previous one without WHERE condition
– Although columnar formats allow for partition pruning and reading only relevant rows (according to WHERE condition), it is not pushed down
– Clickhouse EXPLAIN command revealed that filter is applied only after the whole result set is returned from S3 😕
See scripts and queries →
Gist
File formats comparison: CSV, JSON, Parquet, ORC
File formats comparison: CSV, JSON, Parquet, ORC. GitHub Gist: instantly share code, notes, and snippets.
[RU] Вредные советы при построении Аналитики (Data Lake / DWH / BI) – целеполагание и результаты
Продолжаю серию публикаций в формате “вредных советов”, целью которых является попытка обратить внимание на (не)лучшие практики и подходы в построении аналитических сервисов с реальными примерами и историями.
В этой публикации Вас ожидает:
– Выполнение задач без четкого понимания целей - Question your customer;
– Игнорирование оценки ожидаемых характеристик данных - Assess data expectations;
– Пренебрежение документацией и пояснениями к коду - Ensure access and transparency.
#best_practices #dwh
Читать на Хабр →
Продолжаю серию публикаций в формате “вредных советов”, целью которых является попытка обратить внимание на (не)лучшие практики и подходы в построении аналитических сервисов с реальными примерами и историями.
В этой публикации Вас ожидает:
– Выполнение задач без четкого понимания целей - Question your customer;
– Игнорирование оценки ожидаемых характеристик данных - Assess data expectations;
– Пренебрежение документацией и пояснениями к коду - Ensure access and transparency.
#best_practices #dwh
Читать на Хабр →
Хабр
Вредные советы при построении Аналитики (Data Lake / DWH / BI) – целеполагание и результаты
Всем привет! На связи Артемий Козырь – Analytics Engineer. Продолжаю серию публикаций в формате “вредных советов” , целью которых является попытка обратить внимание на (не)лучшие практики и подходы в...
⚡️Easily sync Google Sheets with Clickhouse
gSheets is the easiest way to maintain and edit mappings, target KPI values, custom dimensions and events.
It allows business users real-time editing data and seeing the results without knowing anything about Big Data, SQL, and columnar databases.
What I love about Clickhouse is its rich data integration capabilities. Here's what you do in 3 minutes:
1. Create gSheet with any structured data.
2. Get a direct link to data export to one of the known formats (CSV, JSON).
3. Create a table with engine URL specifying schema, gSheet link and file format.
#dwh #clickhouse #pipelines
gSheets is the easiest way to maintain and edit mappings, target KPI values, custom dimensions and events.
It allows business users real-time editing data and seeing the results without knowing anything about Big Data, SQL, and columnar databases.
What I love about Clickhouse is its rich data integration capabilities. Here's what you do in 3 minutes:
1. Create gSheet with any structured data.
2. Get a direct link to data export to one of the known formats (CSV, JSON).
3. Create a table with engine URL specifying schema, gSheet link and file format.
CREATE TABLE dbt.margin_rate (4. Query real-time in sub-second manner!
`date_month` Date
, `margin_rate` Float64
)
ENGINE=URL('https://docs.google.com/spreadsheets/d/{key}/gviz/tq?tqx=out:csv&sheet=margin_rate', CSVWithNames)
;
SELECT * FROM dbt.margin_rate LIMIT 200 ;
Data Warehouses have never been such interactive.#dwh #clickhouse #pipelines
test_failure.png
29.8 KB
Hey, everyone!
Today I’ve faced with an issue of missing rows in my incremental dbt model.
A little background: I calculate GEO-zones for a large amount of events, which is pretty CPU-intensive operation. And of course I want to do it in incremental way, hence only processing deltas - new and changed rows.
If you still don’t use incremental models, you probably want to take a look at dbtLabs Docs on it.
So, first of all, it was relationships test which has given me a clue I was missing some rows.
Today I’ve faced with an issue of missing rows in my incremental dbt model.
A little background: I calculate GEO-zones for a large amount of events, which is pretty CPU-intensive operation. And of course I want to do it in incremental way, hence only processing deltas - new and changed rows.
If you still don’t use incremental models, you probably want to take a look at dbtLabs Docs on it.
So, first of all, it was relationships test which has given me a clue I was missing some rows.
The initial incremental clause looked like this:
So I basically took only rows where metadata timestamp was newer (later) than timestamp of those rows I already have in my table.
Then I realized sometimes metadata timestamp (which is the exact time when the row was fetched from source system) does not increase gradually, some rows get into different batches and land to DWH at different times, so I introduced a lookback period of 3 hours to make sure I don’t miss anything:
That means I have more assurance I will take all the missing rows, but on the other hand I will process a significant amount of rows multiple times as well as trigger more UPDATEs and DELETEs on my target table since incremental periods overlap.
It was all going OK until rare cases of EXTRACT - LOAD errors / pauses / outages showed that even 3 hours of lookback period sometimes is not sufficient.
where true
{% if is_incremental() %}
and orders.__metadata_timestamp >=
(select max(__metadata_timestamp) as high_watermark from {{ this }})
{% endif %}
So I basically took only rows where metadata timestamp was newer (later) than timestamp of those rows I already have in my table.
Then I realized sometimes metadata timestamp (which is the exact time when the row was fetched from source system) does not increase gradually, some rows get into different batches and land to DWH at different times, so I introduced a lookback period of 3 hours to make sure I don’t miss anything:
where true
{% if is_incremental() %}
and orders.__metadata_timestamp >=
(select max(__metadata_timestamp) as high_watermark from {{ this }}) - interval '3 hours'
{% endif %}
That means I have more assurance I will take all the missing rows, but on the other hand I will process a significant amount of rows multiple times as well as trigger more UPDATEs and DELETEs on my target table since incremental periods overlap.
It was all going OK until rare cases of EXTRACT - LOAD errors / pauses / outages showed that even 3 hours of lookback period sometimes is not sufficient.
One can increase lookback interval to 12, 24 or even 48 hours which totally can solve most of the cases, but I decided to rewrite my query in a smart way using
That simply means:
– take either completely new rows (‘request_id’ does not exist in {{ this }})
– or take ‘request_id’ which exist in {{ this }} but have different __metadata_timestamp (row has been modified)
I thought it was perfect, but Amazon Redshift didn’t think so 😅:
> This type of correlated subquery pattern is not supported due to internal error
NOT EXISTS clause:where true
{% if is_incremental() %}
and not exists (
select 1
from {{ this }}
where orders.request_id = {{ this }}.request_id
and orders.__metadata_timestamp = {{ this }}.__metadata_timestamp
)
{% endif %}
That simply means:
– take either completely new rows (‘request_id’ does not exist in {{ this }})
– or take ‘request_id’ which exist in {{ this }} but have different __metadata_timestamp (row has been modified)
I thought it was perfect, but Amazon Redshift didn’t think so 😅:
> This type of correlated subquery pattern is not supported due to internal error
Amazon
Correlated subqueries - Amazon Redshift
Provides examples of how to use correlated subqueries in the WHERE clause.
So took another apporach – Anti-join
But unfortunately EXPLAIN plan revealed Redshift performs anti-join + filtering after performing 2 GEO-spatial joins.
Which technically means processing full data set and resets any incrementality benefits.
{% if is_incremental() %}
left join {{ this }}
on orders.request_id = {{ this }}.request_id
and orders.__metadata_timestamp = {{ this }}.__metadata_timestamp
where {{ this }}.request_id is null
{% endif %} But unfortunately EXPLAIN plan revealed Redshift performs anti-join + filtering after performing 2 GEO-spatial joins.
Which technically means processing full data set and resets any incrementality benefits.
In older times I would just use a hint to make joins run in a specific way to filter rows early, however today just shuffling join order was good enough!
from {{ ref('stg_orders_tmp') }} as orders
{% if is_incremental() %}
left join {{ this }}
on orders.request_id = {{ this }}.request_id
and orders.__metadata_timestamp = {{ this }}.__metadata_timestamp
{% endif %}
left join {{ ref('stg_zones_tmp') }} as pickup
on ST_Intersects(
ST_Point(orders.pickup_position_lon, orders.pickup_position_lat), pickup.geometry)
left join {{ ref('stg_zones_tmp') }} as dropoff
on ST_Intersects(
ST_Point(orders.dropoff_position_lon, orders.dropoff_position_lat), dropoff.geometry)
{% if is_incremental() %}
where {{ this }}.request_id is null
{% endif %}