Documentation
📚 Вопросы с собеседований
Python задачи

🐍 Банк задач для собеседований по Python

В этом разделе собраны практические задачи на Python, которые проверяют умение работать с данными и оптимизацией.


Задача: Выгрузка данных из PostgreSQL в CSV файл

Необходимо написать код на Python, который будет выгружать данные из таблицы PostgreSQL big_table и записывать их в CSV файл.
Особенность: таблица содержит 40 миллионов строк, поэтому данные нужно выгружать чанками, чтобы избежать переполнения памяти.

Показать решение
import psycopg2
import csv
 
dsn = "host=... dbname=... user=... password=... port=5432"
outfile = "big_table.csv"
chunk_size = 100_000
 
with psycopg2.connect(dsn) as conn:
    # серверный курсор — результат не буферизуется целиком
    cur = conn.cursor(name="ss_cur")
    cur.itersize = chunk_size
    cur.execute("SELECT * FROM public.big_table ORDER BY id")  # важно иметь порядок/индекс
 
    with open(outfile, "w", newline="", encoding="utf-8") as f:
        writer = csv.writer(f)
        # заголовки
        colnames = [desc[0] for desc in cur.description]
        writer.writerow(colnames)
 
        rows_written = 0
        while True:
            rows = cur.fetchmany(chunk_size)
            if not rows:
                break
            writer.writerows(rows)
            rows_written += len(rows)
            # print(f"written: {rows_written}")

Задачи с собеседований МТС

1. Слияние двух отсортированных списков за O(N + M)

Нужно написать функцию на Python, которая получает на вход два отсортированных списка целых чисел длины N и M и возвращает новый отсортированный список за O(N + M).

Пример:

N = [1, 3, 3, 9, 10]
M = [1, 4, 5, 16]

Ожидаемый результат:

[1, 1, 3, 3, 4, 5, 9, 10, 16]
Показать решение
def merge_sorted_lists(left: list[int], right: list[int]) -> list[int]:
    result = []
    i, j = 0, 0
 
    while i < len(left) and j < len(right):
        if left[i] <= right[j]:
            result.append(left[i])
            i += 1
        else:
            result.append(right[j])
            j += 1
 
    result.extend(left[i:])
    result.extend(right[j:])
    return result
 
 
n = [1, 3, 3, 9, 10]
m = [1, 4, 5, 16]
 
assert merge_sorted_lists(n, m) == [1, 1, 3, 3, 4, 5, 9, 10, 16]

2. Running sum массива

Дан массив:

[1, 2, 3, 4]

Нужно сформировать новый массив, в котором каждый элемент равен сумме всех элементов от начала массива до текущей позиции.

Ожидаемый результат:

[1, 3, 6, 10]
Показать решение
def running_sum(nums: list[int]) -> list[int]:
    result = []
    total = 0
 
    for num in nums:
        total += num
        result.append(total)
 
    return result
 
 
assert running_sum([1, 2, 3, 4]) == [1, 3, 6, 10]

3. Сортировка словаря по значениям

У вас есть словарь, в котором ключи это имена продуктов, а значения это их цены.

Нужно написать функцию, которая принимает этот словарь и возвращает новый словарь, отсортированный по возрастанию цен.

Пример:

products = {
    "Яблоки": 200,
    "Бананы": 50,
    "Апельсины": 100,
    "Ананас": 250,
}

Ожидаемый результат:

{
    "Бананы": 50,
    "Апельсины": 100,
    "Яблоки": 200,
    "Ананас": 250,
}
Показать решение
def sort_by_price(products: dict[str, int]) -> dict[str, int]:
    return dict(sorted(products.items(), key=lambda item: item[1]))
 
 
products = {
    "Яблоки": 200,
    "Бананы": 50,
    "Апельсины": 100,
    "Ананас": 250,
}
 
assert sort_by_price(products) == {
    "Бананы": 50,
    "Апельсины": 100,
    "Яблоки": 200,
    "Ананас": 250,
}

Задача: Частоты символов

1. Найти количество наиболее часто встречающегося символа

Дан список:

new_list = ['a', 'a', 'b', 'c']

Нужно найти количество наиболее часто встречающегося символа без вложенных циклов.

Показать решение
from collections import Counter
 
