from loguru import logger
import os
import sys
import asyncio
from pathlib import Path
import typing as tp
from contextlib import contextmanager
from pydispatch import Dispatcher, Property, DictProperty, ListProperty
import jsonfactory
from jvconnected.utils import IndexedDict
[docs]def get_config_dir(app_name: str) -> 'pathlib.Path':
"""Get the platform's preferred configuration directory
* For Windows, the :literal:`%LOCALAPPDATA%` environment variable is used.
Typically ``c:\\Users\\<username>\\AppData\\Local``
* For MacOS, ``~/Library/Preferences``
* All others will be ``~/.config``
"""
if sys.platform == 'win32':
p = Path(os.environ['LOCALAPPDATA'])
elif sys.platform == 'darwin':
p = Path.home() / 'Library' / 'Preferences'
else:
p = Path.home() / '.config'
return p / app_name
class DumbLock(object):
def __init__(self):
self._locked = 0
def acquire(self):
self._locked += 1
def release(self):
if not self.locked():
raise Exception('Lock not acquired')
self._locked -= 1
def locked(self):
return self._locked > 0
def __enter__(self):
self.acquire()
return self
def __exit__(self, *args):
self.release()
async def __aenter__(self):
self.acquire()
return self
async def __aexit__(self, *args):
self.release()
class ContextLock(DumbLock):
def __init__(self):
self.context = None
super().__init__()
@contextmanager
def set(self, value):
self.acquire()
self.context = value
yield
self.release()
def release(self):
super().release()
if not self.locked():
self.context = None
[docs]class Config(Dispatcher):
"""Configuration storage
This object provides a dict-like interface and stores the configuration data
automatically when it changes. The stored config data is read on initialization.
Arguments:
filename (:class:`pathlib.Path`, optional): The configuration filename. If not provided,
the :attr:`DEFAULT_FILENAME` is used
Attributes:
DEFAULT_FILENAME (:class:`pathlib.Path`): Platform-dependent default filename
(``<config_dir>/jvconnected/config.json``). Where ``<config_dir>``
is chosen in :func:`get_config_dir`
indexed_devices: An instance of :class:`jvconnected.utils.IndexedDict`
to handle device indexing
"""
data = DictProperty()
DEFAULT_FILENAME: 'pathlib.Path' = get_config_dir('jvconnected') / 'config.json'
_events_ = ['on_device_added']
def __init__(self, filename: tp.Optional['pathlib.Path'] = None):
if filename is None:
filename = self.DEFAULT_FILENAME
logger.info(f'Config using filename: {filename}')
self._read_complete = False
self._device_reindexing = ContextLock()
self.indexed_devices = IndexedDict()
self.indexed_devices.bind(
on_item_index_changed=self.on_device_dict_index_changed,
)
self.filename = filename
self._setitem_lock = DumbLock()
self.read()
self._read_complete = True
self.bind(data=self.on_data_changed)
@property
def devices(self):
if 'devices' not in self:
self['devices'] = {}
return self['devices']
def __setitem__(self, key, item):
with self._setitem_lock:
self.data[key] = item
self.write()
def __getitem__(self, key):
return self.data[key]
def __contains__(self, key):
return key in self.data
def keys(self): return self.data.keys()
def values(self): return self.data.values()
def items(self): return self.data.items()
def get(self, key, default=None):
return self.data.get(key, default)
[docs] def update(self, other: tp.Dict):
"""Update from another :class:`dict`
"""
other = other.copy()
with self._setitem_lock:
oth_devices = other.get('devices', {})
if 'devices' in other:
del other['devices']
self.data.update(other)
for device in oth_devices.values():
self.add_device(device)
self.write()
[docs] def add_device(self, device: 'DeviceConfig') -> 'DeviceConfig':
"""Add a :class:`DeviceConfig` instance
If a device config already exists, it will be updated with the info
provided using :meth:`DeviceConfig.update_from_other`
If its :attr:`~DeviceConfig.device_index` is set, it will be added
to :attr:`indexed_devices`.
"""
key = device.id
if key in self.devices:
with self._setitem_lock:
ix = self.devices[key].device_index
self.devices[key].update_from_other(device)
if not self._read_complete:
if ix is not None and ix != -1:
assert self.devices[key].device_index == ix
self.write()
else:
assert not self._device_reindexing.locked()
ix = device.device_index
if ix is not None:
with self._setitem_lock:
with self._device_reindexing.set(device):
new_index = self.indexed_devices.add(key, device, ix)
if not self._read_complete and ix != -1:
assert ix == new_index
device.device_index = new_index
self.devices[key] = device
device.stored_in_config = True
device.bind(
device_index=self.on_device_index,
on_change=self.on_device_prop_change,
)
# self.indexed_devices.compact_indices()
self.write()
self.emit('on_device_added', device)
return self.devices[key]
[docs] def add_discovered_device(self, info: 'zeroconf.ServiceInfo') -> 'DeviceConfig':
"""Add a :class:`DeviceConfig` from zeroconf data
"""
device = DeviceConfig.from_service_info(info)
return self.add_device(device)
def read(self):
if not self.filename.exists():
return
with self._setitem_lock:
data = jsonfactory.loads(self.filename.read_text())
self.update(data)
def write(self):
if not self._read_complete:
return
p = self.filename.parent
if not p.exists():
p.mkdir(mode=0o700, parents=True)
self.filename.write_text(jsonfactory.dumps(self.data, indent=2))
def on_data_changed(self, instance, value, **kwargs):
if self._setitem_lock.locked():
return
self.write()
def on_device_dict_index_changed(self, **kwargs):
key = kwargs['key']
device = kwargs['item']
old_index = kwargs['old_index']
new_index = kwargs['new_index']
if self._device_reindexing.locked():
device.device_index = new_index
else:
with self._device_reindexing.set(device):
with self._setitem_lock:
device.device_index = new_index
self.write()
def on_device_index(self, instance, value, **kwargs):
if self._device_reindexing.locked():
return
if self._setitem_lock.locked():
return
key = instance.id
old = kwargs['old']
if value is None:
assert isinstance(old, int)
if key in self.indexed_devices:
self.indexed_devices.remove(key)
else:
with self._device_reindexing.set(instance):
with self._setitem_lock:
key = instance.id
if old is None:
new_index = self.indexed_devices.add(key, instance, value)
if value != -1:
assert value == new_index
else:
assert new_index >= 0
instance.device_index = new_index
else:
self.indexed_devices.change_item_index(key, value)
# self.indexed_devices.compact_indices()
self.write()
def on_device_prop_change(self, instance, prop_name, value, **kwargs):
if prop_name == 'device_index':
return
self.write()
[docs]class DeviceConfig(Dispatcher):
"""Configuration data for a device
Properties:
name (str): The device name, taken from :meth:`zeroconf.ServiceInfo.get_name`
dns_name (str): The fully qualified name for the service host, taken from
:class:`ServiceInfo.server <zeroconf.ServiceInfo>`
fqdn (str): The fully qualified service name, taken from
:class:`ServiceInfo.name <zeroconf.ServiceInfo>`
hostaddr (str): The IPv4 address (in string form)
hostport (int): The service port
display_name (str): A user-defined name for the device, defaults to
:attr:`name`
auth_user (str): Username to use with authentication
auth_pass (str): Password to use with authentication
device_index (int): Index for the device for organization purposes.
If ``None`` (default), no index is assigned. Otherwise, the index
will be assigned according to :meth:`jvconnected.utils.IndexedDict.add`
online (bool): ``True`` if the device is currently active on the network
active (bool): ``True`` if a :class:`jvconnected.device.Device` is
currently communicating with the device
:Events:
.. event:: on_change(instance, prop_name, value)
Fired when any property value changes
"""
name = Property('')
dns_name = Property('')
fqdn = Property('')
hostaddr = Property('')
hostport = Property(80)
display_name = Property('')
auth_user = Property(None)
auth_pass = Property(None)
device_index = Property(None)
stored_in_config = Property(False)
online = Property(False)
active = Property(False)
_all_prop_names = (
'name', 'dns_name', 'fqdn', 'display_name',
'hostaddr', 'hostport', 'auth_user', 'auth_pass', 'device_index',
)
_immutable_prop_names = (
'model_name', 'serial_number',
)
_events_ = ['on_change']
def __init__(self, model_name: str, serial_number: str, **kwargs):
self.__model_name = model_name
self.__serial_number = serial_number
for attr in self._all_prop_names:
if attr not in kwargs:
continue
val = kwargs[attr]
setattr(self, attr, val)
if not len(self.display_name):
self.display_name = self.name
self.bind(**{attr:self.on_prop_change for attr in self._all_prop_names})
@property
def model_name(self) -> str:
"""The model name of the device, taken from
:class:`ServiceInfo.properties <zeroconf.ServiceInfo>`
"""
return self.__model_name
@property
def serial_number(self) -> str:
"""The serial number of the device, taken from the
service name ``hc500-XXXXXXXX`` where ``XXXXXXXX`` is the serial number
"""
return self.__serial_number
@property
def id(self) -> str:
"""A unique id for the device using the :attr:`model_name`
and :attr:`serial_number` attributes
"""
return f'{self.model_name}-{self.serial_number}'
[docs] @classmethod
def get_id_for_service_info(cls, info: 'zeroconf.ServiceInfo') -> str:
"""Get the :attr:`id` attribute for the given :class:`zeroconf.ServiceInfo`
"""
props = cls.get_props_from_service_info(info)
return f'{props["model_name"]}-{props["serial_number"]}'
[docs] @classmethod
def get_props_from_service_info(cls, info: 'zeroconf.ServiceInfo') -> tp.Dict:
"""Build a dictionary of instance attributes from a :class:`zeroconf.ServiceInfo`
"""
props = dict(
name=info.get_name(),
model_name=info.properties[b'model'].decode('UTF-8'),
serial_number=info.get_name().split('-')[1],
dns_name=info.server,
fqdn=info.name,
hostaddr=info.parsed_addresses()[0],
hostport=info.port,
)
return props
[docs] @classmethod
def from_service_info(cls, info: 'zeroconf.ServiceInfo') -> 'DeviceConfig':
"""Construct an instance from a :class:`zeroconf.ServiceInfo`
"""
kw = cls.get_props_from_service_info(info)
return cls(**kw)
[docs] def update_from_service_info(self, info: 'zeroconf.ServiceInfo'):
"""Update instance attributes from a :class:`zeroconf.ServiceInfo`
"""
props = self.get_props_from_service_info(info)
for key, val in props.items():
if key in self._immutable_prop_names:
assert getattr(self, key) == val
continue
elif key == 'display_name':
continue
setattr(self, key, val)
[docs] def update_from_other(self, other: 'DeviceConfig'):
"""Update from another instance of :class:`DeviceConfig`
"""
for attr in self._all_prop_names:
val = getattr(other, attr)
if attr == 'device_index' and val == -1:
continue
elif attr == 'display_name':
if val == other.name:
continue
elif val is None:
continue
setattr(self, attr, val)
def on_prop_change(self, instance, value, **kwargs):
prop = kwargs['property']
self.emit('on_change', instance, prop.name, value)
def _serialize(self):
d = {k:getattr(self, k) for k in self._all_prop_names}
d.update({k:getattr(self, k) for k in self._immutable_prop_names})
return d
def __repr__(self):
return f'<{self.__class__.__name__}: "{self}">'
def __str__(self):
return f'{self.model_name} - {self.serial_number}'
@jsonfactory.register
class JsonHandler(object):
def cls_to_str(self, cls):
if type(cls) is not type:
cls = cls.__class__
return '.'.join([cls.__module__, cls.__name__])
def str_to_cls(self, s):
for cls in [DeviceConfig]:
if self.cls_to_str(cls) == s:
return cls
def encode(self, o):
if isinstance(o, DeviceConfig):
d = o._serialize()
d['__class__'] = self.cls_to_str(o)
return d
def decode(self, d):
if '__class__' in d:
cls = self.str_to_cls(d['__class__'])
if cls is not None:
return cls(**d)
return d