OwlCyberSecurity - MANAGER
Edit File: sshutil.py
import asyncio import re import subprocess import urllib.request import os from logging import getLogger from urllib.error import URLError from pathlib import Path from defence360agent.utils import atomic_rewrite logger = getLogger(__name__) ANALYST_PUB_KEY_URL = ( "https://repo.imunify360.cloudlinux.com/defense360/assisted-cleanup.pub" ) # Define the key pattern to match (FROM ANALYST_PUB_KEY_URL) KEY_PATTERN = r"clsupport@sshbox\.cloudlinux\.com" SSH_CONFIG_PATH = Path("/etc/ssh/sshd_config") SSH_CONFIG_DIR = Path("/etc/ssh/sshd_config.d") async def get_ssh_port(): """ Detect SSH port from config and its overrides. Searches configs in reverse order to find the last override first. """ port = 22 # default port try: # Collect and sort config files config_files = [SSH_CONFIG_PATH] if SSH_CONFIG_DIR.exists(): config_files.extend(sorted(SSH_CONFIG_DIR.glob("*.conf"))) # Process files for config_file in reversed(config_files): try: for line in config_file.read_text().splitlines(): line = line.strip() if line.startswith("Port ") and not line.startswith("#"): try: # return first match # since we are searching backwards port = int(line.split()[1]) return port except (IndexError, ValueError): continue except IOError as e: logger.warning(f"Failed to read {config_file}: {e}") continue except Exception as e: logger.warning(f"Failed to get SSH port: {e}") finally: return port async def check_ssh_connection(port=22): """Test if port is actually an SSH port by checking the server banner""" try: reader, writer = await asyncio.open_connection("127.0.0.1", port) try: banner = await asyncio.wait_for(reader.readline(), timeout=5.0) banner = banner.decode("utf-8", errors="ignore").strip() if re.match(r"^SSH-[12]\.", banner): logger.info( f"Port {port} is confirmed as SSH (banner: {banner})" ) return True else: logger.warning( f"Port {port} is open but not SSH (got: {banner})" ) return False except asyncio.TimeoutError: logger.warning(f"Timeout waiting for SSH banner on port {port}") return False finally: writer.close() await writer.wait_closed() except (ConnectionRefusedError, OSError) as e: logger.warning(f"Failed to connect to port {port}: {e}") return False except Exception as e: logger.warning(f"Unexpected error checking SSH port {port}: {e}") return False async def install_pub_key(username="root"): """Install analyst public key for the user downloads the public key from ANALYST_PUB_KEY_URL and adds it to the user's authorized_keys file """ try: # Determine the path to the authorized_keys file if username == "root": auth_keys_path = Path("/root/.ssh/authorized_keys") else: auth_keys_path = Path(f"/home/{username}/.ssh/authorized_keys") # If not running as root, fail if os.geteuid() != 0: logger.error("Function must be run as root") return False # Download the public key try: pub_key = ( urllib.request.urlopen(ANALYST_PUB_KEY_URL) .read() .decode() .strip() ) except URLError as e: logger.error(f"Failed to download public key: {e}") return False # Check if the authorized_keys directory exists, create if not auth_keys_dir = auth_keys_path.parent if not auth_keys_dir.exists(): try: auth_keys_dir.mkdir(mode=0o700, parents=True, exist_ok=True) # Set proper ownership for the .ssh directory if username != "root": subprocess.run( ["chown", f"{username}:{username}", str(auth_keys_dir)] ) except Exception as e: logger.error( f"Failed to create directory {auth_keys_dir}: {e}" ) return False # Check if the authorized_keys file exists, create if not if not auth_keys_path.exists(): try: auth_keys_path.touch(mode=0o600) # Set proper ownership for the authorized_keys file if username != "root": subprocess.run( [ "chown", f"{username}:{username}", str(auth_keys_path), ] ) except Exception as e: logger.error(f"Failed to create file {auth_keys_path}: {e}") return False try: # Read existing content to avoid adding duplicate keys content = auth_keys_path.read_text() if pub_key in content: logger.info(f"Key already exists in {auth_keys_path}") return True # Append the key to the authorized_keys file with open(auth_keys_path, "a") as f: f.write(f"{pub_key}\n") logger.info(f"Successfully installed key for user {username}") return True except IOError as e: logger.error(f"Failed to write to {auth_keys_path}: {e}") return False except Exception as e: logger.error(f"Failed to install public key: {e}") return False def remove_pub_key(username="root") -> bool: """Remove analyst public key for the specified user This function removes the analyst's public key that was previously installed using the install_pub_key function. returns: True if key was successfully removed, False otherwise. """ try: # Determine the path to the authorized_keys file if username == "root": auth_keys_path = Path("/root/.ssh/authorized_keys") else: auth_keys_path = Path(f"/home/{username}/.ssh/authorized_keys") # Check if the file exists if not auth_keys_path.exists(): logger.warning( f"authorized_keys file not found at {auth_keys_path}" ) return False # Read the current content of the file try: content = auth_keys_path.read_text() except IOError as e: logger.error(f"Failed to read {auth_keys_path}: {e}") return False # Check if the key exists in the file if not re.search(KEY_PATTERN, content): logger.info(f"Analyst public key not found in {auth_keys_path}") return False # Remove the key (including the line it's on) new_content = re.sub(r".*" + KEY_PATTERN + r".*\n?", "", content) # If the file ends up empty, consider adding a note if not new_content.strip(): logger.info(f"File {auth_keys_path} will be empty after removal") # Write the updated content back to the file try: atomic_rewrite( auth_keys_path, new_content, backup=True, ) logger.info( "Successfully removed analyst public key from" f" {auth_keys_path}" ) return True except IOError as e: logger.error(f"Failed to write to {auth_keys_path}: {e}") return False except Exception as e: logger.error(f"Failed to remove public key: {e}") return False