четверг, 20 июня 2024 г.

Масштабирование python

Процессы и потоки

Из-за GIL не рекомендуется использовать потоки в Python. Но можно для IO или сети, т.к. цпу/гил не нагружается, а коннекты можно переиспользовать.

Пул процессов
import multiprocessing
def compute(n):
    return n+1

pool = multiprocessing.Pool(processes=8)
print("Results: %s" % pool.map(compute, range(8)))
С Python 3.2 предпочтительней использовать concurrent.futures
from concurrent import futures
with futures.ThreadPoolExecutor(max_workers=8) as executor:
    futs = [executor.submit(compute) for _ in range(8)]

results = [f.result() for f in futs]
Пул может быть как из потоков, так и из процессов:
futures.ThreadPoolExecutor - потоки
futures.ProcessPoolExecutor - процессы

Задание размера пула потоков для requests
import requests
session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
    pool_connections=100,
    pool_maxsize=100)
session.mount('http://', adapter)
response = session.get("http://example.org")

futurist
Ожидание окончания нескольких параллельных заданий:
from futurist import waiters
results = waiters.wait_for_all(futs)
Ограничение числа задач в ожидании = 2
with futurist.ThreadPoolExecutor(
        max_workers=8,
        check_and_reject=rejection.reject_when_reached(2)) as executor:
Периодический вызов функции
from futurist import periodics
@periodics.periodic(1)
def fnc():

#запуск
w = periodics.PeriodicWorker(fnc)
w.start

#остановка
w.stop()

cotyledon
Демонизация процесса
import cotyledon
manager = cotyledon.ServiceManager()
manager.add(PrinterService, 2)
manager.run()
- PrinterService - должен реализовывать init, run, terminate

AsyncIO

На основе статьи 1 и 2 на хабре
Фреймворк для организации неблокирующей работы с IO bound кодом.
Важно использовать неблокирующие асинхронные api. Api должно поддерживать прерывание и восстановление работы.

Ключевые понятия:
* Корунтина - ключевое слово async - обертка над функцией, превращающая ее в указатель с возможностью прерывания.
разновидность генератора, передает значение class 'coroutine' внешней прогарммы до полного выполнения.
Корутина дает интерпретатору возможность возобновить функцию, которая была приостановлена в месте размещения ключевого слова await (при завершении await задачи)
* await - приостановка кода внутри корунтина и передача управлению основному циклу событий с возможностью вернуться в эту точку позже
Поток не блокируется и его ресурсы могут быть использованы другим кодом.
* asyncio.create_task - создает таску для выполнения корунтины
* asyncio.run - цикл событий, который запускает корунтины и отсоединяет ресурсы, задание выполняется независимо от основного процесса и оповещает о результате основной цикл событий. По этому ресурсы основного цикла не расходуются, пока работает параллельное задание

Т.е. фактически 1 потоком/процессом можно обрабатывать несколько IO задач, периодически получая результаты при их готовности и пока они не готовы тратить cpu на более полезные cpu bound задачи.

import asyncio
import time

async def fun1(x):
    print(x**2)
    await asyncio.sleep(3) #
    print('fun1 завершена')

#async - преобразовываем функцию в корунтину, чтобы ее можно было выполнить позже/приостанавливать и т.д.
async def fun2(x):
    print(x**0.5)
    #в точке await передается управление основному циклу событий (asyncio.get_running_loop()) 
    #сама корунтина в этом месте приостанавливается до выполнения IO задачи
    await asyncio.sleep(3) 
    print('fun2 завершена')


async def main():
    #create_task - наследник Future - старт корунтины и ожидание результата в цикле
    task1 = asyncio.create_task(fun1(4))
    task2 = asyncio.create_task(fun2(4))

    await task1 
    await task2


print(time.strftime('%X'))
asyncio.run(main()) #создает цикл событий - завершается при выполнении всех тасок
print(time.strftime('%X'))
Не все библиотеки поддерживают asyncio работу, по этому можно реализовать самостоятельно через пул потоков/процессов.
Но код будет блокировать 1 поток внутри, т.е. ресурсы потока в таком сценарии не освобождаются.
import asyncio
import functools
import time
import requests
from concurrent.futures import ThreadPoolExecutor


def get_weather(city):
    #какой то блокируемый код


async def main(cities_):
    # извлекаем цикл событий
    loop = asyncio.get_running_loop()

    with ThreadPoolExecutor() as pool: #или ProcessPoolExecutor
        # раскладываем задачи по отдельным потокам
        tasks = [loop.run_in_executor(pool, functools.partial(get_weather, city)) for city in cities_]
        await asyncio.gather(*tasks) #говорим циклу событий запустить и мониторить результат

print(time.strftime('%X'))
asyncio.run(main([...]))
print(time.strftime('%X'))
К примеру данный подход используется в отложенных операторах airflow, когда запуск задания не тратит слот шедуллера.

