python类create_engine()的实例源码

utilities.py 文件源码 项目:nucypher-kms 作者: nucypher 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def make_ursulas(how_many_ursulas: int, ursula_starting_port: int) -> list:
    """
    :param how_many_ursulas: How many Ursulas to create.
    :param ursula_starting_port: The port of the first created Ursula; subsequent Ursulas will increment the port number by 1.
    :return: A list of created Ursulas
    """
    event_loop = asyncio.get_event_loop()

    URSULAS = []
    for _u in range(how_many_ursulas):
        engine = create_engine('sqlite:///:memory:')
        Base.metadata.create_all(engine)
        ursulas_keystore = keystore.KeyStore(engine)
        _URSULA = Ursula(urulsas_keystore=ursulas_keystore)
        _URSULA.attach_server()
        _URSULA.listen(ursula_starting_port + _u, "127.0.0.1")

        URSULAS.append(_URSULA)

    for _counter, ursula in enumerate(URSULAS):
        event_loop.run_until_complete(
            ursula.server.bootstrap([("127.0.0.1", ursula_starting_port + _c) for _c in range(how_many_ursulas)]))
        ursula.publish_interface_information()

    return URSULAS
db.py 文件源码 项目:PeekabooAV 作者: scVENUS 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, db_url):
        """
        Initialize the Peekaboo database handler.

        :param db_url: An RFC 1738 URL that points to the database.
        """
        self.__engine = create_engine(db_url)
        self.__db_con = None
        session_factory = sessionmaker(bind=self.__engine)
        self.__Session = scoped_session(session_factory)
        self.__lock = threading.RLock()
        try:
            self.__db_con = self.__engine.connect()
        except SQLAlchemyError as e:
            raise PeekabooDatabaseError(
                'Unable to connect to the database: %s' % e
            )
        if not self.__db_con.dialect.has_table(self.__engine, '_meta'):
            self._init_db()
            logger.debug('Database schema created.')
        else:
            self.clear_in_progress()
__init__.py 文件源码 项目:stream2segment 作者: rizac 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_session(dbpath, scoped=False):  # , enable_fk_if_sqlite=True):
    """
    Create an sql alchemy session for IO db operations
    :param dbpath: the path to the database, e.g. sqlite:///path_to_my_dbase.sqlite
    :param scoped: boolean (False by default) if the session must be scoped session
    """
    # init the session:
    engine = create_engine(dbpath)
    Base.metadata.create_all(engine)  # @UndefinedVariable

    # enable fkeys if sqlite. This can be added also as event listener as outlined here:
    # http://stackoverflow.com/questions/13712381/how-to-turn-on-pragma-foreign-keys-on-in-sqlalchemy-migration-script-or-conf
    # NOT implemented YET. See models.py

    if not scoped:
        # create a configured "Session" class
        session = sessionmaker(bind=engine)
        # create a Session
        return session()
    # return session
    else:
        session_factory = sessionmaker(bind=engine)
        return scoped_session(session_factory)
wordnet.py 文件源码 项目:trf 作者: aistairc 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, db_path: str, lang: str):

        engine = create_engine('sqlite:///{}'.format(db_path))
        SessionMaker = sessionmaker(bind=engine)
        self.session = SessionMaker()
        self.lang = lang
proxy.py 文件源码 项目:annotated-py-sqlalchemy 作者: hhstore 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def connect(self, uri, opts=None, **kwargs):
        """Establish connection to a real engine.
        """
        key = "%s(%s,%s)" % (uri, repr(opts), repr(kwargs))
        try:
            map = self.storage.connection
        except AttributeError:
            self.storage.connection = {}
            self.storage.engine = None
            map = self.storage.connection
        try:
            self.engine = map[key]
        except KeyError:
            map[key] = create_engine(uri, opts, **kwargs)
            self.storage.engine = map[key]
test_state_table_generators.py 文件源码 项目:triage 作者: dssg 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_empty_dense_state_table():
    """An empty dense (input) state table produces a useful error."""
    with testing.postgresql.Postgresql() as postgresql:
        engine = create_engine(postgresql.url())
        utils.create_dense_state_table(engine, 'states', ())  # no data
        table_generator = StateTableGenerator(
            engine,
            'exp_hash',
            dense_state_table='states'
        )

        with pytest.raises(ValueError):
            table_generator.generate_sparse_table([datetime(2016, 1, 1)])

        engine.dispose()
