python类database_exists()的实例源码

conftest.py 文件源码 项目:flask-boilerplate 作者: MarcFord 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def app(request):
    test_app = ApplicationFactory.create_application('testing')
    test_app.app_context().push()
    if database_exists(db.engine.url):
        drop_database(db.engine.url)
    create_database(db.engine.url)
    db.create_all()

    def teardown():
        db.session.expunge_all()
        db.session.remove()
        drop_database(db.engine.url)
        db.engine.dispose()

    request.addfinalizer(teardown)
    return test_app
dbmgr.py 文件源码 项目:opp 作者: openpassphrase 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def init(config):
    db_connect = config.conf['db_connect']
    if not db_connect:
        sys.exit("Error: database connection string not "
                 "found in any of the configuration files")
    try:
        engine = create_engine(db_connect)
    except exc.NoSuchModuleError as e:
        sys.exit("Error: %s" % str(e))
    try:
        if not database_exists(engine.url):
            printv(config, "Creating database: 'openpassphrase'")
            create_database(engine.url)
    except exc.OperationalError as e:
        sys.exit("Error: %s" % str(e))
    printv(config, "Creating tables based on models")
    models.Base.metadata.create_all(engine)
test_dbimport.py 文件源码 项目:GenomicsSampleAPIs 作者: Intel-HLS 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def setUpClass(self):
        self.DBURI = "postgresql+psycopg2://@:5432/dbimport"

        if database_exists(self.DBURI):
            drop_database(self.DBURI)

        create_database(self.DBURI)

        engine = create_engine(self.DBURI)
        models.bind_engine(engine)

        self.references = {"1": 249250621, "2": 243199373, "3": 198022430}
        self.array = "test"

        # referenceset, workspace, and individual registration previously
        # tested
        with dbimport.DBImport(self.DBURI).getSession() as session:
            self.referenceset = session.registerReferenceSet(
                str(uuid.uuid4()), "testAssembly", references=self.references)
            self.workspace = session.registerWorkspace(
                str(uuid.uuid4()), "/test/dbimport/workspace")
            self.individual = session.registerIndividual(
                str(uuid.uuid4()), name="testIndividual")
test_maf2tile.py 文件源码 项目:GenomicsSampleAPIs 作者: Intel-HLS 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def setUpClass(self):
        self.TESTDB_URI = "postgresql+psycopg2://@:5432/mafend2end"
        if not database_exists(self.TESTDB_URI):
            create_database(self.TESTDB_URI)

        engine = create_engine(self.TESTDB_URI)
        models.bind_engine(engine)
        self.config_path = os.path.join(os.path.realpath(
            sys.argv[-1]), "utils/example_configs/icgc_config.json")
        with open(self.config_path, 'r') as readFP:
            config_json = json.load(readFP)
            config_json["TileDBConfig"] = os.path.join(os.path.realpath(
                sys.argv[-1]), "utils/example_configs/tiledb_config.json")
            config_json["TileDBAssembly"] = os.path.join(
                os.path.realpath(sys.argv[-1]), "utils/example_configs/hg19.json")
            config_json["VariantSetMap"]["VariantConfig"] = os.path.join(
                os.path.realpath(sys.argv[-1]), "utils/example_configs/icgc_variants.json")
        with open(self.config_path, 'w') as writeFP:
            writeFP.write(json.dumps(config_json))

        config = ConfigReader(self.config_path)
        config.DB_URI = self.TESTDB_URI
        imp.helper.registerWithMetadb(config)
test_vcf2tile.py 文件源码 项目:GenomicsSampleAPIs 作者: Intel-HLS 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def setUpClass(self):
        self.DBURI = "postgresql+psycopg2://@:5432/vcfimport"

        if database_exists(self.DBURI):
            drop_database(self.DBURI)

        create_database(self.DBURI)

        engine = create_engine(self.DBURI)
        models.bind_engine(engine)

        self.assembly = "testAssembly"

        # test files
        with open("test/data/header.vcf", "r") as f:
            self.header = f.read()

        # create the base config
        self.config_path = os.path.abspath(
            "utils/example_configs/vcf_import.config")
        with open(self.config_path, 'r') as readFP:
            self.config = json.load(readFP)
            self.config["dburi"] = self.DBURI
