OwlCyberSecurity - MANAGER
Edit File: send_message.py
import base64 import json import os import time import urllib.error import urllib.request from abc import ABC, abstractmethod from logging import getLogger from typing import Optional import asyncio from defence360agent.api.server import ( API, APIError, APITokenError, FGWSendMessgeException, ) from defence360agent.contracts.config import Core from defence360agent.contracts.messages import Message from defence360agent.internals.global_scope import g from defence360agent.internals.iaid import ( IndependentAgentIDAPI, IAIDTokenError, ) from defence360agent.utils.async_utils import AsyncIterate from defence360agent.utils.json import ServerJSONEncoder logger = getLogger(__name__) class BaseSendMessageAPI(API, ABC): URL = "/api/v2/send-message/{method}" @abstractmethod async def _send_request(self, message_method, headers, post_data) -> dict: pass # pragma: no cover def check_response(self, result: dict) -> None: if "status" not in result: raise APIError("unexpected server response: {!r}".format(result)) if result["status"] != "ok": raise APIError("server error: {}".format(result.get("msg"))) async def send_data(self, method: str, post_data: bytes) -> None: try: token = await IndependentAgentIDAPI.get_token() except IAIDTokenError as e: raise APITokenError(f"IAID token error occurred {e}") headers = { "Content-Type": "application/json", "X-Auth": token, } result = await self._send_request(method, headers, post_data) self.check_response(result) class SendMessageAPI(BaseSendMessageAPI): _SOCKET_TIMEOUT = Core.DEFAULT_SOCKET_TIMEOUT def __init__(self, rpm_ver: str, base_url: str = None, executor=None): self._executor = executor self.rpm_ver = rpm_ver self.product_name = "" self.server_id = None # type: Optional[str] self.license = {} # type: dict if base_url: self.base_url = base_url else: self.base_url = self._BASE_URL def set_product_name(self, product_name: str) -> None: self.product_name = product_name def set_server_id(self, server_id: Optional[str]) -> None: self.server_id = server_id def set_license(self, license: dict) -> None: self.license = license async def _send_request(self, message_method, headers, post_data): request = urllib.request.Request( self.base_url + self.URL.format(method=message_method), data=post_data, headers=headers, method="POST", ) return await self.async_request(request, executor=self._executor) async def send_message(self, message: Message) -> None: # add message handling time if it does not exist, so that # the server does not depend on the time it was received if "timestamp" not in message: message["timestamp"] = time.time() data2send = { "payload": message.payload, "rpm_ver": self.rpm_ver, "message_id": message.message_id, "server_id": self.server_id, "name": self.product_name, } post_data = json.dumps(data2send, cls=ServerJSONEncoder).encode() await self.send_data(message.method, post_data) class FileBasedGatewayAPI(SendMessageAPI): async def _prepare_message(self, message, semaphore) -> dict: async with semaphore: loaded = await asyncio.to_thread(json.loads, message) return { "method": loaded["method"], "data": {k: v for k, v in loaded.items() if k != "method"}, } async def send_messages(self, messages: list[tuple[float, bytes]]) -> None: max_threads = 5 semaphore = asyncio.Semaphore(max_threads) tasks = [ self._prepare_message(msg, semaphore) async for _, msg in AsyncIterate(messages) ] prepared_messages = await asyncio.gather(*tasks) dumped_messages = await asyncio.to_thread( json.dumps, prepared_messages ) bin_file_path = os.getenv( "I360_MESSAGE_GATEWAY_BIN_PATH", "/usr/libexec/" ) bin_file = os.path.join(bin_file_path, "imunify-message-gateway") command = [ bin_file, "send-many", "--producer=i360-agent-non-resident", ] process = await asyncio.create_subprocess_exec( *command, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) b64data = base64.b64encode(dumped_messages.encode()) stdout, stderr = await process.communicate(input=b64data) if g.get("DEBUG"): logger.info( "Message sent to fgw: %s %s %s", len(messages), stdout, stderr ) if process.returncode != 0: logger.error(f"Error sending message: {stderr.decode()}") raise FGWSendMessgeException( str(f"Error sending message: {stderr.decode()}") )