| f | import asyncio | f | import asyncio |
| | | |
| class Portal: | | class Portal: |
| | | |
| def __init__(self, parties: int): | | def __init__(self, parties: int): |
| n | self._parties = parties | n | self._total = parties |
| self._generation = 0 | | self._phase = 0 |
| self._n_waiting = 0 | | self._count = 0 |
| self._event = asyncio.Event() | | self._phase_event = asyncio.Event() |
| self._broken = False | | self._is_broken = False |
| self._lock = asyncio.Lock() | | self._mutex = asyncio.Lock() |
| self._topic = None | | self._last_topic = None |
| self._pending_topic = {} | | self._phase_topics_pending = {} |
| self._gen_topic = {} | | self._phase_topics = {} |
| self._task_gen = {} | | self._task_phase = {} |
| self._task_index = {} | | self._task_slot = {} |
| | | |
| @property | | @property |
| n | def parties(self) -> int: | n | def parties(self): |
| return self._parties | | return self._total |
| | | |
| @property | | @property |
| n | def n_waiting(self) -> int: | n | def n_waiting(self): |
| return self._n_waiting | | return self._count |
| | | |
| @property | | @property |
| n | def broken(self) -> bool: | n | def broken(self): |
| return self._broken | | return self._is_broken |
| | | |
| @property | | @property |
| def topic(self): | | def topic(self): |
| """ | | """ |
| n | Для задач возвращаем топик их собственного прохода, | n | Для текущей задачи → её топик фазы. |
| для внешнего кода — последний глобальный топик. | | Для внешнего кода → глобальный последний топик. |
| """ | | """ |
| task = asyncio.current_task() | | task = asyncio.current_task() |
| if task is not None: | | if task is not None: |
| n | gen = self._task_gen.get(task) | n | ph = self._task_phase.get(task) |
| if gen is not None: | | if ph is not None: |
| topic = self._gen_topic.get(gen, None) | | t = self._phase_topics.get(ph) |
| if topic is not None: | | if t is not None: |
| return topic | | return t |
| return self._topic | | return self._last_topic |
| | | |
| async def wait(self, topic=None): | | async def wait(self, topic=None): |
| n | if self._broken: | n | if self._is_broken: |
| raise asyncio.BrokenBarrierError | | raise asyncio.BrokenBarrierError |
| task = asyncio.current_task() | | task = asyncio.current_task() |
| n | async with self._lock: | n | async with self._mutex: |
| gen = self._generation | | phase = self._phase |
| self._task_gen[task] = gen | | self._task_phase[task] = phase |
| if topic is not None: | | if topic is not None: |
| n | self._pending_topic[gen] = topic | n | self._phase_topics_pending[phase] = topic |
| self._n_waiting += 1 | | self._count += 1 |
| index = self._parties - self._n_waiting | | slot = self._total - self._count |
| self._task_index[task] = index | | self._task_slot[task] = slot |
| my_event = self._event | | my_event = self._phase_event |
| if self._n_waiting == self._parties: | | if self._count == self._total: |
| gen_topic = self._pending_topic.pop(gen, None) | | phase_topic = self._phase_topics_pending.pop(phase, None |
| | | ) |
| self._gen_topic[gen] = gen_topic | | self._phase_topics[phase] = phase_topic |
| self._topic = gen_topic | | self._last_topic = phase_topic |
| self._generation += 1 | | self._phase += 1 |
| self._n_waiting = 0 | | self._count = 0 |
| self._event = asyncio.Event() | | self._phase_event = asyncio.Event() |
| my_event.set() | | my_event.set() |
| await my_event.wait() | | await my_event.wait() |
| n | return self._task_index[task] | n | return self._task_slot[task] |
| | | |
| async def reset(self): | | async def reset(self): |
| t | """ | t | |
| Сбрасывает портал в исходное состояние. | | |
| Предполагается, что вызывается, когда никто не ждёт. | | |
| """ | | |
| async with self._lock: | | async with self._mutex: |
| self._generation = 0 | | self._phase = 0 |
| self._n_waiting = 0 | | self._count = 0 |
| self._event = asyncio.Event() | | self._phase_event = asyncio.Event() |
| self._broken = False | | self._is_broken = False |
| self._topic = None | | self._last_topic = None |
| self._pending_topic.clear() | | self._phase_topics_pending.clear() |
| self._gen_topic.clear() | | self._phase_topics.clear() |
| self._task_gen.clear() | | self._task_phase.clear() |
| self._task_index.clear() | | self._task_slot.clear() |
| | | |
| async def __aenter__(self): | | async def __aenter__(self): |
| return await self.wait() | | return await self.wait() |
| | | |
| async def __aexit__(self, exc_type, exc, tb): | | async def __aexit__(self, exc_type, exc, tb): |
| return False | | return False |