database.py 文件源码 项目:DCheat 作者: DCheat 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def init(db_url, db_log_flag = False, recycle_time = 3600):
        # ?? ?? ???
        DBManager.__engine = create_engine(db_url,
                                           pool_recycle = recycle_time,
                                           echo = db_log_flag)
        # DATABASE Not exist case
        if not database_exists(DBManager.__engine.url):
            # Creatre Database
            create_database(DBManager.__engine.url)

            DBManager.__engine = create_engine(db_url,
                                               pool_recycle = recycle_time,
                                               echo = db_log_flag)

        DBManager.__session = scoped_session(sessionmaker(autocommit = False, 
                                                          autoflush = False, 
                                                          bind = DBManager.__engine))

                # ?? ??? ??
        global dao
        dao = DBManager.__session
data.py 文件源码 项目:bayesianpy 作者: morganics 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def write(self, if_exists:str=None):

        def _create_database():
            return self._engine.dialect.dbapi.create_database(
                user=self._username, password=self._password, host=self._server, database=self.uuid,
                                                       page_size=self._pagesize
                                                       )

        from sqlalchemy_utils import database_exists
        from sqlalchemy import exc as dbexceptions
        try:
            if not database_exists(self._engine.url):
                a = _create_database()
        except dbexceptions.DatabaseError:
            a = _create_database()

        super().write(if_exists=if_exists)
cli.py 文件源码 项目:athena 作者: slint 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def init():
    """Create database."""
    click.gecho(f'Creating database {_db.engine.url}')
    if not database_exists(str(_db.engine.url)):
        create_database(str(_db.engine.url))
test.py 文件源码 项目:jawaf 作者: danpozmanter 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def handle(self, **options):
        """Create test databases for all db connections."""
        # Create test db
        for key in settings.DATABASES:
            test_db = settings.DATABASES[key]
            test_db['database'] = 'test_' + test_db['database']
            engine = get_engine(key)
            if not database_exists(engine.url):
                create_database(engine.url)
        create_tables(settings.INSTALLED_APPS, warn=False)
        pytest.main(options['unknown'])
orm.py 文件源码 项目:flask-boilerplate 作者: MarcFord 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def create():
    """
    Create database and all tables
    """
    if not database_exists(db.engine.url):
        app.logger.debug('Creating a fresh database to work with')
        create_database(db.engine.url)
        alembic_stamp()
        db.create_all()
    app.logger.debug('Database already exists, please drop the database and try again!')
orm.py 文件源码 项目:flask-boilerplate 作者: MarcFord 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def drop():
    """
    Drop the database if it exists
    :return:
    """
    app.logger.debug('Dropping the database!')
    if database_exists(db.engine.url):
        drop_database(db.engine.url)

    app.logger.error('Database does not exists!')
createdb.py 文件源码 项目:crm 作者: Incubaid 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def createdb():
    """
    Create DB    
    """
    if not database_exists(app.config['SQLALCHEMY_DATABASE_URI']):
        create_database(app.config['SQLALCHEMY_DATABASE_URI'])
    print("DB created.")
conftest.py 文件源码 项目:gold-digger 作者: business-factory 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def db_connection(db_connection_string):
    """
    Create one test database for all database tests.
    """
    engine = create_engine(db_connection_string)
    if not database_exists(engine.url):
        create_database(engine.url)
    connection = engine.connect()

    yield connection
    connection.close()
    engine.dispose()
    drop_database(engine.url)
database.py 文件源码 项目:ooni-measurements 作者: TheTorProject 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def init_db(app):
    app.db_engine = create_engine(
        app.config['DATABASE_URL'], convert_unicode=True
    )
    if not database_exists(app.db_engine.url):
        create_database(app.db_engine.url)
    app.db_session = scoped_session(
        sessionmaker(autocommit=False, autoflush=False, bind=app.db_engine)
    )
    Base.query = app.db_session.query_property()
    init_query_logging(app)
