__init__.py 文件源码

python
阅读 18 收藏 0 点赞 0 评论 0

项目:Hawkeye 作者: tozhengxq 项目源码 文件源码
def construct_engine(engine, **opts):
    """.. versionadded:: 0.5.4

    Constructs and returns SQLAlchemy engine.

    Currently, there are 2 ways to pass create_engine options to :mod:`migrate.versioning.api` functions:

    :param engine: connection string or a existing engine
    :param engine_dict: python dictionary of options to pass to `create_engine`
    :param engine_arg_*: keyword parameters to pass to `create_engine` (evaluated with :func:`migrate.versioning.util.guess_obj_type`)
    :type engine_dict: dict
    :type engine: string or Engine instance
    :type engine_arg_*: string
    :returns: SQLAlchemy Engine

    .. note::

        keyword parameters override ``engine_dict`` values.

    """
    if isinstance(engine, Engine):
        return engine
    elif not isinstance(engine, six.string_types):
        raise ValueError("you need to pass either an existing engine or a database uri")

    # get options for create_engine
    if opts.get('engine_dict') and isinstance(opts['engine_dict'], dict):
        kwargs = opts['engine_dict']
    else:
        kwargs = dict()

    # DEPRECATED: handle echo the old way
    echo = asbool(opts.get('echo', False))
    if echo:
        warnings.warn('echo=True parameter is deprecated, pass '
            'engine_arg_echo=True or engine_dict={"echo": True}',
            exceptions.MigrateDeprecationWarning)
        kwargs['echo'] = echo

    # parse keyword arguments
    for key, value in six.iteritems(opts):
        if key.startswith('engine_arg_'):
            kwargs[key[11:]] = guess_obj_type(value)

    log.debug('Constructing engine')
    # TODO: return create_engine(engine, poolclass=StaticPool, **kwargs)
    # seems like 0.5.x branch does not work with engine.dispose and staticpool
    return create_engine(engine, **kwargs)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号