Вводная статья о основных возможностях DBT.
- Установка
- Модели
- Макросы
- Внешние таблицы
- Материализация
- Jinja интерпретатор
- Пакеты функций
- Тесты
- Таргет подключения
- Снапшоты
- События
- ADHoc анализ
- Документация
- Трекинг изменений
- Версионирование моделей
Установка
Dbt будет работать в связке с Postgres, для этого поставим его в отдельный контейнер:
version: '3.9'
# Defining the named volume
volumes:
database:
services:
database:
image: 'postgres:latest'
restart: 'always'
environment:
POSTGRES_USER: 'admin'
POSTGRES_PASSWORD: 'admin'
POSTGRES_DB: 'dbt'
volumes:
# Mounting the named volume
- 'database:/home/pihel/Documents/dbt/pg-data'
ports:
- '5432:5432'
Запуск Pg:
docker compose -f docker-compose.yaml upDoсker образ для работы с Dbt в связке с vscode:
FROM python:3.12-slim
SHELL ["/bin/bash", "-c"]
RUN python3.12 -m venv ~/.dbt && \
source ~/.dbt/bin/activate && \
pip install dbt-core && \
pip install dbt-postgres && \
mkdir -p /opt/dbt && \
apt-get -y update && \
apt-get -y install curl && \
apt-get -y install git
# install VS Code (code-server)
RUN curl -fsSL https://code-server.dev/install.sh | sh
# install VS Code extensions
RUN code-server --install-extension redhat.vscode-yaml \
--install-extension ms-python.python
ENTRYPOINT ["/bin/bash"]
Запуск и подключение к контейнеру с Dbt:
docker run -it \
-v /home/pihel/Documents/dbt/vscode-server:/root/.vscode-server \
-v /home/pihel/Documents/dbt/code:/opt/dbt \
--entrypoint /bin/bash \
--rm \
dbt
Подключаем Dbt к Postgress:
source ~/.dbt/bin/activate cd /opt/dbt dbt init #host=Ip машины в локальной сети dbt debug dbt run
Модели
- Настройка параметров моделейДелаются в файле dbt_project.yaml, параметр должен начинаться с +
models: test_project:- Запуск моделей
dbt run- Просмотр результата компиляции модели:
dbt compileРезультат компиляции будет в target-path
- Модель - параметризируемый sql запрос
название таблицы/вью берется из названия файл
обращение к модели {{ ref("model_name") }} из другой модели
переменные проекта можно менять прямо в sql модели
{{ config(
materialized="table",
partition_by={
"field": "order_date",
"data_type": "date",
"granularity": "day"
}
) }}
SELECT 1 AS id
- для моделей можно задавать алиасы, тогда таблица будет сохранться не с именем файла, а по алиасу
config:
alias: processed_orders
- Задание тега дл модели
{{
config(tags=["daily", "weekly"])
}}
SELECT 1 AS id
Которые используется для фильтрации при запуске:
dbt run -s tag:daily
Макросы
Чтото похожее на функции, написанные на jinja
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if custom_schema_name is none -%}
{{ default_schema }}
{%- else -%}
{{ custom_schema_name | trim }}
{%- endif -%}
{%- endmacro %}
- питон макросы работают только в dbt-snowflake, dbt-bigquery, dbt-databricks
import pandas as pd
#либо это может быть пайспарк дф
#from pyspark.sql.functions
def model(dbt, session):
# Use the Python-compatible materialization
dbt.config(materialized="python")
# Reference the upstream model or table
df = dbt.ref("magic_number").to_pandas()
# Add a new column with the transformed data
df['next_id'] = df['magic_number'] + 1
# Return the modified DataFrame
return df
Внешние таблицы
Таблицы созданные не с помощью dbt описываются в /models/sources
#/models/sources/shop.yml:
version: 2
sources:
- name: raw
schema: public
tables:
- name: raw_orders
- name: raw_shops
- Используется через source
SELECT date, COUNT(DISTINCT order_id) AS orders
FROM {{ source('raw', 'raw_orders') }}
GROUP BY date
Загрузка данных
создаем папку seed, в нее кладем csv,запускаем dbt seed
создастся модель с название = названию файла
Материализация
Incremental models
- полная перезагрузка инкрементов:dbt run --full-refresh- Указание типа материализации в модели
{{ config(
materialized="incremental",
unique_key= "shop_id"
)
}}
select shop_id,shop_name from {{ ref("shops") }}
{% if is_incremental() %}
WHERE shop_id > (SELECT MAX(shop_id) FROM {{this}})
{% endif %}
- Как будет выглядеть скомпилированный запрос:
dbt compile- delete+insert для базы без merge
delete from "dbt"."public"."process_shops"
where (
shop_id) in (
select (shop_id)
from "process_shops__dbt_tmp183022708624"
);
insert into "dbt"."public"."process_shops" ("shop_id", "shop_name")
(
select "shop_id", "shop_name"
from "process_shops__dbt_tmp183022708624"
)
Insert overwrite
- insert_overwrite модель - база должна поддерживать Insert overwrite
{{ config(
materialized="incremental",
incremental_strategy= "insert_overwrite",
partition_by={
"field" :"order_id",
"data_type": "INT64",
"range": {
"start": 1,
"end": 100,
"interval": 10
}
}
)
}}
select * ...
Принцип: до insert сформируется dbt_partitions_for_replacement - массив с партициями, которые есть в и новых и старых данных.
Он далее используется для удаления
Ephemeral models
{{ config(materialized="ephemeral")}}
запрос не оформляется во view, а делается WITH каждый раз на лету
Jinja интерпретатор
Переменные
- могут быть определены в project.ymlvars: shop_name: 'the_greatest_shop' shop_id: 14- можно задать при запуске:
dbt run --vars 'shop_name: chocolate_factory'- Использование с дефолтным значением
Hello {{ var('shop_name', 'default_shop') }}!
- доступ к переменных окружения
{{ env_var("DBT_DATASET", "models_dataset") }}
переменные с секретами должны начинаться на "DBT_ENV_SECRET*"- установка переменных в коде:
{% set name="Peter" -%}
SELECT "{{ name |default('John Doe') }}" AS name
| - дефолтное значение- многострочный вариант установки переменной:
{% set greeting_message -%}
Welcome to {{ country }}, {{ first_name }} !
{% endset -%}
Условия
- {% if... %} and {% endif %}
{% if weather == "sunny" -%}
Enjoy your holiday in the sun!
{% elif weather == "cold" -%}
Wear your warmest coat!
{% else -%}
Циклы
- For цикл:
{% for i in range(5) %}
field_{{i}},
{% endfor %}
- Foreach
{% set fruits = {"apples": 2, "bananas":4, "watermelons":1 } %}
{% for fruit, quantity in fruits.items() %}
There are {{ quantity }} {{fruit}}.
{% endfor %}
- Комментарии
{# first_name is a variable that contains the first name of the recipient #}
Функции
- Макрос, который можно переиспользовать
{% macro hello(person_to_greet) -%}
SELECT "Hello {{person_to_greet}}!" AS welcome_message
{% endmacro %}
{{ hello("peter") }}
- значеие макроса можно получить в переменную
{% set query=hello_world() %}
{{query}}
Пакеты функций
- набор полезных функций указывается в packages.yml
#packages.yml
packages:
- package: dbt-labs/dbt_utils
version: 1.0.0
- установка:
dbt depsДругие официальные пакеты:
dbt_external_tables - коннекторы
dbt_expectations - проверки качества данных
- порядок применения packages
#dbt_project.yaml
dispatch:
- macro_namespace: dbt_utils
search_order: ["shop_project", "dbt_utils", "dbt"]
- Важно указать порядок поиска макросов:
macro-paths: ["macros", "dbt_packages/dbt_utils/macros"]- использование в модели:
{{ log_info("This is just a dummy model") }}
SELECT 1
Создание своего пакета
запаковка своих функций в пакет:- делаем отдельную папку с нужными макросами
#macros/my_macro.sql
{% macro some_macro %}
- описываем пакет:
#dbt_project.yml name: 'my_package' version: '1.0.0' config-version: 2- вызов нашего пакета в другом проекте:
#packages.yml packages: - local: path_to_my_package- в macro-paths прописываем порядок поиска макросов
Тесты
- общии тесты:unique (только уникальные),
not_null (нет пустых),
accepted_values (допустимые знаения),
relationships - значения из другой модели
чтобы тесты могли работать, у модели должна быть схема рядом с моделью:
#/models/.../schema.yml
version: 2
models:
- name: good_orders
columns:
- name: order_id
tests:
- unique
- запуск
dbt run dbt test- можно протестировать определенные модели по тегу:
dbt test -s tag:marketing- можно сохранять результат тестов в файлах
#dbt_project.yml tests: +store_failures: trueлибо можно указать у конкретной модели в schema.yml
- Установка важности ошибки: warn , вместо error при тесте:
#/models/.../schema.yml
- name: bad_orders
columns:
- name: product_id
tests:
- not_null:
config:
severity: warn
- можно сделать ошибки допустимыми, но не более определенного числа:
#/models/.../schema.yml
- name: order_status
tests:
- accepted_values:
values: ["Shipped", "Canceled", "Delivered", "Accepted"]
config:
error_if: ">10"
warn_if: ">5"
- тестировать можно на части данных, если таблица очень большая:
- name: customer_id
tests:
- relationships:
to: ref('customers')
field: customer_id
config:
where: "order_date='Monday'"
Создание своих тестов
- написание своего теста - это написание макроса, который будет отбирать проблемные строки:
#macros/lower_than.sql
{% test lower_than(model, column_name, upper_bound) %}
SELECT
*
FROM {{ model }}
WHERE {{ column_name }} > {{ upper_bound }}
{% endtest %}
- Указание теста
#/models/.../schema.yml
- name: good_orders
columns:
- name: order_id
tests:
- unique
- lower_than:
upper_bound: 100
Тесты для конкретной таблицы
- singular test-тест для конкретной таблицы
#tests/assert_orders_is_not_empty.sql
SELECT COUNT(*)
FROM {{ ref("good_orders")}}
HAVING COUNT(*)=0
Таргет подключения
в profiles.yml можно сделать несколько подключений, target - указывает значение поумолчаниюно можно запустить конкретное из консоли:
dbt run --target prodдля таргета есть переменная, которую можно использовать в jinja:
{%- if target.name == "prod" -%} educative
{%- else -%} educative_dev
{%- endif -%}
Снапшоты
Сохранение изменений таблицы (scd)- Таблица для теста scd
create table raw_shops( shop_id int not null, shop_name varchar(250) not null, shop_param int, created_at timestamp not null default current_timestamp, updated_at timestamp not null default current_timestamp ); insert into raw_shops values(1, 'test1', 1); insert into raw_shops values(2, 'test2', 1); insert into raw_shops values(3, 'test3', 1);- Макрос с настройкой снапшота:
{% snapshot shop_snapshot %}
{{ config(
strategy="timestamp",
target_schema="public",
updated_at="updated_at",
unique_key="shop_id"
)}}
SELECT *
FROM {{ source('raw', 'raw_shops') }}
{% endsnapshot %}
- Запуск создание снапшота
dbt snapshot
Timestamp стратегия
- Что сгенерирует Dbt при первом запуске (просто создание таблицы)
create table "dbt"."public"."shop_snapshot"
as
(
select *,
md5(coalesce(cast(shop_id as varchar ), '')
|| '|' || coalesce(cast(updated_at as varchar ), '')
) as dbt_scd_id,
updated_at as dbt_updated_at,
updated_at as dbt_valid_from,
coalesce(nullif(updated_at, updated_at), null)
as dbt_valid_to
from (
SELECT *
FROM "dbt"."public"."raw_shops"
) sbq
);
- итоговая таблица будет иметь вид:
CREATE TABLE public.shop_snapshot ( shop_id int4 NULL, shop_name varchar(250) NULL, shop_param int4 NULL, created_at timestamp NULL, updated_at timestamp NULL, dbt_scd_id text NULL, --уникальный ключ версии dbt_updated_at timestamp NULL, --дата изменения dbt_valid_from timestamp NULL, - дата действи версии с - по dbt_valid_to timestamp NULL );- Со 2 запуска будет запрос трекающий вставки и изменения:
create temporary table "shop_snapshot__dbt_tmp181948366985"
as
(
with snapshot_query as (
SELECT *
FROM "dbt"."public"."raw_shops"
),
snapshotted_data as (
select *,
shop_id as dbt_unique_key
from "dbt"."public"."shop_snapshot"
where
dbt_valid_to is null
),
insertions_source_data as (
select *,
shop_id as dbt_unique_key
,
updated_at as dbt_updated_at,
updated_at as dbt_valid_from,
coalesce(nullif(updated_at, updated_at), null)
as dbt_valid_to
,
md5(coalesce(cast(shop_id as varchar ), '')
|| '|' || coalesce(cast(updated_at as varchar ), '')
) as dbt_scd_id
from snapshot_query
),
updates_source_data as (
select *,
shop_id as dbt_unique_key
,
updated_at as dbt_updated_at,
updated_at as dbt_valid_from,
updated_at as dbt_valid_to
from snapshot_query
),
insertions as (
select
'insert' as dbt_change_type,
source_data.*
from insertions_source_data as source_data
left outer join snapshotted_data
on
snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where
snapshotted_data.dbt_unique_key is null
or (
snapshotted_data.dbt_unique_key is not null
and ((snapshotted_data.dbt_valid_from < source_data.updated_at))
)
),
updates as (
select
'update' as dbt_change_type,
source_data.*,
snapshotted_data.dbt_scd_id
from updates_source_data as source_data
join snapshotted_data
on
snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where (
(snapshotted_data.dbt_valid_from < source_data.updated_at)
)
)
select * from insertions
union all
select * from updates
);
- Изменение целевой таблицы
update "dbt"."public"."shop_snapshot"
set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to
from "shop_snapshot__dbt_tmp181442823182" as DBT_INTERNAL_SOURCE
where DBT_INTERNAL_SOURCE.dbt_scd_id::text = "dbt"."public"."shop_snapshot".dbt_scd_id::text
and DBT_INTERNAL_SOURCE.dbt_change_type::text in ('update'::text, 'delete'::text)
and "dbt"."public"."shop_snapshot".dbt_valid_to is null;
insert into "dbt"."public"."shop_snapshot" ("shop_id", "shop_name", "shop_param", "created_at", "updated_at", "dbt_updated_at", "dbt_valid_from", "dbt_valid_to", "dbt_scd_id")
select DBT_INTERNAL_SOURCE."shop_id",DBT_INTERNAL_SOURCE."shop_name",DBT_INTERNAL_SOURCE."shop_param",DBT_INTERNAL_SOURCE."created_at",DBT_INTERNAL_SOURCE."updated_at",DBT_INTERNAL_SOURCE."dbt_updated_at",DBT_INTERNAL_SOURCE."dbt_valid_from",DBT_INTERNAL_SOURCE."dbt_valid_to",DBT_INTERNAL_SOURCE."dbt_scd_id"
from "shop_snapshot__dbt_tmp181442823182" as DBT_INTERNAL_SOURCE
where DBT_INTERNAL_SOURCE.dbt_change_type::text = 'insert'::text;
Check стратегия
{{ config(
strategy="check",
check_cols=['order_status'],
target_schema="educative",
unique_key="order_id"
)}}
различие будет в запросе merge, вместо сверки изменения updated_at будет проверяться изменение колонки order_status
События
on-run-start runs at the start of a dbt command.on-run-end runs at the end of a dbt command.
pre-hook runs at the start of a model, seed, or snapshot.
post-hook runs at the end of a model, seed, or snapshot.
хук может вызывать прямо в модели:
{{
config(
post_hook=manage_view()
)
}}
- manage_view - это макрос:
#macros/manage_view.sql
{% macro manage_view() %}
ALTER VIEW {{ this }}
SET OPTIONS (labels = [('managed_by', 'dbt')]);
{% endmacro %}
- пример общего хука, который будет логировать все запуски
#dbt_project.yml
on-run-start: "CREATE TABLE IF NOT EXISTS {{target.schema}}.dbt_model_runs(model_name text, run_started_at TIMESTAMP, run_ended_at TIMESTAMP);"
models:
+pre-hook: "INSERT INTO {{target.schema}}.dbt_model_runs VALUES('{{this.name}}', CURRENT_TIMESTAMP, NULL)"
+post-hook: "UPDATE {{target.schema}}.dbt_model_runs SET run_ended_at= CURRENT_TIMESTAMP WHERE model_name='{{this.name}}' AND run_started_at=(SELECT run_started_at FROM {{target.schema}}.dbt_model_runs WHERE model_name='{{this.name}}' AND run_ended_at IS NULL LIMIT 1)"
- Что будет в таблице:
model_name |run_started_at |run_ended_at | -------------------+-----------------------+-----------------------+ daily_orders |2025-02-10 18:42:06.322|2025-02-10 18:42:06.322| magic_number |2025-02-10 18:42:06.423|2025-02-10 18:42:06.423| mat_shops |2025-02-10 18:42:06.467|2025-02-10 18:42:06.467|
ADHoc анализ
временные скрипты аналитиков, но которые нужно периодически вызывать, помещаем в analyses
#/analyses/adhoc_orders.sql
SELECT order_date, COUNT(*)
FROM {{ source('raw', 'raw_orders') }}
WHERE order_date BETWEEN "{{ var('start_date')}}"
AND "{{ var('end_date')}}"
- получаем актуальный запрос через dbt compile
dbt compile -s adhoc_orders --vars '{start_date: "2025-02-10", end_date: "2025-02-10"}'
- Берем запрос в target и выполняем в таргет базе
SELECT order_date, COUNT(*) FROM "dbt"."public"."raw_orders" WHERE order_date BETWEEN "2025-02-10" AND "2025-02-10"
Документация
- Генерация документацииdbt docs generate- Запуск сервера, чтобы посмотреть доку:
dbt docs serveзапустится веб сервер, на который можно зайти по url http://127.0.0.1:8080/#!/
- так же можно подтянуть description таблиц и колонок из бд, через настройку:
#dbt_project.yml
models:
+persist_docs:
relation: true
columns: true
Трекинг изменений
каждое изменение сохранется в target/manifest.json - там полное описание проекта, по нему можно отслеживать измененияесли сохранить предыдущую версию проекта и указать в env:
export DBT_STATE=shop_project/previous_targetто можно смотреть изменения:
- новое
dbt ls --select "state:new"- измененное
dbt ls --select "state:modified"- можно детализировать, что ищем в изменениях:
dbt ls --select "state:modified.body"
Версионирование моделей
модели можно создавать с vN на конце:processed_orders_v1
- это будет считаться 1 моделью, но разными версиями
при описании моделей в dbt_project можно указать, которая последняя
#dbt_project.yml
models:
- name: processed_orders
latest_version: 2
versions:
- v: 3
- v: 2
- v: 1
deprecation_date: 2026-01-01
- к конкретной версии можно обратиться прямо в запросе:
{{ ref("processed_orders", v="3")}}
Комментариев нет:
Отправить комментарий