python类Session()的实例源码

test_biweeklypayperiod.py 文件源码 项目:biweeklybudget 作者: jantman 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def setup(self):
        self.mock_sess = Mock(spec_set=Session)
        self.cls = BiweeklyPayPeriod(date(2017, 3, 17), self.mock_sess)
test_biweeklypayperiod.py 文件源码 项目:biweeklybudget 作者: jantman 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def setup(self):
        self.mock_sess = Mock(spec_set=Session)
        self.cls = BiweeklyPayPeriod(date(2017, 3, 17), self.mock_sess)
test_biweeklypayperiod.py 文件源码 项目:biweeklybudget 作者: jantman 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def setup(self):
        self.mock_sess = Mock(spec_set=Session)
        self.cls = BiweeklyPayPeriod(date(2017, 3, 7), self.mock_sess)
test_biweeklypayperiod.py 文件源码 项目:biweeklybudget 作者: jantman 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def setup(self):
        self.mock_sess = Mock(spec_set=Session)
        self.cls = BiweeklyPayPeriod(date(2017, 3, 17), self.mock_sess)
test_biweeklypayperiod.py 文件源码 项目:biweeklybudget 作者: jantman 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def setup(self):
        self.mock_sess = Mock(spec_set=Session)
        self.cls = BiweeklyPayPeriod(date(2017, 3, 17), self.mock_sess)
usermixin.py 文件源码 项目:websauna 作者: websauna 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def check_empty_site_init(self, dbsession:Session, user:UserMixin):
        """Call after user creation to see if this user is the first user and should get initial admin rights."""

        assert user.id, "Please flush your db"

        # Try to reflect related group class based on User model
        i = inspection.inspect(user.__class__)
        Group = i.relationships["groups"].mapper.entity

        # If we already have groups admin group must be there
        if dbsession.query(Group).count() > 0:
            return

        self.init_empty_site(dbsession, user)
fixtures.py 文件源码 项目:websauna 作者: websauna 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def create_test_dbsession(request, registry: Registry, transaction_manager=transaction.manager) -> Session:
    """Create a test database session and setup database.

    Create and drop all tables when called. Add teardown function py.test to drop all tables during teardown.
    Also add implicit UUID extension on the database, so we don't need to add by hand every time.

    :param request: py.test test request
    :param settings: test.ini app settings
    :param transaction_manager:
    :return: New database session
    """
    from websauna.system.model.meta import Base

    dbsession = create_dbsession(registry, manager=transaction_manager)
    engine = dbsession.get_bind()

    connection = engine.connect()

    # Support native PSQL UUID types
    if engine.dialect.name == "postgresql":
        connection.execute('create extension if not exists "uuid-ossp";')

    with transaction.manager:
        Base.metadata.drop_all(engine)
        Base.metadata.create_all(engine)

    def teardown():
        # There might be open transactions in the database. They will block DROP ALL and thus the tests would end up in a deadlock. Thus, we clean up all connections we know about.
        # XXX: Fix this shit

        with transaction.manager:
            Base.metadata.drop_all(engine)

        dbsession.close()

    request.addfinalizer(teardown)

    return dbsession
test_addon.py 文件源码 项目:mccurse 作者: khardix 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_mod_search(filled_database):
    """Does the search return expected results?"""

    EXPECT_IDS = {42, 45}

    session = SQLSession(bind=filled_database.engine)
    selected = addon.Mod.search(session, 'Tested')

    assert {int(m.id) for m in selected} == EXPECT_IDS
test_addon.py 文件源码 项目:mccurse 作者: khardix 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_mod_find(filled_database):
    """Does the search find the correct mod or report correct error?"""

    session = SQLSession(bind=filled_database.engine)

    assert addon.Mod.find(session, 'Tested').id == 42

    with pytest.raises(addon.NoResultFound):
        addon.Mod.find(session, 'nonsense')
