Source code for jvconnected.common

from __future__ import annotations
import typing as tp
import asyncio
import enum
from dataclasses import dataclass, field

[docs]class ConnectionState(enum.IntFlag): """State used in :class:`ReconnectStatus` """ UNKNOWN = enum.auto() """Unknown state""" SCHEDULING = enum.auto() """A task is being scheduled to reconnect, but it has not begun execution""" SLEEPING = enum.auto() """The :attr:`ReconnectStatus.task` is waiting before attempting to reconnect""" ATTEMPTING = enum.auto() """The connection is being established""" CONNECTED = enum.auto() """Connection attempt success""" FAILED = enum.auto() """Connection has been lost""" DISCONNECT = enum.auto() """User manually disconnected"""
[docs]class RemovalReason(enum.Enum): """Possible values used in :event:`.engine.Engine.on_device_removed` """ UNKNOWN = enum.auto() """Unknown reason""" OFFLINE = enum.auto() """The device is no longer on the network""" TIMEOUT = enum.auto() """Communication was lost due to a timeout. Reconnection will be attempted""" AUTH = enum.auto() """Authentication with the device failed, likely due to invalid credentials""" USER = enum.auto() """User manually removed the device""" SHUTDOWN = enum.auto() """The engine is shutting down"""
[docs]@dataclass class ReconnectStatus: """Holds state used in device reconnect methods """ device_id: str """The associated device id""" state: ConnectionState = ConnectionState.UNKNOWN """Current :class:`ConnectionState`""" reason: RemovalReason = RemovalReason.UNKNOWN """The :class:`RemovalReason` for disconnect""" task: Optional[asyncio.Task] = None """The current :class:`asyncio.Task` scheduled to reconnect""" num_attempts: int = 0 """Number of reconnect attempts""" lock: asyncio.Lock = field(default_factory=asyncio.Lock) notify: asyncio.Condition = field(default_factory=asyncio.Condition)
[docs] async def set_state(self, state: ConnectionState): """Set the :attr:`state` """ async with self.notify: self.state = state self.notify.notify_all()
[docs] async def wait_for_state(self, state: ConnectionState, timeout: float|int|None = None): """Wait for a specific :attr:`state` value(s) Arguments: state: May be a single :class:`ConnectionState` member or a combination of multiple members (using bitwise OR ``|``) timeout: If not None, wait the given number of seconds for the state. Otherwise, wait indefinitely. Raises: asyncio.TimeoutError If the state was not set within the given timeout """ def predicate(): if state == ConnectionState.UNKNOWN: return self.state == state else: return state & self.state > 0 async with self.notify: coro = self.notify.wait_for(predicate) if timeout is not None: await asyncio.wait_for(coro, timeout) else: await coro
[docs] async def wait_for_connect_or_failure(self, timeout: float|int|None = None): """Shortcut to wait for ``CONNECTED`` or ``FAILED`` """ state = ConnectionState.CONNECTED | ConnectionState.FAILED await self.wait_for_state(state, timeout)
async def __aenter__(self): await self.lock.acquire() return None async def __aexit__(self, *args): self.lock.release()