def test_implicit_transaction_failure(engine, mytable):
await engine.execute(CreateTable(mytable))
with pytest.raises(RuntimeError):
async with engine.begin() as conn:
assert isinstance(conn, AsyncioConnection)
await conn.execute(mytable.insert())
result = await conn.execute(mytable.select())
rows = await result.fetchall()
assert len(rows) == 1
raise RuntimeError
# Transaction should have been rolled back automatically
result = await engine.execute(mytable.select())
rows = await result.fetchall()
assert len(rows) == 0
评论列表
文章目录