def test_transaction_context_manager_failure(engine, mytable):
async with engine.connect() as conn:
await conn.execute(CreateTable(mytable))
with pytest.raises(RuntimeError):
async with conn.begin() as trans:
await conn.execute(mytable.insert())
result = await conn.execute(mytable.select())
rows = await result.fetchall()
assert len(rows) == 1
raise RuntimeError
result = await conn.execute(mytable.select())
rows = await result.fetchall()
assert len(rows) == 0
评论列表
文章目录