test_state_table_generators.py 文件源码 项目:triage 作者: dssg 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_empty_sparse_state_table():
    """An empty sparse (generated) state table eagerly produces an
    error.

    (Rather than allowing execution to proceed.)

    """
    with testing.postgresql.Postgresql() as postgresql:
        engine = create_engine(postgresql.url())
        utils.create_dense_state_table(engine, 'states', (
            (5, 'permitted', datetime(2016, 1, 1), datetime(2016, 6, 1)),
            (6, 'permitted', datetime(2016, 2, 5), datetime(2016, 5, 5)),
            (1, 'injail', datetime(2014, 7, 7), datetime(2014, 7, 15)),
            (1, 'injail', datetime(2016, 3, 7), datetime(2016, 4, 2)),
        ))
        table_generator = StateTableGenerator(
            engine,
            'exp_hash',
            dense_state_table='states'
        )

        with pytest.raises(ValueError):
            # Request time outside of available intervals
            table_generator.generate_sparse_table([datetime(2015, 12, 31)])

        (state_count,) = engine.execute('''\
            select count(*) from {generator.sparse_table_name}
        '''.format(generator=table_generator)
        ).first()

        assert state_count == 0

        engine.dispose()
test_sqlalchemy_athena.py 文件源码 项目:PyAthena 作者: laughingman7743 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def create_engine(self):
        conn_str = 'awsathena+rest://athena.{region_name}.amazonaws.com:443/' + \
                   '{schema_name}?s3_staging_dir={s3_staging_dir}'
        return create_engine(conn_str.format(region_name=ENV.region_name,
                                             schema_name=SCHEMA,
                                             s3_staging_dir=quote_plus(ENV.s3_staging_dir)))
test_sqlalchemy_athena.py 文件源码 项目:PyAthenaJDBC 作者: laughingman7743 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def create_engine(self):
        conn_str = 'awsathena+jdbc://athena.{region_name}.amazonaws.com:443/' + \
                   '{schema_name}?s3_staging_dir={s3_staging_dir}'
        return create_engine(conn_str.format(region_name=ENV.region_name,
                                             schema_name=SCHEMA,
                                             s3_staging_dir=quote_plus(ENV.s3_staging_dir)))
base.py 文件源码 项目:marcotti-mls 作者: soccermetrics 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __init__(self, config):
        logger.info("Marcotti-MLS v{0}: Python {1} on {2}".format(__version__, sys.version, sys.platform))
        logger.info("Opened connection to {0}".format(self._public_db_uri(config.database_uri)))
        self.engine = create_engine(config.database_uri)
        self.connection = self.engine.connect()
        self.start_year = config.START_YEAR
        self.end_year = config.END_YEAR
test_codegen.py 文件源码 项目:sqlacodegen 作者: agronholm 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def setup(self):
        self.metadata = MetaData(create_engine('sqlite:///'))
main.py 文件源码 项目:sqlacodegen 作者: agronholm 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def main():
    parser = argparse.ArgumentParser(description='Generates SQLAlchemy model code from an existing database.')
    parser.add_argument('url', nargs='?', help='SQLAlchemy url to the database')
    parser.add_argument('--version', action='store_true', help="print the version number and exit")
    parser.add_argument('--schema', help='load tables from an alternate schema')
    parser.add_argument('--tables', help='tables to process (comma-separated, default: all)')
    parser.add_argument('--noviews', action='store_true', help="ignore views")
    parser.add_argument('--noindexes', action='store_true', help='ignore indexes')
    parser.add_argument('--noconstraints', action='store_true', help='ignore constraints')
    parser.add_argument('--nojoined', action='store_true', help="don't autodetect joined table inheritance")
    parser.add_argument('--noinflect', action='store_true', help="don't try to convert tables names to singular form")
    parser.add_argument('--noclasses', action='store_true', help="don't generate classes, only tables")
    parser.add_argument('--outfile', help='file to write output to (default: stdout)')
    args = parser.parse_args()

    if args.version:
        version = pkg_resources.get_distribution('sqlacodegen').parsed_version
        print(version.public)
        return
    if not args.url:
        print('You must supply a url\n', file=sys.stderr)
        parser.print_help()
        return

    engine = create_engine(args.url)
    metadata = MetaData(engine)
    tables = args.tables.split(',') if args.tables else None
    metadata.reflect(engine, args.schema, not args.noviews, tables)
    outfile = codecs.open(args.outfile, 'w', encoding='utf-8') if args.outfile else sys.stdout
    generator = CodeGenerator(metadata, args.noindexes, args.noconstraints, args.nojoined, args.noinflect,
                              args.noclasses)
    generator.render(outfile)
