Source code for mts_ipc.client

import asyncio
import logging
import typing

import aiohttp

from mts_ipc.errors import IpcError

[docs]class Client: """ Handles webserver side requests to the bot process. Parameters ---------- host: str The IP or host of the IPC server, defaults to localhost. path: str The path of the IPC server's IPC connections, defaults to /ipc. secret_key: Union[str, bytes] The secret key for your IPC server. Must match the server secret_key or requests will not go ahead, defaults to None """ def __init__(self, host: "localhost", secret_key: typing.Union[str, bytes] = "public-ipc-key",path: str = "/ipc"): """Constructor""" self.loop = asyncio.get_event_loop() self.path = "/"+str(path) if not str(path).startswith("/") else path self.secret_key = secret_key self.host = host @property def url(self): return "https://{0.host}{1}".format(self, self.path)
[docs] async def request(self, endpoint, **kwargs): """Make a request to the IPC server process. Parameters ---------- endpoint: str The endpoint to request on the server **kwargs The data to send to the endpoint """ payload = {"endpoint": endpoint, "data": kwargs} headers = {"Authorization": self.secret_key} async with aiohttp.ClientSession() as ses: ret = await ses.post(self.url, json=payload, headers=headers) ret = await ret.json() if "error" in ret: raise IpcError(ret["error"]) resp = ret["content"] await ses.close() return resp