воскресенье, 16 февраля 2025 г.

Dbt

Вводная статья о основных возможностях DBT.

Установка

Dbt будет работать в связке с Postgres, для этого поставим его в отдельный контейнер:
version: '3.9'

# Defining the named volume
volumes:
    database:

services:

    database:
        image: 'postgres:latest'
        restart: 'always'
        environment:
            POSTGRES_USER: 'admin'
            POSTGRES_PASSWORD: 'admin'
            POSTGRES_DB: 'dbt'
        volumes:
            # Mounting the named volume
            - 'database:/home/pihel/Documents/dbt/pg-data'
        ports:
            - '5432:5432'
Запуск Pg:
docker compose -f docker-compose.yaml up
Doсker образ для работы с Dbt в связке с vscode:
FROM python:3.12-slim

SHELL ["/bin/bash", "-c"] 

RUN python3.12 -m venv ~/.dbt && \
    source ~/.dbt/bin/activate && \
    pip install dbt-core && \
    pip install dbt-postgres && \
    mkdir -p /opt/dbt && \
    apt-get -y update && \
    apt-get -y install curl && \ 
    apt-get -y install git

# install VS Code (code-server)
RUN curl -fsSL https://code-server.dev/install.sh | sh

# install VS Code extensions
RUN code-server --install-extension redhat.vscode-yaml \
                --install-extension ms-python.python

ENTRYPOINT ["/bin/bash"]
Запуск и подключение к контейнеру с Dbt:
docker run -it \
    -v /home/pihel/Documents/dbt/vscode-server:/root/.vscode-server \
    -v /home/pihel/Documents/dbt/code:/opt/dbt \
    --entrypoint /bin/bash \
    --rm \
    dbt
Подключаем Dbt к Postgress:
source ~/.dbt/bin/activate
cd /opt/dbt
dbt init
#host=Ip машины в локальной сети
dbt debug
dbt run

Модели

- Настройка параметров моделей
Делаются в файле dbt_project.yaml, параметр должен начинаться с +
models:
  test_project:
- Запуск моделей
dbt run
- Просмотр результата компиляции модели:
dbt compile
Результат компиляции будет в target-path

- Модель - параметризируемый sql запрос
название таблицы/вью берется из названия файл
обращение к модели {{ ref("model_name") }} из другой модели
переменные проекта можно менять прямо в sql модели
{{ config(
    materialized="table",
    partition_by={
      "field": "order_date",
      "data_type": "date",
      "granularity": "day"
    }
) }}
SELECT 1 AS id
- для моделей можно задавать алиасы, тогда таблица будет сохранться не с именем файла, а по алиасу
        config:
          alias: processed_orders
- Задание тега дл модели
{{
  config(tags=["daily", "weekly"])
}}
SELECT 1 AS id
Которые используется для фильтрации при запуске:
dbt run -s tag:daily

Макросы

Чтото похожее на функции, написанные на jinja
{% macro generate_schema_name(custom_schema_name, node) -%}
    {%- set default_schema = target.schema -%}
    {%- if custom_schema_name is none -%}
        {{ default_schema }}
    {%- else -%}
        {{ custom_schema_name | trim }}
    {%- endif -%}
{%- endmacro %}

- питон макросы работают только в dbt-snowflake, dbt-bigquery, dbt-databricks
import pandas as pd
#либо это может быть пайспарк дф
#from pyspark.sql.functions 

def model(dbt, session):
    # Use the Python-compatible materialization
    dbt.config(materialized="python")

    # Reference the upstream model or table
    df = dbt.ref("magic_number").to_pandas()

    # Add a new column with the transformed data
    df['next_id'] = df['magic_number'] + 1

    # Return the modified DataFrame
    return df

Внешние таблицы

Таблицы созданные не с помощью dbt описываются в /models/sources
#/models/sources/shop.yml:
version: 2

sources:
  - name: raw
    schema: public
    tables:
      - name: raw_orders
      - name: raw_shops

- Используется через source
SELECT date, COUNT(DISTINCT order_id) AS orders
FROM {{ source('raw', 'raw_orders') }}
GROUP BY date

