OwlCyberSecurity - MANAGER
Edit File: analyst_cleanup.py
import peewee as pw from defence360agent.model import Model, instance from datetime import datetime, timezone, timedelta class AnalystCleanupRequest(Model): """ Model for storing analyst cleanup requests. Tracks request details and status for each cleanup request submitted. """ class Meta: database = instance.db db_table = "analyst_cleanup_requests" id = pw.AutoField() username = pw.CharField(null=False) zendesk_id = pw.CharField(null=False) ticket_link = pw.TextField(null=False) created_at = pw.TimestampField( null=False, default=datetime.now(timezone.utc) ) status = pw.CharField( null=False, default="pending", constraints=[ pw.Check("status in ('pending','in_progress','completed')") ], ) last_updated = pw.TimestampField( null=False, default=datetime.now(timezone.utc) ) @classmethod def create_request(cls, username, zendesk_id, ticket_link): """Create a new cleanup request""" return cls.create( username=username, zendesk_id=zendesk_id, ticket_link=ticket_link ) @classmethod def get_user_requests(cls, username, limit=50, offset=0): """Get all requests for a specific user""" return ( cls.select() .where(cls.username == username) .order_by(cls.created_at.desc()) .limit(limit) .offset(offset) ) @classmethod def get_all_requests(cls, limit=50, offset=0): """Get all requests for a sever""" return ( cls.select() .order_by(cls.created_at.desc()) .limit(limit) .offset(offset) ) @classmethod def get_active_request_link(cls, username) -> str | None: """ Gets user requests for a user and checks if there are requests with [pending | in_progress] state. If found, returns ticket_link, otherwise returns None """ active_request = ( cls.select() .where( (cls.username == username) & (cls.status.in_(["pending", "in_progress"])) ) .limit(1) ).first() return active_request.ticket_link if active_request else None @classmethod def update_status(cls, zendesk_id, new_status, last_updated): """Update the status of a request""" return ( cls.update(status=new_status, last_updated=last_updated) .where(cls.zendesk_id == zendesk_id) .execute() ) @classmethod def get_all_relevant_requests(cls): """ Returns a query to fetch active cleanup requests and recently completed requests for the specified users. """ # Calculate the cutoff date for "recently completed" (3 days ago) three_days_ago = datetime.now(timezone.utc) - timedelta(days=3) return AnalystCleanupRequest.select( AnalystCleanupRequest.username, AnalystCleanupRequest.zendesk_id, AnalystCleanupRequest.status, AnalystCleanupRequest.last_updated, ).where( (AnalystCleanupRequest.status.in_(["pending", "in_progress"])) | ( (AnalystCleanupRequest.status == "completed") & (AnalystCleanupRequest.last_updated >= three_days_ago) ) )