Source code for mts_ipc.server

import logging
import discord
import asyncio

import aiohttp.web

from typing import Union, Optional, Tuple
from aiohttp import web
from discord.ext import commands
from mts_ipc.errors import *

log = logging.getLogger(__name__)


[docs]def route(name: str = None): """Used to register a coroutine as an endpoint when you don't have access to an instance of :class:`.Server` Parameters ---------- name: str The endpoint name. If not provided the method name will be used. This can be used in cogs. For ex - .. code:: python3 import mts_ipc as ipc from discord.ext import commands class IpcCog(commands.Cog): def __init__(self, bot): self.bot = bot @ipc.server.route(name = "test") async def test(self, data: ipc.server.IpcServerResponse): return "Successfully tested the endpoint, which name is " + str(data.endpoint) """ def decorator(func): if not name: Server.ROUTES[func.__name__] = func else: Server.ROUTES[name] = func return func return decorator
[docs]class IpcServerResponse: """Server response made for handling coroutines reserved for endpoints. Parameters ---------- data: dict The data from which response is to be made. """ def __init__(self, data: dict): self._json = data self.length = len(data) self.endpoint = data["endpoint"] for key, value in data["data"].items(): setattr(self, key, value) def to_json(self): return self._json def __repr__(self): return "<IpcServerResponse length={0.length}>".format(self) def __str__(self): return self.__repr__()
[docs]class Server: """The IPC server. Usually used on the bot process for receiving requests from the client. Parameters ---------- bot: Union[:class:`~discord.Client`, :class:`~discord.ext.commands.Bot`, :class:`~discord.ext.commands.AutoShardedBot`] Your bot instance secret_key: str A secret key. Used for authentication and should be the same as your client's secret key. """ ROUTES: dict = {} def __init__(self, bot: Union[discord.Client, commands.Bot, commands.AutoShardedBot], secret_key: str = "public-ipc-key"): self.bot = bot self.loop = bot.loop self.endpoints: dict = {} self.secret_key: str = secret_key self._app: aiohttp.web.Application = None self._is_closed: dict = False self.host: str = None self.path: str = None self.port: int = None self._webserver: web.TCPSite = None
[docs] def route(self, name: str = None): """Used to register a coroutine as an endpoint when you have access to an instance of :class:`.Server`. Parameters ---------- name: str The endpoint name. If not provided the method name will be used. """ def decorator(func): if not name: self.endpoints[func.__name__] = func else: self.endpoints[name] = func return func return decorator
[docs] def update_endpoints(self): """Called internally to update the server's endpoints for cog routes.""" self.endpoints = {**self.endpoints, **self.ROUTES} self.ROUTES = {}
@property def app(self): return self._app
[docs] async def handle_accept(self, request: aiohttp.web.Request): """Handles client requests from the client process. Parameters ---------- request: :class:`~aiohttp.web.Request` The request made by the client, parsed by aiohttp. """ self.update_endpoints() auth = request.headers.get("Authorization", "") if not auth == self.secret_key: resp = { "code": 403, "error": "Forbidden, No token or invalid token provided" } return aiohttp.web.json_response(resp) _data = await request.json() #print(_data) endpoint = _data.get("endpoint", "") if not endpoint or endpoint not in self.endpoints: resp = { "error": "no endpoint provided or invalid provided", "code": 500 } return aiohttp.web.json_response(resp) server_resp = IpcServerResponse(_data) try: attempted_cls = self.bot.cogs.get( self.endpoints[endpoint].__qualname__.split(".")[0]) if attempted_cls: args = (attempted_cls, server_resp) else: args = (server_resp, ) except AttributeError: args = (server_resp, ) try: ret = await self.endpoints[endpoint](*args) resp = {"content": ret} except Exception as error: self.bot.dispatch("ipc_error", endpoint, error) resp = {"error_in_server": error, "code": 500} try: return aiohttp.web.json_response(resp) except TypeError as error: if str(error).startswith("Object of type") and str(error).endswith( "is not JSON serializable"): error_response = [ "IPC route returned values which are not able to be sent over sockets.", " If you are trying to send a discord.py object,", " please only send the data you need." ] resp = {"error": error_response, "code": 500} raise JSONEncodeError("".join(error_response))
[docs] def start(self, *args, **kwargs) -> Tuple[web.Application, web.TCPSite]: """Method to start IPC Parameters ---------- app: Optional[:class:`~aiohttp.web.Application`] An aiohttp application if already made with important things. It is optional creates a new one when None given. path: Optional[str] The path where IPC connections are to be made, takes "/ipc" when None given. host: Optional[str] The host where the app has to be host for ex '0.0.0.0' for repl, defaults to localhost. port: Optional[int] The port where app has to be run, defaults to 8080 Returns ------- Tuple[:class:`~aiohttp.web.Application`, :class:`~aiohttp.web.TCPSite`]""" self.loop.run_until_complete(self._start(*args, **kwargs)) return (self._app, self._webserver)
[docs] def setup(self, app: Optional[web.Application] = None, path: Optional[str] = "/ipc") -> web.Application: """Setups IPC app but doesn't runs it Parameters ---------- app: Optional[:class:`~aiohttp.web.Application`] An aiohttp application if already made with important things. It is optional creates a new one when None given. path: Optional[str] The path where IPC connections are to be made, takes "/ipc" when None given. Returns ------- application formatted application""" self.loop.run_until_complete(self._setup(app, path)) return self._app
async def _setup(self, app: Optional[web.Application] = None, path: Optional[str] = "/ipc"): """Setups IPC app but doesn't runs it Parameters ---------- app: Optional[:class:`~aiohttp.web.Application`] An aiohttp application if already made with important things. It is optional creates a new one when None given. path: Optional[str] The path where IPC connections are to be made, takes "/ipc" when None given.""" self._app = app or web.Application(loop=self.loop) self._app.router.add_post(path, self.handle_accept) self.bot.dispatch("ipc_setup") async def _start(self, app: Optional[web.Application] = None, path: Optional[str] = "/ipc", host: Optional[str] = "localhost", port: Optional[int] = 8080): """A coro to start IPC Parameters ---------- app: Optional[:class:`~aiohttp.web.Application`] An aiohttp application if already made with important things. It is optional creates a new one when None given. path: Optional[str] The path where IPC connections are to be made, takes "/ipc" when None given. host: Optional[str] The host where the app has to be host for ex '0.0.0.0' for repl, defaults to localhost. port: Optional[int] The port where app has to be run, defaults to 8080""" self.setup(app=app, path=path) self.host, self.port, self.path = host, port, path runner = web.AppRunner(app) await runner.setup() self._webserver = web.TCPSite(runner, self.host, self.port) await self._webserver.start() self.bot.dispatch("ipc_ready")