Загрузка данных

создаем папку seed, в нее кладем csv,
запускаем dbt seed
создастся модель с название = названию файла

Материализация

Incremental models

- полная перезагрузка инкрементов:
dbt run --full-refresh
- Указание типа материализации в модели
{{ config(
    materialized="incremental",
    unique_key= "shop_id"
    )
}}
select shop_id,shop_name from {{ ref("shops") }}
{% if is_incremental() %}
WHERE shop_id > (SELECT MAX(shop_id) FROM {{this}})
{% endif %}
- Как будет выглядеть скомпилированный запрос:
dbt compile 
- delete+insert для базы без merge
delete from "dbt"."public"."process_shops"
where (
    shop_id) in (
    select (shop_id)
    from "process_shops__dbt_tmp183022708624"
);
insert into "dbt"."public"."process_shops" ("shop_id", "shop_name")
(
select "shop_id", "shop_name"
from "process_shops__dbt_tmp183022708624"
)

Insert overwrite

- insert_overwrite модель - база должна поддерживать Insert overwrite
{{ config(
    materialized="incremental",
    incremental_strategy= "insert_overwrite",
    partition_by={
        "field" :"order_id", 
        "data_type": "INT64", 
        "range": {
            "start": 1,
            "end": 100,
            "interval": 10
      }
    }
    )
}}
select * ...
Принцип:
до insert сформируется dbt_partitions_for_replacement - массив с партициями, которые есть в и новых и старых данных.
Он далее используется для удаления

Ephemeral models

{{ config(materialized="ephemeral")}}
запрос не оформляется во view, а делается WITH каждый раз на лету

Jinja интерпретатор

Переменные

- могут быть определены в project.yml
vars:
  shop_name: 'the_greatest_shop'
  shop_id: 14
- можно задать при запуске:
dbt run --vars 'shop_name: chocolate_factory'
- Использование с дефолтным значением
Hello {{ var('shop_name', 'default_shop') }}!
- доступ к переменных окружения
{{ env_var("DBT_DATASET", "models_dataset") }}
переменные с секретами должны начинаться на "DBT_ENV_SECRET*"
- установка переменных в коде:
{% set name="Peter" -%}
SELECT "{{ name |default('John Doe') }}" AS name
| - дефолтное значение
- многострочный вариант установки переменной:
{% set greeting_message -%}
Welcome to {{ country }}, {{ first_name }} !
{% endset -%}

Условия

- {% if... %} and {% endif %}
{% if weather == "sunny"  -%}
    Enjoy your holiday in the sun!
{% elif weather == "cold" -%}
    Wear your warmest coat!
{% else -%}

Циклы

