OwlCyberSecurity - MANAGER
Edit File: analyst_cleanup.py
import warnings from logging import getLogger from datetime import datetime, timedelta from defence360agent.rpc_tools import ValidationError from defence360agent.rpc_tools.lookup import RootEndpoints, bind import defence360agent.subsys.panels.hosting_panel as hp from defence360agent.utils.support import send_request from defence360agent.utils.sshutil import ( get_ssh_port, check_ssh_connection, install_pub_key, ) from defence360agent.model.analyst_cleanup import AnalystCleanupRequest from defence360agent.api.server.analyst_cleanup import AnalystCleanupAPI logger = getLogger(__name__) PREPARE_SERVER_GUIDE = "https://cloudlinux.zendesk.com/hc/en-us/articles/6245743410460-How-to-authenticate-your-server-for-Support-Team-and-use-the-SSH-access-form" ZENDESK_REGISTRATION_URL = ( "https://cloudlinux.zendesk.com/auth/v2/login/registration" ) class AnalystCleanupEndpoints(RootEndpoints): async def _create_zendesk_ticket( self, email, subject, full_description, ) -> (str, str): """ Creates a Zendesk ticket and return link and id of the ticket On any error raises ValidationError, which would be added to RPC answer """ # Create Zendesk ticket try: ticket_url = await send_request( email, subject, full_description, ) logger.info(f"Created ticket on url {ticket_url}") if ticket_url: # Extract Zendesk ID from URL return ticket_url, ticket_url.split("/")[-1] else: raise ValidationError("Failed to create support ticket") except Exception as e: logger.error(f"Failed to process cleanup request: {e}") raise ValidationError( f"Failed to process cleanup request: {str(e)}" ) @bind("analyst-cleanup", "request") async def request_cleanup(self, email, username, message): """Handle analyst cleanup request""" # Check active tickets if active_ticket := AnalystCleanupRequest.get_active_request_link( username ): raise ValidationError( "You already have an active request for cleaning this user." " If you have additional information, you may follow" f" the link and provide new data here: {active_ticket}" ) # Check if cleanup is allowed if not (await AnalystCleanupAPI.check_cleanup_allowed()): raise ValidationError( "You are not authorized to submit Analyst Cleanup requests. " "Contact sales@cloudlinux.com to get access" ) email_status = await AnalystCleanupAPI.check_registered(email) # Check if email is registered if not email_status.get("result", False): raise ValidationError( f"{email_status.get('message', '')} Couldn't register" " your email in our Zendesk system. You can make it manually" f" by following the link {ZENDESK_REGISTRATION_URL} and then" " try sending the request again." ) if email_status.get("is_new", False): warnings.warn( "We’ve set up a Zendesk account for you! To complete your" " registration, check your email and click the “Reset" " Password” button." ) # Install public key key_installed = await install_pub_key(username) # Get SSH port and check connection ssh_port = await get_ssh_port() connection_ok = await check_ssh_connection(ssh_port) # Prepare ticket subject and description subject = "Analyst Cleanup Request" server_access = ( f"{hp.HostingPanel().get_server_ip()}:{ssh_port}/{username}" ) full_description = ( f"Username: {username}\n" f"Server Access: {server_access}\n\n" f"Customer Message:\n{message}\n\n" ) if not key_installed: warnings.warn("Support SSH public key is not installed", Warning) full_description += ( "\n\nWARNING: Not able to install analyst's public key\n" " Please make it manually by reffering to" f" {PREPARE_SERVER_GUIDE}\n and provide credentials " "to zendesk ticket" ) elif not connection_ok: warnings.warn("SSH connection test failed", Warning) full_description += ( "\n\nWARNING: SSH connection test failed. Please verify SSH" " access and refer to the access request form." ) # Create Zendesk ticket # In a case of no url|id # ValidationError is raised from _create_zendesk_ticket ticket_url, ticket_id = await self._create_zendesk_ticket( email, subject, full_description, ) # Store request in database AnalystCleanupRequest.create_request( username=username, zendesk_id=ticket_id, ticket_link=ticket_url, ) return {"items": {"ticket_url": ticket_url}} @bind("analyst-cleanup", "get-requests") async def request_status(self, username=None, limit=50, offset=0): """ Get status of analyst cleanup requests for all or a specific user Completed tickets will only be visible for 2 weeks after their last update """ # Get user's requests using the get_user_requests method from the model # This will return the most recent requests first (ordered by created_at desc) if username is None: requests = AnalystCleanupRequest.get_all_requests(limit, offset) else: requests = AnalystCleanupRequest.get_user_requests( username, limit, offset ) # If no requests found, return appropriate response if not requests or len(requests) == 0: return [] # Calculate the cutoff date (2 weeks ago) two_weeks_ago = datetime.utcnow() - timedelta(weeks=2) logger.info(f"Showing requests since {two_weeks_ago}") # Filter requests: show all except completed tickets older than 2 weeks filtered_requests = [ { "username": req.username, "ticket_url": req.ticket_link, "status": req.status, "created_at": str(datetime.timestamp(req.created_at)), "last_update": str(datetime.timestamp(req.last_updated)), "zendesk_id": req.zendesk_id, } for req in requests if req.status != "completed" or req.last_updated > two_weeks_ago ] logger.info(f"Got requests: {filtered_requests}") # Return the request details return filtered_requests @bind("analyst-cleanup", "is-allowed") async def is_allowed(self): is_allowed = await AnalystCleanupAPI.check_cleanup_allowed() return {"items": {"is_allowed": is_allowed}}