def test_transaction_context_manager_success(engine, mytable):
async with engine.connect() as conn:
await conn.execute(CreateTable(mytable))
async with conn.begin() as trans:
await conn.execute(mytable.insert())
result = await conn.execute(mytable.select())
rows = await result.fetchall()
assert len(rows) == 1
result = await conn.execute(mytable.select())
rows = await result.fetchall()
assert len(rows) == 1
评论列表
文章目录