from __future__ import annotations
import typing as tp
from loguru import logger
import asyncio
from typing import List, Dict, Sequence, Optional
import mido
import rtmidi
from pydispatch import Dispatcher, Property, DictProperty, ListProperty
from jvconnected.utils import IOType
from jvconnected.interfaces import Interface
from jvconnected.interfaces.midi.aioport import InputPort, OutputPort
from jvconnected.interfaces.midi.mapped_device import MappedDevice
from jvconnected.interfaces.midi.mapper import MidiMapper
[docs]class ValidationError(Exception):
pass
[docs]class MidiIO(Interface):
"""Midi interface handler
"""
inport_names: tp.List[str] = ListProperty(copy_on_change=True)
"""list of input port names to use (as ``str``)"""
outport_names: tp.List[str] = ListProperty(copy_on_change=True)
"""list of output port names to use (as ``str``)"""
inports: tp.Dict[str, InputPort] = DictProperty()
"""Mapping of :class:`~.aioport.InputPort` instances stored with their
:attr:`~.aioport.Input.name` as keys
"""
outports: tp.Dict[str, OutputPort] = DictProperty()
"""Mapping of :class:`~.aioport.OutputPort` instances stored with their
:attr:`~.aioport.OutputPort.name` as keys
"""
mapped_devices: tp.Dict[str, 'jvconnected.interfaces.midi.mapped_device.MappedDevice'] = DictProperty()
"""Mapping of :class:`~.mapped_device.MappedDevice` instances stored with
the device id as keys
"""
device_channel_map: Dict[str, int] = DictProperty()
"""Mapping of :class:`~.mapped_device.MappedDevice` instances
stored with the device id as keys
"""
channel_device_map: Dict[int, str] = DictProperty()
"""Mapping of Midi channel assignments using the Midi channel
as keys and :attr:`device_id <jvconnected.config.DeviceConfig.id>`
as values
"""
mapper = Property()
[docs] def port_state(self, io_type: IOType, name: str, state: bool):
"""Fired when a port is added or removed using one of :meth:`add_input`,
:meth:`add_output`, :meth:`remove_input`, :meth:`remove_output`.
"""
interface_name = 'midi'
_events_ = ['port_state']
def __init__(self):
super().__init__()
self._consume_tasks = {}
self._reading_config = False
self._device_map_lock = asyncio.Lock()
self._port_lock = asyncio.Lock()
self._refresh_event = asyncio.Event()
self._refresh_task = None
self.mapper = MidiMapper()
self.bind_async(
self.loop,
inport_names=self.on_inport_names,
outport_names=self.on_outport_names,
)
self.bind(config=self.read_config)
[docs] @classmethod
def get_available_outputs(cls) -> List[str]:
"""Get all detected output port names
"""
return mido.get_output_names()
[docs] async def set_engine(self, engine: 'jvconnected.engine.Engine'):
if engine is self.engine:
return
await super().set_engine(engine)
await self.automap_engine_devices()
engine.bind_async(self.loop, devices=self.automap_engine_devices)
[docs] async def automap_engine_devices(self, *args, **kwargs):
"""Map the engine's devices by index
"""
config = self.engine.config
coros = set()
config_update = False
async with self._device_map_lock:
for conf_device in config.indexed_devices.values():
device_id = conf_device.id
device_index = conf_device.device_index
if device_index > 15:
break
device = self.engine.devices.get(device_id)
mapped_device = self.mapped_devices.get(device_id)
if device is None:
if mapped_device is not None:
self._unmap_device(device_id)
config_update = True
elif mapped_device is None:
mapped_device = await self.map_device(device, send_all_parameters=False)
coros.add(mapped_device.send_all_parameters())
config_update = True
else:
assert device is mapped_device.device
if config_update:
self.update_config()
if len(coros):
await asyncio.gather(*coros)
[docs] @logger.catch
async def open(self):
"""Open any configured input and output ports and begin communication
"""
if self.running:
return
logger.debug('MidiIO.open()')
self.running = True
await self.open_ports()
self._refresh_task = asyncio.ensure_future(self.periodic_refresh())
logger.success('MidiIO running')
[docs] async def close(self):
"""Stop communication and close all input and output ports
"""
if not self.running:
return
logger.debug('MidiIO.close()')
self.running = False
self._refresh_event.set()
await self._refresh_task
self._refresh_task = None
await self.close_ports()
logger.success('MidiIO stopped')
[docs] async def open_ports(self):
"""Open any configured input and output ports.
(Called by :meth:`open`)
"""
for name in self.inport_names:
await self.add_input(name)
for name in self.outport_names:
await self.add_output(name)
[docs] async def close_ports(self):
"""Close all running input and output ports
(Called by :meth:`close`)
"""
coros = set()
for port in self.inports.values():
coros.add(port.close())
for port in self.outports.values():
coros.add(port.close())
await asyncio.gather(*coros)
[docs] async def add_output(self, name: str):
"""Add an output port
The port name will be added to :attr:`outport_names` and stored in the
:attr:`config`.
If MidiIO is :attr:`running`, an instance of :class:`~.aioport.OutputPort`
will be created and added to :attr:`outports`.
Arguments:
name (str): The port name (as it appears in :meth:`get_available_outputs`)
"""
async with self._port_lock:
if name not in self.outport_names:
self.outport_names.append(name)
if self.running:
if name in self.outports:
raise ValueError(f'Output "{name}" already open')
port = OutputPort(name)
logger.debug(f'port: {port}')
self.outports[name] = port
try:
await port.open()
except rtmidi.SystemError as exc:
await port.close()
del self.outports[name]
logger.exception(exc)
return
self.emit('port_state', IOType.OUTPUT, name, True)
async def close_inport(self, name: str):
port = self.inports[name]
port.unbind(self)
await port.close()
task = self._consume_tasks.get(name)
if task is not None:
await task
del self._consume_tasks[name]
[docs] async def remove_output(self, name: str):
"""Remove an output port from :attr:`outports` and :attr:`outport_names`
If the port exists in :attr:`outports`, it will be closed and removed.
Arguments:
name (str): The port name
"""
async with self._port_lock:
if name in self.outports:
port = self.outports[name]
await port.close()
del self.outports[name]
if name in self.outport_names:
self.outport_names.remove(name)
self.emit('port_state', IOType.OUTPUT, name, False)
@logger.catch
async def periodic_refresh(self):
while self.running:
try:
r = await asyncio.wait_for(self._refresh_event.wait(), 30)
except asyncio.TimeoutError:
r = False
if r:
break
coros = set()
for mapped_device in self.mapped_devices.values():
coros.add(mapped_device.send_all_parameters())
if len(coros):
logger.debug('refreshing midi data')
await asyncio.gather(*coros)
@logger.catch
async def consume_incoming_messages(self, port: InputPort):
while self.running and port.running:
msgs = await port.receive_many(timeout=.5)
if msgs is None:
continue
logger.opt(lazy=True).debug(
'{x}', x=lambda: '\n'.join([f'MIDI rx: {msg}' for msg in msgs])
)
coros = set()
for device in self.mapped_devices.values():
coros.add(device.handle_incoming_messages(msgs))
if len(coros):
await asyncio.gather(*coros)
[docs] async def send_message(self, msg: mido.messages.messages.BaseMessage):
"""Send a message to all output ports in :attr:`outports`
Arguments:
msg: The :class:`Message <mido.Message>` to send
"""
coros = set()
for port in self.outports.values():
if port.running:
coros.add(port.send(msg))
if len(coros):
await asyncio.gather(*coros)
logger.opt(lazy=True).debug(f'MIDI tx: {msg}')
[docs] async def send_messages(self, msgs: Sequence[mido.Message]):
"""Send a message to all output ports in :attr:`outports`
Arguments:
msgs: A sequence of :class:`Messages <mido.Message>` to send
"""
coros = set()
for port in self.outports.values():
if port.running:
coros.add(port.send_many(*msgs))
if len(coros):
await asyncio.gather(*coros)
logger.opt(lazy=True).debug(
'{x}', x=lambda: '\n'.join([f'MIDI tx: {msg}' for msg in msgs])
)
[docs] @logger.catch
async def map_device(
self,
device: 'jvconnected.device.Device',
send_all_parameters: bool = True,
midi_channel: Optional[int] = None
) -> MappedDevice:
"""Connect a :class:`jvconnected.device.Device` to a :class:`.mapped_device.MappedDevice`
The Midi channel used for the device is retreived from the :attr:`config`
if available. If no channel assignment was found, the next available
channel is used and saved in the :attr:`config`.
Arguments:
device: The :class:`~jvconnected.device.Device` to map
send_all_parameters (bool, optional): If True, send all current
parameter values once the device is mapped. Default is True
midi_channel (int, optional): The Midi channel to use for
the device (from 0 to 15). If not provided, the channel is
assigned automatically using :meth:`get_midi_channel_for_device`
Raises:
ValidationError: If *midi_channel* was provided and already in use
ValueError: If there are no Midi channels available
"""
device_id = device.id
assert device_id not in self.mapped_devices
if midi_channel is None:
midi_channel = self.get_midi_channel_for_device(device_id)
self.validate_device_channel(device_id, midi_channel)
m = MappedDevice(self, midi_channel, device, self.mapper)
self.mapped_devices[device_id] = m
await self._assign_device_channel(device_id, midi_channel)
if send_all_parameters:
await m.send_all_parameters()
logger.debug(f'mapped device: {m} to midi channel {midi_channel}')
return m
[docs] @logger.catch
async def unmap_device(self, device_id: str, unassign_channel: bool = False):
"""Unmap a device
Arguments:
device_id (str): The :attr:`id <jvconnected.config.DeviceConfig.id>`
of the device to unmap
unassign_channel (bool, optional): If True, removes the Midi channel
assignment for the device and updates the saved config. If False
(the default), only removes the :class:`~.mapped_device.MappedDevice`
from :attr:`mapped_devices`.
"""
logger.debug(f'unmap_device: {device_id}')
async with self._device_map_lock:
if device_id not in self.device_channel_map:
return
self._unmap_device(device_id, unassign_channel)
self.update_config()
def _unmap_device(self, device_id: str, unassign_channel: bool = False):
if device_id in self.mapped_devices:
# logger.debug(f'unmap device: {self.mapped_devices[device_id]}')
del self.mapped_devices[device_id]
if unassign_channel:
midi_channel = self.device_channel_map[device_id]
del self.channel_device_map[midi_channel]
del self.device_channel_map[device_id]
[docs] @logger.catch
async def remap_device_channel(self, device_id: str, midi_channel: int):
"""Reassign the Midi channel for a device
If the device is online, the existing :class:`~.mapped_device.MappedDevice`
attached to it is reassigned as well.
Arguments:
device_id (str): The :attr:`id <jvconnected.config.DeviceConfig.id>`
of the device
midi_channel (int): The new Midi channel for the device
Raises:
ValidationError: If the given *midi_channel* is already in use
"""
mapped_device = self.mapped_devices.get(device_id)
logger.debug(f'remap_device: {device_id}, {midi_channel}, {mapped_device}')
if mapped_device is not None:
if mapped_device.midi_channel == midi_channel:
return
await self.unmap_device(device_id, unassign_channel=True)
self.validate_device_channel(device_id, midi_channel)
device = self.engine.devices.get(device_id)
if device is not None:
await self.map_device(device, midi_channel=midi_channel)
else:
await self._assign_device_channel(device_id, midi_channel)
[docs] def get_midi_channel_for_device(self, device_id: str) -> int:
"""Get the assigned Midi channel for a device or next one available
If the :attr:`device_id <jvconnected.config.DeviceConfig.id>` exists
in the :attr:`config`, it is used. If no assignment exists, the next
available channel is returned.
Raises:
ValueError: If there are no available channels
"""
chan = self.device_channel_map.get(device_id)
if chan is not None:
return chan
all_channels = set(range(16))
in_use = set(self.channel_device_map.keys())
available = all_channels - in_use
if not len(available):
raise ValueError('No Midi channel available')
return min(available)
def validate_device_channel(self, device_id: str, midi_channel: int) -> None:
if midi_channel < 0 or midi_channel > 15:
raise ValidationError('Midi channel out of range')
if midi_channel in self.channel_device_map:
if self.channel_device_map[midi_channel] != device_id:
raise ValidationError(f'Channel {midi_channel} already assigned')
async def _assign_device_channel(self, device_id: str, midi_channel: int):
assert len(device_id)
# self.validate_device_channel(device_id, midi_channel)
if midi_channel in self.channel_device_map:
assert self.channel_device_map[midi_channel] == device_id
else:
self.channel_device_map[midi_channel] = device_id
if device_id in self.device_channel_map:
assert self.device_channel_map[device_id] == midi_channel
else:
self.device_channel_map[device_id] = midi_channel
if not self._device_map_lock.locked():
async with self._device_map_lock:
self.update_config()
async def on_engine_running(self, instance, value, **kwargs):
if instance is not self.engine:
return
if value:
if not self.running:
await self.open()
else:
await self.close()
[docs] def update_config(self, *args, **kwargs):
"""Update the :attr:`config` with current state
"""
if self._reading_config:
return
d = self.get_config_section()
if d is None:
return
d['inport_names'] = self.inport_names.copy()
d['outport_names'] = self.outport_names.copy()
d['device_channel_map'] = self.device_channel_map.copy()
def read_config(self, *args, **kwargs):
d = self.get_config_section()
if d is None:
return
self._reading_config = True
for attr in ['inport_names', 'outport_names']:
conf_val = d.get(attr, [])
prop_val = getattr(self, attr)
if conf_val == prop_val:
continue
setattr(self, attr, conf_val.copy())
conf_val = d.get('device_channel_map', {})
if conf_val != self.device_channel_map:
self.device_channel_map = conf_val.copy()
self.channel_device_map = {v:k for k,v in conf_val.items()}
self._reading_config = False
async def on_inport_names(self, instance, value, **kwargs):
self.update_config()
# if not self.running:
# return
# if self._port_lock.locked():
# return
# old = kwargs['old']
# new_values = set(old) - set(value)
# removed_values = set(value) - set(old)
# logger.info(f'on_inport_names: {value}, new_values: {new_values}, removed_values: {removed_values}')
# for name in removed_values:
# if name in self.inports:
# await self.remove_input(name)
# for name in new_values:
# if name not in self.inports:
# await self.add_input(name)
async def on_outport_names(self, instance, value, **kwargs):
self.update_config()
# if not self.running:
# return
# old = kwargs['old']
# new_values = set(old) - set(value)
# removed_values = set(value) - set(old)
# for name in removed_values:
# if name in self.outports:
# await self.remove_output(name)
# for name in new_values:
# if name not in self.outports:
# await self.add_output(name)
# self.update_config()
async def on_inport_running(self, port, value, **kwargs):
logger.debug(f'{self}.on_inport_running({port}, {value})')
if port is not self.inports.get(port.name):
return
if value:
logger.debug(f'starting consume task for {port}')
assert port.name not in self._consume_tasks
task = asyncio.ensure_future(self.consume_incoming_messages(port))
self._consume_tasks[port.name] = task
logger.debug(f'consume task running for {port}')
else:
logger.debug(f'stopping consume task for {port}')
task = self._consume_tasks.get(port.name)
if task is not None:
await task
del self._consume_tasks[port.name]