관리-도구
편집 파일: provision.py
"""NOTE: copied/adapted from SQLAlchemy master for backwards compatibility; this should be removable when Alembic targets SQLAlchemy 1.0.0 """ from sqlalchemy.engine import url as sa_url from sqlalchemy import text from ..util import compat from . import config, engines from .compat import get_url_backend_name FOLLOWER_IDENT = None class register(object): def __init__(self): self.fns = {} @classmethod def init(cls, fn): return register().for_db("*")(fn) def for_db(self, dbname): def decorate(fn): self.fns[dbname] = fn return self return decorate def __call__(self, cfg, *arg): if isinstance(cfg, compat.string_types): url = sa_url.make_url(cfg) elif isinstance(cfg, sa_url.URL): url = cfg else: url = cfg.db.url backend = get_url_backend_name(url) if backend in self.fns: return self.fns[backend](cfg, *arg) else: return self.fns['*'](cfg, *arg) def create_follower_db(follower_ident): for cfg in _configs_for_db_operation(): _create_db(cfg, cfg.db, follower_ident) def configure_follower(follower_ident): for cfg in config.Config.all_configs(): _configure_follower(cfg, follower_ident) def setup_config(db_url, db_opts, options, file_config, follower_ident): if follower_ident: db_url = _follower_url_from_main(db_url, follower_ident) eng = engines.testing_engine(db_url, db_opts) eng.connect().close() cfg = config.Config.register(eng, db_opts, options, file_config) if follower_ident: _configure_follower(cfg, follower_ident) return cfg def drop_follower_db(follower_ident): for cfg in _configs_for_db_operation(): _drop_db(cfg, cfg.db, follower_ident) def _configs_for_db_operation(): hosts = set() for cfg in config.Config.all_configs(): cfg.db.dispose() for cfg in config.Config.all_configs(): url = cfg.db.url backend = get_url_backend_name(url) host_conf = ( backend, url.username, url.host, url.database) if host_conf not in hosts: yield cfg hosts.add(host_conf) for cfg in config.Config.all_configs(): cfg.db.dispose() @register.init def _create_db(cfg, eng, ident): raise NotImplementedError("no DB creation routine for cfg: %s" % eng.url) @register.init def _drop_db(cfg, eng, ident): raise NotImplementedError("no DB drop routine for cfg: %s" % eng.url) @register.init def _configure_follower(cfg, ident): pass @register.init def _follower_url_from_main(url, ident): url = sa_url.make_url(url) url.database = ident return url @_follower_url_from_main.for_db("sqlite") def _sqlite_follower_url_from_main(url, ident): url = sa_url.make_url(url) if not url.database or url.database == ':memory:': return url else: return sa_url.make_url("sqlite:///%s.db" % ident) @_create_db.for_db("postgresql") def _pg_create_db(cfg, eng, ident): with eng.connect().execution_options( isolation_level="AUTOCOMMIT") as conn: try: _pg_drop_db(cfg, conn, ident) except: pass currentdb = conn.scalar("select current_database()") conn.execute("CREATE DATABASE %s TEMPLATE %s" % (ident, currentdb)) @_create_db.for_db("mysql") def _mysql_create_db(cfg, eng, ident): with eng.connect() as conn: try: _mysql_drop_db(cfg, conn, ident) except: pass conn.execute("CREATE DATABASE %s" % ident) conn.execute("CREATE DATABASE %s_test_schema" % ident) conn.execute("CREATE DATABASE %s_test_schema_2" % ident) @_configure_follower.for_db("mysql") def _mysql_configure_follower(config, ident): config.test_schema = "%s_test_schema" % ident config.test_schema_2 = "%s_test_schema_2" % ident @_create_db.for_db("sqlite") def _sqlite_create_db(cfg, eng, ident): pass @_drop_db.for_db("postgresql") def _pg_drop_db(cfg, eng, ident): with eng.connect().execution_options( isolation_level="AUTOCOMMIT") as conn: conn.execute( text( "select pg_terminate_backend(pid) from pg_stat_activity " "where usename=current_user and pid != pg_backend_pid() " "and datname=:dname" ), dname=ident) conn.execute("DROP DATABASE %s" % ident) @_drop_db.for_db("sqlite") def _sqlite_drop_db(cfg, eng, ident): pass #os.remove("%s.db" % ident) @_drop_db.for_db("mysql") def _mysql_drop_db(cfg, eng, ident): with eng.connect() as conn: try: conn.execute("DROP DATABASE %s_test_schema" % ident) except: pass try: conn.execute("DROP DATABASE %s_test_schema_2" % ident) except: pass try: conn.execute("DROP DATABASE %s" % ident) except: pass