| f | import asyncio | f | import asyncio |
| | | |
| class Portal: | | class Portal: |
| | | |
| def __init__(self, parties: int): | | def __init__(self, parties: int): |
| n | self._total = parties | n | self._parties = parties |
| self._phase = 0 | | self._generation = 0 |
| self._count = 0 | | self._n_waiting = 0 |
| self._phase_event = asyncio.Event() | | self._event = asyncio.Event() |
| self._is_broken = False | | self._broken = False |
| self._mutex = asyncio.Lock() | | self._lock = asyncio.Lock() |
| self._last_topic = None | | self._topic = None |
| self._phase_topics_pending = {} | | self._pending_topic = {} |
| self._phase_topics = {} | | self._gen_topic = {} |
| self._task_phase = {} | | self._task_gen = {} |
| self._task_slot = {} | | self._task_index = {} |
| | | |
| @property | | @property |
| n | def parties(self): | n | def parties(self) -> int: |
| return self._total | | return self._parties |
| | | |
| @property | | @property |
| n | def n_waiting(self): | n | def n_waiting(self) -> int: |
| return self._count | | return self._n_waiting |
| | | |
| @property | | @property |
| n | def broken(self): | n | def broken(self) -> bool: |
| return self._is_broken | | return self._broken |
| | | |
| @property | | @property |
| def topic(self): | | def topic(self): |
| """ | | """ |
| n | Для текущей задачи → её топик фазы. | n | Для задач возвращаем топик их собственного прохода, |
| Для внешнего кода → глобальный последний топик. | | для внешнего кода — последний глобальный топик. |
| """ | | """ |
| task = asyncio.current_task() | | task = asyncio.current_task() |
| if task is not None: | | if task is not None: |
| n | ph = self._task_phase.get(task) | n | gen = self._task_gen.get(task) |
| if ph is not None: | | if gen is not None: |
| t = self._phase_topics.get(ph) | | topic = self._gen_topic.get(gen, None) |
| if t is not None: | | if topic is not None: |
| return t | | return topic |
| return self._last_topic | | return self._topic |
| | | |
| async def wait(self, topic=None): | | async def wait(self, topic=None): |
| n | if self._is_broken: | n | if self._broken: |
| raise asyncio.BrokenBarrierError | | raise asyncio.BrokenBarrierError |
| task = asyncio.current_task() | | task = asyncio.current_task() |
| n | async with self._mutex: | n | async with self._lock: |
| phase = self._phase | | gen = self._generation |
| self._task_phase[task] = phase | | self._task_gen[task] = gen |
| if topic is not None: | | if topic is not None: |
| n | self._phase_topics_pending[phase] = topic | n | self._pending_topic[gen] = topic |
| self._count += 1 | | self._n_waiting += 1 |
| slot = self._total - self._count | | index = self._parties - self._n_waiting |
| self._task_slot[task] = slot | | self._task_index[task] = index |
| my_event = self._phase_event | | my_event = self._event |
| if self._count == self._total: | | if self._n_waiting == self._parties: |
| phase_topic = self._phase_topics_pending.pop(phase, None | | gen_topic = self._pending_topic.pop(gen, None) |
| ) | | |
| self._phase_topics[phase] = phase_topic | | self._gen_topic[gen] = gen_topic |
| self._last_topic = phase_topic | | self._topic = gen_topic |
| self._phase += 1 | | self._generation += 1 |
| self._count = 0 | | self._n_waiting = 0 |
| self._phase_event = asyncio.Event() | | self._event = asyncio.Event() |
| my_event.set() | | my_event.set() |
| await my_event.wait() | | await my_event.wait() |
| n | return self._task_slot[task] | n | return self._task_index[task] |
| | | |
| async def reset(self): | | async def reset(self): |
| t | | t | """ |
| | | Сбрасывает портал в исходное состояние. |
| | | Предполагается, что вызывается, когда никто не ждёт. |
| | | """ |
| async with self._mutex: | | async with self._lock: |
| self._phase = 0 | | self._generation = 0 |
| self._count = 0 | | self._n_waiting = 0 |
| self._phase_event = asyncio.Event() | | self._event = asyncio.Event() |
| self._is_broken = False | | self._broken = False |
| self._last_topic = None | | self._topic = None |
| self._phase_topics_pending.clear() | | self._pending_topic.clear() |
| self._phase_topics.clear() | | self._gen_topic.clear() |
| self._task_phase.clear() | | self._task_gen.clear() |
| self._task_slot.clear() | | self._task_index.clear() |
| | | |
| async def __aenter__(self): | | async def __aenter__(self): |
| return await self.wait() | | return await self.wait() |
| | | |
| async def __aexit__(self, exc_type, exc, tb): | | async def __aexit__(self, exc_type, exc, tb): |
| return False | | return False |