OwlCyberSecurity - MANAGER
Edit File: analyst_cleanup.py
import json import urllib.request import logging from datetime import datetime, timedelta from defence360agent.api.server import API from defence360agent.internals.iaid import ( IndependentAgentIDAPI, IAIDTokenError, ) from defence360agent.rpc_tools.utils import run_in_executor_decorator from defence360agent.utils.support import parse_params logger = logging.getLogger(__name__) CACHE_TTL = timedelta(minutes=10) class AnalystCleanupAPI(API): CLEANUP_ALLOWED_URL_TEMPLATE = ( "{base}/api/analyst-assisted-cleanup/is-allowed" ) SHOW_MANY_URL_TEMPLATE = "{base}/api/analyst-assisted-cleanup/tickets" IS_REGISTERED_URL_TEMPLATE = ( "{base}/api/analyst-assisted-cleanup/is-registered" ) # Cache for the cleanup allowed check _cache = { "result": None, "timestamp": datetime.min, # Initialize with minimum datetime } @classmethod async def check_cleanup_allowed(cls): """Check if analyst cleanup is allowed for this installation""" current_time = datetime.now() if ( cls._cache["result"] is not None and current_time - cls._cache["timestamp"] < CACHE_TTL ): return cls._cache["result"] try: request = urllib.request.Request( cls.CLEANUP_ALLOWED_URL_TEMPLATE.format(base=cls._BASE_URL), headers={"X-Auth": await IndependentAgentIDAPI.get_token()}, method="GET", ) except IAIDTokenError: return False else: result = await cls._check_allowed(request) cls._cache["result"] = result cls._cache["timestamp"] = datetime.now() return result @classmethod @run_in_executor_decorator def _check_allowed(cls, request): """Execute the actual request in executor""" try: result = cls.request(request) return result.get("result", False) except Exception as e: logger.error("Failed to check cleanup permission: %s", e) # NOTE: # If the API returns an error, the request should be allowed # (to prevent extra sales trips due to API instability). # Ref: https://cloudlinux.slite.com/app/docs/Fhb2ASESxb9111 return True @classmethod async def get_tickets(cls, ids: [str]) -> [dict]: """ Retrieve tickets from Zendesk API using the show_many endpoint Args: ids (list or str): List of ticket IDs or comma-separated string of IDs Returns: list: List of dictionaries with 'id', 'status', and 'updated_at' fields """ # Convert list of ids to comma-separated string if necessary if isinstance(ids, list): ids_str = ",".join(str(_id) for _id in ids) else: ids_str = str(ids) # Construct URL for the show_many endpoint url = cls.SHOW_MANY_URL_TEMPLATE.format(base=cls._BASE_URL) params = {"ids": ids_str} try: request = urllib.request.Request( parse_params(params, url), headers={"X-Auth": await IndependentAgentIDAPI.get_token()}, method="GET", ) except IAIDTokenError as e: logger.error(f"Failed to get IAID token for tickets: {e}") raise return await cls._execute_get_tickets(request) @classmethod @run_in_executor_decorator def _execute_get_tickets(cls, request): """Execute the actual get_tickets request in executor""" simplified_tickets = [] try: result = cls.request(request) # Extract only the required fields from each ticket for ticket in result.get("tickets", []): simplified_tickets.append( { "id": ticket.get("id"), "status": ticket.get("status"), "updated_at": ticket.get("updated_at"), } ) return simplified_tickets except Exception as e: logger.error(f"Failed to get tickets: {e}") finally: return simplified_tickets @classmethod async def check_registered(cls, email): """Check if email is registered in Zendesk""" try: request = urllib.request.Request( cls.IS_REGISTERED_URL_TEMPLATE.format(base=cls._BASE_URL), headers={ "X-Auth": await IndependentAgentIDAPI.get_token(), "Content-Type": "application/json", }, data=json.dumps({"customer_email": email}).encode(), method="POST", ) except IAIDTokenError: logger.error("Got IAIDTokenError") return {} else: result = await cls._register_status(request) return result @classmethod @run_in_executor_decorator def _register_status(cls, request): """Execute the actual request in executor""" try: result = cls.request(request) return result except Exception as e: logger.error("Failed to check email registration: %s", e) return {}