testing.py 文件源码 项目:lymph-sqlalchemy 作者: deliveryhero 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def init_db(cls):
        engine = create_engine(cls.url())
        cls.create_db(engine)
        store = cls.create_store()
        store.create_all()
        engine.dispose()
testing.py 文件源码 项目:lymph-sqlalchemy 作者: deliveryhero 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def drop_db(cls):
        engine = create_engine(cls.url())
        drop_database(engine.url)
        engine.dispose()
testing.py 文件源码 项目:lymph-sqlalchemy 作者: deliveryhero 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def setUpClass(cls):
        cls.create = False
        cls.engine = create_engine(cls.url())
        if not database_exists(cls.engine.url):
            cls.create_db(cls.engine)
            cls.create = True
        cls.connection = cls.engine.connect()
        cls.transaction = cls.connection.begin()
        cls.store = cls.create_store(cls.connection)
        if cls.create:
            cls.store.create_all()
        super(StoreTestCase, cls).setUpClass()
conftest.py 文件源码 项目:dodotable 作者: spoqa 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def fx_session():
    engine = create_engine(TEST_DATABASE_URL)
    try:
        metadata = Base.metadata
        metadata.drop_all(bind=engine)
        metadata.create_all(bind=engine)
        session = Session(bind=engine)
        yield session
        session.rollback()
        metadata.drop_all(bind=engine)
    finally:
        engine.dispose()
presto.py 文件源码 项目:missioncontrol 作者: mozilla 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_engine():
    return create_engine(settings.PRESTO_URL)
resource.py 文件源码 项目:falcon-sqlalchemy 作者: vixcess 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, sqlurl_or_engine: Union[str, Engine], **kwargs):
        super().__init__()
        if isinstance(sqlurl_or_engine, str):
            self._sqlurl = sqlurl_or_engine
            self._engine = create_engine(sqlurl_or_engine)
        elif isinstance(sqlurl_or_engine, Engine):
            self._engine = sqlurl_or_engine
            self._sqlurl = self._engine.url
        self._session_maker = AutoCloseSessionMaker(bind=self._engine, **kwargs)
persist.py 文件源码 项目:lang2program 作者: kelvinguu 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def sqlalchemy_metadata(host, port, database, username, password):
    url = URL(drivername='postgresql+psycopg2', username=username,
              password=password, host=host, port=port, database=database)
    engine = create_engine(url, server_side_cursors=True, connect_args={'connect_timeout': 4})
    # ensure that we can connect
    with engine.begin():
        pass  # this will throw OperationalError if it fails
    return MetaData(engine)
persist.py 文件源码 项目:lang2program 作者: kelvinguu 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def sqlalchemy_metadata(host, port, database, username, password):
    url = URL(drivername='postgresql+psycopg2', username=username,
              password=password, host=host, port=port, database=database)
    engine = create_engine(url, server_side_cursors=True, connect_args={'connect_timeout': 4})
    # ensure that we can connect
    with engine.begin():
        pass  # this will throw OperationalError if it fails
    return MetaData(engine)
test_querying.py 文件源码 项目:asyncpgsa 作者: CanopyTax 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def metadata():
    metadata = MetaData()
    metadata.bind = create_engine(URL)
    return metadata
test_jsonify.py 文件源码 项目:deb-python-pecan 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def setUp(self):
        super(TestJsonifySQLAlchemyGenericEncoder, self).setUp()
        if not create_engine:
            self.create_fake_proxies()
        else:
            self.create_sa_proxies()
