Source code for jvconnected.ui.models.engine

from loguru import logger
import asyncio
import threading
from typing import List

from PySide2 import QtCore, QtQml
from PySide2.QtCore import Property, Signal

from qasync import QEventLoop, asyncSlot, asyncClose

from jvconnected.common import ConnectionState
from jvconnected.engine import Engine
from jvconnected.ui.utils import (
    GenericQObject, connect_async_close_event, AnnotatedQtSignal as AnnoSignal,
)
from jvconnected.ui.models.device import DeviceModel, DeviceConfigModel

[docs]class EngineModel(GenericQObject): """Qt Bridge to :class:`jvconnected.engine.Engine` This object creates an instance of :class:`jvconnected.engine.Engine` and handles all necessary interaction with it """ _n_running = Signal() _n_deviceViewIndices = Signal() deviceAdded: AnnoSignal(device=DeviceModel) = Signal(DeviceModel) """Fired when an active device is added to the engine """ deviceRemoved: AnnoSignal(device_id=str) = Signal(str) """Fired when a device is removed """ configDeviceAdded: AnnoSignal(conf_device=DeviceConfigModel) = Signal(DeviceConfigModel) """Fired when a device is detected or loaded from config """ engine: Engine """The engine instance""" def __init__(self, *args): self.loop = asyncio.get_event_loop() self.engine = Engine(auto_add_devices=True) self.engine.bind_async(self.loop, running=self.on_engine_running, on_config_device_added=self.on_config_device_added, on_device_discovered=self.on_device_discovered, on_device_added=self._engine_device_added, on_device_connected=self._engine_device_connected, on_device_removed=self._engine_device_removed, ) self._running = False self._device_configs = {} self._devices = {} self._deviceViewIndices = [] super().__init__(*args) connect_async_close_event(self.appClose)
[docs] @asyncSlot() async def open(self): """Open the :attr:`engine` See :meth:`jvconnected.engine.Engine.open` """ for conf_device in self.engine.config.devices.values(): await self.on_config_device_added(conf_device) await self.engine.open()
[docs] @asyncSlot() async def close(self): """Close the :attr:`engine` See :meth:`jvconnected.engine.Engine.close` """ await self.engine.close()
async def appClose(self): await self.close()
[docs] @QtCore.Slot(str, result=DeviceConfigModel) def getDeviceConfig(self, device_id: str) -> DeviceConfigModel: """Get a :class:`jvconnected.ui.models.device.DeviceConfigModel` by its :attr:`~jvconnected.ui.models.device.DeviceConfig.deviceId` """ return self._device_configs[device_id]
[docs] @QtCore.Slot(result='QVariantList') def getAllDeviceConfigIds(self) -> List[str]: """Get a list of all device ids in the :class:`jvconnected.config.Config` """ return list(self._device_configs.keys())
[docs] @QtCore.Slot(str, result=DeviceModel) def getDevice(self, device_id: str) -> DeviceModel: """Get a :class:`jvconnected.ui.models.device.DeviceModel` by its :attr:`~jvconnected.ui.models.device.DeviceModel.deviceId` """ return self._devices[device_id]
def _g_running(self) -> bool: return self._running def _s_running(self, value: bool): self._generic_setter('_running', value) running: bool = Property(bool, _g_running, _s_running, notify=_n_running) """Run state""" def _g_deviceViewIndices(self): return self._deviceViewIndices def _s_deviceViewIndices(self, value): self._generic_setter('_deviceViewIndices', value) deviceViewIndices: List[int] = Property('QVariantList', _g_deviceViewIndices, _s_deviceViewIndices, notify=_n_deviceViewIndices, ) def on_engine_running(self, instance, value, **kwargs): self.running = value @logger.catch async def on_config_device_added(self, conf_device): if conf_device.id in self._device_configs: model = self._device_configs[conf_device.id] if model.device is not conf_device: if model.device is not None: model.device.unbind(self) model.device = conf_device conf_device.bind(device_index=self._calc_device_view_indices) return logger.debug(f'adding conf_device: {conf_device}') conf_device.bind(device_index=self._calc_device_view_indices) model = DeviceConfigModel(device=conf_device) self._device_configs[conf_device.id] = model model.reconnectSignal.connect(self.on_device_conf_reconnect_sig) self.configDeviceAdded.emit(model) @logger.catch async def on_device_discovered(self, conf_device, **kwargs): logger.info(f'engine.on_device_discovered: {conf_device}') await self.on_config_device_added(conf_device) @logger.catch async def _engine_device_added(self, device, **kwargs): conf_device_model = self._device_configs[device.id] if device.connection_state == ConnectionState.CONNECTED: return if conf_device_model.alwaysConnect: await self._engine_device_connected(device, **kwargs) @logger.catch async def _engine_device_connected(self, device, **kwargs): conf_device_model = self._device_configs[device.id] logger.info(f'engine.on_device_connected: {device=}, {conf_device_model=}') engine_conf_device = self.engine.config.devices[device.id] assert conf_device_model.device is engine_conf_device if device.id in self._devices: model = self._devices[device.id] assert model.confDevice is conf_device_model if model.device is device: return assert model.deviceId == device.id logger.info(f'setting model.device to "{device}"') model.device = device else: logger.info(f'creating new DeviceModel for "{device.id}"') model = DeviceModel(confDevice=conf_device_model) model.device = device model.reconnectSignal.connect(self.on_device_reconnect_sig) self._devices[model.deviceId] = model self._calc_device_view_indices() self.deviceAdded.emit(model) model.removeDeviceIndex.connect(self.on_device_remove_index) logger.debug(f'{engine_conf_device.connection_state=}, {engine_conf_device.device_index=}, {device.device_index=}') @logger.catch async def _engine_device_removed(self, device, reason, **kwargs): conf_device_model = self._device_configs.get(device.id) logger.info(f'engine.on_device_removed: {device}, {reason}, {conf_device_model=}') model = self._devices.get(device.id) if model is not None: model.device = None
[docs] @asyncSlot(DeviceConfigModel) async def on_device_conf_reconnect_sig(self, conf_device_model: DeviceConfigModel): """Reconnect the given device Calls the :meth:`~jvconnected.engine.Engine.reconnect_device` method on the :attr:`engine`. """ conf_device = conf_device_model.device if conf_device.device_index is None: conf_device.device_index = -1 logger.debug(f'set conf_device index: {conf_device.device_index=}') state = await self.engine.reconnect_device(conf_device, wait_for_state=True) logger.debug(f'reconnect state={state!r}') config_conf_device = self.engine.config.devices[conf_device.id] assert config_conf_device is conf_device
@asyncSlot(DeviceModel) async def on_device_reconnect_sig(self, device_model: DeviceModel): await self.on_device_conf_reconnect_sig(device_model.confDevice) def _calc_device_view_indices(self, *args, **kwargs): devices = self.engine.config.devices d = {dev.device_index:dev.id for dev in devices.values() if dev.id in self._devices and dev.device_index is not None} # d = {dev.deviceIndex:dev.deviceId for dev in self._devices.values()} l = [d[key] for key in sorted(d.keys())] self.deviceViewIndices = l # @asyncSlot(str) def on_device_remove_index(self, device_id): model = self._devices[device_id] device = model.device conf_device_model = self._device_configs[device_id] conf_device = conf_device_model.device # await device.stop() # model.device = None conf_device.device_index = None
def register_qml_types(): QtQml.qmlRegisterType(EngineModel, 'DeviceModels', 1, 0, 'EngineModel')