test_addon.py 文件源码 项目:mccurse 作者: khardix 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_mod_with_id(filled_database):
    """Does the with_id find the correct mod?"""

    session = SQLSession(bind=filled_database.engine)

    assert addon.Mod.with_id(session, 42).name == 'tested'
    assert addon.Mod.with_id(session, 45).name == 'tester'

    with pytest.raises(addon.NoResultFound):
        addon.Mod.with_id(session, 44)


# Release tests
curse.py 文件源码 项目:mccurse 作者: khardix 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def session(self) -> SQLSession:
        """Create new session for batch database communication."""

        return SQLSession(bind=self.engine)
__init__.py 文件源码 项目:papersummarize 作者: mrdrozdov 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def receive_after_insert(mapper, connection, target):
    """ listen for the 'after_insert' event """
    @event.listens_for(Session, "before_commit", once=True)
    def receive_before_commit(session):
        paper_rating = PaperRating(paper_id=target.id)
        session.add(paper_rating)
__init__.py 文件源码 项目:papersummarize 作者: mrdrozdov 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def receive_after_insert(mapper, connection, target):
    """ listen for the 'after_insert' event """
    @event.listens_for(Session, "before_commit", once=True)
    def receive_before_commit(session):
        paper_rating = session.query(PaperRating).filter_by(paper_id=target.paper_id).first()
        user_paper_ratings = session.query(UserPaperRating).filter_by(paper_id=target.paper_id).all()
        update_value(paper_rating, user_paper_ratings)
__init__.py 文件源码 项目:papersummarize 作者: mrdrozdov 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def receive_after_update(mapper, connection, target):
    """ listen for the 'after_update' event """
    @event.listens_for(Session, "before_commit", once=True)
    def receive_before_commit(session):
        paper_rating = session.query(PaperRating).filter_by(paper_id=target.paper_id).first()
        user_paper_ratings = session.query(UserPaperRating).filter_by(paper_id=target.paper_id).all()
        update_value(paper_rating, user_paper_ratings)
__init__.py 文件源码 项目:papersummarize 作者: mrdrozdov 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def receive_after_delete(mapper, connection, target):
    """ listen for the 'after_delete' event """
    @event.listens_for(Session, "before_commit", once=True)
    def receive_before_commit(session):
        paper_rating = session.query(PaperRating).filter_by(paper_id=target.paper_id).first()
        user_paper_ratings = session.query(UserPaperRating).filter_by(paper_id=target.paper_id).all()
        update_value(paper_rating, user_paper_ratings)
test_equals.py 文件源码 项目:sqlalchemy_zdb 作者: skftn 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_db_essentials(dbsession):
    assert isinstance(dbsession, Session)
    q = ZdbQuery(Products, session=dbsession)
    assert isinstance(q, ZdbQuery)
testing.py 文件源码 项目:botfriend 作者: leonardr 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def setup(self):
        # Create a new connection to the database.
        self._db = Session(self.connection)
        self.transaction = self.connection.begin_nested()

        # Start with a high number so it won't interfere with tests that
        # search for a small number.
        self.counter = 2000

        self.time_counter = datetime(2014, 1, 1)
utils.py 文件源码 项目:hydrus 作者: HTTP-APIs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def set_session(application, DB_SESSION):
    """Set the database session for the app. Must be of type <hydrus.hydraspec.doc_writer.HydraDoc>."""
    if not isinstance(DB_SESSION, Session):
        raise TypeError("The API Doc is not of type <sqlalchemy.orm.session.Session>")

    def handler(sender, **kwargs):
        g.dbsession = DB_SESSION
    with appcontext_pushed.connected_to(handler, application):
        yield
utils.py 文件源码 项目:hydrus 作者: HTTP-APIs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_session():
    """Get the Database Session for the server."""
    session = getattr(g, 'dbsession', None)
    if session is None:
        session = sessionmaker(bind=engine)()
        g.dbsession = session
    return session


问题


面经


文章

微信
公众号

扫码关注公众号