dbmgr.py 文件源码 项目:opp 作者: openpassphrase 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def restore(config):  # pragma: no cover
    db_connect = config.conf['db_connect']
    if not db_connect:
        sys.exit("Error: database connection string not "
                 "found in any of the configuration files")
    try:
        engine = create_engine(db_connect)
    except exc.NoSuchModuleError as e:
        sys.exit("Error: %s" % str(e))

    if database_exists(engine.url):
        sys.exit("Error: database already exists, will not overwrite!")

    try:
        create_database(engine.url)
    except exc.OperationalError as e:
        sys.exit("Error: %s" % str(e))
    models.Base.metadata.create_all(engine)

    conn = engine.connect()

    print("Restoring database from backup file: opp.db.tz")
    code, out, err = utils.execute("tar zxf opp.db.tz", propagate=False)
    if code != 0:
        sys.exit("Error extracting database archive file: %s" % err)

    for table in models.Base.metadata.sorted_tables:
        with open("%s.pickle" % table, "rb") as f:
            while True:
                try:
                    ins = table.insert().values(load(f))
                    conn.execute(ins)
                except EOFError:
                    break

    files = ["%s.pickle" % x.name for x in models.Base.metadata.sorted_tables]
    code, out, err = utils.execute("rm %s" % " ".join(files))
    if code != 0:
        print("Unable to remove .pickle files: %s" % err)
test_dbimport.py 文件源码 项目:GenomicsSampleAPIs 作者: Intel-HLS 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def setUpClass(self):
        self.DBURI = "postgresql+psycopg2://@:5432/dbimport"

        if database_exists(self.DBURI):
            drop_database(self.DBURI)

        create_database(self.DBURI)

        engine = create_engine(self.DBURI)
        models.bind_engine(engine)

        self.assembly = "testAssembly"
        self.workspace = "/test/dbimport/workspace"
test_dbimport.py 文件源码 项目:GenomicsSampleAPIs 作者: Intel-HLS 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def setUpClass(self):
        self.DBURI = "postgresql+psycopg2://@:5432/dbimport"

        if database_exists(self.DBURI):
            drop_database(self.DBURI)

        create_database(self.DBURI)

        engine = create_engine(self.DBURI)
        models.bind_engine(engine)

        # all these function have been previously tested
        with dbimport.DBImport(self.DBURI).getSession() as session:

            self.referenceset = session.registerReferenceSet(
                str(uuid.uuid4()), "testAssembly")
            self.workspace = session.registerWorkspace(
                str(uuid.uuid4()), "/test/dbimport/workspace")
            self.array = session.registerDBArray(
                str(uuid.uuid4()), self.referenceset.id, self.workspace.id, "test")
            self.array2 = session.registerDBArray(
                str(uuid.uuid4()), self.referenceset.id, self.workspace.id, "test2")

            self.variantset = session.registerVariantSet(
                str(uuid.uuid4()), self.referenceset.id, "Dataset")
            self.variantset2 = session.registerVariantSet(
                str(uuid.uuid4()), self.referenceset.id, "Dataset2")
            self.variantset3 = session.registerVariantSet(
                str(uuid.uuid4()), self.referenceset.id, "Dataset3")
            self.variantset4 = session.registerVariantSet(
                str(uuid.uuid4()), self.referenceset.id, "Dataset4")

            self.individual = session.registerIndividual(
                str(uuid.uuid4()), name="testIndividual")
            self.source = session.registerSample(
                str(uuid.uuid4()), self.individual.guid, name="source")
            self.target = session.registerSample(
                str(uuid.uuid4()), self.individual.guid, name="target")
dbhelpers.py 文件源码 项目:zou 作者: cgwire 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def create_all():
    from zou.app import db
    engine = create_engine(get_db_uri())
    if not database_exists(engine.url):
        create_database(engine.url)
    db.create_all()
admin.py 文件源码 项目:FeatureHub 作者: HDI-Project 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, problem=None, database="featurehub"):
        """Create the ORMManager and connect to DB.

        If problem name is given, load it.

        Parameters
        ----------
        problem : str, optional (default=None)
            Name of problem
        database : str, optional (default="featurehub")
            Name of database within DBMS.
        """

        self.__orm = ORMManager(database)

        if not database_exists(self.__orm.engine.url):
            print("database {} does not seem to exist.".format(database))
            print("You might want to create it by calling set_up method")
        elif  problem:
            try:
                with self.__orm.session_scope() as session:
                    problem = session.query(Problem)\
                                     .filter(Problem.name == problem).one()
                    self.__problemid = problem.id
            except NoResultFound:
                print("WARNING: Problem {} does not exist!".format(problem))
                print("You might want to create it by calling create_problem"
                      " method")
