from loguru import logger
import asyncio
from typing import Optional
[docs]class JpegSource:
"""Acquire preview images sequentially from the device
This acts an Asynchronous Context Manager and an Asynchronous Iterator
and should be used within an :keyword:`async with` block. The individual
image frames can then be retreived using an :keyword:`async for` loop::
async with JpegSource(device) as src:
async for image_bytes in src:
process_image(image_bytes)
"""
device: 'jvconnected.device.Device' #: The device instance
fps: int = 30 #: Desired frame rate in frames per second
def __init__(self, device: 'jvconnected.device.Device', fps: int = 30):
self.device = device
self.fps = fps
self.loop = asyncio.get_event_loop()
self.last_frame_time = None
self.__encoding = False
@property
def client(self) -> 'jvconnected.client.Client':
return self.device.client
@property
def image_uri(self) -> str:
uri = f'/cgi-bin/get_jpg.cgi?SessionID={self.client.session_id}'
return self.client._build_uri(uri)
@property
def encoding(self) -> bool:
"""``True`` if the camera's Jpeg encode function is enabled
"""
return self.__encoding
[docs] async def acquire(self):
"""Acquire the context manager and tell the device to begin Jpeg encoding
"""
if self.encoding:
return
await self.client.request('JpegEncode', {'Operate':'Start'})
logger.success('JpegEncode acquired')
self.__encoding = True
[docs] async def release(self):
"""Stop encoding on the device and release the context manager
"""
logger.debug('releasing...')
if self.encoding:
await self.client.request('JpegEncode', {'Operate':'Stop'})
logger.success('JpegEncode released')
self.__encoding = False
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, *args):
await self.release()
[docs] async def wait_for_next_frame(self):
"""Wait for enough time to elapse to respect the required :attr:`fps`
"""
now = self.loop.time()
if self.last_frame_time is not None:
elapsed = now - self.last_frame_time
next_frame_time = self.last_frame_time + (1./self.fps)
if now < next_frame_time:
delta = next_frame_time - now
await asyncio.sleep(delta)
[docs] async def get_single_image(self) -> Optional[bytes]:
"""Get a single frame as a :class:`bytes` of Jpeg encoded data
If necessary, wait before requesting to maintain the desired :attr:`fps`
Note:
The object must be acquired before calling this method, either by
:meth:`acquire` or :keyword:`async with`
"""
await self.wait_for_next_frame()
if not self.encoding:
self.last_frame_time = self.loop.time()
return None
img_uri = self.image_uri
data = []
async with self.client._client.stream('GET', img_uri) as resp:
if resp.status_code != 200:
return None
async for chunk in resp.aiter_bytes():
data.append(chunk)
self.last_frame_time = self.loop.time()
return b''.join(data)
def __aiter__(self):
return self
async def __anext__(self):
if not self.encoding:
raise StopAsyncIteration
return await self.get_single_image()