Hei! In touch Artemiy - Analytics Engineer dari Wheely.
Hari ini saya ingin berbicara tentang mengubah indikator keuangan ke dalam mata uang yang berbeda. Pertanyaannya cukup relevan, karena banyak perusahaan memiliki zona kehadiran multinasional, membangun analitik skala global, menyiapkan laporan sesuai dengan standar internasional.
Izinkan saya menunjukkan kepada Anda bagaimana masalah ini diselesaikan dengan menggunakan pendekatan modern menggunakan contoh kasus Wheely:
: Open Exchange Rates, Airflow, Redshift Spectrum, dbt.
legacy- - . . , AED ( ). - BTC, ETH, - .
:
, API
,
( )
, . , :
API
()
legacy-
pivot- . , , .
pandas . (T ELT) , dbt.
, , – https://openexchangerates.org/
Developer :
10.000 ( )
,
API:
API endpoint /latest.json
- :
Airflow
Airflow. Apache Airflow – - , data engineering .
(DAG):
API
(, S3)
Slack
DAG:
(base currency),
DAG shell-:
TS=`date +"%Y-%m-%d-%H-%M-%S-%Z"`
curl -H "Authorization: Token $OXR_TOKEN" \
"https://openexchangerates.org/api/historical/$BUSINESS_DT.json?base=$BASE_CURRENCY&symbols=$SYMBOLS" \
| aws s3 cp - s3://$BUCKET/$BUCKET_PATH/$BUSINESS_DT-$BASE_CURRENCY-$TS.json
S3:
25 , :
, (, , ). .
, Developer API endpoint /time-series.json, upgrade .
/historical/*.json API :
#!/bin/bash
d=2011-01-01
while [ "$d" != 2021-02-19 ]; do
echo $d
curl -H "Authorization: Token $TOKEN" "https://openexchangerates.org/api/historical/$d.json?base=AED&symbols=AED,GBP,EUR,RUB,USD" > ./export/$d.json
d=$(date -j -v +1d -f "%Y-%m-%d" $d +%Y-%m-%d)
done
, , :
legacy- X ( -) .
, . - .
Data Lake. , :
legacy pivot-
PARQUET AWS S3
S3 PARQUET
CREATE EXTERNAL TABLE spectrum.currencies_cbrf
STORED AS PARQUET
LOCATION 's3://<BUCKET>/dwh/currencies_cbrf/' AS
WITH base AS (
SELECT 'EUR' AS base_currency
UNION ALL
SELECT 'GBP'
UNION ALL
SELECT 'RUB'
UNION ALL
SELECT 'USD'
)
SELECT
"day" AS business_dt
,b.base_currency
,CASE b.base_currency
WHEN 'EUR' THEN 1
WHEN 'GBP' THEN gbp_to_eur
WHEN 'RUB' THEN rub_to_eur
WHEN 'USD' THEN usd_to_eur
ELSE NULL
END AS eur
,CASE b.base_currency
WHEN 'EUR' THEN eur_to_gbp
WHEN 'GBP' THEN 1
WHEN 'RUB' THEN rub_to_gbp
WHEN 'USD' THEN usd_to_gbp
ELSE NULL
END AS gbp
,CASE b.base_currency
WHEN 'EUR' THEN eur_to_rub
WHEN 'GBP' THEN gbp_to_rub
WHEN 'RUB' THEN 1
WHEN 'USD' THEN usd_to_rub
ELSE NULL
END AS rub
,CASE b.base_currency
WHEN 'EUR' THEN eur_to_usd
WHEN 'GBP' THEN gbp_to_usd
WHEN 'RUB' THEN rub_to_usd
WHEN 'USD' THEN 1
ELSE NULL
END AS usd
FROM ext.currencies c
CROSS JOIN base b
;
, S3 , - , . .
DWH S3 External Table
– Amazon Redshift , .
– EXTERNAL TABLE, SQL- , S3. JSON, AVRO, ORC, PARQUET . Redshift Spectrum SQL- Amazon Athena, Presto.
CREATE EXTERNAL TABLE IF NOT EXISTS spectrum.currencies_oxr (
"timestamp" bigint
, base varchar(3)
, rates struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>
)
ROW format serde 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://<BUCKET>/dwh/currencies/'
;
rates struct.
dbt. dbt-external-tables EXTERNAL TABLES :
- name: external
schema: spectrum
tags: ["spectrum"]
loader: S3
description: "External data stored in S3 accessed vith Redshift Spectrum"
tables:
- name: currencies_oxr
description: "Currency Exchange Rates fetched from OXR API https://openexchangerates.org"
freshness:
error_after: {count: 15, period: hour}
loaded_at_field: timestamp 'epoch' + "timestamp" * interval '1 second'
external:
location: "s3://<BUCKET>/dwh/currencies/"
row_format: "serde 'org.openx.data.jsonserde.JsonSerDe'"
columns:
- name: timestamp
data_type: bigint
- name: base
data_type: varchar(3)
- name: rates
data_type: struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>
– source freshness test . . , .
– 15 – Slack.
() ( API) currencies:
{{
config(
materialized='table',
dist='all',
sort=["business_dt", "base_currency"]
)
}}
with cbrf as (
select
business_dt
, null as business_ts
, base_currency
, aed
, eur
, gbp
, rub
, usd
from {{ source('external', 'currencies_cbrf') }}
where business_dt <= '2021-02-18'
),
oxr_all as (
select
(timestamp 'epoch' + o."timestamp" * interval '1 second')::date as business_dt
, (timestamp 'epoch' + o."timestamp" * interval '1 second') as business_ts
, o.base as base_currency
, o.rates.aed::decimal(10,4) as aed
, o.rates.eur::decimal(10,4) as eur
, o.rates.gbp::decimal(10,4) as gbp
, o.rates.rub::decimal(10,4) as rub
, o.rates.usd::decimal(10,4) as usd
, row_number() over (partition by base_currency, business_dt order by business_ts desc) as rn
from {{ source('external', 'currencies_oxr') }} as o
where business_dt > '2021-02-18'
),
oxr as (
select
business_dt
, business_ts
, base_currency
, aed
, eur
, gbp
, rub
, usd
from {{ ref('stg_currencies_oxr_all') }}
where rn = 1
),
united as (
select
business_dt
, business_ts
, base_currency
, aed
, eur
, gbp
, rub
, usd
from cbrf
union all
select
business_dt
, business_ts
, base_currency
, aed
, eur
, gbp
, rub
, usd
from oxr
)
select
business_dt
, business_ts
, base_currency
, aed
, eur
, gbp
, rub
, usd
from united
Redshift .
, , . API, - JSON S3, . :
select
-- price_details
, r.currency
, {{ convert_currency('price', 'currency') }}
, {{ convert_currency('discount', 'currency') }}
, {{ convert_currency('insurance', 'currency') }}
, {{ convert_currency('tips', 'currency') }}
, {{ convert_currency('parking', 'currency') }}
, {{ convert_currency('toll_road', 'currency') }}
from {{ ref('requests') }} r
left join {{ ref('stg_currencies') }} currencies on r.completed_dt_utc = currencies.business_dt
and r.currency = currencies.base_currency
, :
-- currency conversion macro
{% macro convert_currency(convert_column, currency_code_column) -%}
( {{ convert_column }} * aed )::decimal(18,4) as {{ convert_column }}_aed
, ( {{ convert_column }} * eur )::decimal(18,4) as {{ convert_column }}_eur
, ( {{ convert_column }} * gbp )::decimal(18,4) as {{ convert_column }}_gbp
, ( {{ convert_column }} * rub )::decimal(18,4) as {{ convert_column }}_rub
, ( {{ convert_column }} * usd )::decimal(18,4) as {{ convert_column }}_usd
{%- endmacro %}
-
– . . , .
Data Engineer OTUS, .
, . – :
Data Architecture
Data Lake
Data Warehouse
NoSQL / NewSQL
MLOps
.