Source code for jvconnected.client

from loguru import logger
import asyncio
from typing import Optional
import httpx
import httpcore

[docs]class ClientError(Exception): def __init__(self, msg, response_obj, data=None): self.msg = msg self.response_obj = response_obj self.data = data def __str__(self): return f'{self.msg}: {self.response_obj} - {self.data}'
[docs]class ClientAuthError(ClientError): pass
[docs]class ClientNetworkError(ClientError): pass
[docs]class Client(object): """Http client wrapper Arguments: hostaddr (str): The network host address auth_user (str): Api username auth_pass (str): Api password """ AUTH_URI = '/api.php' CMD_URI = '/cgi-bin/api.cgi' def __init__(self, hostaddr: str, auth_user: str, auth_pass: str, hostport: int = 80): if hostaddr.endswith('/'): hostaddr = hostaddr.rstrip('/') if not hostaddr.startswith('http'): hostaddr = f'http://{hostaddr}' self.hostaddr = hostaddr self.hostport = hostport self._client = None if auth_user is None: auth_user = '' if auth_pass is None: auth_pass = '' self.auth = httpx.DigestAuth(auth_user, auth_pass) self._authenticated = False self._error = False @property def netloc(self): return f'{self.hostaddr}:{self.hostport}' @property def session_id(self) -> Optional[str]: """The SessionID stored in cookies, if available """ if self._client is None: return None return self._client.cookies.get('SessionID') def _build_uri(self, path: str): path = path.lstrip('/') return f'{self.netloc}/{path}' async def _authenticate(self): """Authenticate with the host using digest auth """ if self._authenticated: return if self._error: return uri = self._build_uri(self.AUTH_URI) resp = await self._client.get(uri) try: resp.raise_for_status() except httpx.HTTPError as exc: logger.error(exc) self._error = True if resp.status_code == 401: raise ClientAuthError(f'Unauthorized for "{uri}"', resp) self._authenticated = True
[docs] async def open(self): """Open the Http client session and authenticate """ if self._client is None: self._authenticated = False self._error = False try: self._client = httpx.AsyncClient(auth=self.auth) await self._authenticate() except (httpcore.NetworkError, httpcore.TimeoutException, httpx.NetworkError, httpx.TimeoutException) as exc: logger.warning(repr(exc)) self._error = True raise ClientNetworkError(str(exc), exc)
[docs] async def close(self): """Close the client session """ if self._client is not None: c = self._client self._client = None await c.aclose()
async def __aenter__(self): await self.open() return self async def __aexit__(self, *args): await self.close()
[docs] async def request(self, command: str, params=None): """Make an api request Arguments: command (str): The api command name params (dict, optional): Data parameters for the command (if needed) Returns: dict: The response data """ if self._error: return payload = {'Request':{'Command':command}} if params is not None: payload['Request']['Params'] = params uri = self._build_uri(self.CMD_URI) try: resp = await self._client.post(uri, json=payload) except (httpcore.NetworkError, httpcore.TimeoutException, httpcore.ProtocolError, httpx.NetworkError, httpx.TimeoutException, httpx.ProtocolError) as exc: logger.warning(repr(exc)) self._error = True raise ClientNetworkError(str(exc), exc) try: resp.raise_for_status() except httpx.HTTPError as exc: logger.error(exc) self._error = True raise data = resp.json() # logger.debug(f'Response: {data}') resp_data = self._check_response_data(command, resp, data) return resp_data
@logger.catch def _check_response_data(self, command, resp, data): """Validate an api response from the host """ resp_data = data.get('Response', {}) if resp_data.get('Result') != 'Success': raise ClientError('Result failure', resp, data) elif resp_data.get('Requested') != command: raise ClientError('Response does not match request', resp, data) return resp_data def __repr__(self): return f'<{self.__class__.__name__}: "{self}">' def __str__(self): return self.hostaddr