test_jsonify.py 文件源码 项目:deb-python-pecan 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def create_sa_proxies(self):

        # create the table and mapper
        metadata = schema.MetaData()
        user_table = schema.Table(
            'user',
            metadata,
            schema.Column('id', types.Integer, primary_key=True),
            schema.Column('first_name', types.Unicode(25)),
            schema.Column('last_name', types.Unicode(25))
        )

        class User(object):
            pass
        orm.mapper(User, user_table)

        # create the session
        engine = create_engine('sqlite:///:memory:')
        metadata.bind = engine
        metadata.create_all()
        session = orm.sessionmaker(bind=engine)()

        # add some dummy data
        user_table.insert().execute([
            {'first_name': 'Jonathan', 'last_name': 'LaCour'},
            {'first_name': 'Yoann', 'last_name': 'Roman'}
        ])

        # get the SA objects
        self.sa_object = session.query(User).first()
        select = user_table.select()
        self.result_proxy = select.execute()
        self.row_proxy = select.execute().fetchone()
test_sqlalchemy_bigquery.py 文件源码 项目:pybigquery 作者: mxmzdlv 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def engine():
    engine = create_engine('bigquery://', echo=True)
    return engine
sql.py 文件源码 项目:kotori 作者: daq-tools 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def startDatabase(self):
        self.engine = create_engine(

            # sqlite in-memory
            #"sqlite://", reactor=reactor, strategy=TWISTED_STRATEGY

            # sqlite on filesystem
            "sqlite:////tmp/kotori.sqlite", reactor=reactor, strategy=TWISTED_STRATEGY

            # mysql... todo
        )

        # Create the table
        yield self.engine.execute(CreateTable(self.telemetry))
        #yield self.engine
test_state_table_generators.py 文件源码 项目:triage 作者: dssg 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_sparse_state_table_generator():
    input_data = [
        (5, 'permitted', datetime(2016, 1, 1), datetime(2016, 6, 1)),
        (6, 'permitted', datetime(2016, 2, 5), datetime(2016, 5, 5)),
        (1, 'injail', datetime(2014, 7, 7), datetime(2014, 7, 15)),
        (1, 'injail', datetime(2016, 3, 7), datetime(2016, 4, 2)),
    ]

    with testing.postgresql.Postgresql() as postgresql:
        engine = create_engine(postgresql.url())
        utils.create_dense_state_table(engine, 'states', input_data)

        table_generator = StateTableGenerator(
            engine,
            'exp_hash',
            dense_state_table='states'
        )
        as_of_dates = [
            datetime(2016, 1, 1),
            datetime(2016, 2, 1),
            datetime(2016, 3, 1),
            datetime(2016, 4, 1),
            datetime(2016, 5, 1),
            datetime(2016, 6, 1),
        ]
        table_generator.generate_sparse_table(as_of_dates)
        results = [row for row in engine.execute(
            'select entity_id, as_of_date, injail, permitted from {} order by entity_id, as_of_date'.format(
                table_generator.sparse_table_name
            ))]
        expected_output = [
            # entity_id, as_of_date, injail, permitted
            (1, datetime(2016, 4, 1), True, False),
            (5, datetime(2016, 1, 1), False, True),
            (5, datetime(2016, 2, 1), False, True),
            (5, datetime(2016, 3, 1), False, True),
            (5, datetime(2016, 4, 1), False, True),
            (5, datetime(2016, 5, 1), False, True),
            (6, datetime(2016, 3, 1), False, True),
            (6, datetime(2016, 4, 1), False, True),
            (6, datetime(2016, 5, 1), False, True),
        ]
        assert results == expected_output
        utils.assert_index(engine, table_generator.sparse_table_name, 'entity_id')
        utils.assert_index(engine, table_generator.sparse_table_name, 'as_of_date')
