Асинхронные возможности Python
Асинхронность:
Прямая (параллелизм) — в Python нет
Есть модули, которрые позволяют воспользоваться (псевдо) параллелизмом операционной системы (например, subprocess), и в них предусмотрены средства синхронизации
Событийно-ориентированное программирование, в котором программа — это обработчик независимо порождаемых событий
Как правило, понятие «события» привязано к внешним источникам, а программа представляет собой более или менее обычный цикл обработки, то есть цикл и вызов методов-обработчиков
См., например, tkinter или игровые движки типа PyGame / The Python Arcade Library
- Сопрограммная: внутри нескольких сопрограмм есть синхронные участки; асинхронность — это алгоритм, сообразно которому происходит переключение между выполнением этих участков
- Да это ж генераторы!
Модель
Предполагается, что весь предлагаемый код вы запускаете и смотрите на результат; без этого понять намного сложнее, if even possible ☺
Как работает yield from (повторение)
На время yield from код генератора task() логически не исполняется, можно считать, что на это время его замещает subr()
Ловля return из генератора с помощью yield from (два способа)
Оператор return в генераторе откладывает свой параметр в поле .value исключения StopIteration
А в конструкции yield from … это значение приезжает прямо так!
Передача параметра в генератор с помощью .send() (повторение):
Особенность: самый первый .send() должен быть генератор.send(None) (или, что то же самое, next(генератор), потому что в синтаксисе нет способа передать какое-то значение в начало генератора, а не в yield.
initial — это параметр генератор-функции, он передаётся в момент создания генератора, а не при его проходе
Мы договорились считать этот первый next() запуском генератора.
Куда происходит .send() в случае yield from?
Ничего неожиданного: .send() попадает в тот итератор, который сейчас yield-ит
Внимательно посмотрим, куда что send-илось…
Асинхронность как произвольное исполнение частей кода между yield-ами
Понятие синхронного фрагмента — непрерывно выполняемого кода между входом в сопрограмму, yield-ами и выходом из неё
Понятие образующего цикла (main loop)
Тот же пример, но с двумя асинхронно выполняющимися задачами:
1 def subr(n): 2 x = yield f"({n}) Wait for x" 3 y = yield f"({n}) Wait for y ({x=})" 4 return x, y 5 6 def task(n): 7 yield f"Start {n}" 8 while True: 9 value = yield from subr(n) 10 _ = yield f"[{n}]: {value}" 11 12 cores = task(0), task(1) 13 print(next(cores[0]), next(cores[1]), sep="\n") 14 for i in range(20): 15 print(cores[not i % 3].send(i))
Здесь из образующего цикла поступает поток целых чисел, subr() их попарно умножает, а две задачи складывают эти произведения
Очередное число попадает в subr() выбранной задачи, а выбор задач делает образующий цикл
Синхронные фрагменты из task[0] выполняются в два раза чаще синхронных фрагментов из task[1]
Можно попробовать разобраться, какой .send() докуда доходил
- Более сложный пример: три конечных задачи с разным количеством синхронных фрагментов
1 def subr(): 2 x = yield 3 y = yield 4 return [x, y] 5 6 def task(num): 7 res = [] 8 for i in range(num): 9 res += yield from subr() 10 return res 11 12 def loop(*tasks): 13 queue, result = list(tasks), [] 14 print("Start:", *queue, sep="\n\t") 15 for task in tasks: 16 next(task) 17 step = 0 18 while queue: 19 task = queue.pop(0) 20 try: 21 task.send(step) 22 except StopIteration as ret: 23 result.append((hex(id(task)), ret.value)) 24 else: 25 queue.append(task) 26 step += 1 27 return result 28 29 print("Done:", *loop(task(7), task(2), task(5)), sep="\n\t")
- Образующий цикл вынесен в отдельную функцию и стал сложнее. В нём генерируется последовательность целых чисел и отдаётся поштучно на обработку очередному заданию. Если задание закончилось, запоминается его результат, а если нет — ставится в конец очереди.
Для реализации этой логики пришлось снова «вытащить» явную обработку StopIteration
Значения, возвращаемые yield, при этом не используются вообще: yield служит только для разметки синхронных фрагментов
Если ещё усложнить логику образующего цикла, мы сможем управлять его поведением с помощью возвращаемых yield значений:
1 from random import randint 2 from string import ascii_uppercase 3 from collections import deque 4 5 def subr(): 6 return (yield int) * (yield str) 7 8 def task(num): 9 res = "" 10 for i in range(num): 11 res += yield from subr() 12 return res 13 14 def loop(*tasks): 15 queue, result = deque((task, None) for task in tasks), [] 16 print("Start:", *queue, sep="\n\t") 17 idx = -1 18 while queue: 19 task, request = queue.popleft() 20 if request is int: 21 data = randint(1, 4) 22 elif request is str: 23 data = ascii_uppercase[idx := idx + 1] 24 else: 25 data = request 26 try: 27 request = task.send(data) 28 except StopIteration as ret: 29 result.append((task, ret.value)) 30 task.close() 31 else: 32 queue.append((task, request)) 33 return result 34 35 print("Done:", *loop(task(10), task(3), task(5)), sep="\n\t")
subr() возвращает тип параметра, который она хотела бы получить в следующем yield-е
Этот тип хранится в очереди вместе с заданием, чей subr() запросил данный параметр
- Образующий цикл генерирует параметр сообразно типу
А ещё мы храним очередь в очереди, а не в списке, не надо привыкать к плохому!
Можно и дальше усложнять, но и так уже непросто!
Ещё модели
Цикл событий: образующий цикл получает откуда-то «события», определяет, кто их должен обрабатывать и вызывает функции-обработчики с параметром обработчик(событие) (возможно, не функции, а генераторы обработчик.send(событие), не слишком важно).
- Цикл обратных вызовов (callback-ов): частный случай того же самого: каждый обработчик «регистрируется» — по заранее определённому протоколу указывает, в каких случаях его надо вызывать (это и есть событие), а образующий цикл при наступлении события вызывает все обработчики, которые на нём зарегистрировались (опять-таки, можно организовать в виде функций, а можно в виде генераторов)
Цикл с фьючами (future, promise): унификация управления образующим циклом
future — это генератор, в котором есть поле «готовность / результат»; изначально фьюча не готова. Фьюча состоит из двух синхронных сегментов:
Настройка и yield себя в образующий цикл
return готового результата пользователю
- Алгоритм работы:
- Неготовая фьюча заводится в данном образующем цикле
Образующий цикл вызывает next(сопрограмма)
Сопрограмма делает yield from фьюча
- Фьюча выпадает в образующий цикл, потому что она ещё не готова (первый сегмент)
- Образующий цикл продолжает работу, проверяя, что фьюча не готова
- В какой-то момент некто выставляет фьюче готовность / результат
На этом основании образующий цикл возвращает управление фьюче next(фьюча) (во второй сегмент)
- А та возвращает значение пользователю
- Если фьюча всё ещё не готова — это ошибка алгоритма в образующем цикле, так делать нельзя
Если фьюча уже готова, все вызовы yield from фьюча сразу возвращают результат
Полученный StopIteration образующий цикл обрабатывает (обычно игнорирует)
- … более сложная логика (например, приоритизация событий) …
Ещё раз: асинхронность — это не параллелизм! Все фрагменты выполняются последовательно в один поток.
Синтаксис Async
async def + return — задание сопрограммы ≈ def + yield (только без yield)
Генератор на один шаг:
Может включать в себя await ≈ yield from — вызов других сопрограмм
Кстати, async def + yield — это именно то, чем кажется: генераторы, про которые сразу известно, что они асинхронные:
Их можно проходить циклом async for (причём в конструкторах вида [… async for i in асинхронный-гененратор …] тоже)
Если до этого момента не стало понятно:
A tale of event loops — ещё одно объяснение async, немножко с обратного конца
Перепишем предыдущий пример на async
Примечание: @types.coroutine — низкоуровневая сопрограмма, которая может делать и return значение, и yield, то есть напрямую обращаться к образующему циклу. Встречается редко.
1 from random import randint
2 from string import ascii_uppercase
3 from types import coroutine
4 from collections import deque
5
6 @coroutine
7 def subr():
8 return (yield int) * (yield str)
9
10 async def task(num):
11 res = ""
12 for i in range(num):
13 res += await subr()
14 return res
15
16 def loop(*tasks):
17 queue, result = deque((task, None) for task in tasks), []
18 print("Start:", *queue, sep="\n\t")
19 idx = -1
20 while queue:
21 task, request = queue.popleft()
22 if request is int:
23 data = randint(1, 4)
24 elif request is str:
25 data = ascii_uppercase[idx := idx + 1]
26 else:
27 data = request
28 try:
29 request = task.send(data)
30 except StopIteration as ret:
31 result.append((task, ret.value))
32 task.close()
33 else:
34 queue.append((task, request))
35 return result
36
37 print("Done:", *loop(task(10), task(3), task(5)), sep="\n\t")
Наш subr() использует прямое управление образующим циклом с помощью yield
В готовом инструментарии это практически никогда не нужно: и образующий цикл, и инструменты управления им должны входить в такой инструментарий
Формально говоря, awaitable object — это просто объект с методом .__await__(), возвращающим итератор. Однако логика работы этого метода диктуется управляющим циклом, это вам не .__call__ ☺. Вот пример реализации логики Future для управляющего цикла asyncio.
Asyncio
Немного истории:
http://magic.io → https://www.edgedb.com
→ uvloop
Кстати, [https://github.com/MagicStack/uvloop|uvloop]]
- … пошло-поехало!
- …
Python Asyncio: The Complete Guide (NB: название сайта!)
- …
- Самое сложное — это логика образующего цикла
Самое ненужное — это логика образующего цикла (достаточно знать, как им пользоваться, а не что делает)
⇒
- Запрограммируем образующий цикл заранее, насуём туда инструментов
Упростим протокол управления до одного понятия — Future
- Обмажем протокол верхним уровнем (задания, события, очереди и т. п.)
До такой степени, что ни одна из наших сопрограмм не делает yield (если это не асинхронный генератор)
(asyncio specific) обмажем огромным количеством применений IRL
Основные понятия:
Mainloop — образующий цикл. Полностью под капотом, мы его не видим.
Task — сопрограмма, поставленная в управление образующим циклом
1 import asyncio 2 from time import strftime 3 4 async def late(delay, msg): 5 await asyncio.sleep(delay) 6 print(msg) 7 8 async def main(): 9 print(f"> {strftime('%X')}") 10 await late(1, "One") 11 print(f"> {strftime('%X')} + 1") 12 await late(2, "Two") 13 print(f"> {strftime('%X')} + 2") 14 15 task3 = asyncio.create_task(late(3, "Three")) 16 task4 = asyncio.create_task(late(4, "Four")) 17 await task3 18 print(f"> {strftime('%X')} + 3") 19 await task4 20 print(f"> {strftime('%X')} + <<1>>") 21 22 asyncio.run(main())
asyncio.run(main()) — запуск «приложения» main() в образующем цикле asyncio()
- «приложение» asyncio — корутина, который заполняет очередь mainloop-а и немножко командует им
Если просто написать await — корутина «просто запустится», в чём асинхроннотсь, непонятно (даже если она и выходила в mainloop)
В примере первая корутина спит секунду, а вторая — после этого ещё две
Если написать create_task(корутина), корутина регистрируется в mainloop-е, а возвращется нечто вроде фьючи — задание
await(здадание) запускает его
В примере две корутины планируются одновременно, первая из них спит три секунды, а вторая — четыре, так что отрабатывает через секунду после первой
asyncio.sleep(тайм-аут) — это команда mainloop-у «верни мне управление после тайм-аута»
- Чуть ли не единственная команда asyncio-шному mainloop-у на поверхности
Gather — атомарная операция create_task() / await над несколькими корутинами
- Тут всё понятно, запустились, повисели сколько сказано, завершились
(Python 3.11) группы заданий — запуск в виде контекстного менеджера
1 import asyncio 2 3 async def late(delay, msg): 4 await asyncio.sleep(delay) 5 print(msg) 6 return delay 7 8 async def main(): 9 async with asyncio.TaskGroup() as tg: 10 tg.create_task(late(3, "A")) 11 tg.create_task(late(1, "B")) 12 tg.create_task(late(2, "C")) 13 print("Done") 14 15 asyncio.run(main())
Asyncio — это:
Асинхронное выполнение фрагментов кода между yield-ами; порядок определяется образующим циклом и намеренно недетерминирован
⇒ возможны ситуации гонок, взаимоблокировки и прочие прелести; необходима синхронизация
- Один поток вычислений
⇒ нет ситуаций одновременного атомарного доступа к ресурсу (неатомарный одновременный доступ, то есть длящийся более одного фрагмента, разумеется, есть; требуются семафоры и mutex-ы)
⇒ нельзя надолго (особенно — неопределённо надолго) оставаться в одном фрагменте (висеть в горячем цикле, синхронном вводе и т. п.): пока сопрограмма не передала управление образующему циклу, никакие другие задания не выполняются
В таких случаях может помочь asyncio.sleep(0), но если он помогает — что-то с вашим алгоритмом не то…
- Простая модель асинхронности с полностью скрытым образующим циклом
Высокоуровневое API (введение)
Собственно фьюча,
она считается низкоуровневым примитивом (The rule of thumb is to never expose Future objects in user-facing APIs) (если успеем) Пример:
1 >>> F = asyncio.Future() 2 >>> F.done() 3 False 4 >>> F 5 <Future pending> 6 >>> f = F.__await__() # Итератор из двух сегментов 7 >>> f.send(None) # Это клиент сделал await f (бывш. yield from f) 8 <Future pending> # фьюча не готова, клиент спит на ней 9 >>> F.set_result(42) # кто-то сделал её готовой 10 >>> F 11 <Future finished result=42> # клюенту вернётся это значение 12 >>> F.done() 13 True 14 15 >>> f.send(None) # mainloop велел продолжать 16 Traceback (most recent call last): 17 File "<stdin>", line 1, in <module> 18 StopIteration: 42
Высокоуровневый — например, события
1 async def waiter(name, event): 2 print(f'{name} waits for {event}…') 3 await event.wait() 4 print(f'…{name} got it!') 5 6 async def eventer(wait, event): 7 print(f"Emitting {event} in {wait} seconds") 8 await asyncio.sleep(wait) 9 print(f"Emitting {event}…") 10 event.set() 11 12 async def main(): 13 event = asyncio.Event() 14 await asyncio.gather( 15 waiter("One", event), 16 waiter("Two", event), 17 eventer(1, event)) 18 19 asyncio.run(main())
…или барьеры
- …
1 async def ham(queue, size): 2 for i in range(size): 3 await asyncio.sleep(1) 4 res = await queue.get() 5 print(f"\tGot {res}") 6 7 async def spam(wait, queue): 8 for i in range(6): 9 await asyncio.sleep(wait) 10 val = f"{wait}:{i}" 11 await queue.put(val) 12 print(f"Put {val}") 13 14 async def main(): 15 queue = asyncio.Queue() 16 await asyncio.gather( 17 ham(queue, 12), 18 spam(0.4, queue), 19 spam(1.6, queue)) 20 21 asyncio.run(main())
- Есть и приоритетные очереди
И толстый-толстый слой шоколада!
Параллелизм (внешний, следите за тредобезопасностью или не используйте треды)
Изменение логики работы mainloop (aka Policies)
- Сеть (I/O, IPC и всё остальное), сигналы
Потоки (над этим всем)
В частности, TCP сервер (аналог netcat -l)
1 import asyncio 2 3 async def echo(reader, writer): 4 while data := await reader.readline(): 5 writer.write(data.swapcase()) 6 writer.close() 7 await writer.wait_closed() 8 9 async def main(): 10 server = await asyncio.start_server(echo, '0.0.0.0', 1337) 11 async with server: 12 await server.serve_forever() 13 14 asyncio.run(main())
- Может принимать произвольное количество соединений
Ещё раз: это не параллелизм! - ⇒ (да, да, помним: не висеть внутри сопрограммы в горячих циклах / синхронном вводе / whatever alike)
datamodel.html#coroutine.throw (по аналогии с generator.throw()
- …
Дикая туча модулей на основе asyncio
Python3.14+: Интроспекция запущенной asyncio-программы
Д/З
Попробовать прочитать всю документацию и прощёлкать всё, до чего дотянетесь.
- Input:
Написать класс Portal, который работает так же, как Barrier, однако дополнительно имеет property .topic. Этот дескриптор по умолчанию равен None, однако вызов .wait(топик) с не-None параметром его меняет на топик. Главное свойство Portal состоит в том, что к моменту «прохождения портала» любым его клиентом значение topic должно быть равно заданному.
Предполагается, что из клиентов только один задаёт топик, остальные не меняют его (например, передают None)
Output:FLAG0 FLAG0 FLAG0 FLAG0 FLAG0 FLAG1 FLAG1 FLAG1 FLAG1 FLAG1 FLAG2 FLAG2 FLAG2 FLAG2 FLAG2
