def setup(self):
self.mock_sess = Mock(spec_set=Session)
self.cls = BiweeklyPayPeriod(date(2017, 3, 17), self.mock_sess)
python类Session()的实例源码
def setup(self):
self.mock_sess = Mock(spec_set=Session)
self.cls = BiweeklyPayPeriod(date(2017, 3, 17), self.mock_sess)
def setup(self):
self.mock_sess = Mock(spec_set=Session)
self.cls = BiweeklyPayPeriod(date(2017, 3, 7), self.mock_sess)
def setup(self):
self.mock_sess = Mock(spec_set=Session)
self.cls = BiweeklyPayPeriod(date(2017, 3, 17), self.mock_sess)
def setup(self):
self.mock_sess = Mock(spec_set=Session)
self.cls = BiweeklyPayPeriod(date(2017, 3, 17), self.mock_sess)
def check_empty_site_init(self, dbsession:Session, user:UserMixin):
"""Call after user creation to see if this user is the first user and should get initial admin rights."""
assert user.id, "Please flush your db"
# Try to reflect related group class based on User model
i = inspection.inspect(user.__class__)
Group = i.relationships["groups"].mapper.entity
# If we already have groups admin group must be there
if dbsession.query(Group).count() > 0:
return
self.init_empty_site(dbsession, user)
def create_test_dbsession(request, registry: Registry, transaction_manager=transaction.manager) -> Session:
"""Create a test database session and setup database.
Create and drop all tables when called. Add teardown function py.test to drop all tables during teardown.
Also add implicit UUID extension on the database, so we don't need to add by hand every time.
:param request: py.test test request
:param settings: test.ini app settings
:param transaction_manager:
:return: New database session
"""
from websauna.system.model.meta import Base
dbsession = create_dbsession(registry, manager=transaction_manager)
engine = dbsession.get_bind()
connection = engine.connect()
# Support native PSQL UUID types
if engine.dialect.name == "postgresql":
connection.execute('create extension if not exists "uuid-ossp";')
with transaction.manager:
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
def teardown():
# There might be open transactions in the database. They will block DROP ALL and thus the tests would end up in a deadlock. Thus, we clean up all connections we know about.
# XXX: Fix this shit
with transaction.manager:
Base.metadata.drop_all(engine)
dbsession.close()
request.addfinalizer(teardown)
return dbsession
def test_mod_search(filled_database):
"""Does the search return expected results?"""
EXPECT_IDS = {42, 45}
session = SQLSession(bind=filled_database.engine)
selected = addon.Mod.search(session, 'Tested')
assert {int(m.id) for m in selected} == EXPECT_IDS
def test_mod_find(filled_database):
"""Does the search find the correct mod or report correct error?"""
session = SQLSession(bind=filled_database.engine)
assert addon.Mod.find(session, 'Tested').id == 42
with pytest.raises(addon.NoResultFound):
addon.Mod.find(session, 'nonsense')
def test_mod_with_id(filled_database):
"""Does the with_id find the correct mod?"""
session = SQLSession(bind=filled_database.engine)
assert addon.Mod.with_id(session, 42).name == 'tested'
assert addon.Mod.with_id(session, 45).name == 'tester'
with pytest.raises(addon.NoResultFound):
addon.Mod.with_id(session, 44)
# Release tests
def session(self) -> SQLSession:
"""Create new session for batch database communication."""
return SQLSession(bind=self.engine)
def receive_after_insert(mapper, connection, target):
""" listen for the 'after_insert' event """
@event.listens_for(Session, "before_commit", once=True)
def receive_before_commit(session):
paper_rating = PaperRating(paper_id=target.id)
session.add(paper_rating)
def receive_after_insert(mapper, connection, target):
""" listen for the 'after_insert' event """
@event.listens_for(Session, "before_commit", once=True)
def receive_before_commit(session):
paper_rating = session.query(PaperRating).filter_by(paper_id=target.paper_id).first()
user_paper_ratings = session.query(UserPaperRating).filter_by(paper_id=target.paper_id).all()
update_value(paper_rating, user_paper_ratings)
def receive_after_update(mapper, connection, target):
""" listen for the 'after_update' event """
@event.listens_for(Session, "before_commit", once=True)
def receive_before_commit(session):
paper_rating = session.query(PaperRating).filter_by(paper_id=target.paper_id).first()
user_paper_ratings = session.query(UserPaperRating).filter_by(paper_id=target.paper_id).all()
update_value(paper_rating, user_paper_ratings)
def receive_after_delete(mapper, connection, target):
""" listen for the 'after_delete' event """
@event.listens_for(Session, "before_commit", once=True)
def receive_before_commit(session):
paper_rating = session.query(PaperRating).filter_by(paper_id=target.paper_id).first()
user_paper_ratings = session.query(UserPaperRating).filter_by(paper_id=target.paper_id).all()
update_value(paper_rating, user_paper_ratings)
def test_db_essentials(dbsession):
assert isinstance(dbsession, Session)
q = ZdbQuery(Products, session=dbsession)
assert isinstance(q, ZdbQuery)
def setup(self):
# Create a new connection to the database.
self._db = Session(self.connection)
self.transaction = self.connection.begin_nested()
# Start with a high number so it won't interfere with tests that
# search for a small number.
self.counter = 2000
self.time_counter = datetime(2014, 1, 1)
def set_session(application, DB_SESSION):
"""Set the database session for the app. Must be of type <hydrus.hydraspec.doc_writer.HydraDoc>."""
if not isinstance(DB_SESSION, Session):
raise TypeError("The API Doc is not of type <sqlalchemy.orm.session.Session>")
def handler(sender, **kwargs):
g.dbsession = DB_SESSION
with appcontext_pushed.connected_to(handler, application):
yield
def get_session():
"""Get the Database Session for the server."""
session = getattr(g, 'dbsession', None)
if session is None:
session = sessionmaker(bind=engine)()
g.dbsession = session
return session