관리-도구
편집 파일: 128_move_cleanup_storage_files.py
import logging import os import shutil from peewee import CharField, Model from defence360agent.utils import importer from defence360agent.model.simplification import FilenameField CleanupStorage = importer.get( module="imav.malwarelib.cleanup.storage", name="CleanupStorage", default=None, ) logger = logging.getLogger(__name__) def get_model(db): """ Model stub for migration because we can't use migrator.orm[] due to custom field FilenameField """ class MalwareHit(Model): class Meta: db_table = "malware_hits" database = db user = CharField(null=False) orig_file = FilenameField(null=False) hash = CharField(null=True) size = CharField(null=True) @property def storage_name(self) -> str: """ Get file name for cleanup storage :return: file name """ try: return os.path.extsep.join([self.user, self.hash, self.size]) except TypeError: return None return MalwareHit def _move(src, dst): src, dst = map(CleanupStorage.path.joinpath, (src, dst)) src, dst = map(str, (src, dst)) try: shutil.move(src, dst) except FileNotFoundError: pass except Exception as err: logger.error("Failed to move stored file to the new location: %r", err) def migrate(migrator, database, fake=False, **kwargs): if fake: return MalwareHit = get_model(database) for hit in MalwareHit: src = hit.storage_name if src is None: continue dst = CleanupStorage.storage_name(hit.orig_file) _move(src, dst) def rollback(migrator, database, fake=False, **kwargs): pass