OwlCyberSecurity - MANAGER
Edit File: whmcs_utils.py
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT # helper functions for clwpos utility import datetime import logging import os import shutil import json import pwd import subprocess from clcommon.cpapi import cpusers from clwpos.cl_wpos_exceptions import WposError from clwpos.constants import ( USER_WPOS_DIR, PUBLIC_OPTIONS, AWP_BACKUP_DIR, CLWPOS_UIDS_PATH, CLWPOS_ADMIN_DIR, USERS_CONFIGS_TO_BACKUP, PUBLIC_OPTIONS_FILE_NAME, CLWPOS_UIDS_DIR_NAME, ALLOWED_SUITES_JSON, ) def collect_user_configs(user=None) -> dict: """ Collects user configs inside HOME/.clwpos """ target_users = [user] if user else list(cpusers()) user_folders = {} for user in target_users: try: pw = pwd.getpwnam(user) except KeyError: logging.warning('Cannot collect configs folder for user: %s', user) continue full_user_dir = os.path.join(pw.pw_dir, USER_WPOS_DIR) user_folders[user] = [os.path.join(full_user_dir, config) for config in USERS_CONFIGS_TO_BACKUP] return user_folders def backup_single_user_conf(username, configs, backups_dir): """ Copies configs for single user """ for path in configs: if not os.path.exists(path): continue user_backup_dir = os.path.join(backups_dir, username, os.path.basename(os.path.dirname(path))) backup_path = os.path.join(user_backup_dir, os.path.basename(path)) if not os.path.exists(user_backup_dir): os.makedirs(user_backup_dir, mode=0o700, exist_ok=True) shutil.copy2(path, backup_path) def backup_accelerate_wp(): """ Backups main AccelerateWP configs to /var/clwpos/DATE-TIME/.backup/%Y_%m_%d_%H_%M_%S. Directory is only root-editable, so no user permissions drop is needed. """ files_to_backup = [ PUBLIC_OPTIONS ] folders_to_backup = [ CLWPOS_UIDS_PATH, CLWPOS_ADMIN_DIR ] user_configs = collect_user_configs() current_backup_folder = os.path.join( AWP_BACKUP_DIR, datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S') ) if not os.path.exists(AWP_BACKUP_DIR): os.mkdir(AWP_BACKUP_DIR, mode=0o700) os.mkdir(current_backup_folder, mode=0o700) for path in files_to_backup: if not os.path.exists(path): continue shutil.copy2(path, current_backup_folder) for path in folders_to_backup: if not os.path.exists(path): continue shutil.copytree(path, os.path.join(current_backup_folder, os.path.basename(path))) user_backups = os.path.join(current_backup_folder, 'users') os.mkdir(user_backups, mode=0o700) for username, paths in user_configs.items(): try: backup_single_user_conf(username, paths, user_backups) except Exception: logging.exception('Cannot backup configs for user %s', username) continue def restore_accelerate_wp_public_options_backup(): """ Restore AccelerateWP suite states from latest backup (server-wide) """ visible_suites, allowed_suites, upgrade_url = _get_backup_of_public_options() if visible_suites: try: output = subprocess.run([ '/usr/bin/cloudlinux-awp-admin', 'set-suite', '--visible-for-all', '--suites', ','.join(visible_suites)], capture_output=True, text=True) except subprocess.CalledProcessError as e: logging.exception('Unable to set suites state to visible-for-all' 'Stdout is %s. Stderr is %s', e.stdout, e.stderr) # return to avoid complicating debugging if something goes wrong return if allowed_suites: try: output = subprocess.run([ '/usr/bin/cloudlinux-awp-admin', 'set-suite', '--allowed-for-all', '--suites', ','.join(allowed_suites)], capture_output=True, text=True) except subprocess.CalledProcessError as e: logging.exception('Unable to set suites state to allowed-for-all.' 'Stdout is %s. Stderr is %s', e.stdout, e.stderr) if upgrade_url: try: output = subprocess.run([ '/usr/bin/cloudlinux-awp-admin', 'set-options', '--upgrade-url', upgrade_url], capture_output=True, text=True) except subprocess.CalledProcessError as e: logging.exception('Unable to set upgrade_url.' 'Stdout is %s. Stderr is %s', e.stdout, e.stderr) def _get_backup_of_public_options(): allowed_suites = [] visible_suites = [] upgrade_url = '' last_backup = _get_latest_backup_folder() if last_backup: public_options_backup_file = os.path.join(last_backup, PUBLIC_OPTIONS_FILE_NAME) if os.path.exists(public_options_backup_file): with open(public_options_backup_file) as f: try: public_options = json.load(f) allowed_suites = public_options.get('allowed_suites', []) visible_suites = public_options.get('visible_suites', []) upgrade_url = public_options.get('upgrade_url', '') except json.decoder.JSONDecodeError as err: raise WposError( message=_("Backup file is corrupted: %(config_file)s" " or fix the line provided in details"), details=str(err), context={'config_file': public_options_backup_file}) return visible_suites, allowed_suites, upgrade_url def _get_latest_backup_folder(): with os.scandir(AWP_BACKUP_DIR) as entries: backups = sorted(entries, key=lambda entry: entry.name, reverse=True) if not backups: return None return backups[0] def restore_accelerate_wp_users_suites_statuses_backup(): """ Restores backup AccelerateWP suite states (per user) """ grouped_by_suite_usernames = _get_backup_of_users_suites() for suite, data in grouped_by_suite_usernames.items(): for status, usernames in data.items(): if usernames: try: output = subprocess.run([ '/usr/bin/cloudlinux-awp-admin', 'set-suite', '--' + status, '--suites', suite, '--users', ','.join(usernames)], capture_output=True, text=True) except subprocess.CalledProcessError as e: logging.exception('Unable to set suites state for user.' 'Stdout is %s. Stderr is %s', e.stdout, e.stderr) def _get_backup_of_users_suites(): suites = {} grouped_by_suite_usernames = {} last_backup = _get_latest_backup_folder() if not last_backup: logging.exception("Can't restore WHMCS backup. There is no backup data.") return {} users_uids_dir = os.path.join(last_backup, CLWPOS_UIDS_DIR_NAME) with os.scandir(users_uids_dir) as entries: for uid_folder in entries: uid_config_path = os.path.join(uid_folder, ALLOWED_SUITES_JSON) try: username = pwd.getpwuid(int(uid_folder.name))[0] except (KeyError, TypeError): continue if not os.path.exists(uid_config_path): continue with open(uid_config_path) as f: try: uid_config = json.load(f) suites[username] = uid_config.get('suites', {}) except json.decoder.JSONDecodeError as e: logging.exception("Backup file is corrupted: %s" " or fix the line provided in details" 'Stdout is %s. Stderr is %s', uid_config_path, e.stdout, e.stderr) grouped_by_suite_usernames = _group_users_by_suites(suites) return grouped_by_suite_usernames def _group_users_by_suites(uids_suites: dict): """ Groups uids by suite and status to change suite status for multiple uids by one run. Converts per uid dict from uids_suites = { "user1": { "accelerate_wp": "allowed", "accelerate_wp_premium": "visible", "accelerate_wp_cdn": "default", "accelerate_wp_cdn_pro": "allowed" } } to { "accelerate_wp": { "allowed": [user1,user2,user3], "visible": [user1,user2,user3], "default": [], } } """ grouped_by_suite_uids = {} for uid, suites in uids_suites.items(): for suite, status in suites.items(): if suite not in grouped_by_suite_uids: grouped_by_suite_uids[suite] = {} if status not in grouped_by_suite_uids[suite]: grouped_by_suite_uids[suite] = {status:[]} grouped_by_suite_uids[suite][status].append(uid) return grouped_by_suite_uids