Source code for jvconnected.discovery

from __future__ import annotations
import typing as tp
from loguru import logger
import asyncio

from zeroconf import ServiceInfo, Zeroconf
from zeroconf.asyncio import AsyncZeroconf
from pydispatch import Dispatcher, Property, DictProperty

from jvconnected.utils import async_callback

PROCAM_FQDN = '_jvc_procam_web._tcp.local.'

class ProcamListener(object):
    def __init__(self, discovery: 'Discovery'):
        self.discovery = discovery

    async def get_service_info(self, zc: Zeroconf, type_: str, name: str) -> ServiceInfo:
        info = ServiceInfo(type_, name)
        r = await info.async_request(zc, 3000)
        if r:
            return info

    @async_callback
    async def remove_service(self, zc: Zeroconf, type_: str, name: str):
        logger.debug(f'Service {name} removed')
        disco = self.discovery
        if name in disco.procam_infos:
            info = disco.procam_infos[name]
            del disco.procam_infos[name]
        else:
            info = None
        disco.emit('on_service_removed', name, info=info)

    @async_callback
    async def add_service(self, zc: Zeroconf, type_: str, name: str):
        info = await self.get_service_info(zc, type_, name)
        if info is None:
            logger.warning(f'Could not resolve service "{type_}, {name}"')
            return
        logger.debug(f'Adding {info}')
        disco = self.discovery
        disco.procam_infos[info.name] = info
        disco.emit('on_service_added', info.name, info=info)

    @async_callback
    async def update_service(self, zc: Zeroconf, type_: str, name: str):
        info = await self.get_service_info(zc, type_, name)
        if info is None:
            logger.warning(f'Could not resolve service "{type_}, {name}"')
            return
        logger.debug(f'Update {info}')
        disco = self.discovery
        stored_info = disco.procam_infos[info.name]
        disco.procam_infos[info.name] = info
        disco.emit('on_service_updated', info.name, info=info, old=stored_info)

[docs]class Discovery(Dispatcher): """Listen for cameras using zeroconf """ procam_infos: tp.Dict[str, ServiceInfo] = DictProperty() """Container for discovered devices as instances of :class:`zeroconf.ServiceInfo`. The service names (fqdn) are used as keys """
[docs] def on_service_added(self, name: str, info: ServiceInfo): """Fired when a new device is discovered """
[docs] def on_service_updated(self, name: str, info: ServiceInfo, old: ServiceInfo): """Fired when an service is updated. The pre-existing info is passed for comparison """
[docs] def on_service_removed(self, name: str, info: ServiceInfo): """Fired when an existing service is no longer available """
_events_ = ['on_service_added', 'on_service_updated', 'on_service_removed'] def __init__(self): self.running = False
[docs] async def open(self): """Open the zeroconf browser and begin listening """ if self.running: return loop = self.loop = asyncio.get_event_loop() azc = self.async_zeroconf = AsyncZeroconf() self.zeroconf = azc.zeroconf fqdn = PROCAM_FQDN listener = self.listener = ProcamListener(discovery=self) await azc.async_add_service_listener(fqdn, listener) self.running = True
[docs] async def close(self): """Stop listening and close all connections """ if not self.running: return await self.async_zeroconf.async_close() self.running = False
def main(): loop = asyncio.get_event_loop() disco = Discovery() try: loop.run_until_complete(disco.open()) try: loop.run_forever() except KeyboardInterrupt: loop.run_until_complete(disco.close()) finally: loop.close() return disco if __name__ == '__main__': main()