new_list = ['a', 'a', 'b', 'c']
 
max_count = Counter(new_list).most_common(1)[0][1]
print(max_count)  # 2

2. Как одним действием выбрать максимальное значение из словаря

Допустим, из первой задачи получился словарь, где ключ - это символ, а значение - количество его вхождений:

freq = {'a': 2, 'b': 1, 'c': 1}

Нужно одним действием получить максимальное значение из словаря.

Показать решение
max_value = max(freq.values())
print(max_value)  # 2

Задача: Сумма amount по каждому user_id

Дан список словарей:

data = [
    {"user_id": 1, "amount": 100},
    {"user_id": 2, "amount": 200},
    {"user_id": 1, "amount": 50},
    {"user_id": 2, "amount": 300},
    {"user_id": 3, "amount": 10},
]

Нужно посчитать сумму amount по каждому user_id и вернуть результат, отсортированный по убыванию суммы.

Ожидаемый результат:

[
    {"user_id": 2, "amount": 500},
    {"user_id": 1, "amount": 150},
    {"user_id": 3, "amount": 10},
]
Показать решение
totals = {}
 
for row in data:
    user_id = row["user_id"]
    amount = row["amount"]
    totals[user_id] = totals.get(user_id, 0) + amount
 
result = sorted(
    (
        {"user_id": user_id, "amount": total_amount}
        for user_id, total_amount in totals.items()
    ),
    key=lambda x: x["amount"],
    reverse=True
)
 
print(result)

Задача: Средняя оценка по языкам программирования

Необходимо посчитать среднюю оценку по каждому языку программирования.

Входные данные

arr = [
    {'name': 'Petr', 'subject': 'Python', 'score': 97},
    {'name': 'Max', 'subject': 'C++', 'score': 70},
    {'name': 'Petr', 'subject': 'C++', 'score': 45},
    {'name': 'Max', 'subject': 'Python', 'score': 60},
    {'name': 'Petr', 'subject': 'SQL', 'score': 65},
    {'name': 'Max', 'subject': 'SQL', 'score': 100}
]

Ожидаемый результат:

Python - 78.5
C++ - 57.5
SQL - 82.5
Показать решение
scores = {}
 
for item in arr:
    subject = item['subject']
    score = item['score']
 
    if subject not in scores:
        scores[subject] = {'sum': 0, 'count': 0}
 
    scores[subject]['sum'] += score
    scores[subject]['count'] += 1
 
for subject, data in scores.items():
    avg_score = data['sum'] / data['count']
    print(f"{subject} - {avg_score}")

Задача: Перенос данных из PostgreSQL в ClickHouse

Напишите код на Python, который переносит данные из таблицы PostgreSQL clients в таблицу ClickHouse clients.
Данные выгружаются чанками, чтобы избежать переполнения памяти.

Показать решение
import psycopg2
import clickhouse_connect
 
PG_DSN = "host=... dbname=... user=... password=... port=5432"
CH_HOST = "http://localhost:8123"   # или http://ch-host:8123
CH_USER = "default"
CH_PASS = ""
CH_DB   = "default"
 
CHUNK = 100_000
 
# 1) Читаем из PostgreSQL чанками
with psycopg2.connect(PG_DSN) as pg_conn:
    pg_cur = pg_conn.cursor(name="ss_clients")  # server-side курсор
    pg_cur.itersize = CHUNK
    pg_cur.execute("SELECT id, name, email, created_at FROM public.clients ORDER BY id")
 
    # 2) Подключаемся к ClickHouse
    ch = clickhouse_connect.get_client(host=CH_HOST, username=CH_USER, password=CH_PASS, database=CH_DB)
    # Опционально: создаём таблицу при отсутствии
    ch.query("""
        CREATE TABLE IF NOT EXISTS clients (
            id UInt64,
            name String,
            email String,
            created_at DateTime
        ) ENGINE = MergeTree
        ORDER BY id
    """)
 
    # 3) Гоним чанками
    total = 0
    columns = ["id", "name", "email", "created_at"]
 
    while True:
        rows = pg_cur.fetchmany(CHUNK)
        if not rows:
            break
 
        # Вставка пачкой
        ch.insert("clients", rows, column_names=columns)
        total += len(rows)
        # print(f"Inserted: {total}")