🐍 Банк задач для собеседований по Python
В этом разделе собраны практические задачи на Python, которые проверяют умение работать с данными и оптимизацией.
Задача: Выгрузка данных из PostgreSQL в CSV файл
Необходимо написать код на Python, который будет выгружать данные из таблицы PostgreSQL big_table и записывать их в CSV файл.
Особенность: таблица содержит 40 миллионов строк, поэтому данные нужно выгружать чанками, чтобы избежать переполнения памяти.
Показать решение
import psycopg2
import csv
dsn = "host=... dbname=... user=... password=... port=5432"
outfile = "big_table.csv"
chunk_size = 100_000
with psycopg2.connect(dsn) as conn:
# серверный курсор — результат не буферизуется целиком
cur = conn.cursor(name="ss_cur")
cur.itersize = chunk_size
cur.execute("SELECT * FROM public.big_table ORDER BY id") # важно иметь порядок/индекс
with open(outfile, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
# заголовки
colnames = [desc[0] for desc in cur.description]
writer.writerow(colnames)
rows_written = 0
while True:
rows = cur.fetchmany(chunk_size)
if not rows:
break
writer.writerows(rows)
rows_written += len(rows)
# print(f"written: {rows_written}")Задачи с собеседований МТС
1. Слияние двух отсортированных списков за O(N + M)
Нужно написать функцию на Python, которая получает на вход два отсортированных списка целых чисел длины N и M и возвращает новый отсортированный список за O(N + M).
Пример:
N = [1, 3, 3, 9, 10]
M = [1, 4, 5, 16]Ожидаемый результат:
[1, 1, 3, 3, 4, 5, 9, 10, 16]Показать решение
def merge_sorted_lists(left: list[int], right: list[int]) -> list[int]:
result = []
i, j = 0, 0
while i < len(left) and j < len(right):
if left[i] <= right[j]:
result.append(left[i])
i += 1
else:
result.append(right[j])
j += 1
result.extend(left[i:])
result.extend(right[j:])
return result
n = [1, 3, 3, 9, 10]
m = [1, 4, 5, 16]
assert merge_sorted_lists(n, m) == [1, 1, 3, 3, 4, 5, 9, 10, 16]2. Running sum массива
Дан массив:
[1, 2, 3, 4]Нужно сформировать новый массив, в котором каждый элемент равен сумме всех элементов от начала массива до текущей позиции.
Ожидаемый результат:
[1, 3, 6, 10]Показать решение
def running_sum(nums: list[int]) -> list[int]:
result = []
total = 0
for num in nums:
total += num
result.append(total)
return result
assert running_sum([1, 2, 3, 4]) == [1, 3, 6, 10]3. Сортировка словаря по значениям
У вас есть словарь, в котором ключи это имена продуктов, а значения это их цены.
Нужно написать функцию, которая принимает этот словарь и возвращает новый словарь, отсортированный по возрастанию цен.
Пример:
products = {
"Яблоки": 200,
"Бананы": 50,
"Апельсины": 100,
"Ананас": 250,
}Ожидаемый результат:
{
"Бананы": 50,
"Апельсины": 100,
"Яблоки": 200,
"Ананас": 250,
}Показать решение
def sort_by_price(products: dict[str, int]) -> dict[str, int]:
return dict(sorted(products.items(), key=lambda item: item[1]))
products = {
"Яблоки": 200,
"Бананы": 50,
"Апельсины": 100,
"Ананас": 250,
}
assert sort_by_price(products) == {
"Бананы": 50,
"Апельсины": 100,
"Яблоки": 200,
"Ананас": 250,
}Задача: Частоты символов
1. Найти количество наиболее часто встречающегося символа
Дан список:
new_list = ['a', 'a', 'b', 'c']Нужно найти количество наиболее часто встречающегося символа без вложенных циклов.
Показать решение
from collections import Counter
new_list = ['a', 'a', 'b', 'c']
max_count = Counter(new_list).most_common(1)[0][1]
print(max_count) # 22. Как одним действием выбрать максимальное значение из словаря
Допустим, из первой задачи получился словарь, где ключ - это символ, а значение - количество его вхождений:
freq = {'a': 2, 'b': 1, 'c': 1}Нужно одним действием получить максимальное значение из словаря.
Показать решение
max_value = max(freq.values())
print(max_value) # 2Задача: Сумма amount по каждому user_id
Дан список словарей:
data = [
{"user_id": 1, "amount": 100},
{"user_id": 2, "amount": 200},
{"user_id": 1, "amount": 50},
{"user_id": 2, "amount": 300},
{"user_id": 3, "amount": 10},
]Нужно посчитать сумму amount по каждому user_id и вернуть результат, отсортированный по убыванию суммы.
Ожидаемый результат:
[
{"user_id": 2, "amount": 500},
{"user_id": 1, "amount": 150},
{"user_id": 3, "amount": 10},
]Показать решение
totals = {}
for row in data:
user_id = row["user_id"]
amount = row["amount"]
totals[user_id] = totals.get(user_id, 0) + amount
result = sorted(
(
{"user_id": user_id, "amount": total_amount}
for user_id, total_amount in totals.items()
),
key=lambda x: x["amount"],
reverse=True
)
print(result)Задача: Средняя оценка по языкам программирования
Необходимо посчитать среднюю оценку по каждому языку программирования.
Входные данные
arr = [
{'name': 'Petr', 'subject': 'Python', 'score': 97},
{'name': 'Max', 'subject': 'C++', 'score': 70},
{'name': 'Petr', 'subject': 'C++', 'score': 45},
{'name': 'Max', 'subject': 'Python', 'score': 60},
{'name': 'Petr', 'subject': 'SQL', 'score': 65},
{'name': 'Max', 'subject': 'SQL', 'score': 100}
]Ожидаемый результат:
Python - 78.5
C++ - 57.5
SQL - 82.5Показать решение
scores = {}
for item in arr:
subject = item['subject']
score = item['score']
if subject not in scores:
scores[subject] = {'sum': 0, 'count': 0}
scores[subject]['sum'] += score
scores[subject]['count'] += 1
for subject, data in scores.items():
avg_score = data['sum'] / data['count']
print(f"{subject} - {avg_score}")Задача: Перенос данных из PostgreSQL в ClickHouse
Напишите код на Python, который переносит данные из таблицы PostgreSQL clients в таблицу ClickHouse clients.
Данные выгружаются чанками, чтобы избежать переполнения памяти.
Показать решение
import psycopg2
import clickhouse_connect
PG_DSN = "host=... dbname=... user=... password=... port=5432"
CH_HOST = "http://localhost:8123" # или http://ch-host:8123
CH_USER = "default"
CH_PASS = ""
CH_DB = "default"
CHUNK = 100_000
# 1) Читаем из PostgreSQL чанками
with psycopg2.connect(PG_DSN) as pg_conn:
pg_cur = pg_conn.cursor(name="ss_clients") # server-side курсор
pg_cur.itersize = CHUNK
pg_cur.execute("SELECT id, name, email, created_at FROM public.clients ORDER BY id")
# 2) Подключаемся к ClickHouse
ch = clickhouse_connect.get_client(host=CH_HOST, username=CH_USER, password=CH_PASS, database=CH_DB)
# Опционально: создаём таблицу при отсутствии
ch.query("""
CREATE TABLE IF NOT EXISTS clients (
id UInt64,
name String,
email String,
created_at DateTime
) ENGINE = MergeTree
ORDER BY id
""")
# 3) Гоним чанками
total = 0
columns = ["id", "name", "email", "created_at"]
while True:
rows = pg_cur.fetchmany(CHUNK)
if not rows:
break
# Вставка пачкой
ch.insert("clients", rows, column_names=columns)
total += len(rows)
# print(f"Inserted: {total}")