admin.py 文件源码 项目:FeatureHub 作者: HDI-Project 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def set_up(self, drop=False):
        """Create a new DB and create the initial scheme.

        If the database exists and drop=True, the existing database is dropped
        and recreated. Regardless, any tables defined by the schema that do not
        exist are created.

        Parameters
        ----------
        drop : bool, optional (default=False)
            Drop database if it already exists.
        """

        # todo extract database name from engine url and report for brevity
        engine = self.__orm.engine
        if database_exists(engine.url):
            print("Database {} already exists.".format(engine.url))
            if drop:
                print("Dropping old database {}".format(engine.url))
                drop_database(engine.url)
                with possibly_talking_action("Re-creating database..."):
                    create_database(engine.url)
        else:
            with possibly_talking_action("Creating database..."):
                create_database(engine.url)

        with possibly_talking_action("Creating tables..."):
            Base.metadata.create_all(engine)

        print("Database {} created successfully".format(engine.url))
ghost.py 文件源码 项目:ghost 作者: nir0s 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def init(self):
        if self._local_path:
            dirname = os.path.dirname(self._local_path)
            if dirname and not os.path.isdir(dirname):
                os.makedirs(dirname)

        if not database_exists(self.db.url):
            create_database(self.db.url)
        # More on connection strings for sqlalchemy:
        # http://docs.sqlalchemy.org/en/latest/core/engines.html
        self.metadata.bind = self.db
        self.metadata.create_all()
ghost.py 文件源码 项目:ghost 作者: nir0s 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def is_initialized(self):
        return database_exists(self.db.url)
database.py 文件源码 项目:flusk 作者: dimmg 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def init_db():
    """
    Create database if doesn't exist and
    create all tables.
    """
    if not database_exists(engine.url):
        create_database(engine.url)
    BaseModel.metadata.create_all(bind=engine)
test_database.py 文件源码 项目:deb-python-sqlalchemy-utils 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_create_and_drop(self, dsn):
        assert not database_exists(dsn)
        create_database(dsn)
        assert database_exists(dsn)
        drop_database(dsn)
        assert not database_exists(dsn)
test_database.py 文件源码 项目:deb-python-sqlalchemy-utils 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_exists_memory(self, dsn):
        assert database_exists(dsn)
test_database.py 文件源码 项目:deb-python-sqlalchemy-utils 作者: openstack 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_exists_memory_none_database(self, sqlite_none_database_dsn):
        assert database_exists(sqlite_none_database_dsn)
data.py 文件源码 项目:bayesianpy 作者: morganics 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def write(self, if_exists:str=None):
        jp.java.lang.Class.forName("com.mysql.jdbc.Driver",
                                   True, jp.java.lang.ClassLoader.getSystemClassLoader())
        from sqlalchemy_utils import database_exists, create_database
        if not database_exists(self._engine.url):
             create_database(self._engine.url)

        super().write(if_exists=if_exists)
table_creator.py 文件源码 项目:2017-DB-Team-Project1 作者: bethesirius 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def init_db():
    if sqlalchemy_utils.database_exists(engine.url):
        sqlalchemy_utils.drop_database(engine.url)
    sqlalchemy_utils.create_database(engine.url)
    print("DB ??? ??")
table_creator.py 文件源码 项目:2017-DB-Team-Project1 作者: bethesirius 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def init_db():
    if sqlalchemy_utils.database_exists(engine.url):
        sqlalchemy_utils.drop_database(engine.url)
    sqlalchemy_utils.create_database(engine.url)
    print("DB ??? ??")
postgresql.py 文件源码 项目:selinon 作者: selinon 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def connect(self):  # noqa
        if not database_exists(self.engine.url):
            create_database(self.engine.url)

        self.session = sessionmaker(bind=self.engine)()
        _Base.metadata.create_all(self.engine)


问题


面经


文章

微信
公众号

扫码关注公众号