Процессы и потоки
Из-за GIL не рекомендуется использовать потоки в Python. Но можно для IO или сети, т.к. цпу/гил не нагружается, а коннекты можно переиспользовать.Пул процессов
import multiprocessing
def compute(n):
return n+1
pool = multiprocessing.Pool(processes=8)
print("Results: %s" % pool.map(compute, range(8)))
С Python 3.2 предпочтительней использовать concurrent.futures
from concurrent import futures
with futures.ThreadPoolExecutor(max_workers=8) as executor:
futs = [executor.submit(compute) for _ in range(8)]
results = [f.result() for f in futs]
Пул может быть как из потоков, так и из процессов:futures.ThreadPoolExecutor - потоки
futures.ProcessPoolExecutor - процессы
Задание размера пула потоков для requests
import requests
session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
pool_connections=100,
pool_maxsize=100)
session.mount('http://', adapter)
response = session.get("http://example.org")
futurist
Ожидание окончания нескольких параллельных заданий:
from futurist import waiters results = waiters.wait_for_all(futs)Ограничение числа задач в ожидании = 2
with futurist.ThreadPoolExecutor(
max_workers=8,
check_and_reject=rejection.reject_when_reached(2)) as executor:
Периодический вызов функции
from futurist import periodics @periodics.periodic(1) def fnc(): #запуск w = periodics.PeriodicWorker(fnc) w.start #остановка w.stop()
cotyledon
Демонизация процесса
import cotyledon manager = cotyledon.ServiceManager() manager.add(PrinterService, 2) manager.run()- PrinterService - должен реализовывать init, run, terminate
AsyncIO
На основе статьи 1 и 2 на хабреФреймворк для организации неблокирующей работы с IO bound кодом.
Важно использовать неблокирующие асинхронные api. Api должно поддерживать прерывание и восстановление работы.
Ключевые понятия:
* Корунтина - ключевое слово async - обертка над функцией, превращающая ее в указатель с возможностью прерывания.
разновидность генератора, передает значение class 'coroutine' внешней прогарммы до полного выполнения.
Корутина дает интерпретатору возможность возобновить функцию, которая была приостановлена в месте размещения ключевого слова await (при завершении await задачи)
* await - приостановка кода внутри корунтина и передача управлению основному циклу событий с возможностью вернуться в эту точку позже
Поток не блокируется и его ресурсы могут быть использованы другим кодом.
* asyncio.create_task - создает таску для выполнения корунтины
* asyncio.run - цикл событий, который запускает корунтины и отсоединяет ресурсы, задание выполняется независимо от основного процесса и оповещает о результате основной цикл событий. По этому ресурсы основного цикла не расходуются, пока работает параллельное задание
Т.е. фактически 1 потоком/процессом можно обрабатывать несколько IO задач, периодически получая результаты при их готовности и пока они не готовы тратить cpu на более полезные cpu bound задачи.
Важное замечание: как получается отцеплять IO bound задачу от основного потока без дополнительного потока или процесса:
- сокеты (базы, http) - после отправки сообщения, удаленный сервер отправляет результат в сетевую карту и уже она оповещает ОС о готовности данных
- файлы - реализиция цикла обработки IO появилась в Linux Kernel 5.1 (io_uring). Так же можно реализовать на железном уровне, дисковая система должна иметь внутри процессор для накопления очереди на обработку и возможность информаировать ОС о готовности данных.
В текущий момент в python не поддерживается и эмулируется потоками.
import asyncio
import time
async def fun1(x):
print(x**2)
await asyncio.sleep(3) #
print('fun1 завершена')
#async - преобразовываем функцию в корунтину, чтобы ее можно было выполнить позже/приостанавливать и т.д.
async def fun2(x):
print(x**0.5)
#в точке await передается управление основному циклу событий (asyncio.get_running_loop())
#сама корунтина в этом месте приостанавливается до выполнения IO задачи
await asyncio.sleep(3)
print('fun2 завершена')
async def main():
#create_task - наследник Future - старт корунтины и ожидание результата в цикле
task1 = asyncio.create_task(fun1(4))
task2 = asyncio.create_task(fun2(4))
await task1
await task2
print(time.strftime('%X'))
asyncio.run(main()) #создает цикл событий - завершается при выполнении всех тасок
print(time.strftime('%X'))
Не все библиотеки поддерживают asyncio работу, по этому можно реализовать самостоятельно через пул потоков/процессов.Но код будет блокировать 1 поток внутри, т.е. ресурсы потока в таком сценарии не освобождаются.
import asyncio
import functools
import time
import requests
from concurrent.futures import ThreadPoolExecutor
def get_weather(city):
#какой то блокируемый код
async def main(cities_):
# извлекаем цикл событий
loop = asyncio.get_running_loop()
with ThreadPoolExecutor() as pool: #или ProcessPoolExecutor
# раскладываем задачи по отдельным потокам
tasks = [loop.run_in_executor(pool, functools.partial(get_weather, city)) for city in cities_]
await asyncio.gather(*tasks) #говорим циклу событий запустить и мониторить результат
print(time.strftime('%X'))
asyncio.run(main([...]))
print(time.strftime('%X'))
К примеру данный подход используется в отложенных операторах airflow, когда запуск задания не тратит слот шедуллера.
Очереди
rq.Queue
from rq import Queue
from redis import Redis
q = Queue(connection=Redis())
job = q.enqueue(sum, [42, 43])
while job.result is None:
time.sleep(1)
print(job.result)
#ограничение времени ожидания результата
job = q.enqueue(requests.get, "http://httpbin.org/delay/1",ttl=60, result_ttl=300)
Celery - обертка над любой очередью: redis, rabit, чтото облачное
+ бд для хранения параметров и результатов
+ сериализация, с реализацией на разных
import celery
app = celery.Celery('celery-task',
broker='redis://localhost',
backend='redis://localhost')
@app.task
def add(x, y):
return x + y
result = add.delay(4, 4)
result.get()
Можно задавать ретрай
@app.task(bind=True, retry_backoff=True,
retry_kwargs={'max_retries': 5})
#последовательное выполнение тасок
chain = celery.chain(add.s(4, 6), multiply.s(10))
#можно делать несколько очередей с разными приоритетами
result = add.apply_async(args=[4, 6], queue='low-priority')
Ретрай для обычной функции без очередей:
import tenacity
def do_something():
if random.randint(0, 1) == 0:
print("Failure")
raise RuntimeError
print("Success")
tenacity.Retrying()(do_something)
#можно задавать задержку при ретрае
@tenacity.retry(
wait=tenacity.wait_exponential(multiplier=0.5, max=30, exp_base=2),
)
Блокировки
Межпотоковая блокировка
import threading
stdout_lock = threading.Lock()
def print_something(something):
with stdout_lock:
#(something)
Остановка потока через флаг:
import threading
stop = threading.Event()
def background_job():
while not stop.is_set():
...
stop.set()
Межпроцессорная блокировка. Но процессы должны быть дочерними текущего:
import multiprocessing stdout_lock = multiprocessing.Lock()Межпроцессорная блокировка на файлах:
import fasteners
lock = fasteners.InterProcessLock("/tmp/mylock")
with lock:
#smth
#Или через декаратор:
@fasteners.interprocess_locked('/tmp/tmp_lock_file')
def locked_print():
Распределенный лок через kv базу
import etcd3
client = etcd3.client()
lock = client.lock("foobar")
lock.acquire()
lock.release()
#или через декаратор:
with lock:
Кэширование
Кэширование на основании словарей с максимальным размером = 3:import cachetools cache = cachetools.LRUCache(maxsize=3)Распределенное кэширование в мемкэше
from pymemcache.client import base
client.set('some_key', 'some_value')
result = client.get('some_key')
#атомарная операция чек и сет (для многопоточных приложений)
client.cas('visitors', result, cas)
Профилирование и трассировка
Профилирование работы конкретной функций
>> import cProfile
>> cProfile.run('2 + 2')
3 function calls in 0.000 seconds
Ordered by: standard name
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 0.000 0.000 <string>:1(<module>)
1 0.000 0.000 0.000 0.000 {built-in method builtins.exec}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
Профилирование целого файла:
>> python -m cProfile sample_script.py
# -s time - сортировка по времени работы
17 function calls in 10.011 seconds
Ordered by: standard name
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 10.011 10.011 sample_script.py:1(<module>)
2 0.000 0.000 4.004 2.002 sample_script.py:10(z)
1 0.000 0.000 4.004 4.004 sample_script.py:13(w)
1 0.000 0.000 10.011 10.011 sample_script.py:17(main)
3 0.000 0.000 3.003 1.001 sample_script.py:3(x)
1 0.000 0.000 3.003 3.003 sample_script.py:6(y)
1 0.000 0.000 10.011 10.011 {built-in method builtins.exec}
6 10.011 1.668 10.011 1.668 {built-in method time.sleep}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
Визуаулизация графа вызова с учетом времени работы
$ python -m cProfile -o sample_script.cprof sample_script.py $ runsnake sample_script.cprofПрофилирование использованной памяти:
python -m memory_profiler memoryview-copy.py
Line # Mem usage Increment Occurences Line Contents
============================================================
1 17.840 MiB 17.840 MiB 1 @profile
2 def read_random():
3 17.840 MiB 0.000 MiB 1 with open("/dev/urandom", "rb") as source:
4 27.633 MiB 9.793 MiB 1 content = source.read(1024 * 10000)
5 37.430 MiB 9.797 MiB 1 content_to_write = content[1024:]
6 37.430 MiB 0.000 MiB 2 print("Content length: %d, content to write length %d" %
7 37.430 MiB 0.000 MiB 1 (len(content), len(content_to_write)))
8 37.430 MiB 0.000 MiB 1 with open("/dev/null", "wb") as target:
9 37.430 MiB 0.000 MiB 1 target.write(content_to_write)
Комментариев нет:
Отправить комментарий