def test_implicit_transaction_success(engine, mytable):
async with engine.begin() as conn:
assert isinstance(conn, AsyncioConnection)
await conn.execute(CreateTable(mytable))
await conn.execute(mytable.insert())
result = await conn.execute(mytable.select())
rows = await result.fetchall()
assert len(rows) == 1
# Transaction should have been committed automatically
result = await engine.execute(mytable.select())
rows = await result.fetchall()
assert len(rows) == 1
评论列表
文章目录