Source code for jvconnected.interfaces.midi.midi_io

from loguru import logger
import asyncio
from typing import List

import mido
import rtmidi
from pydispatch import Dispatcher, Property, DictProperty, ListProperty

from jvconnected.interfaces import Interface
from jvconnected.interfaces.midi.aioport import InputPort, OutputPort
from jvconnected.interfaces.midi.mapped_device import MappedDevice
from jvconnected.interfaces.midi.mapper import MidiMapper

[docs]class MidiIO(Interface): """Midi interface handler Properties: inport_names (List[str]): list of input port names to use (as ``str``) outport_names (List[str]): list of output port names to use (as ``str``) inports (Dict[str, InputPort]): Mapping of :class:`~.aioport.InputPort` instances stored with the names as keys outports (Dict[str, OutputPort]): Mapping of :class:`~.aioport.OutputPort` instances stored with the names as keys mapped_devices (Dict[str, MappedDevice]): Mapping of :class:`~.mapped_device.MappedDevice` instances stored with the device id as keys config: Instance of :class:`jvconnected.config.Config`. This is gathered from the :attr:`engine` after :meth:`set_engine` has been called. running (bool): Run state """ inport_names = ListProperty(copy_on_change=True) outport_names = ListProperty(copy_on_change=True) inports = DictProperty() outports = DictProperty() mapped_devices = DictProperty() mapper = Property() config = Property() def __init__(self): super().__init__() self._consume_tasks = {} self._reading_config = False self._port_lock = asyncio.Lock() self._refresh_event = asyncio.Event() self._refresh_task = None self.mapper = MidiMapper() self.bind_async( self.loop, inport_names=self.on_inport_names, outport_names=self.on_outport_names, ) self.bind(config=self.read_config)
[docs] @classmethod def get_available_inputs(cls) -> List[str]: """Get all detected input port names """ return mido.get_input_names()
[docs] @classmethod def get_available_outputs(cls) -> List[str]: """Get all detected output port names """ return mido.get_output_names()
[docs] async def set_engine(self, engine: 'jvconnected.engine.Engine'): if engine is self.engine: return self.config = engine.config await super().set_engine(engine) self.automap_engine_devices() engine.bind(devices=self.automap_engine_devices)
[docs] def automap_engine_devices(self, *args, **kwargs): """Map the engine's devices by index """ config = self.engine.config for conf_device in config.indexed_devices.values(): device_id = conf_device.id device_index = conf_device.device_index if device_index > 15: break device = self.engine.devices.get(device_id) mapped_device = self.mapped_devices.get(device_index) if device is None: if mapped_device is not None: self.unmap_device(device_index) elif mapped_device is None: self.map_device(device_index, device) elif device is not mapped_device.device: self.map_device(device_index, device)
[docs] @logger.catch async def open(self): """Open any configured input and output ports and begin communication """ if self.running: return logger.debug('MidiIO.open()') self.running = True await self.open_ports() self._refresh_task = asyncio.ensure_future(self.periodic_refresh()) logger.success('MidiIO running')
[docs] async def close(self): """Stop communication and close all input and output ports """ if not self.running: return logger.debug('MidiIO.close()') self.running = False self._refresh_event.set() await self._refresh_task self._refresh_task = None await self.close_ports() logger.success('MidiIO stopped')
[docs] async def open_ports(self): """Open any configured input and output ports. (Called by :meth:`open`) """ for name in self.inport_names: await self.add_input(name) for name in self.outport_names: await self.add_output(name)
[docs] async def close_ports(self): """Close all running input and output ports (Called by :meth:`close`) """ coros = set() for port in self.inports.values(): coros.add(port.close()) for port in self.outports.values(): coros.add(port.close()) await asyncio.gather(*coros)
[docs] async def add_input(self, name: str): """Add an input port The port name will be added to :attr:`inport_names` and stored in the :attr:`config`. If MidiIO is :attr:`running`, an instance of :class:`~.aioport.InputPort` will be created and added to :attr:`inports`. Arguments: name (str): The port name (as it appears in :meth:`get_available_inputs`) """ async with self._port_lock: logger.info(f'add_input: {name}') if self.running: if name not in self.inport_names: self.inport_names.append(name) if name in self.inports: raise ValueError(f'Input "{name}" already open') port = InputPort(name) logger.debug(f'port: {port}') self.inports[name] = port port.bind_async(self.loop, running=self.on_inport_running) if self.running: try: await port.open() except rtmidi.SystemError as exc: await port.close() del self.inports[name] logger.exception(exc) return
[docs] async def add_output(self, name: str): """Add an output port The port name will be added to :attr:`outport_names` and stored in the :attr:`config`. If MidiIO is :attr:`running`, an instance of :class:`~.aioport.OutputPort` will be created and added to :attr:`outports`. Arguments: name (str): The port name (as it appears in :meth:`get_available_outputs`) """ async with self._port_lock: if name not in self.outport_names: self.outport_names.append(name) if self.running: if name in self.outports: raise ValueError(f'Output "{name}" already open') port = OutputPort(name) logger.debug(f'port: {port}') self.outports[name] = port try: await port.open() except rtmidi.SystemError as exc: await port.close() del self.outports[name] logger.exception(exc) return
async def close_inport(self, name: str): port = self.inports[name] port.unbind(self) await port.close() task = self._consume_tasks.get(name) if task is not None: await task del self._consume_tasks[name]
[docs] async def remove_input(self, name: str): """Remove an input port from :attr:`inports` and :attr:`inport_names` If the port exists in :attr:`inports`, it will be closed and removed. Arguments: name (str): The port name """ async with self._port_lock: if name in self.inports: await self.close_inport(name) del self.inports[name] if name in self.inport_names: self.inport_names.remove(name)
[docs] async def remove_output(self, name: str): """Remove an output port from :attr:`outports` and :attr:`outport_names` If the port exists in :attr:`outports`, it will be closed and removed. Arguments: name (str): The port name """ async with self._port_lock: if name in self.outports: port = self.outports[name] await port.close() del self.outports[name] if name in self.outport_names: self.outport_names.remove(name)
@logger.catch async def periodic_refresh(self): while self.running: try: r = await asyncio.wait_for(self._refresh_event.wait(), 30) except asyncio.TimeoutError: r = False if r: break coros = set() for mapped_device in self.mapped_devices.values(): coros.add(mapped_device.send_all_parameters()) if len(coros): logger.debug('refreshing midi data') await asyncio.gather(*coros) @logger.catch async def consume_incoming_messages(self, port: InputPort): while self.running and port.running: msg = await port.receive(timeout=.5) if msg is None: continue if msg is False: port.task_done() break logger.debug(f'MIDI rx: {msg}') coros = [] for device in self.mapped_devices.values(): coros.append(device.handle_incoming_message(msg)) await asyncio.gather(*coros) port.task_done()
[docs] async def send_message(self, msg: mido.messages.messages.BaseMessage): """Send a message to all output ports in :attr:`outports` Arguments: msg: The :class:`Message <mido.Message>` to send """ coros = set() for port in self.outports.values(): if port.running: coros.add(port.send(msg)) if len(coros): logger.debug(f'MIDI tx: {msg}') await asyncio.gather(*coros)
[docs] def map_device(self, midi_channel: int, device: 'jvconnected.device.Device'): """Connect a :class:`jvconnected.device.Device` to a :class:`.mapped_device.MappedDevice` """ if not 0 <= midi_channel <= 15: raise ValueError('midi_channel must be between 0 and 15') if midi_channel in self.mapped_devices: self.unmap_device(midi_channel) m = MappedDevice(self, midi_channel, device, self.mapper) self.mapped_devices[midi_channel] = m logger.debug(f'mapped device: {m}')
[docs] def unmap_device(self, midi_channel: int): """Unmap a device """ if midi_channel not in self.mapped_devices: return logger.debug(f'unmap device: {self.mapped_devices[midi_channel]}') del self.mapped_devices[midi_channel]
async def on_engine_running(self, instance, value, **kwargs): if instance is not self.engine: return if value: if not self.running: await self.open() else: await self.close()
[docs] def update_config(self, *args, **kwargs): """Update the :attr:`config` with current state """ if self._reading_config: return config = self.config if config is None: return if 'interfaces' not in config: config['interfaces'] = {} if 'midi' not in config['interfaces']: config['interfaces']['midi'] = {'inport_names':[], 'outport_names':[]} d = config['interfaces']['midi'] d['inport_names'] = self.inport_names.copy() d['outport_names'] = self.outport_names.copy()
def read_config(self, *args, **kwargs): config = self.config if config is None: return d = config.get('interfaces', {}).get('midi', {}) self._reading_config = True for attr in ['inport_names', 'outport_names']: conf_val = d.get(attr, []) prop_val = getattr(self, attr) if conf_val == prop_val: continue setattr(self, attr, conf_val.copy()) self._reading_config = False async def on_inport_names(self, instance, value, **kwargs): self.update_config() # if not self.running: # return # if self._port_lock.locked(): # return # old = kwargs['old'] # new_values = set(old) - set(value) # removed_values = set(value) - set(old) # logger.info(f'on_inport_names: {value}, new_values: {new_values}, removed_values: {removed_values}') # for name in removed_values: # if name in self.inports: # await self.remove_input(name) # for name in new_values: # if name not in self.inports: # await self.add_input(name) async def on_outport_names(self, instance, value, **kwargs): self.update_config() # if not self.running: # return # old = kwargs['old'] # new_values = set(old) - set(value) # removed_values = set(value) - set(old) # for name in removed_values: # if name in self.outports: # await self.remove_output(name) # for name in new_values: # if name not in self.outports: # await self.add_output(name) # self.update_config() async def on_inport_running(self, port, value, **kwargs): logger.debug(f'{self}.on_inport_running({port}, {value})') if port is not self.inports.get(port.name): return if value: logger.debug(f'starting consume task for {port}') assert port.name not in self._consume_tasks task = asyncio.ensure_future(self.consume_incoming_messages(port)) self._consume_tasks[port.name] = task logger.debug(f'consume task running for {port}') else: logger.debug(f'stopping consume task for {port}') task = self._consume_tasks.get(port.name) if task is not None: await task del self._consume_tasks[port.name]