OwlCyberSecurity - MANAGER
Edit File: cmdline_parser.py
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT """ This module contains an argparse command line parser for cloudlinux-xray-manager """ import argparse import logging from xray.internal.utils import read_sys_id from xray.internal.constants import advice_action_sources def get_system_id(): try: return read_sys_id() except Exception as e: logging.error('Unable to get system_id, error: %s', str(e)) return '' def cmd_parser_manager() -> 'argparse.ArgumentParser': """ Command line parser for X Ray Manager using built-in argparse module :return: parser """ parser = argparse.ArgumentParser(prog='cloudlinux-xray-manager', description='Utility to manage X-Ray Client') subparsers = parser.add_subparsers(title='Commands', dest='command', required=True) start_parser = subparsers.add_parser('start', help='Start monitoring of URL by IP for COUNT of minutes or requests', argument_default=argparse.SUPPRESS) start_parser.add_argument('--system_id', metavar='SYS_ID', required=True, help='unique ID of CL+ system') start_parser.add_argument('--url', metavar='URL', required=True, help='monitoring URL') start_parser.add_argument('--client_ip', metavar='IP', default='*', help='client IP to monitor requests from') counters = start_parser.add_mutually_exclusive_group() counters.add_argument('--time', metavar='COUNT', type=int, help='number of minutes to monitor') counters.add_argument('--request_qty', metavar='COUNT', type=int, help='number of requests to monitor') stop_parser = subparsers.add_parser('stop', help='Stop monitoring of given TASK') stop_parser.add_argument('--system_id', metavar='SYS_ID', required=True, help='unique ID of CL+ system') stop_parser.add_argument('--tracing_task_id', metavar='TASK', required=True, help='unique ID of tracing task') continue_parser = subparsers.add_parser('continue', help='Continue monitoring of given TASK') continue_parser.add_argument('--system_id', metavar='SYS_ID', required=True, help='unique ID of CL+ system') continue_parser.add_argument('--tracing_task_id', metavar='TASK', required=True, help='unique ID of tracing task') complete_parser = subparsers.add_parser('complete', help='Complete given TASK') complete_parser.add_argument('--system_id', metavar='SYS_ID', required=True, help='unique ID of CL+ system') complete_parser.add_argument('--tracing_task_id', metavar='TASK', required=True, help='unique ID of tracing task') delete_parser = subparsers.add_parser('delete', help='Delete given TASK') delete_parser.add_argument('--system_id', metavar='SYS_ID', required=True, help='unique ID of CL+ system') delete_parser.add_argument('--tracing_task_id', metavar='TASK', required=True, help='unique ID of tracing task') enable_parser = subparsers.add_parser('enable-continuous', help='Schedule a URL for continuous monitoring') enable_parser.add_argument('--system_id', metavar='SYS_ID', required=True, help='unique ID of CL+ system') enable_parser.add_argument('--url', metavar='URL', required=True, help='monitoring URL') enable_parser.add_argument('--email', metavar='EMAIL', required=True, help='e-mail address for reporting') disable_parser = subparsers.add_parser('disable-continuous', help='Schedule to stop continuous monitoring for URL') disable_parser.add_argument('--system_id', metavar='SYS_ID', required=True, help='unique ID of CL+ system') disable_parser.add_argument('--url', metavar='URL', required=True, help='monitoring URL') con_start_parser = subparsers.add_parser('start-continuous', help='Include URL into continuous monitoring') con_start_parser.add_argument('--system_id', metavar='SYS_ID', required=True, help='unique ID of CL+ system') con_start_parser.add_argument('--url', metavar='URL', required=True, help='monitoring URL') con_stop_parser = subparsers.add_parser('stop-continuous', help='Exclude URL from continuous monitoring') con_stop_parser.add_argument('--system_id', metavar='SYS_ID', required=True, help='unique ID of CL+ system') con_stop_parser.add_argument('--url', metavar='URL', required=True, help='monitoring URL') list_parser = subparsers.add_parser('continuous-tracing-list', help='Get list of scheduled continuous monitoring tasks') list_parser.add_argument('--system_id', metavar='SYS_ID', required=True, help='unique ID of CL+ system') task_list_parser = subparsers.add_parser('tasks-list', help='Get list of tasks') task_list_parser.add_argument('--system_id', metavar='SYS_ID', required=True, help='unique ID of CL+ system') req_parser = subparsers.add_parser('requests-list', help='Get list of requests for given task') req_parser.add_argument('--system_id', metavar='SYS_ID', required=True, help='unique ID of CL+ system') req_parser.add_argument('--tracing_task_id', metavar='TASK', required=True, help='unique ID of tracing task') req_data_parser = subparsers.add_parser('request-data', help='Get statistics for given request') req_data_parser.add_argument('--system_id', metavar='SYS_ID', required=True, help='unique ID of CL+ system') req_data_parser.add_argument('--tracing_task_id', metavar='TASK', required=True, help='unique ID of tracing task') req_data_parser.add_argument('--request_id', metavar='REQUEST', required=True, type=int, help='unique ID of request') enable_user_agent_parser = subparsers.add_parser('enable-user-agent', help='Enable X-Ray User Agent') enable_user_agent_parser.add_argument('--system_id', metavar='SYS_ID', default=get_system_id(), help='unique ID of CL+ system') disable_user_agent_parser = subparsers.add_parser('disable-user-agent', help='Disable X-Ray User Agent') disable_user_agent_parser.add_argument('--system_id', metavar='SYS_ID', default=get_system_id(), help='unique ID of CL+ system') status_user_agent_parser = subparsers.add_parser('user-agent-status', help='Get status of X-Ray User Agent') status_user_agent_parser.add_argument('--system_id', metavar='SYS_ID', required=True, help='unique ID of CL+ system') autocomplete_parser = subparsers.add_parser('autocomplete-tasks', help='Completes all tasks run more than expected time') autocomplete_parser.add_argument('--system_id', metavar='SYS_ID', default=get_system_id(), help='unique ID of CL+ system') advanced_metrics_parser = subparsers.add_parser('advanced-metrics', help='Advanced metrics tool') advanced_metrics_parser.add_argument('--system_id', metavar='SYS_ID', default=get_system_id(), help='unique ID of CL+ system') advanced_metrics_parser_group = advanced_metrics_parser.add_mutually_exclusive_group() advanced_metrics_parser_group.add_argument('--enable', action='store_true', required=False, help='enable advanced metrics') advanced_metrics_parser_group.add_argument('--disable', action='store_true', required=False, help='disable advanced metrics') advanced_metrics_parser_group.add_argument('--status', action='store_true', required=False, help='get status (none|enabled|disabled)') enable_serverwide_mode_parser = subparsers.add_parser('enable-serverwide-mode') enable_serverwide_mode_parser.add_argument('--system_id', metavar='SYS_ID', default=get_system_id(), help='unique ID of CL+ system') disable_serverwide_mode_parser = subparsers.add_parser('disable-serverwide-mode') disable_serverwide_mode_parser.add_argument('--system_id', metavar='SYS_ID', default=get_system_id(), help='unique ID of CL+ system') return parser def cmd_parser_user_manager() -> 'argparse.ArgumentParser': """ Command line parser for X Ray Manager for users using built-in argparse module :return: parser """ parser = argparse.ArgumentParser(prog='cloudlinux-xray-user-manager.py', description='Utility to manage X-Ray Client for end-users') subparsers = parser.add_subparsers(title='Commands', dest='command', required=True) start_parser = subparsers.add_parser('start', help='Start monitoring of URL by IP for COUNT of minutes or requests', argument_default=argparse.SUPPRESS) start_parser.add_argument('--url', metavar='URL', required=True, help='monitoring URL') start_parser.add_argument('--client_ip', metavar='IP', default='*', help='client IP to monitor requests from') counters = start_parser.add_mutually_exclusive_group() counters.add_argument('--time', metavar='COUNT', type=int, help='number of minutes to monitor') counters.add_argument('--request_qty', metavar='COUNT', type=int, help='number of requests to monitor') stop_parser = subparsers.add_parser('stop', help='Stop monitoring of given TASK') stop_parser.add_argument('--tracing_task_id', metavar='TASK', required=True, help='unique ID of tracing task') continue_parser = subparsers.add_parser('continue', help='Continue monitoring of given TASK') continue_parser.add_argument('--tracing_task_id', metavar='TASK', required=True, help='unique ID of tracing task') complete_parser = subparsers.add_parser('complete', help='Complete given TASK') complete_parser.add_argument('--tracing_task_id', metavar='TASK', required=True, help='unique ID of tracing task') delete_parser = subparsers.add_parser('delete', help='Delete given TASK') delete_parser.add_argument('--tracing_task_id', metavar='TASK', required=True, help='unique ID of tracing task') subparsers.add_parser('tasks-list', help='Get list of tasks') req_parser = subparsers.add_parser('requests-list', help='Get list of requests for given task') req_parser.add_argument('--tracing_task_id', metavar='TASK', required=True, help='unique ID of tracing task') req_data_parser = subparsers.add_parser('request-data', help='Get statistics for given request') req_data_parser.add_argument('--tracing_task_id', metavar='TASK', required=True, help='unique ID of tracing task') req_data_parser.add_argument('--request_id', metavar='REQUEST', required=True, type=int, help='unique ID of request') subparsers.add_parser('user-agent-status', help='Get status of X-Ray User Agent') return parser def cmd_parser_adviser() -> 'argparse.ArgumentParser': """ Command line parser for X Ray Smart Advice utility using built-in argparse module :return: parser """ parser = argparse.ArgumentParser(prog='cl-smart-advice', description='Utility for X-Ray Smart Advice microservice interaction') parser.add_argument('--api-version', help='Specify version of CLI, if not passed - latest will be used by default', required=False, default=None) subparsers = parser.add_subparsers(title='Commands', dest='command', required=True) list_parser = subparsers.add_parser('list', help='Get list of advice') list_parser.add_argument('--extends', default=False, action='store_true', help='Advice will be with extended info') detail_parser = subparsers.add_parser('details', help='Get details for a particular advice') detail_parser.add_argument('--advice_id', required=True, help='an ID of advice to get details for') apply_parser = subparsers.add_parser('apply', help='Apply a particular advice') apply_parser.add_argument('--advice_id', required=True, help='an ID of advice to apply') apply_parser.add_argument('--ignore-errors', action="store_true", default=False, help='apply advice anyway') apply_parser.add_argument('--async', action="store_true", dest='async_mode', default=False, help='run apply in a background process') apply_parser.add_argument('--source', default='ACCELERATE_WP', choices=advice_action_sources, help='Advice apply source') apply_parser.add_argument('--accept_license_terms', default=False, action='store_true', help=argparse.SUPPRESS) apply_parser.add_argument('--analytics_data', type=str, help='User analytics data') rollback_parser = subparsers.add_parser('rollback', help='Rollback a particular advice') rollback_parser.add_argument('--advice_id', required=True, help='an ID of advice to rollback') rollback_parser.add_argument('--async', action="store_true", dest='async_mode', default=False, help='run rollback in a background process') rollback_parser.add_argument('--source', default='ACCELERATE_WP', choices=advice_action_sources, help='Advice rollback source') rollback_parser.add_argument('--reason', type=str, help='Reason for advice rollback') rollback_parser.add_argument('--analytics_data', type=str, help='User analytics data') subparsers.add_parser('counters', help='Get advice counters total/applied') status_parser = subparsers.add_parser('status', help='Get current status and progress for a particular advice') status_parser.add_argument('--advice_id', required=True, help='an ID of advice to get status for') sites_parser = subparsers.add_parser('sites-status', help='Get websites statuses') sites_parser.add_argument('--username', help='an username to get statuses info') get_options_parser = subparsers.add_parser('get-options', help='Get options for user') get_options_parser.add_argument('--username', help='Username to get options') advice_plugin_install = subparsers.add_parser( 'wordpress-plugin-install', help='Installs/uninstalls cl-smart-advice plugin for websites according to current advice list') advice_plugin_uninstall = subparsers.add_parser( 'wordpress-plugin-uninstall', help='Uninstalls cl-smart-advice plugin for websites') subparsers.add_parser('update-advices-metadata', help='Sends actual metadata to microservice') get_pullzone = subparsers.add_parser('awp-cdn-get-pullzone', help='Get pullzone for account`s website') get_pullzone.add_argument('--username', help='Name of target user') get_pullzone.add_argument('--account_id', help='Unique account identifier saved in user`s home directory') get_pullzone.add_argument('--domain', help='Account`s domain', required=True) get_pullzone.add_argument('--website', help='Website', required=True) remove_pullzone = subparsers.add_parser('awp-cdn-remove-pullzone', help='Remove pullzone for account`s website') remove_pullzone.add_argument('--username', help='Name of target user') remove_pullzone.add_argument('--account_id', help='Unique account identifier saved in user`s home directory') remove_pullzone.add_argument('--domain', help='Account`s domain', required=True) remove_pullzone.add_argument('--website', help='Website', required=True) purge_cdn_cache = subparsers.add_parser('awp-cdn-purge', help='Purge cache for account`s website') purge_cdn_cache.add_argument('--username', help='Name of target user') purge_cdn_cache.add_argument('--account_id', help='Unique account identifier saved in user`s home directory') purge_cdn_cache.add_argument('--domain', help='Account`s domain', required=True) purge_cdn_cache.add_argument('--website', help='Website', required=True) purge_cdn_cache = subparsers.add_parser('awp-sync', help='[for internal usage] Sync account information on microservice') purge_cdn_cache.add_argument('--username', help='Name of target user') purge_cdn_cache.add_argument('--account_id', help='Unique account identifier saved in user`s home directory. ' 'Multiple could be passed split by ","') get_cdn_usage = subparsers.add_parser('get-cdn-usage', help='Get cdn usage for account') get_cdn_usage.add_argument('--account_id', help='Unique account identifier saved in user`s home directory') get_cdn_usage.add_argument('--username', help='Name of target user') subscription_parser = subparsers.add_parser('subscription', help='Subscription configuration') subscription_parser.add_argument('--advice_id', required=True, help='an ID of advice to manage subscription for') subscription_parser.add_argument('--listen', required=True, action='store_true') agreement_parser = subparsers.add_parser('agreement', help='License agreement commands') agreement_parser.add_argument('--text', required=True, help='name of the feature to get license text for') report_analytics = subparsers.add_parser('report-analytics', help='Sends analytics to Smart Advice') report_analytics.add_argument('--feature', help='Feature to be tracked by analytics', required=True) report_analytics.add_argument('--event', help='Event to be tracked by analytics', required=True) report_analytics.add_argument('--source', help='Where event was triggered', required=True) report_analytics.add_argument('--username', help='Users affected by event, pass multiple via "," ', required=True) report_analytics.add_argument('--advice_id', help='ID of advice to manage events'), report_analytics.add_argument('--user_hash', help='Hash of user for event') report_analytics.add_argument('--journey_id', help='ID event journey') report_analytics.add_argument('--variant_id', help='ID event variant_id') wp_plugin_data = subparsers.add_parser('wp-plugin-data', help='Get latest WordPress plugin version data') wp_plugin_data.add_argument('--plugin_name', required=True, help='Name of the plugin') wp_get_plugin = subparsers.add_parser('wp-plugin-copy', help='Copy the latest WordPress plugin archive to the site if it\'s not already installed') wp_get_plugin.add_argument('--plugin_name', required=True, help='Name of the plugin') wp_get_plugin.add_argument('--plugin_version', required=True, help='Current plugin version') wp_get_plugin.add_argument('--tmp_dir', required=True, help='Temp directory path') return parser def parse_cmd_arguments( parser: 'argparse.ArgumentParser') -> 'argparse.Namespace': """ Parse arguments with given parser :param parser: an instance of ArgumentParser :return: arguments Namespace """ try: args = parser.parse_args() return args except SystemExit as e: logging.error('Invalid utility invocation', extra={'err': str(e)}) raise