Процессы и потоки
Из-за GIL не рекомендуется использовать потоки в Python. Но можно для IO или сети, т.к. цпу/гил не нагружается, а коннекты можно переиспользовать.Пул процессов
import multiprocessing def compute(n): return n+1 pool = multiprocessing.Pool(processes=8) print("Results: %s" % pool.map(compute, range(8)))С Python 3.2 предпочтительней использовать concurrent.futures
from concurrent import futures with futures.ThreadPoolExecutor(max_workers=8) as executor: futs = [executor.submit(compute) for _ in range(8)] results = [f.result() for f in futs]Пул может быть как из потоков, так и из процессов:
futures.ThreadPoolExecutor - потоки
futures.ProcessPoolExecutor - процессы
Задание размера пула потоков для requests
import requests session = requests.Session() adapter = requests.adapters.HTTPAdapter( pool_connections=100, pool_maxsize=100) session.mount('http://', adapter) response = session.get("http://example.org")
futurist
Ожидание окончания нескольких параллельных заданий:
from futurist import waiters results = waiters.wait_for_all(futs)Ограничение числа задач в ожидании = 2
with futurist.ThreadPoolExecutor( max_workers=8, check_and_reject=rejection.reject_when_reached(2)) as executor:Периодический вызов функции
from futurist import periodics @periodics.periodic(1) def fnc(): #запуск w = periodics.PeriodicWorker(fnc) w.start #остановка w.stop()
cotyledon
Демонизация процесса
import cotyledon manager = cotyledon.ServiceManager() manager.add(PrinterService, 2) manager.run()- PrinterService - должен реализовывать init, run, terminate
AsyncIO
На основе статьи 1 и 2 на хабреФреймворк для организации неблокирующей работы с IO bound кодом.
Важно использовать неблокирующие асинхронные api. Api должно поддерживать прерывание и восстановление работы.
Ключевые понятия:
* Корунтина - ключевое слово async - обертка над функцией, превращающая ее в указатель с возможностью прерывания.
разновидность генератора, передает значение class 'coroutine' внешней прогарммы до полного выполнения.
Корутина дает интерпретатору возможность возобновить функцию, которая была приостановлена в месте размещения ключевого слова await (при завершении await задачи)
* await - приостановка кода внутри корунтина и передача управлению основному циклу событий с возможностью вернуться в эту точку позже
Поток не блокируется и его ресурсы могут быть использованы другим кодом.
* asyncio.create_task - создает таску для выполнения корунтины
* asyncio.run - цикл событий, который запускает корунтины и отсоединяет ресурсы, задание выполняется независимо от основного процесса и оповещает о результате основной цикл событий. По этому ресурсы основного цикла не расходуются, пока работает параллельное задание
Т.е. фактически 1 потоком/процессом можно обрабатывать несколько IO задач, периодически получая результаты при их готовности и пока они не готовы тратить cpu на более полезные cpu bound задачи.
import asyncio import time async def fun1(x): print(x**2) await asyncio.sleep(3) # print('fun1 завершена') #async - преобразовываем функцию в корунтину, чтобы ее можно было выполнить позже/приостанавливать и т.д. async def fun2(x): print(x**0.5) #в точке await передается управление основному циклу событий (asyncio.get_running_loop()) #сама корунтина в этом месте приостанавливается до выполнения IO задачи await asyncio.sleep(3) print('fun2 завершена') async def main(): #create_task - наследник Future - старт корунтины и ожидание результата в цикле task1 = asyncio.create_task(fun1(4)) task2 = asyncio.create_task(fun2(4)) await task1 await task2 print(time.strftime('%X')) asyncio.run(main()) #создает цикл событий - завершается при выполнении всех тасок print(time.strftime('%X'))Не все библиотеки поддерживают asyncio работу, по этому можно реализовать самостоятельно через пул потоков/процессов.
Но код будет блокировать 1 поток внутри, т.е. ресурсы потока в таком сценарии не освобождаются.
import asyncio import functools import time import requests from concurrent.futures import ThreadPoolExecutor def get_weather(city): #какой то блокируемый код async def main(cities_): # извлекаем цикл событий loop = asyncio.get_running_loop() with ThreadPoolExecutor() as pool: #или ProcessPoolExecutor # раскладываем задачи по отдельным потокам tasks = [loop.run_in_executor(pool, functools.partial(get_weather, city)) for city in cities_] await asyncio.gather(*tasks) #говорим циклу событий запустить и мониторить результат print(time.strftime('%X')) asyncio.run(main([...])) print(time.strftime('%X'))К примеру данный подход используется в отложенных операторах airflow, когда запуск задания не тратит слот шедуллера.
Очереди
rq.Queuefrom rq import Queue from redis import Redis q = Queue(connection=Redis()) job = q.enqueue(sum, [42, 43]) while job.result is None: time.sleep(1) print(job.result) #ограничение времени ожидания результата job = q.enqueue(requests.get, "http://httpbin.org/delay/1",ttl=60, result_ttl=300)
Celery - обертка над любой очередью: redis, rabit, чтото облачное
+ бд для хранения параметров и результатов
+ сериализация, с реализацией на разных
import celery app = celery.Celery('celery-task', broker='redis://localhost', backend='redis://localhost') @app.task def add(x, y): return x + y result = add.delay(4, 4) result.get()Можно задавать ретрай
@app.task(bind=True, retry_backoff=True, retry_kwargs={'max_retries': 5}) #последовательное выполнение тасок chain = celery.chain(add.s(4, 6), multiply.s(10)) #можно делать несколько очередей с разными приоритетами result = add.apply_async(args=[4, 6], queue='low-priority')Ретрай для обычной функции без очередей:
import tenacity def do_something(): if random.randint(0, 1) == 0: print("Failure") raise RuntimeError print("Success") tenacity.Retrying()(do_something) #можно задавать задержку при ретрае @tenacity.retry( wait=tenacity.wait_exponential(multiplier=0.5, max=30, exp_base=2), )
Блокировки
Межпотоковая блокировкаimport threading stdout_lock = threading.Lock() def print_something(something): with stdout_lock: #(something)Остановка потока через флаг:
import threading stop = threading.Event() def background_job(): while not stop.is_set(): ... stop.set()Межпроцессорная блокировка. Но процессы должны быть дочерними текущего:
import multiprocessing stdout_lock = multiprocessing.Lock()Межпроцессорная блокировка на файлах:
import fasteners lock = fasteners.InterProcessLock("/tmp/mylock") with lock: #smth #Или через декаратор: @fasteners.interprocess_locked('/tmp/tmp_lock_file') def locked_print():Распределенный лок через kv базу
import etcd3 client = etcd3.client() lock = client.lock("foobar") lock.acquire() lock.release() #или через декаратор: with lock:
Кэширование
Кэширование на основании словарей с максимальным размером = 3:import cachetools cache = cachetools.LRUCache(maxsize=3)Распределенное кэширование в мемкэше
from pymemcache.client import base client.set('some_key', 'some_value') result = client.get('some_key') #атомарная операция чек и сет (для многопоточных приложений) client.cas('visitors', result, cas)
Профилирование и трассировка
Профилирование работы конкретной функций>> import cProfile >> cProfile.run('2 + 2') 3 function calls in 0.000 seconds Ordered by: standard name ncalls tottime percall cumtime percall filename:lineno(function) 1 0.000 0.000 0.000 0.000 <string>:1(<module>) 1 0.000 0.000 0.000 0.000 {built-in method builtins.exec} 1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}Профилирование целого файла:
>> python -m cProfile sample_script.py # -s time - сортировка по времени работы 17 function calls in 10.011 seconds Ordered by: standard name ncalls tottime percall cumtime percall filename:lineno(function) 1 0.000 0.000 10.011 10.011 sample_script.py:1(<module>) 2 0.000 0.000 4.004 2.002 sample_script.py:10(z) 1 0.000 0.000 4.004 4.004 sample_script.py:13(w) 1 0.000 0.000 10.011 10.011 sample_script.py:17(main) 3 0.000 0.000 3.003 1.001 sample_script.py:3(x) 1 0.000 0.000 3.003 3.003 sample_script.py:6(y) 1 0.000 0.000 10.011 10.011 {built-in method builtins.exec} 6 10.011 1.668 10.011 1.668 {built-in method time.sleep} 1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}Визуаулизация графа вызова с учетом времени работы
$ python -m cProfile -o sample_script.cprof sample_script.py $ runsnake sample_script.cprofПрофилирование использованной памяти:
python -m memory_profiler memoryview-copy.py Line # Mem usage Increment Occurences Line Contents ============================================================ 1 17.840 MiB 17.840 MiB 1 @profile 2 def read_random(): 3 17.840 MiB 0.000 MiB 1 with open("/dev/urandom", "rb") as source: 4 27.633 MiB 9.793 MiB 1 content = source.read(1024 * 10000) 5 37.430 MiB 9.797 MiB 1 content_to_write = content[1024:] 6 37.430 MiB 0.000 MiB 2 print("Content length: %d, content to write length %d" % 7 37.430 MiB 0.000 MiB 1 (len(content), len(content_to_write))) 8 37.430 MiB 0.000 MiB 1 with open("/dev/null", "wb") as target: 9 37.430 MiB 0.000 MiB 1 target.write(content_to_write)
Комментариев нет:
Отправить комментарий