Процессы и потоки
Из-за GIL не рекомендуется использовать потоки в Python. Но можно для IO или сети, т.к. цпу/гил не нагружается, а коннекты можно переиспользовать.Пул процессов
import multiprocessing def compute(n): return n+1 pool = multiprocessing.Pool(processes=8) print("Results: %s" % pool.map(compute, range(8)))С Python 3.2 предпочтительней использовать concurrent.futures
from concurrent import futures with futures.ThreadPoolExecutor(max_workers=8) as executor: futs = [executor.submit(compute) for _ in range(8)] results = [f.result() for f in futs]Пул может быть как из потоков, так и из процессов:
futures.ThreadPoolExecutor - потоки
futures.ProcessPoolExecutor - процессы
Задание размера пула потоков для requests
import requests session = requests.Session() adapter = requests.adapters.HTTPAdapter( pool_connections=100, pool_maxsize=100) session.mount('http://', adapter) response = session.get("http://example.org")
Ожидание окончания нескольких параллельных заданий:
from futurist import waiters results = waiters.wait_for_all(futs)Ограничение числа задач в ожидании = 2
with futurist.ThreadPoolExecutor( max_workers=8, check_and_reject=rejection.reject_when_reached(2)) as executor:Периодический вызов функции
from futurist import periodics @periodics.periodic(1) def fnc(): #запуск w = periodics.PeriodicWorker(fnc) w.start #остановка w.stop()
Демонизация процесса
import cotyledon manager = cotyledon.ServiceManager() manager.add(PrinterService, 2) manager.run()- PrinterService - должен реализовывать init, run, terminate
Event loop
import asyncio #async - определение корунтины, которая должна выполняться в параллель async def add_42(number): return 42 + number async def hello_world(): #await - вызов другой корунтины result = await add_42(23) return result event_loop = asyncio.get_event_loop() try: result = event_loop.run_until_complete(hello_world()) print(result) finally: event_loop.close()
Очереди
rq.Queuefrom rq import Queue from redis import Redis q = Queue(connection=Redis()) job = q.enqueue(sum, [42, 43]) while job.result is None: time.sleep(1) print(job.result) #ограничение времени ожидания результата job = q.enqueue(requests.get, "http://httpbin.org/delay/1",ttl=60, result_ttl=300)
Celery - обертка над любой очередью: redis, rabit, чтото облачное
+ бд для хранения параметров и результатов
+ сериализация, с реализацией на разных
import celery app = celery.Celery('celery-task', broker='redis://localhost', backend='redis://localhost') @app.task def add(x, y): return x + y result = add.delay(4, 4) result.get()Можно задавать ретрай
@app.task(bind=True, retry_backoff=True, retry_kwargs={'max_retries': 5}) #последовательное выполнение тасок chain = celery.chain(add.s(4, 6), multiply.s(10)) #можно делать несколько очередей с разными приоритетами result = add.apply_async(args=[4, 6], queue='low-priority')Ретрай для обычной функции без очередей:
import tenacity def do_something(): if random.randint(0, 1) == 0: print("Failure") raise RuntimeError print("Success") tenacity.Retrying()(do_something) #можно задавать задержку при ретрае @tenacity.retry( wait=tenacity.wait_exponential(multiplier=0.5, max=30, exp_base=2), )
Блокировки
Межпотоковая блокировкаimport threading stdout_lock = threading.Lock() def print_something(something): with stdout_lock: #(something)Остановка потока через флаг:
import threading stop = threading.Event() def background_job(): while not stop.is_set(): ... stop.set()Межпроцессорная блокировка. Но процессы должны быть дочерними текущего:
import multiprocessing stdout_lock = multiprocessing.Lock()Межпроцессорная блокировка на файлах:
import fasteners lock = fasteners.InterProcessLock("/tmp/mylock") with lock: #smth #Или через декаратор: @fasteners.interprocess_locked('/tmp/tmp_lock_file') def locked_print():Распределенный лок через kv базу
import etcd3 client = etcd3.client() lock = client.lock("foobar") lock.acquire() lock.release() #или через декаратор: with lock:
Кэширование
Кэширование на основании словарей с максимальным размером = 3:import cachetools cache = cachetools.LRUCache(maxsize=3)Распределенное кэширование в мемкэше
from pymemcache.client import base client.set('some_key', 'some_value') result = client.get('some_key') #атомарная операция чек и сет (для многопоточных приложений) client.cas('visitors', result, cas)
Профилирование и трассировка
Профилирование работы конкретной функций>> import cProfile >> cProfile.run('2 + 2') 3 function calls in 0.000 seconds Ordered by: standard name ncalls tottime percall cumtime percall filename:lineno(function) 1 0.000 0.000 0.000 0.000 <string>:1(<module>) 1 0.000 0.000 0.000 0.000 {built-in method builtins.exec} 1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}Профилирование целого файла:
>> python -m cProfile sample_script.py # -s time - сортировка по времени работы 17 function calls in 10.011 seconds Ordered by: standard name ncalls tottime percall cumtime percall filename:lineno(function) 1 0.000 0.000 10.011 10.011 sample_script.py:1(<module>) 2 0.000 0.000 4.004 2.002 sample_script.py:10(z) 1 0.000 0.000 4.004 4.004 sample_script.py:13(w) 1 0.000 0.000 10.011 10.011 sample_script.py:17(main) 3 0.000 0.000 3.003 1.001 sample_script.py:3(x) 1 0.000 0.000 3.003 3.003 sample_script.py:6(y) 1 0.000 0.000 10.011 10.011 {built-in method builtins.exec} 6 10.011 1.668 10.011 1.668 {built-in method time.sleep} 1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}Визуаулизация графа вызова с учетом времени работы
$ python -m cProfile -o sample_script.cprof sample_script.py $ runsnake sample_script.cprofПрофилирование использованной памяти:
python -m memory_profiler memoryview-copy.py Line # Mem usage Increment Occurences Line Contents ============================================================ 1 17.840 MiB 17.840 MiB 1 @profile 2 def read_random(): 3 17.840 MiB 0.000 MiB 1 with open("/dev/urandom", "rb") as source: 4 27.633 MiB 9.793 MiB 1 content = source.read(1024 * 10000) 5 37.430 MiB 9.797 MiB 1 content_to_write = content[1024:] 6 37.430 MiB 0.000 MiB 2 print("Content length: %d, content to write length %d" % 7 37.430 MiB 0.000 MiB 1 (len(content), len(content_to_write))) 8 37.430 MiB 0.000 MiB 1 with open("/dev/null", "wb") as target: 9 37.430 MiB 0.000 MiB 1 target.write(content_to_write)
Комментариев нет:
Отправить комментарий