Очереди

rq.Queue
from rq import Queue
from redis import Redis
q = Queue(connection=Redis())
job = q.enqueue(sum, [42, 43])
while job.result is None:
    time.sleep(1)
print(job.result)

#ограничение времени ожидания результата
job = q.enqueue(requests.get, "http://httpbin.org/delay/1",ttl=60, result_ttl=300)

Celery - обертка над любой очередью: redis, rabit, чтото облачное
+ бд для хранения параметров и результатов
+ сериализация, с реализацией на разных
import celery
app = celery.Celery('celery-task',
                    broker='redis://localhost',
                    backend='redis://localhost')
@app.task
def add(x, y):
    return x + y
result = add.delay(4, 4)
result.get()
Можно задавать ретрай
@app.task(bind=True, retry_backoff=True,
          retry_kwargs={'max_retries': 5})
#последовательное выполнение тасок
chain = celery.chain(add.s(4, 6), multiply.s(10))
#можно делать несколько очередей с разными приоритетами
result = add.apply_async(args=[4, 6], queue='low-priority')
Ретрай для обычной функции без очередей:
import tenacity
def do_something():
    if random.randint(0, 1) == 0:
        print("Failure")
        raise RuntimeError
    print("Success")
tenacity.Retrying()(do_something)

#можно задавать задержку при ретрае
@tenacity.retry(
    wait=tenacity.wait_exponential(multiplier=0.5, max=30, exp_base=2),
)

Блокировки

Межпотоковая блокировка
import threading
stdout_lock = threading.Lock()
def print_something(something):
    with stdout_lock:
        #(something)
Остановка потока через флаг:
import threading
stop = threading.Event()
def background_job():
    while not stop.is_set():
        ...
stop.set()
Межпроцессорная блокировка. Но процессы должны быть дочерними текущего:
import multiprocessing
stdout_lock = multiprocessing.Lock()
Межпроцессорная блокировка на файлах:
import fasteners
lock = fasteners.InterProcessLock("/tmp/mylock")
with lock:
    #smth

#Или через декаратор:
@fasteners.interprocess_locked('/tmp/tmp_lock_file')
def locked_print():
Распределенный лок через kv базу
import etcd3
client = etcd3.client()
lock = client.lock("foobar")
lock.acquire()
lock.release()
#или через декаратор:
with lock:

Кэширование

Кэширование на основании словарей с максимальным размером = 3:
import cachetools
cache = cachetools.LRUCache(maxsize=3)
Распределенное кэширование в мемкэше
from pymemcache.client import base
client.set('some_key', 'some_value')
result = client.get('some_key')

#атомарная операция чек и сет (для многопоточных приложений)
client.cas('visitors', result, cas)

Профилирование и трассировка

Профилирование работы конкретной функций
>> import cProfile
>> cProfile.run('2 + 2')

3 function calls in 0.000 seconds

Ordered by: standard name

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
    1    0.000    0.000    0.000    0.000 <string>:1(<module>)
    1    0.000    0.000    0.000    0.000 {built-in method builtins.exec}
    1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
Профилирование целого файла:
>> python -m cProfile sample_script.py
# -s time - сортировка по времени работы

17 function calls in 10.011 seconds

Ordered by: standard name

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
    1    0.000    0.000   10.011   10.011 sample_script.py:1(<module>)
    2    0.000    0.000    4.004    2.002 sample_script.py:10(z)
    1    0.000    0.000    4.004    4.004 sample_script.py:13(w)
    1    0.000    0.000   10.011   10.011 sample_script.py:17(main)
    3    0.000    0.000    3.003    1.001 sample_script.py:3(x)
    1    0.000    0.000    3.003    3.003 sample_script.py:6(y)
    1    0.000    0.000   10.011   10.011 {built-in method builtins.exec}
    6   10.011    1.668   10.011    1.668 {built-in method time.sleep}
    1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
Визуаулизация графа вызова с учетом времени работы
$ python -m cProfile -o sample_script.cprof sample_script.py
$ runsnake sample_script.cprof
Профилирование использованной памяти:
python -m memory_profiler memoryview-copy.py
Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
     1   17.840 MiB   17.840 MiB           1   @profile
     2                                         def read_random():
     3   17.840 MiB    0.000 MiB           1       with open("/dev/urandom", "rb") as source:
     4   27.633 MiB    9.793 MiB           1           content = source.read(1024 * 10000)
     5   37.430 MiB    9.797 MiB           1           content_to_write = content[1024:]
     6   37.430 MiB    0.000 MiB           2       print("Content length: %d, content to write length %d" %
     7   37.430 MiB    0.000 MiB           1             (len(content), len(content_to_write)))
     8   37.430 MiB    0.000 MiB           1       with open("/dev/null", "wb") as target:
     9   37.430 MiB    0.000 MiB           1           target.write(content_to_write)

Комментариев нет:

Отправить комментарий