관리-도구
편집 파일: plugin_executors.py
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import cProfile import logging import multiprocessing import os import pstats import signal import sys import threading import time import traceback from sqlalchemy.exc import ( NoReferenceError, NoSuchColumnError, NoSuchTableError, SQLAlchemyError, ) from lvestats.core.plugin import LveStatsPluginTerminated from lvestats.core.plugin_context import PluginContext from lvestats.lib.config import ConfigError __author__ = 'shaman' class PluginExecutionException(Exception): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) class DbRecoveredException(Exception): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) class DbRecoverFailedException(Exception): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) class PluginTimeoutException(Exception): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) class PluginExecutor(object): initial_lve_data = {'stats': {}} def __init__(self, config): self.config = config self.log = logging.getLogger('main_loop.plugin_executor') from lvestats.lib.commons.func import ( # pylint: disable=import-outside-toplevel get_lve_version, ) PluginExecutor.initial_lve_data = {'stats': {}, 'LVE_VERSION': get_lve_version()} def execute(self, plugin_class, now, timeout): """ Executes given plugin with provided lve_data. Does not modify lve_data. Returns updated lve_data or throws PluginExecutionException or PluginTimeoutException :param plugin_class: object: :return: """ raise NotImplementedError() def terminate(self): raise NotImplementedError() KILL_PROCESS_TIMEOUT = 20 class SameProcessPluginExecutor(PluginExecutor): def __init__(self, config): super().__init__(config) self.plugin_context = PluginContext(config) self.last_execution_time = {} self.lve_data = PluginExecutor.initial_lve_data.copy() def execute(self, plugin_class, now, timeout): log = logging.getLogger('main_loop.plugin_executor.same_process') plugin_instance = self.plugin_context.get_instance(plugin_class) plugin_instance.now = now time_passed = None period = getattr(plugin_instance, 'period', getattr(plugin_class, 'period', None)) if period: time_passed = now - self.last_execution_time.get(plugin_class, 0) if period is None or period <= time_passed: try: plugin_instance.execute(self.lve_data) self.last_execution_time[plugin_class] = now return except (NoSuchColumnError, NoSuchTableError, NoReferenceError) as ex: log.exception('Database error during executing plugin that leads to need of db recreation db') try: self.plugin_context.recover_db() except BaseException as exception_during_recover: raise DbRecoverFailedException(exception_during_recover) from exception_during_recover raise DbRecoveredException() from ex except SQLAlchemyError as ex: log.exception('Database error during executing plugin that may indicate the need to recreate db. ' 'Please check it manually.') raise PluginExecutionException(ex) from ex except BaseException as ex: raise PluginExecutionException(ex) from ex def terminate(self): pass class ProcessContext(object): def __init__(self): self.mgr = multiprocessing.Manager() self.ns = self.mgr.Namespace() self.ns.last_execution_time = {} self.plugin_ready_event = self.mgr.Event() self.plugin_execution_finished_event = self.mgr.Event() self.plugin_execution_success_event = self.mgr.Event() self.db_error_event = self.mgr.Event() self.have_to_exit = self.mgr.Event() self.exited = self.mgr.Event() def terminate(self): self.mgr.shutdown() class SeparateProcessPluginExecutor(PluginExecutor): def __init__(self, config, profiling_log=None): super().__init__(config) self.profiling_log = profiling_log self.process_context = None self.process = None @staticmethod def _in_separate_process(config, process_context, profiler=None, profiling_log=None): os.setpgrp() signal.signal(signal.SIGTERM, SeparateProcessPluginExecutor._child_process_sigterm_handler) signal.signal(signal.SIGUSR1, SeparateProcessPluginExecutor._sigusr1_handler) log = logging.getLogger('main_loop.plugin_executor.process') try: plugin_context = PluginContext(config) lve_data = PluginExecutor.initial_lve_data.copy() while True: while not process_context.plugin_ready_event.wait(timeout=1): if process_context.have_to_exit.is_set(): process_context.plugin_execution_finished_event.set() if profiler: with open(profiling_log, 'a+', encoding='utf-8') as f: f.write('Plugin process profile, internal time:\n') stats = pstats.Stats(profiler, stream=f) stats.sort_stats('time').print_stats(20) f.write('Plugin process profile, cumulative time:\n') stats.sort_stats('cumulative').print_stats(20) process_context.exited.set() return process_context.plugin_ready_event.clear() plugin_class = process_context.ns.plugin_class now = process_context.ns.now try: plugin_instance = plugin_context.get_instance(plugin_class) except ConfigError as ce: log.exception(str(ce)) continue except Exception as ex: log.exception("Error during instantiating plugin") process_context.ns.exception = ex process_context.plugin_execution_finished_event.set() continue plugin_instance.now = now period = getattr(plugin_instance, 'period', getattr(plugin_class, 'period', None)) if period is not None: period = float(period) time_passed = None last_execution_time = process_context.ns.last_execution_time if period is not None: time_passed = now - last_execution_time.get(plugin_class, 0) if period is None or period <= time_passed: try: signal.signal(signal.SIGUSR2, SeparateProcessPluginExecutor._stop_plugin) log.debug("Executing plugin %s", plugin_class) t0 = time.time() plugin_instance.execute(lve_data) log.debug("Executing plugin %s took %f sec", plugin_class, time.time() - t0) last_execution_time[plugin_class] = now process_context.ns.last_execution_time = last_execution_time process_context.plugin_execution_success_event.set() process_context.plugin_execution_finished_event.set() signal.signal(signal.SIGUSR2, signal.SIG_IGN) except (NoSuchColumnError, NoSuchTableError, NoReferenceError) as ex: log.exception('Database error during executing plugin that leads to need of db recreation db') process_context.ns.exception = ex process_context.db_error_event.set() process_context.plugin_execution_finished_event.set() continue except SQLAlchemyError as ex: log.exception('Database error during executing %s plugin that may indicate the need to ' 'recreate db. ' 'Please check it manually.', plugin_class) process_context.ns.exception = ex process_context.plugin_execution_finished_event.set() continue except LveStatsPluginTerminated: log.info("Plugin %s was terminated.", plugin_class) sys.exit(0) except IOError as ioe: log.exception("IO Error: %s", str(ioe)) continue except Exception as ex: log.exception("Other exception during execution of plugin %s", plugin_class) process_context.ns.exception = ex process_context.plugin_execution_finished_event.set() continue else: log.debug("plugin %s will be launched in %f sec", plugin_class, period - time_passed) process_context.plugin_execution_success_event.set() process_context.plugin_execution_finished_event.set() except Exception as ex: try: process_context.ns.exception = ex except IOError as e: # Hide "IOError: [Errno 32] Broken pipe" on KeyboardInterrupt # Probably there is a better way to do this if e.errno == 32 and isinstance(ex, EOFError): return raise e # don't hide any other error log.exception("Exception during execution in separate process") process_context.plugin_execution_finished_event.set() @staticmethod def _do_profile_in_separate_process(config, process_context, profiling_log): profiler = cProfile.Profile() profiler.runcall(SeparateProcessPluginExecutor._in_separate_process, config, process_context, profiler, profiling_log) def _make_sub_process(self, ): self.process_context = ProcessContext() target = SeparateProcessPluginExecutor._do_profile_in_separate_process if self.profiling_log else \ SeparateProcessPluginExecutor._in_separate_process self.process = multiprocessing.Process(target=target, name='plugin_process', args=(self.config, self.process_context, self.profiling_log)) self.process.start() @staticmethod def _stop_plugin(signum, frame): log = logging.getLogger('plugin_sigusr2_handler') log.info('Shutting down plugin') raise LveStatsPluginTerminated() @staticmethod def _child_process_sigterm_handler(signum, frame): log = logging.getLogger('subprocess_sigterm_handler') log.info('Shutting down child process') sys.exit(0) def _kill_subprocess(self, pid): if not pid: return os.kill(pid, signal.SIGKILL) self.log.debug("subprocess killed") def _terminate_sub_process(self): if not self.process or not self.process_context: return self.log.debug('Terminating subprocess') self.process_context.have_to_exit.set() if self.process_context.exited.wait(KILL_PROCESS_TIMEOUT): self.process = None self.process_context.terminate() self.process_context = None return self.log.debug('Killing subprocess') pid = self.process.pid try: self.process.terminate() self.process.join(KILL_PROCESS_TIMEOUT) if self.process.is_alive(): self._kill_subprocess(pid) except Exception as ex: self.log.exception("Wasn't able to kill subprocess because of the error: %s", str(ex)) self._kill_subprocess(pid) finally: self.process = None self.process_context.terminate() self.process_context = None @staticmethod def _sigusr1_handler(signum, frame): log = logging.getLogger('stack tracer') log.info("--- Threads stack traces, while plugin is considered stuck ---") id2name = dict([(th.ident, th.name) for th in threading.enumerate()]) code = [] for thread_id, stack in list(sys._current_frames().items()): # pylint: disable=protected-access code.append(f"\n# Thread: {id2name.get(thread_id, '')}({thread_id})") for filename, lineno, name, line in traceback.extract_stack(stack): code.append(f'File: "{filename}", line {lineno}, in {name}') if line: code.append(f" {line.strip()}") log.info("\n".join(code)) def terminate(self): self._terminate_sub_process() def _restart_sub_process(self): self._terminate_sub_process() self._make_sub_process() def _get_process_and_context(self): if not self.process or not self.process_context: self._make_sub_process() return self.process, self.process_context def execute(self, plugin_class, now, timeout): if self.profiling_log: timeout = None _, context = self._get_process_and_context() t0 = time.time() context.ns.plugin_class = plugin_class context.ns.now = now context.plugin_execution_finished_event.clear() context.plugin_execution_success_event.clear() context.plugin_ready_event.set() self.log.debug('Executor (main process) data exchange with plugin took %f sec', time.time() - t0) context.plugin_execution_finished_event.wait(timeout) db_corrupted = context.db_error_event.is_set() if db_corrupted: self._terminate_sub_process() try: recreator = PluginContext(self.config) recreator.recover_db() except BaseException as exception_during_recover: raise DbRecoverFailedException(exception_during_recover) from exception_during_recover self._make_sub_process() raise DbRecoveredException() in_time = context.plugin_execution_finished_event.is_set() if not in_time: os.kill(self.process.pid, signal.SIGUSR1) time.sleep(timeout / 2.0) self._restart_sub_process() raise PluginTimeoutException() success = context.plugin_execution_success_event.is_set() if not success: ex = context.ns.exception self._terminate_sub_process() if ex: raise PluginExecutionException(ex) else: raise PluginExecutionException() __all__ = ['PluginExecutor', 'SameProcessPluginExecutor', 'SeparateProcessPluginExecutor', 'PluginExecutionException', 'PluginTimeoutException', 'DbRecoveredException', 'DbRecoverFailedException']