Source code for jvconnected.config

from __future__ import annotations
import typing as tp
from loguru import logger
import os
import sys
import asyncio
from pathlib import Path
import typing as tp
from contextlib import contextmanager

from pydispatch import Dispatcher, Property, DictProperty, ListProperty
import jsonfactory

from zeroconf import ServiceInfo

from jvconnected.common import ConnectionState
from jvconnected.utils import IndexedDict

[docs]def get_config_dir(app_name: str) -> 'pathlib.Path': """Get the platform's preferred configuration directory * For Windows, the :literal:`%LOCALAPPDATA%` environment variable is used. Typically ``c:\\Users\\<username>\\AppData\\Local`` * For MacOS, ``~/Library/Preferences`` * All others will be ``~/.config`` """ if sys.platform == 'win32': p = Path(os.environ['LOCALAPPDATA']) elif sys.platform == 'darwin': p = Path.home() / 'Library' / 'Preferences' else: p = Path.home() / '.config' return p / app_name
class DumbLock(object): def __init__(self): self._locked = 0 def acquire(self): self._locked += 1 def release(self): if not self.locked(): raise Exception('Lock not acquired') self._locked -= 1 def locked(self): return self._locked > 0 def __enter__(self): self.acquire() return self def __exit__(self, *args): self.release() async def __aenter__(self): self.acquire() return self async def __aexit__(self, *args): self.release() class ContextLock(DumbLock): def __init__(self): self.context = None super().__init__() @contextmanager def set(self, value): self.acquire() self.context = value yield self.release() def release(self): super().release() if not self.locked(): self.context = None
[docs]class Config(Dispatcher): """Configuration storage This object provides a dict-like interface and stores the configuration data automatically when it changes. The stored config data is read on initialization. Arguments: filename (:class:`pathlib.Path`, optional): The configuration filename. If not provided, the :attr:`DEFAULT_FILENAME` is used """ data: tp.Dict[str, tp.Any] = DictProperty() DEFAULT_FILENAME: Path = get_config_dir('jvconnected') / 'config.json' """Platform-dependent default filename (``<config_dir>/jvconnected/config.json``). Where ``<config_dir>`` is chosen in :func:`get_config_dir` """ indexed_devices: IndexedDict """An instance of :class:`jvconnected.utils.IndexedDict` to handle device indexing """ _events_ = ['on_device_added']
[docs] def on_device_added(self, device: 'DeviceConfig'): """Triggered when a device is added to the config """
def __init__(self, filename: tp.Optional['pathlib.Path'] = None): if filename is None: filename = self.DEFAULT_FILENAME logger.info(f'Config using filename: {filename}') self._read_complete = False self._device_reindexing = ContextLock() self.indexed_devices = IndexedDict() self.indexed_devices.bind( on_item_index_changed=self.on_device_dict_index_changed, ) self.filename = filename self._setitem_lock = DumbLock() self.read() self._read_complete = True self.bind(data=self.on_data_changed) @property def devices(self) -> tp.Dict[str, 'DeviceConfig']: """Mapping of :class:`DeviceConfig` using their :attr:`~DeviceConfig.id` as keys """ if 'devices' not in self: self['devices'] = {} return self['devices'] def __setitem__(self, key, item): with self._setitem_lock: self.data[key] = item self.write() def __getitem__(self, key): return self.data[key] def __contains__(self, key): return key in self.data def keys(self): return self.data.keys() def values(self): return self.data.values() def items(self): return self.data.items() def get(self, key, default=None): return self.data.get(key, default)
[docs] def update(self, other: tp.Dict): """Update from another :class:`dict` """ other = other.copy() with self._setitem_lock: oth_devices = other.get('devices', {}) if 'devices' in other: del other['devices'] self.data.update(other) for device in oth_devices.values(): self.add_device(device) self.write()
[docs] def add_device(self, device: 'DeviceConfig') -> 'DeviceConfig': """Add a :class:`DeviceConfig` instance If a device config already exists, it will be updated with the info provided using :meth:`DeviceConfig.update_from_other` If its :attr:`~DeviceConfig.device_index` is set, it will be added to :attr:`indexed_devices`. """ key = device.id if key in self.devices: with self._setitem_lock: ix = self.devices[key].device_index self.devices[key].update_from_other(device) if not self._read_complete: if ix is not None and ix != -1: assert self.devices[key].device_index == ix self.write() else: assert not self._device_reindexing.locked() ix = device.device_index if ix is not None: with self._setitem_lock: with self._device_reindexing.set(device): new_index = self.indexed_devices.add(key, device, ix) if not self._read_complete and ix != -1: assert ix == new_index device.device_index = new_index self.devices[key] = device device.stored_in_config = True device.bind( device_index=self.on_device_index, on_change=self.on_device_prop_change, ) # self.indexed_devices.compact_indices() self.write() self.emit('on_device_added', device) return self.devices[key]
[docs] def add_discovered_device(self, info: 'zeroconf.ServiceInfo') -> 'DeviceConfig': """Add a :class:`DeviceConfig` from zeroconf data """ device = DeviceConfig.from_service_info(info) return self.add_device(device)
def read(self): if not self.filename.exists(): return with self._setitem_lock: data = jsonfactory.loads(self.filename.read_text()) self.update(data) def write(self): if not self._read_complete: return p = self.filename.parent if not p.exists(): p.mkdir(mode=0o700, parents=True) self.filename.write_text(jsonfactory.dumps(self.data, indent=2)) def on_data_changed(self, instance, value, **kwargs): if self._setitem_lock.locked(): return self.write() def on_device_dict_index_changed(self, **kwargs): key = kwargs['key'] device = kwargs['item'] old_index = kwargs['old_index'] new_index = kwargs['new_index'] if self._device_reindexing.locked(): device.device_index = new_index else: with self._device_reindexing.set(device): with self._setitem_lock: device.device_index = new_index self.write() def on_device_index(self, instance, value, **kwargs): if self._device_reindexing.locked(): return if self._setitem_lock.locked(): return key = instance.id old = kwargs['old'] if value is None: assert isinstance(old, int) if key in self.indexed_devices: self.indexed_devices.remove(key) else: with self._device_reindexing.set(instance): with self._setitem_lock: key = instance.id if old is None: new_index = self.indexed_devices.add(key, instance, value) if value != -1: assert value == new_index else: assert new_index >= 0 instance.device_index = new_index else: self.indexed_devices.change_item_index(key, value) # self.indexed_devices.compact_indices() self.write() def on_device_prop_change(self, instance, prop_name, value, **kwargs): if prop_name == 'device_index': return self.write()
[docs]class DeviceConfig(Dispatcher): """Configuration data for a device """ name: str = Property('') """The device name, taken from :meth:`zeroconf.ServiceInfo.get_name`""" dns_name: str = Property('') """The fully qualified name for the service host, taken from :class:`ServiceInfo.server <zeroconf.ServiceInfo>` """ fqdn: str = Property('') """The fully qualified service name, taken from :class:`ServiceInfo.name <zeroconf.ServiceInfo>` """ hostaddr: str = Property('') """The IPv4 address (in string form)""" hostport: int = Property(80) """The service port""" display_name: str = Property('') """A user-defined name for the device, defaults to :attr:`name`""" auth_user: str|None = Property(None) """Username to use with authentication""" auth_pass: str|None = Property(None) """Password to use with authentication""" device_index: int|None = Property(None) """Index for the device for organization purposes. If ``None`` (default), no index is assigned. Otherwise, the index will be assigned according to :meth:`jvconnected.utils.IndexedDict.add` """ always_connect: bool = Property(False) """If ``True``, the :class:`~jvconnected.engine.Engine` will attempt to connect to this device without it being discovered on the network """ stored_in_config: bool = Property(False) """``True`` if the device is stored in :class:`Config`""" online: bool = Property(False) """``True`` if the device is currently active on the network""" active: bool = Property(False) """``True`` if a :class:`jvconnected.device.Device` is currently communicating with the device """ connection_state: ConnectionState = Property(ConnectionState.UNKNOWN) """The device's :class:`~.common.ConnectionState` """
[docs] def on_change(instance: 'DeviceConfig', prop_name: str, value: tp.Any): """Fired when any property value changes Arguments: instance: The instance whose property changed prop_name: The Property name value: New value for the Property """
_zeroconf_prop_names = ( 'name', 'dns_name', 'fqdn', 'hostaddr', 'hostport', ) _immutable_prop_names = ( 'model_name', 'serial_number', ) _user_def_prop_names = ( 'display_name', 'always_connect', 'device_index', 'auth_user', 'auth_pass', ) _all_prop_names = _zeroconf_prop_names + _user_def_prop_names _events_ = ['on_change'] def __init__(self, model_name: str, serial_number: str, **kwargs): self.__model_name = model_name self.__serial_number = serial_number for attr in self._all_prop_names: if attr not in kwargs: continue val = kwargs[attr] setattr(self, attr, val) if not self.display_name: self.display_name = self.name self.bind(**{attr:self.on_prop_change for attr in self._all_prop_names}) self.bind(connection_state=self.on_connection_state) def on_connection_state(self, instance, value, **kwargs): self.active = value == ConnectionState.CONNECTED @property def model_name(self) -> str: """The model name of the device, taken from :class:`ServiceInfo.properties <zeroconf.ServiceInfo>` """ return self.__model_name @property def serial_number(self) -> str: """The serial number of the device, taken from the service name ``hc500-XXXXXXXX`` where ``XXXXXXXX`` is the serial number """ return self.__serial_number @property def id(self) -> str: """A unique id for the device using the :attr:`model_name` and :attr:`serial_number` attributes """ return f'{self.model_name}-{self.serial_number}'
[docs] @classmethod def get_id_for_service_info(cls, info: 'zeroconf.ServiceInfo') -> str: """Get the :attr:`id` attribute for the given :class:`zeroconf.ServiceInfo` """ props = cls.get_props_from_service_info(info) return f'{props["model_name"]}-{props["serial_number"]}'
[docs] @classmethod def get_props_from_service_info(cls, info: 'zeroconf.ServiceInfo') -> tp.Dict: """Build a dictionary of instance attributes from a :class:`zeroconf.ServiceInfo` """ props = dict( name=info.get_name(), model_name=info.properties[b'model'].decode('UTF-8'), serial_number=info.get_name().split('-')[1], dns_name=info.server, fqdn=info.name, hostaddr=info.parsed_addresses()[0], hostport=info.port, ) return props
[docs] @classmethod def from_service_info(cls, info: 'zeroconf.ServiceInfo') -> 'DeviceConfig': """Construct an instance from a :class:`zeroconf.ServiceInfo` """ kw = cls.get_props_from_service_info(info) return cls(**kw)
[docs] def build_service_info(self) -> ServiceInfo: """Create a :class:`zeroconf.ServiceInfo` from the values in this instance """ info = ServiceInfo( name=self.fqdn, type_='.'.join(self.fqdn.split('.')[1:]), server=self.dns_name, properties={b'model':bytes(self.model_name, 'UTF-8')}, port=self.hostport, parsed_addresses=[self.hostaddr], ) return info
[docs] def update_from_service_info(self, info: 'zeroconf.ServiceInfo'): """Update instance attributes from a :class:`zeroconf.ServiceInfo` """ props = self.get_props_from_service_info(info) for key, val in props.items(): if key in self._immutable_prop_names: assert getattr(self, key) == val continue elif key in self._user_def_prop_names: continue setattr(self, key, val)
[docs] def update_from_other(self, other: 'DeviceConfig'): """Update from another instance of :class:`DeviceConfig` """ for attr in self._all_prop_names: val = getattr(other, attr) if attr == 'device_index' and val == -1: continue elif attr == 'display_name': if val == other.name: continue elif attr == 'always_connect': val = self.always_connect or other.always_connect elif val is None: continue setattr(self, attr, val)
def on_prop_change(self, instance, value, **kwargs): prop = kwargs['property'] self.emit('on_change', instance, prop.name, value) def _serialize(self): d = {k:getattr(self, k) for k in self._all_prop_names} d.update({k:getattr(self, k) for k in self._immutable_prop_names}) return d def __repr__(self): return f'<{self.__class__.__name__}: "{self}">' def __str__(self): return f'{self.model_name} - {self.serial_number}'
@jsonfactory.register class JsonHandler(object): def cls_to_str(self, cls): if type(cls) is not type: cls = cls.__class__ return '.'.join([cls.__module__, cls.__name__]) def str_to_cls(self, s): for cls in [DeviceConfig]: if self.cls_to_str(cls) == s: return cls def encode(self, o): if isinstance(o, DeviceConfig): d = o._serialize() d['__class__'] = self.cls_to_str(o) return d def decode(self, d): if '__class__' in d: cls = self.str_to_cls(d['__class__']) if cls is not None: return cls(**d) return d