Documentation
🧪 Pet Project
Пет Проект

Pet Project

Содержание страницы

Проекты

Проект №0 - Sandbox DB

Песочница для Дата ИнженераSandbox DB (opens in a new tab)


Проект №1 - HalltapeETLPipeline

Важно! Этот проект был таким не сразу. Первые его версии содержали только Airflow, Clickhouse и Pandas под капотом. Поэтому не думайте, что я сразу был суперменом. Все добавлялось и допиливалось уже позже. Но база была та же.

Любой пет проект ты можешь собрать, как с нуля, так и взять готовый шаблон по типу моего. В проекте ниже есть минимальный набор необходимых инструментов. Твоя задача – настроить ETL процесс. Данные можно, как сгенерировать свои, так и скачать по API или с любого другого ресурса. Ограничение только твоя фантазия. Этот проект больше, как шаблон. То, как выстроить весь процесс работы с данными – твоя задача.

Вот, что там уже есть:

  • Генерация синтетических данных
  • Построение простой витрины данных на Spark
  • Мониторинг качества данных
  • dbt модель для витрины данных в Clickhouse
  • dbt модель для качества данных в Clickhouse

Для сборки проекта тебе понадобятся знания Git, Docker

Собери его у себяPet Project HalltapeETLPipeline (opens in a new tab)


Проект №2 - spacex-api-analize

По всем вопросам по данному проекту можно обращаться к Шустикову Владимиру в телеграмм канале – Инженерообязанный (opens in a new tab)

Задачами проекта является настройка ETL-процесса по загрузке данных из API в базу данных, настройка сетей и логической репликации данных, автоматизация создания аналитических запросов поверх сырых данных и визуальное представление результатов на дашборде.

Здесь вы можете получить следующие знания:

  • Выгрузка данных из API на AirFlow
  • Работа с ООП
  • Настройка сетей, а именно IP-адресации
  • Логическая репликация в PostgreSQL
  • Работа с внешними источниками в Clickhouse на основе движка PostgreSQL
  • Построение моделей, тестов и макросов в DBT

Для сборки проекта тебе понадобятся знания Git, Docker

Собери его у себяОбработка данных SpaceX AP (opens in a new tab)


Проект №3 - Create mart in PySpark

По всем вопросам по данному проекту можно обращаться к Шустикову Владимиру в телеграмм канале – Инженерообязанный (opens in a new tab)

Задачей данного проекта была сгенерировать сырые данные и на их основе построить несколько витрин данных. Более подробно с заданием проекта можно ознакомиться здесь (opens in a new tab).

В этом проекты вы можете получить следующие знания:

  • программирования на Python
  • работа с Google Disk через код
  • написание кода на PySpark

Собери его у себя

Проект состоит из двух блокнотов:

  1. генерация файлов в Google Disk (opens in a new tab)
  2. создание витрин данных (opens in a new tab)

Для сборки проекта тебе понадобятся стабильный интернет и Google аккаунт. Вся инструкция по запуску описана в блокнотах.


Проект №4 - От почтового сервера до Greenplum

По всем вопросам по данному проекту можно обращаться к Кузьмину Дмитрию (opens in a new tab) в телеграмм канале – Дмитрий Кузьмин. Инженерия данных (opens in a new tab)


Проект №5 - Сборка витрины на Spark

Это pet-проект на Spark, в котором нужно собрать итоговую витрину лейблов Discogs из двух больших XML-датасетов: releases и labels. На выходе должен получиться один CSV-файл с информацией по лейблам, включая случаи, когда contactinfo пустой.

Что нужно сделать

  1. Обработать discogs_20260301_releases.xml.gz.
  2. Выделить из релизов данные только за последние 2 года.
  3. Извлечь из релизов лейблы: название и/или id.
  4. Обработать discogs_20260301_labels.xml.gz.
  5. Выполнить join между releases и labels.
  6. Сохранить итоговую витрину в один CSV.

Исходные данные

  • discogs_20260301_releases.xml.gz
  • discogs_20260301_labels.xml.gz
  • ссылка на Discogs data dump: https://data.discogs.com/?download=data%2F2026%2Fdiscogs_20260301_releases.xml.gz
  • ссылка на Discogs data dump: https://data.discogs.com/?download=data%2F2026%2Fdiscogs_20260301_labels.xml.gz
  • альтернатива для быстрого прототипа: parquet-датасеты с Яндекс Диска https://disk.yandex.ru/d/W_4f_y9RpTT-Eg

