jvconnected.utils

class jvconnected.utils.IOType(value)[source]

Bases: Enum

Enum to distinguish between input and output types

NONE = 1

Not set

INPUT = 2

Input

OUTPUT = 3

Output

jvconnected.utils.async_callback(fn)[source]

Wrap a coroutine function or method (async def) where a sync function is expected.

The decorated function or method will be wrapped in an asyncio.Task and scheduled on the current event loop (within the context of the callback).

Any exceptions will be caught and forwarded to the event loop through asyncio.loop.call_exception_handler().

callback_event = asyncio.Event()

@async_callback
async def my_async_callback(*args, **kwargs):
    print(f'callback got: {args}, {kwargs}')
    callback_event.set()

# Calling `my_async_callback` as a normal function
my_async_callback('foo', bar='baz')

# Run the loop until callback_event is set from inside the callback
loop = asyncio.get_event_loop()
loop.run_until_complete(callback_event.wait())
callback got: ('foo',), {'bar': 'baz'}
class jvconnected.utils.IndexedDict(*args, **kwargs)[source]

Bases: Dispatcher

A dict like container that tracks indices for its items

Events
Event on_item_added(key=key, item=item, index=index_)

Fired when an item is added

Event on_item_removed(key=key, item=item, index=index_)

Fired when an item is removed

Event on_item_index_changed(key=key, item=item, old_index=cur_index, new_index=new_index)

Fired when an item’s index changes

add(key: Any, item: Any, index_: int = -1) int[source]

Add an item

Parameters
  • key (Any) – The dictionary key

  • item (Any) – The dictionary value

  • index – The index for the item. If -1, the item will be appended to the end, otherwise it will be inserted at the specified index

Returns

The inserted item’s index

Return type

int

remove(key: Any)[source]

Remove an item

Parameters

key (Any) – The dictionary key

Returns

The item that was removed

change_item_index(key: Any, new_index: int)[source]

Change the index for an existing item. If necessary, change indices for any conflicting items

Parameters
  • key (Any) – the dictionary key

  • new_index (int) – New index for the item

compact_indices(start_index: int = 0, max_change: int = 1)[source]

Remove gaps in indices

Parameters
  • start_index (int, optional) – The index to start from

  • max_change (int, optional) – Limit index changes to this amount

keys() Iterator[Any][source]

Return an iterator of the dictionary keys, sorted by the item indices

values() Iterator[Any][source]

Return an iterator of the dictionary values, sorted by the item indices

items() Iterator[Tuple[Any, Any]][source]

Return an iterator of the dictionary key, value pairs, sorted by the item indices

iter_indices(start_index: int = 0) Iterator[int][source]

Iterate through sorted indices starting from the one given

Parameters

start_index (int, optional) – The starting index, defaults to 0

iter_consecutive_indices(start_index: int = 0) Iterator[int][source]

Iterate through sorted indices starting from the one given, but stop at the first gap

Parameters

start_index (int, optional) – The starting index, defaults to 0

get(key: Any, default: Optional[Any] = None)[source]

Get an item by key

get_by_index(index_: int, default: Optional[Any] = None)[source]

Get an item by index

Parameters
  • index (int) – The item index to get

  • default (optional) – The default to return if no item exists with the given index, defaults to None

get_item_index(key: Any) int[source]

Get the index for the given key

class jvconnected.utils.NamedItem(key: Any, item: Any)[source]

Bases: object

Helper class for NamedQueue

key: Any

The item key

item: Any

The item itself

class jvconnected.utils.NamedQueue(maxsize=0, *, loop=None)[source]

Bases: Queue

A asyncio.Queue subclass that stores items by user-defined keys.

The items placed on the queue must be instances of NamedItem. For convenience, there is a create_item() contructor method.

classmethod create_item(key: Any, item: Any) NamedItem[source]

Create a NamedItem to be put on the queue

async put(item: NamedItem)[source]

Put a NamedItem into the queue.

If the queue is full, wait until a free slot is available before adding item.

If an item with the same key already exists in the queue, it will be replaced.

put_nowait(item: NamedItem)[source]

Put an item into the queue without blocking.

If no free slot is immediately available, raise QueueFull.

async get() NamedItem[source]

Remove and return an item from the queue.

If queue is empty, wait until an item is available.

get_nowait() NamedItem[source]

Remove and return an item from the queue.

Return an item if one is immediately available, else raise QueueEmpty.