from loguru import logger
import asyncio
import queue
from typing import Optional, Any
from numbers import Number
import mido
from pydispatch import Dispatcher, Property, DictProperty
[docs]class BasePort(Dispatcher):
"""Async wrapper for :any:`mido.ports`
Arguments:
name (str): The port name
Properties:
name (str): The port name
running (bool): Current run state
Attributes:
stopped (asyncio.Event):
"""
MAX_QUEUE = 100
name = Property()
running = Property(False)
def __init__(self, name: str):
self.name = name
self.loop = asyncio.get_event_loop()
# self.queue = asyncio.Queue(self.MAX_QUEUE)
# self.running = asyncio.Event()
self.stopped = asyncio.Event()
self.port = None
[docs] async def open(self) -> bool:
"""Open the midi port
Returns:
bool: ``True`` if the port was successfully opened
"""
if self.running:
return False
self.running = True
self.port = await self._build_port()
# if port is not None:
# self.name = self.port.name
logger.debug(f'{self}.port: {self.port}')
logger.success(f'{self} running')
return True
[docs] async def close(self):
"""Close the midi port
"""
if not self.running:
return False
self.running = False
await self._close_port()
self.stopped.set()
logger.success(f'{self} closed')
return True
async def __aenter__(self):
await self.open()
return self
async def __aexit__(self, *args):
await self.close()
async def _build_port(self):
raise NotImplementedError
async def _close_port(self):
raise NotImplementedError
def __repr__(self):
return f'<{self.__class__}: "{self}">'
def __str__(self):
return self.name
[docs]class OutputPort(BasePort):
"""Async wrapper around :class:`mido.ports.BaseOutput`
Attributes:
queue (queue.Queue): Message queue for the port. Since the output port
operates in a separate thread, this is a thread-based queue (not
async)
"""
def __init__(self, name: Optional[str] = None):
super().__init__(name)
self._send_loop_task = None
self.queue = queue.Queue()
[docs] async def open(self) -> bool:
did_open = await super().open()
if did_open:
# self._send_loop_task = asyncio.ensure_future(self._send_loop())
self._send_loop_task = self.loop.run_in_executor(None, self._blocking_send_loop)
return did_open
[docs] async def send(self, msg: mido.Message):
"""Send a message
The message will be placed on the :attr:`queue` and sent from a separate
thread
Arguments:
msg: The :class:`mido.Message` to send
"""
# await self.queue.put(msg)
self.queue.put_nowait(msg)
async def _build_port(self) -> mido.ports.BaseOutput:
port = None
port = mido.open_output(self.name)
return port
async def _close_port(self):
# try:
# self.queue.put_nowait(False)
# except asyncio.QueueFull:
# pass
self.queue.put_nowait(None)
t = self._send_loop_task
if t is not None:
await t
self._send_loop_task = None
port = self.port
if port is not None:
port.close()
self.port = None
def _blocking_send_loop(self):
self.port.reset()
while self.running:
try:
msg = self.queue.get(timeout=.5)
except queue.Empty:
continue
if msg is None:
break
self.port.send(msg)
self.queue.task_done()
# async def _send_loop(self):
# self.port.reset()
# while self.running:
# msg = await self.queue_get(timeout=.5)
# if msg is None:
# continue
# if msg is False:
# self.task_done()
# break
# self.port.send(msg)
# self.task_done()
[docs]class IOPort(BasePort):
inport = Property()
outport = Property()
async def _build_port(self):
self.inport = InputPort(self.name)
self.outport = OutputPort(self.name)
await self.inport.open()
await self.outport.open()
return None
async def _close_port(self):
if self.inport is not None:
await self.inport.close()
self.inport = None
if self.outport is not None:
await self.outport.close()
self.outport = None