Что проверяет этот проект

  • Умение читать тяжёлые XML-файлы через Spark.
  • Умение переводить сырые данные в parquet.
  • Работу с вложенными структурами и массивами.
  • Построение витрины через explode, фильтрацию и join.
  • Подготовку итоговой выгрузки в понятном табличном виде.

Acceptance Criteria

  • releases преобразован из XML в parquet.
  • labels преобразован из XML в parquet.
  • Для releases выделен release_year.
  • Данные отфильтрованы за последние 2 года.
  • Из releases извлечены label_name и/или label_id.
  • Выполнен join с labels.
  • Витрина включает строки с пустым contactinfo.
  • Результат сохранён в один CSV-файл.
  • В CSV как минимум есть label_name и contactinfo.
  • Плюсом будут поля id, parent_label_name, parent_label_id, profile, sublabel_names, sublabel_ids, urls, data_quality.

Порядок работы

  1. Сначала конвертируешь labels.xml в parquet.
  2. Потом поднимаешь конвертацию releases.xml в parquet с партиционированием по release_year.
  3. После этого собираешь витрину через Spark и сохраняешь результат в один CSV.

Пример пайплайна

Ниже лежат готовые заготовки скриптов. Они большие, поэтому убраны в раскрывающиеся блоки, и каждый можно сразу скопировать.

Скрипт: labels XML в parquet
# 400MB
 
from pyspark.sql import SparkSession
 
spark = (
    SparkSession.builder.appName("XML to Parquet")
    .master("local[8]")
    .config("spark.driver.memory", "8g")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.hadoop.io.native.lib.available", "false")
    .getOrCreate()
)
print(f"Spark UI: {spark.sparkContext.uiWebUrl}")
 
INPUT_PATH = "file:///Q:/disks/M/_python_projects/spark/discogs_20260301_labels.xml"
OUTPUT_PATH = "file:///Q:/disks/M/_python_projects/spark/labels_parquet"
 
df = spark.read.format("xml").option("rowTag", "label").load(INPUT_PATH)
df.printSchema()
df.write.mode("overwrite").parquet(OUTPUT_PATH)
Скрипт: releases XML в parquet
# fmt: off
# 60GB
 
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, LongType, StringType, ArrayType, StructField, IntegerType
 
spark = (
    SparkSession.builder.appName("Releases XML to Parquet")
    .master("local[8]")
    .config("spark.driver.memory", "32g")
    .config("spark.sql.shuffle.partitions", "32")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.hadoop.io.native.lib.available", "false")
    .getOrCreate()
)
 
INPUT_PATH = "file:///Q:/disks/M/_python_projects/spark/discogs_20260301_releases.xml"
OUTPUT_PATH = "file:///Q:/disks/M/_python_projects/spark/releases_parquet"
 
 
release_schema = StructType([
    StructField("_id", LongType(), True),
    StructField("released", StringType(), True),
    StructField("title", StringType(), True),
    StructField("country", StringType(), True),
    StructField("notes", StringType(), True),
    StructField("data_quality", StringType(), True),
    StructField("master_id", StructType([
        StructField("_VALUE", LongType(), True),
        StructField("_is_main_release", StringType(), True),
    ]), True),
    StructField("artists", StructType([
        StructField("artist", ArrayType(StructType([
            StructField("id", LongType(), True),
            StructField("name", StringType(), True),
            StructField("anv", StringType(), True),
            StructField("join", StringType(), True),
        ])), True),
    ]), True),
    StructField("labels", StructType([
        StructField("label", ArrayType(StructType([
            StructField("_id", LongType(), True),
            StructField("_name", StringType(), True),
            StructField("_catno", StringType(), True),
        ])), True),
    ]), True),
    StructField("genres", StructType([
        StructField("genre", ArrayType(StringType()), True),
    ]), True),
    StructField("styles", StructType([
        StructField("style", ArrayType(StringType()), True),
    ]), True),
    StructField("formats", StructType([
        StructField("format", ArrayType(StructType([
            StructField("_name", StringType(), True),
            StructField("_qty", IntegerType(), True),
            StructField("_text", StringType(), True),
            StructField("descriptions", StructType([
                StructField("description", ArrayType(StringType()), True),
            ]), True),
        ])), True),
    ]), True),
    StructField("tracklist", StructType([
        StructField("track", ArrayType(StructType([
            StructField("position", StringType(), True),
            StructField("title", StringType(), True),
            StructField("duration", StringType(), True),
        ])), True),
    ]), True),
    StructField("extraartists", StructType([
        StructField("artist", ArrayType(StructType([
            StructField("id", LongType(), True),
            StructField("name", StringType(), True),
            StructField("anv", StringType(), True),
            StructField("role", StringType(), True),
        ])), True),
    ]), True),
])
print("release_schema created.")
 
 
df = (
    spark.read.format("xml")
    .option("rowTag", "release")
    .option("nullValue", "")
    .option("mode", "PERMISSIVE")
    .schema(release_schema)
    .load(INPUT_PATH)
)
print("df created.")
 
 
df = df.withColumn(
    "release_year",
    F.when(
        F.col("released").rlike(r"^\d{4}"),
        F.substring(F.col("released"), 1, 4).cast("int")
    ).otherwise(F.lit(None).cast("int"))
)
print("Year normalized.")
 
 
(
    df.write
    .format("parquet")
    .option("compression", "snappy")
    .partitionBy("release_year")
    .mode("overwrite")
    .save(OUTPUT_PATH)
)
Скрипт: сборка итоговой витрины
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col, count, expr, concat_ws
 
 
spark = (
    SparkSession.builder.appName("Releases XML to Parquet")
    .master("local[8]")
    .config("spark.driver.memory", "32g")
    .config("spark.sql.shuffle.partitions", "32")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.hadoop.io.native.lib.available", "false")
    .getOrCreate()
)
 
 
print(f"Spark UI: {spark.sparkContext.uiWebUrl}")
 
