Source code for jvconnected.utils

import asyncio
import collections
from dataclasses import dataclass
from typing import Any, Iterator, Union, Tuple

from pydispatch import Dispatcher

[docs]class IndexedDict(Dispatcher): """A ``dict`` like container that tracks indices for its items :Events: .. event:: on_item_added(key=key, item=item, index=index_) Fired when an item is added .. event:: on_item_removed(key=key, item=item, index=index_) Fired when an item is removed .. event:: on_item_index_changed(key=key, item=item, old_index=cur_index, new_index=new_index) Fired when an item's index changes """ _events_ = ['on_item_added', 'on_item_removed', 'on_item_index_changed'] def __init__(self): self._data = {} self._index_key_map = {} self._key_index_map = {} @property def next_index(self): if not len(self._index_key_map): return 0 return max(self._index_key_map.keys()) + 1
[docs] def add(self, key: Any, item: Any, index_: int = -1) -> int: """Add an item Arguments: key: The dictionary key item: The dictionary value index_: The index for the item. If ``-1``, the item will be appended to the end, otherwise it will be inserted at the specified index Returns: int: The inserted item's index """ assert key not in self._data if index_ == -1: index_ = self.next_index if index_ in self._index_key_map: self._pre_insert(index_) assert index_ not in self._index_key_map self._data[key] = item self._index_key_map[index_] = key self._key_index_map[key] = index_ self.emit('on_item_added', key=key, item=item, index=index_) return index_
[docs] def remove(self, key: Any): """Remove an item Arguments: key: The dictionary key Returns: The item that was removed """ item = self._data[key] index_ = self._key_index_map[key] del self._key_index_map[key] del self._index_key_map[index_] del self._data[key] self.emit('on_item_removed', key=key, item=item, index=index_) return item
[docs] def change_item_index(self, key: Any, new_index: int): """Change the index for an existing item. If necessary, change indices for any conflicting items Arguments: key: the dictionary key new_index (int): New index for the item """ item = self._data[key] cur_index = self._key_index_map[key] del self._index_key_map[cur_index] if new_index in self._index_key_map: self._pre_insert(new_index) assert new_index not in self._index_key_map self._index_key_map[new_index] = key self._key_index_map[key] = new_index self.emit( 'on_item_index_changed', key=key, item=item, old_index=cur_index, new_index=new_index, )
[docs] def compact_indices(self, start_index: int = 0, max_change: int = 1): """Remove gaps in indices Arguments: start_index (int, optional): The index to start from max_change (int, optional): Limit index changes to this amount """ cur_indices = [i for i in self.iter_indices(start_index)] cur_keys = [self._index_key_map[i] for i in cur_indices] expected_indices = [] last_i = None for i in cur_indices: new_index = None if last_i is None: last_i = i elif last_i + 1 != i: next_i = last_i + 1 diff = i - next_i if diff > max_change: diff = max_change new_index = i - diff assert new_index > last_i i = new_index expected_indices.append(new_index) last_i = i for key, cur_index, new_index in zip(cur_keys, cur_indices, expected_indices): if new_index is None: continue if cur_index == new_index: continue assert new_index not in self._index_key_map.keys() # key = self._key_index_map[cur_index] self._set_item_index(key, new_index)
[docs] def keys(self) -> Iterator[Any]: """Return an iterator of the dictionary keys, sorted by the item indices """ for i in self.iter_indices(): yield self._index_key_map[i]
[docs] def values(self) -> Iterator[Any]: """Return an iterator of the dictionary values, sorted by the item indices """ for key in self.keys(): yield self[key]
[docs] def items(self) -> Iterator[Tuple[Any, Any]]: """Return an iterator of the dictionary key, value pairs, sorted by the item indices """ for key in self.keys(): yield key, self[key]
[docs] def iter_indices(self, start_index: int = 0) -> Iterator[int]: """Iterate through sorted indices starting from the one given Arguments: start_index (int, optional): The starting index, defaults to ``0`` """ for i in sorted(self._index_key_map.keys()): if i < start_index: continue yield i
[docs] def iter_consecutive_indices(self, start_index: int = 0) -> Iterator[int]: """Iterate through sorted indices starting from the one given, but stop at the first gap Arguments: start_index (int, optional): The starting index, defaults to ``0`` """ last_i = None for i in self.iter_indices(start_index): if last_i is not None: if last_i + 1 != i: break elif i != start_index: break yield i last_i = i
def __getitem__(self, key: Any): return self._data[key] def __len__(self): return len(self._data) def __contains__(self, key: Any): return key in self._data
[docs] def get(self, key: Any, default: Any = None): """Get an item by key """ return self._data.get(key, default)
[docs] def get_by_index(self, index_: int, default: Any = None): """Get an item by index Arguments: index_ (int): The item index to get default (optional): The default to return if no item exists with the given index, defaults to ``None`` """ if index_ not in self._index_key_map: return default key = self._index_key_map[index_] return self[key]
[docs] def get_item_index(self, key: Any) -> int: """Get the index for the given key """ return self._key_index_map[key]
def _pre_insert(self, start_index: int): """Move existing items to the right """ indices = [i for i in self.iter_consecutive_indices(start_index)] for i in reversed(indices): key = self._index_key_map[i] self._set_item_index(key, i+1) def _set_item_index(self, key: Any, new_index: int): assert new_index not in self._index_key_map item = self._data[key] cur_index = self._key_index_map[key] del self._index_key_map[cur_index] self._key_index_map[key] = new_index self._index_key_map[new_index] = key self.emit( 'on_item_index_changed', key=key, item=item, old_index=cur_index, new_index=new_index, )
[docs]@dataclass class NamedItem: """Helper class for :class:`NamedQueue` """ key: Any """The item key""" item: Any """The item itself"""
[docs]class NamedQueue(asyncio.Queue): """A :class:`asyncio.Queue` subclass that stores items by user-defined keys. The items placed on the queue must be instances of :class:`NamedItem`. For convenience, there is a :meth:`create_item` contructor method. """
[docs] @classmethod def create_item(self, key: Any, item: Any) -> NamedItem: """Create a :class:`NamedItem` to be put on the queue """ return NamedItem(key=key, item=item)
def _init(self, maxsize): self._queue = collections.deque() self._queue_items = {} def _put(self, item: NamedItem): self._queue_items[item.key] = item if item.key not in self._queue: self._queue.append(item.key) def _get(self) -> NamedItem: key = self._queue.popleft() item = self._queue_items[key] del self._queue_items[key] return item
[docs] async def put(self, item: NamedItem): """Put a :class:`NamedItem` into the queue. If the queue is full, wait until a free slot is available before adding item. If an item with the same :attr:`~NamedItem.key` already exists in the queue, it will be replaced. """ return await super().put(item)
[docs] def put_nowait(self, item: NamedItem): return super().put_nowait(item)
[docs] async def get(self) -> NamedItem: return await super().get()
[docs] def get_nowait(self) -> NamedItem: return super().get_nowait()