OwlCyberSecurity - MANAGER
Edit File: watcher.py
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # https://cloudlinux.com/docs/LICENCE.TXT # import logging import os import re import subprocess import time from clcommon.lib.cledition import ( CLEditions, CLEditionDetectionError ) from lve_utils import PKG_VERSION PKG_VERSION_TINY = re.sub(r'\.el\w(h?)\.', '.elX.', PKG_VERSION) # https://cl.sentry.cloudlinux.com/settings/cloudlinux_os/projects/userland SENTRY_DSN = 'https://9713d1296f804031b058b8f2d789d7ac:' \ '8ddacae32d8246cf8b25cf826bf3fc0a@cl.sentry.cloudlinux.com/12' LAST_KNOWN_EDITION_CACHE_PATH = '/var/lve/.edition.previous' SYSTEMID_FILE_PATH = '/etc/sysconfig/rhn/systemid' # how much time should past after registration # before we assume that server is "old" _SECONDS_IN_DAY = 24 * 60 * 60 SYSTEMID_NEW_TIME = 3 * _SECONDS_IN_DAY def _reconfigure_server(current_edition, is_pre_check): """ Run reconfiguration script """ subprocess.run([ '/usr/sbin/cloudlinux-customizer', 'preconfigure' if is_pre_check else 'reconfigure', '-t', current_edition], check=True, text=True, stderr=subprocess.STDOUT ) # _on_* handlers below are different for the future additional actions # that will be added with edition switch support def _on_edition_changed(previous_edition, current_edition, is_pre_check): """ Called when saved edition does not match current one. """ _reconfigure_server(current_edition, is_pre_check) def _on_server_newly_registered(current_edition, is_pre_check): """ Called when saved server id detected as one registered recently """ _reconfigure_server(current_edition, is_pre_check) def _get_registration_time(): """ Assume that last registration time is the date of systemid modification. """ try: return os.path.getmtime(SYSTEMID_FILE_PATH) except FileNotFoundError: return None def _get_last_check_time(): """ Assume that last check time is equal to the date of last cache modification. """ try: return os.path.getmtime(LAST_KNOWN_EDITION_CACHE_PATH) except FileNotFoundError: return None def check(new_edition: str, is_pre_check=False): """ Compares current edition with latest saved in file and @param new_edition: provides information about new edition that server is using Depending on is_pre_check this edition can either be already applied or just planning to be installed @param is_pre_check: True means that current check is executed BEFORE new jwt is actially saved in jwt.token file False means that registration is complete and jwt.token file was updated with new edition token """ try: with open(LAST_KNOWN_EDITION_CACHE_PATH) as f: last_edition = f.read() except FileNotFoundError: last_edition = None registration_time = _get_registration_time() is_registration_fresh = \ registration_time is not None \ and registration_time >= (time.time() - SYSTEMID_NEW_TIME) last_check_time = _get_last_check_time() is_last_check_before_registration = \ (last_check_time is None) or ( registration_time is not None and last_check_time < registration_time) if is_last_check_before_registration and is_registration_fresh: _on_server_newly_registered(new_edition, is_pre_check) elif last_edition and last_edition != new_edition: _on_edition_changed(last_edition, new_edition, is_pre_check) # keep old edition in file until post hook executes if not is_pre_check: save_edition(edition=new_edition) def save_edition(edition): """ Stores current edition value in cache file """ try: with open(LAST_KNOWN_EDITION_CACHE_PATH, 'w') as f: f.write(edition) except CLEditionDetectionError as e: logging.warning('Unable to detect current edition, error=%s', e)