from loguru import logger
logger.disable('tslumd.tallyobj')
import asyncio
from typing import Dict, Tuple, Set, Optional
from pydispatch import Dispatcher, Property, DictProperty, ListProperty
from tslumd import Message, Display, Tally, TallyColor, UmdReceiver
from jvconnected.interfaces import Interface
from jvconnected.interfaces.tslumd.mapper import DeviceMapping, MappedDevice
[docs]class UmdIo(Interface):
"""Main UMD interface
"""
hostaddr: str = Property('0.0.0.0')
"""Alias for :attr:`tslumd.receiver.UmdReceiver.hostaddr`"""
hostport: int = Property(65000)
"""Alias for :attr:`tslumd.receiver.UmdReceiver.hostport`"""
device_maps: Dict[int, DeviceMapping] = DictProperty()
"""A ``dict`` of :class:`~.mapper.DeviceMapping` definitions stored with
their :attr:`~.mapper.DeviceMapping.device_index` as keys
"""
mapped_devices: Dict[int, MappedDevice] = DictProperty()
"""A ``dict`` of :class:`~.mapper.MappedDevice` stored with the
``device_index`` of their :attr:`~.mapper.MappedDevice.map` as keys
"""
[docs] def on_tally_added(self, tally: Tally):
"""Fired when a :class:`tslumd.tallyobj.Tally` instance is
added to :attr:`tallies`
"""
[docs] def on_tally_updated(self, tally: Tally):
"""Fired when any :class:`tslumd.tallyobj.Tally` instance has
been updated
"""
_events_ = ['on_tally_added', 'on_tally_updated']
interface_name = 'tslumd'
def __init__(self):
self._reading_config = False
self._config_read = asyncio.Event()
self._connect_lock = asyncio.Lock()
super().__init__()
self.receiver = UmdReceiver()
self.hostaddr = self.receiver.hostaddr
self.hostport = self.receiver.hostport
self.receiver.bind_async(self.loop,
on_tally_added=self._on_receiver_tally_added,
on_tally_updated=self._on_receiver_tally_updated,
)
self.bind_async(self.loop,
config=self.read_config,
)
self.bind(**{prop:self.update_config for prop in ['hostaddr', 'hostport']})
@property
def tallies(self) -> Dict[int, Tally]:
"""Alias for :attr:`tslumd.receiver.UmdReceiver.tallies`
"""
return self.receiver.tallies
[docs] async def set_engine(self, engine: 'jvconnected.engine.Engine'):
if engine is self.engine:
return
if engine.config is not self.config:
self._config_read.clear()
await super().set_engine(engine)
engine.bind_async(
self.loop,
on_device_added=self.on_engine_device_added,
on_device_removed=self.on_engine_device_removed,
)
[docs] async def open(self):
async with self._connect_lock:
if self.running:
return
logger.debug('UmdIo.open()')
if self.config is not None:
await self._config_read.wait()
self.running = True
await self.receiver.open()
logger.success('UmdIo running')
[docs] async def close(self):
async with self._connect_lock:
if not self.running:
return
logger.debug('UmdIo.close()')
self.running = False
await self.receiver.close()
logger.success('UmdIo closed')
[docs] async def set_bind_address(self, hostaddr: str, hostport: int):
"""Set the :attr:`hostaddr` and :attr:`hostport` and restart the server
"""
await self.receiver.set_bind_address(hostaddr, hostport)
self.hostaddr = self.receiver.hostaddr
self.hostport = self.receiver.hostport
[docs] async def set_hostaddr(self, hostaddr: str):
"""Set the :attr:`hostaddr` and restart the server
"""
await self.set_bind_address(hostaddr, self.hostport)
[docs] async def set_hostport(self, hostport: int):
"""Set the :attr:`hostport` and restart the server
"""
await self.set_bind_address(self.hostaddr, hostport)
async def _on_receiver_tally_added(self, tally, **kwargs):
for mapped_device in self.mapped_devices.values():
if mapped_device.have_tallies:
continue
r = mapped_device.get_tallies()
if r:
await mapped_device.update_device_tally()
self.emit('on_tally_added', tally, **kwargs)
async def _on_receiver_tally_updated(self, tally: Tally, props_changed: Set[str], **kwargs):
self.emit('on_tally_updated', tally, props_changed, **kwargs)
def get_device_by_index(self, ix: int) -> Optional['jvconnected.device.Device']:
device = None
if self.engine is not None:
device_conf = self.engine.config.indexed_devices.get_by_index(ix)
if device_conf is not None:
device = self.engine.devices.get(device_conf.id)
return device
[docs] @logger.catch
async def add_device_mapping(self, device_map: 'DeviceMapping'):
"""Add a :class:`~.mapper.DeviceMapping` definition to :attr:`device_maps`
and update the :attr:`config`.
An instance of :class:`~.mapper.MappedDevice` is also created and
associated with its :class:`~jvconnected.device.Device`
if found in the :attr:`engine`.
"""
ix = device_map.device_index
self.device_maps[ix] = device_map
mapped_device = self.mapped_devices.get(ix)
if mapped_device is not None:
await mapped_device.set_device(None)
del self.mapped_devices[ix]
device = self.get_device_by_index(ix)
mapped_device = MappedDevice(map=device_map, umd_io=self)
self.mapped_devices[ix] = mapped_device
await mapped_device.set_device(device)
self.update_config()
[docs] async def remove_device_mapping(self, device_index: int):
"""Remove a :class:`~.mapper.DeviceMapping` and its associated
:class:`~.mapper.MappedDevice` by the given device index
"""
if device_index not in self.device_maps:
return
del self.device_maps[device_index]
mapped_device = self.mapped_devices.get(device_index)
if mapped_device is not None:
await mapped_device.set_device(None)
del self.mapped_devices[device_index]
self.update_config()
async def on_engine_device_added(self, device, **kwargs):
mapped_device = self.mapped_devices.get(device.device_index)
if mapped_device is not None:
await mapped_device.set_device(device)
async def on_engine_device_removed(self, device, reason, **kwargs):
mapped_device = self.mapped_devices.get(device.device_index)
if mapped_device is not None:
await mapped_device.set_device(None)
[docs] def update_config(self, *args, **kwargs):
"""Update the :attr:`config` with current state
"""
if self._reading_config:
return
if self.config is None:
return
if not self._config_read.is_set():
return
d = self.get_config_section()
if d is None:
return
d['hostaddr'] = self.hostaddr
d['hostport'] = self.hostport
m = self.device_maps
d['device_maps'] = [m[k] for k in sorted(m.keys())]
@logger.catch
async def read_config(self, *args, **kwargs):
d = self.get_config_section()
if d is None:
return
self._reading_config = True
hostaddr = d.get('hostaddr', self.hostaddr)
hostport = d.get('hostport', self.hostport)
coros = []
for dev_map in d.get('device_maps', []):
coros.append(self.add_device_mapping(dev_map))
await asyncio.gather(*coros)
await self.set_bind_address(hostaddr, hostport)
self._reading_config = False
self._config_read.set()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
umd = UmdIo()
loop.run_until_complete(umd.open())
try:
loop.run_forever()
except KeyboardInterrupt:
loop.run_until_complete(umd.close())
finally:
loop.close()