#! /usr/bin/env python
from loguru import logger
import argparse
import asyncio
from typing import List, Sequence, ByteString, ClassVar, Tuple, Dict, Optional
import mido
from jvconnected.interfaces.midi import aioport
from jvconnected.interfaces.midi import bcf_sysex
from jvconnected.interfaces.midi.mapper import MidiMapper, Map
[docs]class BCLBlock(bcf_sysex.BCLBlock):
@logger.catch
async def send(self, inport: aioport.InputPort, outport: aioport.OutputPort):
async def get_response():
while True:
msg = await inport.receive(1)
if msg is None:
raise asyncio.TimeoutError
if msg.type != 'sysex':
inport.task_done()
continue
resp = bcf_sysex.BCLReply.from_sysex_message(msg)
inport.task_done()
return resp
items = self.build_sysex_items()
for item in items:
logger.debug(f'tx {item.message_index}: "{item.bcl_text}"')
await outport.send(item.build_sysex_message())
resp = await get_response()
# logger.info(f'rx {resp.message_index}: {resp}')
resp.raise_on_error()
assert resp.message_index == item.message_index
async def send_to_port_name(self, name: str):
ioport = aioport.IOPort(name)
await ioport.open()
try:
await self.send(ioport.inport, ioport.outport)
finally:
await ioport.close()
class Preset(bcf_sysex.Preset):
def build_bcl_block(self) -> BCLBlock:
lines = self.build_bcl_lines()
return BCLBlock(text_lines=lines)
def build_store_block(self, preset_num: int) -> BCLBlock:
lines = [f'$store {preset_num}']
return BCLBlock(text_lines=lines)
# async def send_block(self, blk: BCLBlock, port_name: str):
async def send(self, inport: aioport.InputPort, outport: aioport.OutputPort):
blk = self.build_bcl_block()
await blk.send(inport, outport)
async def send_to_port_name(self, name: str):
blk = self.build_bcl_block()
await blk.send_to_port_name(name)
def build_preset(mapper: Optional[MidiMapper] = None):
def build_control(pst, map_obj: Map, control_ix, cam_ix, **kwargs):
enc_ix = cam_ix * 8 + control_ix
btn_ix = cam_ix * 16 + control_ix
encoder_disp_mode = kwargs.get('encoder_disp_mode', '1dot')
if map_obj.map_type == 'controller':
pst.add_encoder(
index=enc_ix, channel=cam_ix, mode=encoder_disp_mode,
number=map_obj.controller,
)
elif map_obj.map_type == 'adjust_controller':
# tx = mido.Message('control_change', control=spec['controller'], value=0)
# tx_str = ''.join([f'${b:X}' for b in tx.bytes()[:2]])
# tx_str = f'{tx_str} ifp $7f ifn $00'
pst.add_encoder(
index=enc_ix, channel=cam_ix, mode=encoder_disp_mode,
number=map_obj.controller, encoder_mode='relative-2',
)
# pst.add_button(
# index=btn_ix, channel=cam_ix,
# number=spec['increment_note'], message_type='note',
# )
# btn_ix += 8
# pst.add_button(
# index=btn_ix, channel=cam_ix,
# number=spec['decrement_note'], message_type='note',
# )
else:
print(f'no control built: control_ix={control_ix}, map_obj={map_obj}')
if mapper is None:
mapper = MidiMapper()
pst = Preset(name='foo')
iris_map = mapper['exposure.iris_pos']
tally_pgm = mapper['tally.program']
tally_pvw = mapper['tally.preview']
# iris_map = DEFAULT_MAPPING['exposure']['iris_pos']
# tally_pgm = DEFAULT_MAPPING['tally']['program']
# tally_pvw = DEFAULT_MAPPING['tally']['preview']
for cam_ix in range(4):
# Iris mapped to faders 1-8
pst.add_fader(index=cam_ix+1, channel=cam_ix, number=iris_map.controller)
# Program tally on top button row, Preview on bottom
pst.add_button(
index=cam_ix+33, channel=cam_ix, value_max=100,
message_type='note', number=tally_pgm.note,
)
pst.add_button(
index=cam_ix+41, channel=cam_ix, value_max=100,
message_type='note', number=tally_pvw.note,
)
control_ix = 1
# for grpkey, grp in DEFAULT_MAPPING.items():
for map_obj in mapper.iter_indexed():
if map_obj.group_name == 'tally':
continue
# for spkey, spec in grp.items():
if True:
kw = {}
if map_obj.name in ['red_normalized', 'blue_normalized', 'master_black_pos', 'detail_pos']:
kw['encoder_disp_mode'] = 'pan'
else:
kw['encoder_disp_mode'] = 'bar'
build_control(pst, map_obj, control_ix, cam_ix, **kw)
control_ix += 1
return pst
@logger.catch
def send_preset(port_name):
pst = build_preset()
asyncio.run(pst.send_to_port_name(port_name))
@logger.catch
def store_preset(port_name, preset_num):
async def _do_it(pst):
await pst.send_to_port_name(port_name)
blk = pst.build_store_block(preset_num)
await blk.send_to_port_name(port_name)
pst = build_preset()
asyncio.run(_do_it(pst))
def main():
p = argparse.ArgumentParser()
p.add_argument('-p', '--port-name', dest='port_name')
p.add_argument('--store', dest='store', action='store_true')
p.add_argument('-n', '--num', dest='num', type=int, default=1, help='Preset number (if --store is used)')
args = p.parse_args()
if not args.port_name:
all_io_ports = mido.get_ioport_names()
bcf_ports = [name for name in all_io_ports if 'BCF2000' in name]
non_loop = [name for name in all_io_ports if 'through' not in name.lower()]
if len(bcf_ports) == 1:
args.port_name = bcf_ports[0]
elif len(non_loop) == 1:
args.port_name = non_loop[0]
else:
raise Exception(f'Could not find suitable port from "{all_io_ports}"')
logger.info(f'Sending to {args.port_name}...')
if args.store:
store_preset(args.port_name, args.num)
logger.info(f'Saved preset {args.num}')
else:
logger.success
send_preset(args.port_name)
logger.success(f'Preset sent to {args.port_name}')
if __name__ == '__main__':
main()