python类abort()的实例源码

middleware.py 文件源码 项目:borgcube 作者: enkore 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def transaction_middleware(get_response):
    """
    Calls `transaction.abort()` before and after obtaining a response.
    """
    def middleware(request):
        transaction.abort()
        response = get_response(request)
        transaction.abort()
        return response
    return middleware
builtin_metrics.py 文件源码 项目:borgcube 作者: enkore 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def formatted_value(self):
        today_begin = timezone.now().replace(hour=0, minute=0, microsecond=0)
        jobs = 0
        for job in NumberTree.reversed(data_root().jobs_by_state[Job.State.done]):
            if job.created < today_begin:
                break
            jobs += 1
        transaction.abort()
        return str(jobs)
api.py 文件源码 项目:peter_sslers 作者: aptise 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def queue_renewal_process(self):
        try:
            queue_results = lib_db.queue_renewals__process(self.request.api_context)
            if self.request.matched_route.name == 'admin:api:queue_renewals:process.json':
                return {'result': 'success',
                        }
            return HTTPFound("%s/queue-renewals?process=1&results=%s" % (self.request.registry.settings['admin_prefix'], queue_results))
        except Exception as e:
            transaction.abort()
            if self.request.matched_route.name == 'admin:api:queue_renewals:process.json':
                return {'result': 'error',
                        'error': e.message,
                        }
            raise
nvd.py 文件源码 项目:vulnix 作者: flyingcircusio 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __exit__(self, exc_type, exc_value, exc_tb):
        if exc_type is None:
            transaction.commit()
            meta = self._root['meta']
            if self.has_updates:
                meta.unpacked += 1
                if meta.unpacked > 25:
                    logger.debug('Packing database')
                    self._db.pack()
                    meta.unpacked = 0
                transaction.commit()
        else:
            transaction.abort()
        self._connection.close()
forbidden.py 文件源码 项目:websauna 作者: websauna 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def forbidden(request):


    # The template rendering opens a new transaction which is not rolled back by Pyramid transaction machinery, because we are in a very special view. This tranaction will cause the tests to hang as the open transaction blocks Base.drop_all() in PostgreSQL. Here we have careful instructions to roll back any pending transaction by hand.
    html = render('core/forbidden.html', {}, request=request)
    resp = Response(html)
    resp.status_code = 403

    # Hint pyramid_redis_session not to generate any session cookies for this response
    resp.cache_control.public = True

    transaction.abort()
    return resp
notfound.py 文件源码 项目:websauna 作者: websauna 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def notfound(request):
    """Not found view which will log the 404s in the site error log."""

    # Try to extract some more information from request
    user = getattr(request, "user", None)
    if user:
        username = getattr(user, "friendly_name", "<unknown>")
    else:
        username = "<anomymous>"

    # TODO: Maybe make this configurable, default to WARN, configurable as INFO for high volume sites
    logger.info("404 Not Found. user:%s URL:%s referrer:%s", request.url, username, request.referrer)

    # Make sure 404 page does not have any status information, as it is often overlooked special case for caching and we don't want to cache user information
    try:
        request.user = None
    except:
        # pyramid_tm 2.0 - this fails
        pass

    # The template rendering opens a new transaction which is not rolled back by Pyramid transaction machinery, because we are in a very special view. This tranaction will cause the tests to hang as the open transaction blocks Base.drop_all() in PostgreSQL. Here we have careful instructions to roll back any pending transaction by hand.
    html = render('core/notfound.html', {}, request=request)
    resp = Response(html)
    resp.status_code = 404

    # Hint pyramid_redis_session not to generate any session cookies for this response
    resp.cache_control.public = True

    transaction.abort()
    return resp
tests.py 文件源码 项目:assignment_aa 作者: RaHus 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def tearDown(self):
        from .models.meta import Base

        testing.tearDown()
        transaction.abort()
        Base.metadata.drop_all(self.engine)
customer.py 文件源码 项目:assignment_aa 作者: RaHus 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def save_customer(self, data):
        try:
            customer = Customer(**data)
            self.db.add(customer)
            self.db.flush()
            self.db.refresh(customer)
            return customer
        except exc.SQLAlchemyError as e:  # pragma: no cover
            transaction.abort()
            raise DatabaseFailure(str(e))
tests.py 文件源码 项目:EDDB_JsonAPI 作者: FuelRats 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def tearDown(self):
        from .models.meta import Base

        testing.tearDown()
        transaction.abort()
        Base.metadata.drop_all(self.engine)
test_celery.py 文件源码 项目:z3c.celery 作者: ZeitOnline 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def abort(self, trans):
        pass
test_celery.py 文件源码 项目:z3c.celery 作者: ZeitOnline 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def except_with_handler():
    """Raise an exception which is handled after abort."""
    site = zope.component.hooks.getSite()
    site['foo'] = 'bar'

    def handler(arg1, arg2, kw1=1, kw2=2):
        interaction = zope.security.management.getInteraction()
        site['data'] = (arg1, arg2, kw1, kw2,
                        interaction.participations[0].principal.title)

    raise HandleAfterAbort(handler, 'a1', 'a2', kw2=4)
test_celery.py 文件源码 项目:z3c.celery 作者: ZeitOnline 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_celery__TransactionAwareTask__run_in_worker__1(
        celery_session_worker, storage_file, interaction):
    """It handles specific exceptions in a new transaction after abort."""
    job = except_with_handler.delay()
    transaction.commit()
    with pytest.raises(Exception):
        job.get()

    with open_zodb_copy(storage_file) as app:
        assert [('data', ('a1', 'a2', 1, 4, u'User'))] == list(app.items())
test_celery.py 文件源码 项目:z3c.celery 作者: ZeitOnline 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_celery__TransactionAwareTask__run_in_worker__2(
        celery_session_worker, storage_file, interaction):
    """It handles a specific exceptions to abort the transaction but still
    count as a successful job."""
    job = success_but_abort_transaction.delay()
    transaction.commit()
    assert 'done' == job.get()

    with open_zodb_copy(storage_file) as app:
        assert 'flub' not in app
celery.py 文件源码 项目:z3c.celery 作者: ZeitOnline 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def run_in_same_process(self, args, kw):
        try:
            return super(TransactionAwareTask, self).__call__(*args, **kw)
        except Abort as handle:
            transaction.abort()
            handle()
            return handle.message
        except HandleAfterAbort as handle:
            handle()
            raise
test_views.py 文件源码 项目:papersummarize 作者: mrdrozdov 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def tearDown(self):
        testing.tearDown()
        transaction.abort()
test_user_model.py 文件源码 项目:papersummarize 作者: mrdrozdov 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def tearDown(self):
        testing.tearDown()
        transaction.abort()
tests.py 文件源码 项目:dearhrc.us 作者: JessaWitzel 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def tearDown(self):
        from .models.meta import Base

        testing.tearDown()
        transaction.abort()
        Base.metadata.drop_all(self.engine)
conftest.py 文件源码 项目:survivor-pool 作者: bitedgeco 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def new_session(sqlengine, request):
    """Return a session."""
    session_factory = get_session_factory(sqlengine)
    session = get_tm_session(session_factory, transaction.manager)

    def teardown():
        transaction.abort()

    request.addfinalizer(teardown)
    return session
testCopySupportHooks.py 文件源码 项目:ZServer 作者: zopefoundation 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def tearDown(self):
        noSecurityManager()
        transaction.abort()
        self.app._p_jar.close()


问题


面经


文章

微信
公众号

扫码关注公众号