- For цикл:
{% for i in range(5) %}
    field_{{i}},
{% endfor %}
- Foreach
{% set fruits = {"apples": 2, "bananas":4, "watermelons":1 } %}
{% for fruit, quantity in fruits.items() %}
    There are {{ quantity }} {{fruit}}.
{% endfor %}
- Комментарии
{# first_name is a variable that contains the first name of the recipient #}

Функции

- Макрос, который можно переиспользовать
{% macro hello(person_to_greet) -%}
    SELECT "Hello {{person_to_greet}}!" AS welcome_message
{% endmacro %}
{{ hello("peter") }}
- значеие макроса можно получить в переменную
{% set query=hello_world() %}
{{query}}

Пакеты функций

- набор полезных функций указывается в packages.yml
#packages.yml
packages:
  - package: dbt-labs/dbt_utils
    version: 1.0.0
- установка:
dbt deps
Другие официальные пакеты:
dbt_external_tables - коннекторы
dbt_expectations - проверки качества данных
- порядок применения packages
#dbt_project.yaml
dispatch:
  - macro_namespace: dbt_utils
    search_order: ["shop_project", "dbt_utils", "dbt"]
- Важно указать порядок поиска макросов:
macro-paths: ["macros", "dbt_packages/dbt_utils/macros"]
- использование в модели:
{{ log_info("This is just a dummy model") }}
SELECT 1

Создание своего пакета

запаковка своих функций в пакет:
- делаем отдельную папку с нужными макросами
#macros/my_macro.sql
{% macro some_macro %}
- описываем пакет:
#dbt_project.yml
name: 'my_package'
version: '1.0.0'
config-version: 2
- вызов нашего пакета в другом проекте:
#packages.yml
packages:
  - local: path_to_my_package
- в macro-paths прописываем порядок поиска макросов

Тесты

- общии тесты:
unique (только уникальные),
not_null (нет пустых),
accepted_values (допустимые знаения),
relationships - значения из другой модели

чтобы тесты могли работать, у модели должна быть схема рядом с моделью:
#/models/.../schema.yml
version: 2

models:
  - name: good_orders
    columns:
      - name: order_id
        tests:
          - unique
- запуск
dbt run
dbt test
- можно протестировать определенные модели по тегу:
dbt test -s tag:marketing
- можно сохранять результат тестов в файлах
#dbt_project.yml
tests:
  +store_failures: true
либо можно указать у конкретной модели в schema.yml

- Установка важности ошибки: warn , вместо error при тесте:
#/models/.../schema.yml
  - name: bad_orders
    columns:
      - name: product_id
        tests:
          - not_null:
              config:
                severity: warn
- можно сделать ошибки допустимыми, но не более определенного числа:
#/models/.../schema.yml
      - name: order_status
        tests:
          - accepted_values:
             values: ["Shipped", "Canceled", "Delivered", "Accepted"]
             config:
              error_if: ">10"
              warn_if: ">5"
- тестировать можно на части данных, если таблица очень большая:
      - name: customer_id
        tests:
          - relationships:
             to: ref('customers')
             field: customer_id
             config:
              where: "order_date='Monday'"

Создание своих тестов

- написание своего теста - это написание макроса, который будет отбирать проблемные строки:
#macros/lower_than.sql
{% test lower_than(model, column_name, upper_bound) %}
SELECT 
   *
FROM {{ model }}
WHERE {{ column_name }} > {{ upper_bound }}
{% endtest %}
- Указание теста
#/models/.../schema.yml
  - name: good_orders
    columns:
      - name: order_id
        tests:
          - unique
          - lower_than:
             upper_bound: 100

Тесты для конкретной таблицы

- singular test-тест для конкретной таблицы
#tests/assert_orders_is_not_empty.sql
SELECT COUNT(*)
FROM {{ ref("good_orders")}}
HAVING COUNT(*)=0

Таргет подключения

в profiles.yml можно сделать несколько подключений, target - указывает значение поумолчанию
но можно запустить конкретное из консоли:
dbt run --target prod
для таргета есть переменная, которую можно использовать в jinja:
{%- if  target.name == "prod" -%} educative
{%- else -%} educative_dev
{%- endif -%}

Снапшоты

Сохранение изменений таблицы (scd)
- Таблица для теста scd
create table raw_shops(
shop_id int not null,
shop_name varchar(250) not null,
shop_param int,
created_at timestamp not null default current_timestamp,
updated_at timestamp not null default current_timestamp
);
insert into raw_shops values(1, 'test1', 1);
insert into raw_shops values(2, 'test2', 1);
insert into raw_shops values(3, 'test3', 1);
- Макрос с настройкой снапшота:
{% snapshot shop_snapshot %}

{{ config(
    strategy="timestamp", 
    target_schema="public",
    updated_at="updated_at",
    unique_key="shop_id"
)}}

SELECT *
FROM {{ source('raw', 'raw_shops') }}

{% endsnapshot %}
- Запуск создание снапшота
dbt snapshot

Timestamp стратегия

- Что сгенерирует Dbt при первом запуске (просто создание таблицы)
  create  table "dbt"."public"."shop_snapshot"
    as
  (
    select *,
        md5(coalesce(cast(shop_id as varchar ), '')
         || '|' || coalesce(cast(updated_at as varchar ), '')
        ) as dbt_scd_id,
        updated_at as dbt_updated_at,
        updated_at as dbt_valid_from,
  coalesce(nullif(updated_at, updated_at), null)
  as dbt_valid_to
from (
SELECT *
FROM "dbt"."public"."raw_shops"
    ) sbq
  );
- итоговая таблица будет иметь вид:
CREATE TABLE public.shop_snapshot (
	shop_id int4 NULL,
	shop_name varchar(250) NULL,
	shop_param int4 NULL,
	created_at timestamp NULL,
	updated_at timestamp NULL,
	dbt_scd_id text NULL, --уникальный ключ версии
	dbt_updated_at timestamp NULL, --дата изменения
	dbt_valid_from timestamp NULL, - дата действи версии с - по
	dbt_valid_to timestamp NULL
);
- Со 2 запуска будет запрос трекающий вставки и изменения:
create temporary table "shop_snapshot__dbt_tmp181948366985"
    as
  (
    with snapshot_query as (
SELECT *
FROM "dbt"."public"."raw_shops"
    ),
    snapshotted_data as (
        select *, 
        shop_id as dbt_unique_key
        from "dbt"."public"."shop_snapshot"
        where
                dbt_valid_to is null
    ),
    insertions_source_data as (

        select *, 
    
        shop_id as dbt_unique_key
    
,
            updated_at as dbt_updated_at,
            updated_at as dbt_valid_from,
            
  
  coalesce(nullif(updated_at, updated_at), null)
  as dbt_valid_to
,
            md5(coalesce(cast(shop_id as varchar ), '')
         || '|' || coalesce(cast(updated_at as varchar ), '')
        ) as dbt_scd_id

        from snapshot_query
    ),

    updates_source_data as (

        select *, 
    
        shop_id as dbt_unique_key
    
,
            updated_at as dbt_updated_at,
            updated_at as dbt_valid_from,
            updated_at as dbt_valid_to

        from snapshot_query
    ),

    insertions as (

        select
            'insert' as dbt_change_type,
            source_data.*

        from insertions_source_data as source_data
        left outer join snapshotted_data
            on 
    
        snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
    

            where 
    
        snapshotted_data.dbt_unique_key is null
    

            or (
    
        snapshotted_data.dbt_unique_key is not null
    
 and ((snapshotted_data.dbt_valid_from < source_data.updated_at))

        )

    ),

    updates as (

        select
            'update' as dbt_change_type,
            source_data.*,
            snapshotted_data.dbt_scd_id

        from updates_source_data as source_data
        join snapshotted_data
            on 
    
        snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
    

        where (
            (snapshotted_data.dbt_valid_from < source_data.updated_at)
        )
    )

    select * from insertions
    union all
    select * from updates

  );
- Изменение целевой таблицы
      update "dbt"."public"."shop_snapshot"
    set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to
    from "shop_snapshot__dbt_tmp181442823182" as DBT_INTERNAL_SOURCE
    where DBT_INTERNAL_SOURCE.dbt_scd_id::text = "dbt"."public"."shop_snapshot".dbt_scd_id::text
      and DBT_INTERNAL_SOURCE.dbt_change_type::text in ('update'::text, 'delete'::text)
        and "dbt"."public"."shop_snapshot".dbt_valid_to is null;
      
    insert into "dbt"."public"."shop_snapshot" ("shop_id", "shop_name", "shop_param", "created_at", "updated_at", "dbt_updated_at", "dbt_valid_from", "dbt_valid_to", "dbt_scd_id")
    select DBT_INTERNAL_SOURCE."shop_id",DBT_INTERNAL_SOURCE."shop_name",DBT_INTERNAL_SOURCE."shop_param",DBT_INTERNAL_SOURCE."created_at",DBT_INTERNAL_SOURCE."updated_at",DBT_INTERNAL_SOURCE."dbt_updated_at",DBT_INTERNAL_SOURCE."dbt_valid_from",DBT_INTERNAL_SOURCE."dbt_valid_to",DBT_INTERNAL_SOURCE."dbt_scd_id"
    from "shop_snapshot__dbt_tmp181442823182" as DBT_INTERNAL_SOURCE
    where DBT_INTERNAL_SOURCE.dbt_change_type::text = 'insert'::text;

Check стратегия

{{ config(
    strategy="check", 
    check_cols=['order_status'], 
    target_schema="educative", 
    unique_key="order_id"
)}}
различие будет в запросе merge, вместо сверки изменения updated_at будет проверяться изменение колонки order_status

События

on-run-start runs at the start of a dbt command.
on-run-end runs at the end of a dbt command.
pre-hook runs at the start of a model, seed, or snapshot.
post-hook runs at the end of a model, seed, or snapshot.

хук может вызывать прямо в модели:
{{
    config(
        post_hook=manage_view()
    )
}}
- manage_view - это макрос:
#macros/manage_view.sql
{% macro manage_view() %}
ALTER VIEW {{ this }} 
SET OPTIONS (labels = [('managed_by', 'dbt')]);
{% endmacro %}

- пример общего хука, который будет логировать все запуски
#dbt_project.yml
on-run-start: "CREATE TABLE IF NOT EXISTS {{target.schema}}.dbt_model_runs(model_name text, run_started_at TIMESTAMP, run_ended_at TIMESTAMP);"
models:
  +pre-hook: "INSERT INTO {{target.schema}}.dbt_model_runs VALUES('{{this.name}}', CURRENT_TIMESTAMP, NULL)"
  +post-hook: "UPDATE {{target.schema}}.dbt_model_runs SET run_ended_at= CURRENT_TIMESTAMP WHERE model_name='{{this.name}}' AND run_started_at=(SELECT run_started_at FROM {{target.schema}}.dbt_model_runs WHERE model_name='{{this.name}}' AND run_ended_at IS NULL LIMIT 1)"
- Что будет в таблице:
model_name         |run_started_at         |run_ended_at           |
-------------------+-----------------------+-----------------------+
daily_orders       |2025-02-10 18:42:06.322|2025-02-10 18:42:06.322|
magic_number       |2025-02-10 18:42:06.423|2025-02-10 18:42:06.423|
mat_shops          |2025-02-10 18:42:06.467|2025-02-10 18:42:06.467|

ADHoc анализ

временные скрипты аналитиков, но которые нужно периодически вызывать, помещаем в analyses
#/analyses/adhoc_orders.sql
SELECT order_date, COUNT(*)
FROM {{ source('raw', 'raw_orders') }}
WHERE order_date BETWEEN "{{ var('start_date')}}"
AND "{{ var('end_date')}}"
- получаем актуальный запрос через dbt compile
dbt compile -s adhoc_orders --vars '{start_date: "2025-02-10", end_date: "2025-02-10"}'
- Берем запрос в target и выполняем в таргет базе
SELECT order_date, COUNT(*)
FROM "dbt"."public"."raw_orders"
WHERE order_date BETWEEN "2025-02-10"
AND "2025-02-10"

Документация

- Генерация документации
dbt docs generate
- Запуск сервера, чтобы посмотреть доку:
dbt docs serve
запустится веб сервер, на который можно зайти по url http://127.0.0.1:8080/#!/
- так же можно подтянуть description таблиц и колонок из бд, через настройку:
#dbt_project.yml
models:
  +persist_docs:
    relation: true
    columns: true

Трекинг изменений

каждое изменение сохранется в target/manifest.json - там полное описание проекта, по нему можно отслеживать изменения
если сохранить предыдущую версию проекта и указать в env:
export DBT_STATE=shop_project/previous_target
то можно смотреть изменения:
- новое
dbt ls --select "state:new"
- измененное
dbt ls --select "state:modified"
- можно детализировать, что ищем в изменениях:
dbt ls --select "state:modified.body"

Версионирование моделей

модели можно создавать с vN на конце:
processed_orders_v1
- это будет считаться 1 моделью, но разными версиями
при описании моделей в dbt_project можно указать, которая последняя
#dbt_project.yml
models:
  - name: processed_orders
    latest_version: 2
    versions:
      - v: 3
      - v: 2
      - v: 1
        deprecation_date: 2026-01-01
- к конкретной версии можно обратиться прямо в запросе:
{{ ref("processed_orders", v="3")}}

Комментариев нет:

Отправить комментарий