четверг, 20 июня 2024 г.

Масштабирование python

Процессы и потоки

Из-за GIL не рекомендуется использовать потоки в Python. Но можно для IO или сети, т.к. цпу/гил не нагружается, а коннекты можно переиспользовать.

Пул процессов
import multiprocessing
def compute(n):
    return n+1

pool = multiprocessing.Pool(processes=8)
print("Results: %s" % pool.map(compute, range(8)))
С Python 3.2 предпочтительней использовать concurrent.futures
from concurrent import futures
with futures.ThreadPoolExecutor(max_workers=8) as executor:
    futs = [executor.submit(compute) for _ in range(8)]

results = [f.result() for f in futs]
Пул может быть как из потоков, так и из процессов:
futures.ThreadPoolExecutor - потоки
futures.ProcessPoolExecutor - процессы

Задание размера пула потоков для requests
import requests
session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
    pool_connections=100,
    pool_maxsize=100)
session.mount('http://', adapter)
response = session.get("http://example.org")

Ожидание окончания нескольких параллельных заданий:
from futurist import waiters
results = waiters.wait_for_all(futs)
Ограничение числа задач в ожидании = 2
with futurist.ThreadPoolExecutor(
        max_workers=8,
        check_and_reject=rejection.reject_when_reached(2)) as executor:
Периодический вызов функции
from futurist import periodics
@periodics.periodic(1)
def fnc():

#запуск
w = periodics.PeriodicWorker(fnc)
w.start

#остановка
w.stop()

Демонизация процесса
import cotyledon
manager = cotyledon.ServiceManager()
manager.add(PrinterService, 2)
manager.run()
- PrinterService - должен реализовывать init, run, terminate

Event loop
import asyncio

#async - определение корунтины, которая должна выполняться в параллель
async def add_42(number):
    return 42 + number

async def hello_world():
    #await - вызов другой корунтины
    result = await add_42(23)
    return result

event_loop = asyncio.get_event_loop()
try:
    result = event_loop.run_until_complete(hello_world())
    print(result)
finally:
    event_loop.close()

Очереди

rq.Queue
from rq import Queue
from redis import Redis
q = Queue(connection=Redis())
job = q.enqueue(sum, [42, 43])
while job.result is None:
    time.sleep(1)
print(job.result)

#ограничение времени ожидания результата
job = q.enqueue(requests.get, "http://httpbin.org/delay/1",ttl=60, result_ttl=300)

Celery - обертка над любой очередью: redis, rabit, чтото облачное
+ бд для хранения параметров и результатов
+ сериализация, с реализацией на разных
import celery
app = celery.Celery('celery-task',
                    broker='redis://localhost',
                    backend='redis://localhost')
@app.task
def add(x, y):
    return x + y
result = add.delay(4, 4)
result.get()
Можно задавать ретрай
@app.task(bind=True, retry_backoff=True,
          retry_kwargs={'max_retries': 5})
#последовательное выполнение тасок
chain = celery.chain(add.s(4, 6), multiply.s(10))
#можно делать несколько очередей с разными приоритетами
result = add.apply_async(args=[4, 6], queue='low-priority')
Ретрай для обычной функции без очередей:
import tenacity
def do_something():
    if random.randint(0, 1) == 0:
        print("Failure")
        raise RuntimeError
    print("Success")
tenacity.Retrying()(do_something)

#можно задавать задержку при ретрае
@tenacity.retry(
    wait=tenacity.wait_exponential(multiplier=0.5, max=30, exp_base=2),
)

Блокировки

Межпотоковая блокировка
import threading
stdout_lock = threading.Lock()
def print_something(something):
    with stdout_lock:
        #(something)
Остановка потока через флаг:
import threading
stop = threading.Event()
def background_job():
    while not stop.is_set():
        ...
stop.set()
Межпроцессорная блокировка. Но процессы должны быть дочерними текущего:
import multiprocessing
stdout_lock = multiprocessing.Lock()
Межпроцессорная блокировка на файлах:
import fasteners
lock = fasteners.InterProcessLock("/tmp/mylock")
with lock:
    #smth

#Или через декаратор:
@fasteners.interprocess_locked('/tmp/tmp_lock_file')
def locked_print():
Распределенный лок через kv базу
import etcd3
client = etcd3.client()
lock = client.lock("foobar")
lock.acquire()
lock.release()
#или через декаратор:
with lock:

Кэширование

Кэширование на основании словарей с максимальным размером = 3:
import cachetools
cache = cachetools.LRUCache(maxsize=3)
Распределенное кэширование в мемкэше
from pymemcache.client import base
client.set('some_key', 'some_value')
result = client.get('some_key')

#атомарная операция чек и сет (для многопоточных приложений)
client.cas('visitors', result, cas)

Профилирование и трассировка

Профилирование работы конкретной функций
>> import cProfile
>> cProfile.run('2 + 2')

3 function calls in 0.000 seconds

Ordered by: standard name

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
    1    0.000    0.000    0.000    0.000 <string>:1(<module>)
    1    0.000    0.000    0.000    0.000 {built-in method builtins.exec}
    1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
Профилирование целого файла:
>> python -m cProfile sample_script.py
# -s time - сортировка по времени работы

17 function calls in 10.011 seconds

Ordered by: standard name

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
    1    0.000    0.000   10.011   10.011 sample_script.py:1(<module>)
    2    0.000    0.000    4.004    2.002 sample_script.py:10(z)
    1    0.000    0.000    4.004    4.004 sample_script.py:13(w)
    1    0.000    0.000   10.011   10.011 sample_script.py:17(main)
    3    0.000    0.000    3.003    1.001 sample_script.py:3(x)
    1    0.000    0.000    3.003    3.003 sample_script.py:6(y)
    1    0.000    0.000   10.011   10.011 {built-in method builtins.exec}
    6   10.011    1.668   10.011    1.668 {built-in method time.sleep}
    1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
Визуаулизация графа вызова с учетом времени работы
$ python -m cProfile -o sample_script.cprof sample_script.py
$ runsnake sample_script.cprof
Профилирование использованной памяти:
python -m memory_profiler memoryview-copy.py
Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
     1   17.840 MiB   17.840 MiB           1   @profile
     2                                         def read_random():
     3   17.840 MiB    0.000 MiB           1       with open("/dev/urandom", "rb") as source:
     4   27.633 MiB    9.793 MiB           1           content = source.read(1024 * 10000)
     5   37.430 MiB    9.797 MiB           1           content_to_write = content[1024:]
     6   37.430 MiB    0.000 MiB           2       print("Content length: %d, content to write length %d" %
     7   37.430 MiB    0.000 MiB           1             (len(content), len(content_to_write)))
     8   37.430 MiB    0.000 MiB           1       with open("/dev/null", "wb") as target:
     9   37.430 MiB    0.000 MiB           1           target.write(content_to_write)

Комментариев нет:

Отправить комментарий