assertions.py 文件源码 项目:pyetje 作者: rorlika 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def assert_compile(self, clause, result, params=None,
                        checkparams=None, dialect=None,
                        checkpositional=None,
                        use_default_dialect=False,
                        allow_dialect_select=False):
        if use_default_dialect:
            dialect = default.DefaultDialect()
        elif dialect == None and not allow_dialect_select:
            dialect = getattr(self, '__dialect__', None)
            if dialect == 'default':
                dialect = default.DefaultDialect()
            elif dialect is None:
                dialect = config.db.dialect
            elif isinstance(dialect, basestring):
                dialect = create_engine("%s://" % dialect).dialect

        kw = {}
        if params is not None:
            kw['column_keys'] = params.keys()

        if isinstance(clause, orm.Query):
            context = clause._compile_context()
            context.statement.use_labels = True
            clause = context.statement

        c = clause.compile(dialect=dialect, **kw)

        param_str = repr(getattr(c, 'params', {}))

        if util.py3k:
            param_str = param_str.encode('utf-8').decode('ascii', 'ignore')
            print(("\nSQL String:\n" + util.text_type(c) + param_str).encode('utf-8'))
        else:
            print(("\nSQL String:\n" + util.text_type(c).encode('utf-8') + param_str))

        cc = re.sub(r'[\n\t]', '', util.text_type(c))

        eq_(cc, result, "%r != %r on dialect %r" % (cc, result, dialect))

        if checkparams is not None:
            eq_(c.construct_params(params), checkparams)
        if checkpositional is not None:
            p = c.construct_params(params)
            eq_(tuple([p[x] for x in c.positiontup]), checkpositional)
test_database.py 文件源码 项目:antares 作者: CONABIO 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_session(self):
        if self.session is None:
            klass = sessionmaker(bind=create_engine(self.database))
            self.session = klass()
        return self.session
test_database.py 文件源码 项目:antares 作者: CONABIO 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def persist_bundle(self):
        from madmex.persistence.driver import persist_bundle
        from sqlalchemy import create_engine
        from sqlalchemy.orm.session import sessionmaker
        from madmex.util import remove_file
        dummy = DummyBundle()
        persist_bundle(dummy)


        my_database = getattr(SETTINGS, 'ANTARES_TEST_DATABASE')
        klass = sessionmaker(bind=create_engine(my_database))
        session = klass()
        query = 'SELECT count(*) FROM product WHERE uuid=\'%s\';' % dummy.uuid_id
        print query
        try:
            result_set = session.execute(query)
            for row in result_set:
                self.assertGreater(row['count'], 0)
            # Delete object from database.
            session.delete(dummy.get_database_object())
            session.commit()
            for file_name in dummy.get_files():
                full_path = os.path.join(dummy.get_output_directory(), os.path.basename(file_name))
                self.assertTrue(os.path.isfile(full_path))
                # Remove file from filesystem.
                remove_file(full_path)
        except:
            session.rollback()
            raise
        finally:
            session.close()
test_database.py 文件源码 项目:antares 作者: CONABIO 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def persist_bundle_sensor(self):
        from madmex.persistence.driver import persist_bundle
        folder = '/LUSTRE/MADMEX/staging/madmex_antares/test_ingest/556_297_041114_dim_img_spot'
        from sqlalchemy import create_engine
        from sqlalchemy.orm.session import sessionmaker
        from madmex.mapper.bundle.spot5 import Bundle
        #from madmex.configuration import SETTINGS

        dummy = Bundle(folder)
        #dummy.target = '/LUSTRE/MADMEX/staging/'
        target_url = getattr(SETTINGS, 'TEST_FOLDER')
        print target_url
        #TODO please fix me, horrible hack
        dummy.target = target_url
        persist_bundle(dummy)
        my_database = getattr(SETTINGS, 'ANTARES_TEST_DATABASE')
        klass = sessionmaker(bind=create_engine(my_database))
        session = klass()
        query = 'SELECT count(*) FROM product WHERE uuid=\'%s\';' % dummy.uuid_id

        try:
            result_set = session.execute(query)
            for row in result_set:
                self.assertGreater(row['count'], 0)
            session.delete(dummy.get_database_object())
            session.commit()
            for file_name in dummy.get_files():
                full_path = os.path.join(target_url, os.path.basename(file_name))
                self.assertTrue(os.path.isfile(full_path))
                os.remove(full_path)
        except:
            session.rollback()
            raise
        finally:
            session.close()


问题


面经


文章

微信
公众号

扫码关注公众号