from loguru import logger
import functools
import enum
import asyncio
import collections
from dataclasses import dataclass
from typing import Any, Iterator, Union, Tuple
from pydispatch import Dispatcher
[docs]class IOType(enum.Enum):
"""Enum to distinguish between input and output types
"""
NONE = enum.auto() #: Not set
INPUT = enum.auto() #: Input
OUTPUT = enum.auto() #: Output
[docs]def async_callback(fn):
"""Wrap a coroutine function or method (:keyword:`async def`) where a sync
function is expected.
The decorated function or method will be wrapped in an :class:`asyncio.Task`
and scheduled on the current :ref:`event loop <asyncio-event-loop>`
(within the context of the callback).
Any exceptions will be caught and forwarded to the event loop through
:meth:`asyncio.loop.call_exception_handler`.
.. testsetup:: async_callback
import asyncio
from jvconnected.utils import async_callback
.. testcode:: async_callback
callback_event = asyncio.Event()
@async_callback
async def my_async_callback(*args, **kwargs):
print(f'callback got: {args}, {kwargs}')
callback_event.set()
# Calling `my_async_callback` as a normal function
my_async_callback('foo', bar='baz')
# Run the loop until callback_event is set from inside the callback
loop = asyncio.get_event_loop()
loop.run_until_complete(callback_event.wait())
.. testoutput:: async_callback
callback got: ('foo',), {'bar': 'baz'}
"""
def _error_handler(task):
try:
task.result()
except Exception as exc:
logger.exception(exc)
loop = asyncio.get_running_loop()
loop.call_exception_handler({'message':repr(exc), 'exception':exc})
@functools.wraps(fn)
def wrapper(*args, **kwargs):
task = asyncio.ensure_future(fn(*args, **kwargs))
task.add_done_callback(_error_handler)
return wrapper
[docs]class IndexedDict(Dispatcher):
"""A ``dict`` like container that tracks indices for its items
:Events:
.. event:: on_item_added(key=key, item=item, index=index_)
Fired when an item is added
.. event:: on_item_removed(key=key, item=item, index=index_)
Fired when an item is removed
.. event:: on_item_index_changed(key=key, item=item, old_index=cur_index, new_index=new_index)
Fired when an item's index changes
"""
_events_ = ['on_item_added', 'on_item_removed', 'on_item_index_changed']
def __init__(self):
self._data = {}
self._index_key_map = {}
self._key_index_map = {}
@property
def next_index(self):
if not len(self._index_key_map):
return 0
return max(self._index_key_map.keys()) + 1
[docs] def add(self, key: Any, item: Any, index_: int = -1) -> int:
"""Add an item
Arguments:
key: The dictionary key
item: The dictionary value
index_: The index for the item. If ``-1``, the item will be appended
to the end, otherwise it will be inserted at the specified index
Returns:
int:
The inserted item's index
"""
assert key not in self._data
if index_ == -1:
index_ = self.next_index
if index_ in self._index_key_map:
self._pre_insert(index_)
assert index_ not in self._index_key_map
self._data[key] = item
self._index_key_map[index_] = key
self._key_index_map[key] = index_
self.emit('on_item_added', key=key, item=item, index=index_)
return index_
[docs] def remove(self, key: Any):
"""Remove an item
Arguments:
key: The dictionary key
Returns:
The item that was removed
"""
item = self._data[key]
index_ = self._key_index_map[key]
del self._key_index_map[key]
del self._index_key_map[index_]
del self._data[key]
self.emit('on_item_removed', key=key, item=item, index=index_)
return item
[docs] def change_item_index(self, key: Any, new_index: int):
"""Change the index for an existing item.
If necessary, change indices for any conflicting items
Arguments:
key: the dictionary key
new_index (int): New index for the item
"""
item = self._data[key]
cur_index = self._key_index_map[key]
del self._index_key_map[cur_index]
if new_index in self._index_key_map:
self._pre_insert(new_index)
assert new_index not in self._index_key_map
self._index_key_map[new_index] = key
self._key_index_map[key] = new_index
self.emit(
'on_item_index_changed',
key=key, item=item, old_index=cur_index, new_index=new_index,
)
[docs] def compact_indices(self, start_index: int = 0, max_change: int = 1):
"""Remove gaps in indices
Arguments:
start_index (int, optional): The index to start from
max_change (int, optional): Limit index changes to this amount
"""
cur_indices = [i for i in self.iter_indices(start_index)]
cur_keys = [self._index_key_map[i] for i in cur_indices]
expected_indices = []
last_i = None
for i in cur_indices:
new_index = None
if last_i is None:
last_i = i
elif last_i + 1 != i:
next_i = last_i + 1
diff = i - next_i
if diff > max_change:
diff = max_change
new_index = i - diff
assert new_index > last_i
i = new_index
expected_indices.append(new_index)
last_i = i
for key, cur_index, new_index in zip(cur_keys, cur_indices, expected_indices):
if new_index is None:
continue
if cur_index == new_index:
continue
assert new_index not in self._index_key_map.keys()
# key = self._key_index_map[cur_index]
self._set_item_index(key, new_index)
[docs] def keys(self) -> Iterator[Any]:
"""Return an iterator of the dictionary keys, sorted by the item indices
"""
for i in self.iter_indices():
yield self._index_key_map[i]
[docs] def values(self) -> Iterator[Any]:
"""Return an iterator of the dictionary values, sorted by the item indices
"""
for key in self.keys():
yield self[key]
[docs] def items(self) -> Iterator[Tuple[Any, Any]]:
"""Return an iterator of the dictionary key, value pairs, sorted by the item indices
"""
for key in self.keys():
yield key, self[key]
[docs] def iter_indices(self, start_index: int = 0) -> Iterator[int]:
"""Iterate through sorted indices starting from the one given
Arguments:
start_index (int, optional): The starting index, defaults to ``0``
"""
for i in sorted(self._index_key_map.keys()):
if i < start_index:
continue
yield i
[docs] def iter_consecutive_indices(self, start_index: int = 0) -> Iterator[int]:
"""Iterate through sorted indices starting from the one given,
but stop at the first gap
Arguments:
start_index (int, optional): The starting index, defaults to ``0``
"""
last_i = None
for i in self.iter_indices(start_index):
if last_i is not None:
if last_i + 1 != i:
break
elif i != start_index:
break
yield i
last_i = i
def __getitem__(self, key: Any):
return self._data[key]
def __len__(self): return len(self._data)
def __contains__(self, key: Any): return key in self._data
[docs] def get(self, key: Any, default: Any = None):
"""Get an item by key
"""
return self._data.get(key, default)
[docs] def get_by_index(self, index_: int, default: Any = None):
"""Get an item by index
Arguments:
index_ (int): The item index to get
default (optional): The default to return if no item exists with
the given index, defaults to ``None``
"""
if index_ not in self._index_key_map:
return default
key = self._index_key_map[index_]
return self[key]
[docs] def get_item_index(self, key: Any) -> int:
"""Get the index for the given key
"""
return self._key_index_map[key]
def _pre_insert(self, start_index: int):
"""Move existing items to the right
"""
indices = [i for i in self.iter_consecutive_indices(start_index)]
for i in reversed(indices):
key = self._index_key_map[i]
self._set_item_index(key, i+1)
def _set_item_index(self, key: Any, new_index: int):
assert new_index not in self._index_key_map
item = self._data[key]
cur_index = self._key_index_map[key]
del self._index_key_map[cur_index]
self._key_index_map[key] = new_index
self._index_key_map[new_index] = key
self.emit(
'on_item_index_changed',
key=key, item=item, old_index=cur_index, new_index=new_index,
)
[docs]@dataclass
class NamedItem:
"""Helper class for :class:`NamedQueue`
"""
key: Any
"""The item key"""
item: Any
"""The item itself"""
[docs]class NamedQueue(asyncio.Queue):
"""A :class:`asyncio.Queue` subclass that stores items by user-defined keys.
The items placed on the queue must be instances of :class:`NamedItem`.
For convenience, there is a :meth:`create_item` contructor method.
"""
[docs] @classmethod
def create_item(self, key: Any, item: Any) -> NamedItem:
"""Create a :class:`NamedItem` to be put on the queue
"""
return NamedItem(key=key, item=item)
def _init(self, maxsize):
self._queue = collections.deque()
self._queue_items = {}
def _put(self, item: NamedItem):
self._queue_items[item.key] = item
if item.key not in self._queue:
self._queue.append(item.key)
def _get(self) -> NamedItem:
key = self._queue.popleft()
item = self._queue_items[key]
del self._queue_items[key]
return item
[docs] async def put(self, item: NamedItem):
"""Put a :class:`NamedItem` into the queue.
If the queue is full, wait until a free
slot is available before adding item.
If an item with the same :attr:`~NamedItem.key` already exists in the
queue, it will be replaced.
"""
return await super().put(item)
[docs] def put_nowait(self, item: NamedItem):
return super().put_nowait(item)
[docs] async def get(self) -> NamedItem:
return await super().get()
[docs] def get_nowait(self) -> NamedItem:
return super().get_nowait()