Source code for jvconnected.engine

from __future__ import annotations
import typing as tp
from loguru import logger
import asyncio
from typing import Optional

from pydispatch import Dispatcher, Property, DictProperty, ListProperty

from jvconnected.common import ConnectionState, RemovalReason, ReconnectStatus
from jvconnected.config import Config, DeviceConfig
from jvconnected.device import Device
from jvconnected.discovery import Discovery
from jvconnected.client import ClientError, ClientAuthError, ClientNetworkError

from jvconnected import interfaces
from jvconnected.interfaces import midi


[docs]class Engine(Dispatcher): """Top level component to handle config, discovery and device control """ devices: tp.Dict[str, Device] = DictProperty() """Mapping of :class:`~.device.Device` instances using their :attr:`~.device.Device.id` as keys """ discovered_devices = DictProperty() running = Property(False) auto_add_devices = Property(True) """If ``True``, devices will be added automatically when discovered on the network. Otherwise, they must be added manually using :meth:`add_device_from_conf` """ midi_io = Property() interfaces: tp.Dict[str, 'jvconnected.interfaces.base.Interface'] = DictProperty() """Container for :class:`~.interfaces.base.Interface` instances """ _events_ = [ 'on_config_device_added', 'on_device_discovered', 'on_device_added', 'on_device_connected', 'on_device_removed', ] config: Config """The :class:`~.config.Config` instance""" discovery: Discovery """The :class:`~.discovery.Discovery` instance""" connection_status: tp.Dict[str, ReconnectStatus] """Mapping of :class:`~.common.ReconnectStatus` instances using the associated :attr:`device_id <.config.DeviceConfig.id>` as keys """
[docs] def on_config_device_added(self, conf_device: DeviceConfig): """Fired when an instance of :class:`~.config.DeviceConfig` is added """
[docs] def on_device_discovered(self, conf_device: DeviceConfig): """Fired when a device is detected on the network. An instance of :class:`~.config.DeviceConfig` is found (or created) and passed as the argument """
[docs] def on_device_added(self, device: Device): """Fired when an instance of :class:`~.device.Device` is added to :attr:`devices` """
[docs] def on_device_connected(self, device: Device): """Fired when an instance of :class:`~.device.Device` has been added and successfully connected """
[docs] def on_device_removed(self, device: Device, reason: RemovalReason): """Fired when an instance of :class:`~.device.Device` is removed Arguments: device: The device that was removed reason: Reason for removal """
_device_reconnect_timeout = 5 _device_reconnect_max_attempts = 100 def __init__(self, **kwargs): self.auto_add_devices = kwargs.get('auto_add_devices', True) self.loop = asyncio.get_event_loop() self.config = Config() self.discovery = Discovery() self.device_reconnect_queue = asyncio.Queue() self._device_reconnect_main_task = None self._run_pending = False self.connection_status = {} for name, cls in interfaces.registry: obj = cls() self.interfaces[name] = obj if name == 'midi': self.midi_io = obj interfaces.registry.bind_async( self.loop, interface_added=self.on_interface_registered, ) async def on_interface_registered(self, name, cls, **kwargs): if name not in self.interfaces: obj = cls() self.interfaces[name] = obj await obj.set_engine(self)
[docs] def run_forever(self): """Convenience method to open and run until interrupted """ self.loop.run_until_complete(self.open()) try: self.loop.run_forever() except KeyboardInterrupt: self.loop.run_until_complete(self.close()) finally: self.loop.run_until_complete(self.close())
[docs] async def open(self): """Open all communication methods """ if self.running: return self._run_pending = True t = asyncio.create_task(self._reconnect_devices()) self._device_reconnect_main_task = t for obj in self.interfaces.values(): await obj.set_engine(self) self.config.bind_async( self.loop, on_device_added=self._on_config_device_added, ) self.discovery.bind_async( self.loop, on_service_added=self.on_discovery_service_added, on_service_updated=self.on_discovery_service_updated, on_service_removed=self.on_discovery_service_removed, ) self.running = True self._run_pending = False await self.add_always_connected_devices() await self.discovery.open() logger.success('Engine open')
[docs] async def add_always_connected_devices(self): """Create and open any devices with :attr:`~jvconnected.config.DeviceConfig.always_connect` set to True """ coros = [] for device_conf in self.config.devices.values(): if not device_conf.always_connect: continue assert device_conf.id not in self.discovered_devices info = device_conf.build_service_info() coros.append(self.on_discovery_service_added(info.name, info=info)) if len(coros): await asyncio.sleep(.01) await asyncio.gather(*coros) await asyncio.sleep(.01)
[docs] async def close(self): """Close the discovery engine and any running device clients """ if not self.running: return self.running = False self.discovery.unbind(self) await self.discovery.close() t = self._device_reconnect_main_task self._device_reconnect_main_task = None await self.device_reconnect_queue.put(None) await t for status in self.connection_status.values(): t = status.task if t is None or t.done(): continue t.cancel() try: await t except asyncio.CancelledError: pass for conf_device in self.discovered_devices.values(): conf_device.online = False await asyncio.sleep(0) async def close_device(device): conf_device = self.config.devices[device.id] status = self.connection_status[device.id] try: await device.close() finally: await self.set_status_state(status, ConnectionState.DISCONNECT) del self.devices[device.id] self.emit('on_device_removed', device, RemovalReason.SHUTDOWN) coros = [] for device in self.devices.values(): coros.append(close_device(device)) await asyncio.gather(*coros) self.connection_status.clear() logger.success('Engine closed')
[docs] async def add_device_from_conf(self, device_conf: 'jvconnected.config.DeviceConfig'): """Add a client :class:`~jvconnected.device.Device` instance from the given :class:`~jvconnected.config.DeviceConfig` and attempt to connect. If auth information is incorrect or does not exist, display the error and remove the newly added device. """ status = self.connection_status.get(device_conf.id) if status is None: status = ReconnectStatus(device_id=device_conf.id) self.connection_status[device_conf.id] = status if status.state == ConnectionState.ATTEMPTING: task = status.task if task is not None and not task.done(): await task if status.state == ConnectionState.CONNECTED: return logger.debug(f'add_device_from_conf: {device_conf}') device = Device( device_conf.hostaddr, device_conf.auth_user, device_conf.auth_pass, device_conf.id, device_conf.hostport, ) await self.set_status_state(status, ConnectionState.ATTEMPTING) device.device_index = device_conf.device_index self.devices[device_conf.id] = device self.emit('on_device_added', device) async with status: try: await device.open() except ClientError as exc: await asyncio.sleep(0) await self.on_device_client_error(device, exc, skip_status_lock=True) return await self.set_status_state(status, ConnectionState.CONNECTED) status.reason = RemovalReason.UNKNOWN status.num_attempts = 0 self.emit('on_device_connected', device) device.bind_async(self.loop, on_client_error=self.on_device_client_error)
@logger.catch async def on_device_client_error(self, device, exc, **kwargs): skip_status_lock = kwargs.get('skip_status_lock', False) disconnect_state = kwargs.get('state', ConnectionState.FAILED) if not self.running: return if isinstance(exc, ClientNetworkError): reason = RemovalReason.TIMEOUT elif isinstance(exc, ClientAuthError): reason = RemovalReason.AUTH logger.warning(f'Authentication failed for device_id: {device.id}') else: reason = kwargs.get('reason', RemovalReason.UNKNOWN) device_conf = self.discovered_devices[device.id] status = self.connection_status[device.id] async def handle_state(): try: await device.close() finally: await self.set_status_state(status, disconnect_state) if device.id in self.devices: del self.devices[device.id] if reason == RemovalReason.TIMEOUT and status.reason != RemovalReason.OFFLINE: await self.device_reconnect_queue.put((device.id, reason)) if skip_status_lock: await handle_state() else: async with status: await handle_state() self.emit('on_device_removed', device, reason)
[docs] async def disconnect_device(self, device_id: str): """Disconnect the device matching the given id (if connected) """ logger.debug(f'disconnect_device({device_id})') status = self.connection_status.get(device_id) if status is not None: if status.state not in [ConnectionState.CONNECTED, ConnectionState.FAILED]: logger.debug('cancelling task') task = status.task if task is not None: task.cancel() try: await task except asyncio.CancelledError: pass status.task = None status.num_attempts = 0 device = self.devices.get(device_id) if device is not None: logger.debug(f'disconnecting') await self.on_device_client_error( device, None, reason=RemovalReason.USER, state=ConnectionState.USER_DISCONNECT, ) assert device_id not in self.devices logger.info(f'Disconnected device "{device_id}"')
[docs] @logger.catch async def reconnect_device(self, device_conf: DeviceConfig, wait_for_status: bool = False): """Attempt to reestablish a device connection This method is primarily useful when a new device is discovered and authentication information is needed for it. Once the information is set on the *device_conf*, this method may be used to retry the connection. Arguments: device_conf: The :class:`~.config.DeviceConfig` to reconnect wait_for_status: If ``True``, attempt to wait for the connection state (using :meth:`ReconnectStatus.wait_for_connect_or_failure() <.common.ReconnectStatus.wait_for_connect_or_failure>`). Default is False """ logger.debug(f'reconnect_device({device_conf})') device_id = device_conf.id await self.disconnect_device(device_id) await self.add_device_from_conf(device_conf) status = self.connection_status.get(device_id) if wait_for_status: try: await status.wait_for_connect_or_failure(timeout=5) except asyncio.TimeoutError: logger.warning(f'Timeout reached when reconnecting to "{device_conf!r}"') return status.state
@logger.catch async def on_discovery_service_added(self, name, **kwargs): logger.debug(f'on_discovery_service_added: {name}, {kwargs}') info = kwargs['info'] device_id = DeviceConfig.get_id_for_service_info(info) device_conf = self.discovered_devices.get(device_id) if device_id in self.config.devices: if device_conf is not None: dev = self.config.add_device(device_conf) assert dev is device_conf device_conf.update_from_service_info(info) else: device_conf = self.config.add_discovered_device(info) self.discovered_devices[device_id] = device_conf elif device_conf is None: device_conf = self.add_discovered_device(info) if device_id not in self.config.devices: dev = self.config.add_device(device_conf) device_conf.online = True self.emit('on_device_discovered', device_conf) if self.auto_add_devices: if device_conf.id not in self.devices: await self.add_device_from_conf(device_conf) async def on_discovery_service_updated(self, name, **kwargs): logger.debug(f'on_discovery_service_updated: "{name}", {kwargs}') info = kwargs['info'] old = kwargs['old'] device_id = DeviceConfig.get_id_for_service_info(old) status = self.connection_status.get(device_id) if status.task is not None and not status.task.done(): await status.task await self.on_discovery_service_removed(name, info=old) await self.on_discovery_service_added(name, info=info) async def on_discovery_service_removed(self, name, **kwargs): logger.debug(f'on_discovery_service_removed: {name}, {kwargs}') info = kwargs['info'] device_id = DeviceConfig.get_id_for_service_info(info) device_conf = self.discovered_devices.get(device_id) if device_conf is not None: device_conf.online = False if device_conf.always_connect: return status = self.connection_status[device_id] async with status: await self.set_status_state(ConnectionState.FAILED) status.reason = RemovalReason.OFFLINE device = self.devices.get(device_id) if device is not None: try: await device.close() finally: del self.devices[device_id] self.emit('on_device_removed', device, RemovalReason.OFFLINE)
[docs] def add_discovered_device(self, info: 'zeroconf.ServiceInfo') -> DeviceConfig: """Create a :class:`~jvconnected.config.DeviceConfig` and add it to :attr:`discovered_devices` """ device_id = DeviceConfig.get_id_for_service_info(info) if device_id in self.discovered_devices: device_conf = self.discovered_devices[device_id] else: device_conf = DeviceConfig.from_service_info(info) self.discovered_devices[device_conf.id] = device_conf return device_conf
async def set_status_state(self, status: ReconnectStatus, state: ConnectionState): device_conf = self.discovered_devices.get(status.device_id) device = self.devices.get(status.device_id) if device_conf is not None: device_conf.connection_state = state if device is not None: device.connection_state = state await status.set_state(state) @logger.catch async def _reconnect_devices(self): q = self.device_reconnect_queue async def do_reconnect(status: ReconnectStatus): await self.set_status_state(status, ConnectionState.SLEEPING) await asyncio.sleep(self._device_reconnect_timeout) async with status: if status.state != ConnectionState.SLEEPING: return if not self.running: return disco_conf = self.discovered_devices.get(status.device_id) if disco_conf is None: return if not disco_conf.online: return logger.debug(f'reconnect to {disco_conf}') status.num_attempts += 1 await self.add_device_from_conf(disco_conf) while self.running or self._run_pending: item = await q.get() if item is None or not self.running: q.task_done() break device_id, reason = item status = self.connection_status[device_id] valid = True async with status: if status.state != ConnectionState.FAILED: valid = False elif status.num_attempts >= self._device_reconnect_max_attempts: logger.debug(f'max attempts reached for "{device_id}"') valid = False elif status.task is not None and not status.task.done(): logger.error(f'Active reconnect task exists for {status}') valid = False elif reason == RemovalReason.TIMEOUT and status.reason == RemovalReason.OFFLINE: valid = False if valid: status.reason = reason await self.set_status_state(status, ConnectionState.SCHEDULING) logger.debug(f'scheduling reconnect for {device_id}, num_attempts={status.num_attempts}') status.task = asyncio.create_task(do_reconnect(status)) q.task_done() async def _on_config_device_added(self, conf_device, **kwargs): conf_device.bind(device_index=self._on_config_device_index_changed) self.emit('on_config_device_added', conf_device) def _on_config_device_index_changed(self, instance, value, **kwargs): device_id = instance.id device = self.devices.get(device_id) if device is None: return device.device_index = value
if __name__ == '__main__': Engine().run_forever()