LABELS = "datasets/labels_parquet/*.parquet"
RELEASES = "datasets/releases_parquet"
 
labels = spark.read.parquet(LABELS)
 
releases = spark.read.parquet(RELEASES).where(
    """ release_year between 2024 and 2027 """
)
 
 
labels_flat = releases.withColumn(
    "label_exploded", explode(col("labels.label"))
).select(
    col("_id").alias("release_id"),
    col("title"),
    col("release_year"),
    col("label_exploded._id").alias("label_id"),
    col("label_exploded._name").alias("label_name"),
    col("label_exploded._catno").alias("catalog_number"),
)
 
 
release_labels_flat = (
    labels_flat.where(""" label_name IS NOT NULL """).select("label_name").distinct()
)
 
 
result = release_labels_flat.join(
    labels, release_labels_flat.label_name == labels.name, "inner"
).drop("name")
 
save_to_csv = result.select(
    "id",
    "label_name",
    "contactinfo",
    "parentLabel",
    "profile",
    "sublabels",
    "urls",
    "data_quality",
)
 
 
labels_final = save_to_csv.select(
    col("id"),
    col("label_name"),
    col("contactinfo"),
    col("parentLabel._VALUE").alias("parent_label_name"),
    col("parentLabel._id").alias("parent_label_id"),
    col("profile"),
    expr("transform(sublabels.label, x -> x._VALUE)").alias("sublabel_names"),
    expr("transform(sublabels.label, x -> x._id)").alias("sublabel_ids"),
    col("urls.url").alias("urls"),
    col("data_quality"),
).select(
    col("id"),
    col("label_name"),
    col("contactinfo"),
    col("parent_label_name"),
    col("parent_label_id"),
    col("profile"),
    concat_ws(" | ", col("sublabel_names")).alias("sublabel_names"),
    concat_ws(" | ", col("sublabel_ids")).alias("sublabel_ids"),
    concat_ws(" | ", col("urls")).alias("urls"),
    col("data_quality"),
)
 
 
(
    labels_final.coalesce(1)
    .write.format("csv")
    .options(sep=";", header=True, encoding="UTF-8", quote='"', escape='"')
    .mode("overwrite")
    .save("lables_info_2024_2026.csv")
)
 
 
# проверка
# labels_final.groupBy("label_name").agg(count("*").alias("total_rows")).orderBy(col("total_rows").desc()).show()

На что обратить внимание

  • В текущем примере join выполняется по label_name.
  • Для более надёжной реализации лучше использовать label_id = labels.id, если id корректно извлекается из releases.
  • Для больших объёмов данных releases лучше хранить parquet с партиционированием по release_year.
  • Даже если labels.contactinfo равен NULL, запись всё равно должна попасть в итоговую витрину.

Что получится в итоге

После выполнения проекта у тебя будет полноценный mini data engineering pipeline:

  • чтение сырых XML;
  • преобразование в parquet;
  • фильтрация по времени;
  • извлечение вложенных полей;
  • join со справочником;
  • выгрузка витрины в CSV.

Это уже очень близко к реальной задаче дата-инженера, а не просто к учебному ноутбуку.

Открытые API для проектов