python类NoResultFound()的实例源码

bgp_db.py 文件源码 项目:neutron-dynamic-routing 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _save_bgp_speaker_peer_binding(self, context, bgp_speaker_id,
                                       bgp_peer_id):
        with db_api.context_manager.writer.using(context):
            try:
                bgp_speaker = self._get_by_id(context, BgpSpeaker,
                                              bgp_speaker_id)
            except sa_exc.NoResultFound:
                raise bgp_ext.BgpSpeakerNotFound(id=bgp_speaker_id)

            try:
                bgp_peer = self._get_by_id(context, BgpPeer,
                                           bgp_peer_id)
            except sa_exc.NoResultFound:
                raise bgp_ext.BgpPeerNotFound(id=bgp_peer_id)

            peers = self._get_bgp_peers_by_bgp_speaker_binding(context,
                                                               bgp_speaker_id)
            self._validate_peer_ips(bgp_speaker_id, peers, bgp_peer)
            binding = BgpSpeakerPeerBinding(bgp_speaker_id=bgp_speaker.id,
                                            bgp_peer_id=bgp_peer.id)
            context.session.add(binding)
bgp_db.py 文件源码 项目:neutron-dynamic-routing 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _save_bgp_speaker_network_binding(self,
                                          context,
                                          bgp_speaker_id,
                                          network_id):
        with db_api.context_manager.writer.using(context):
            try:
                bgp_speaker = self._get_by_id(context, BgpSpeaker,
                                              bgp_speaker_id)
            except sa_exc.NoResultFound:
                raise bgp_ext.BgpSpeakerNotFound(id=bgp_speaker_id)

            try:
                network = self._get_by_id(context, models_v2.Network,
                                          network_id)
            except sa_exc.NoResultFound:
                raise n_exc.NetworkNotFound(net_id=network_id)

            binding = BgpSpeakerNetworkBinding(
                                            bgp_speaker_id=bgp_speaker.id,
                                            network_id=network.id,
                                            ip_version=bgp_speaker.ip_version)
            context.session.add(binding)
models.py 文件源码 项目:annotran 作者: BirkbeckCTP 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_report(cls, translation, author, reporter):
        """
        Get a report by page, language, group, author, and reporter
        :param translation: the translation (page, group, language) to which the report corresponds
        :param author: the author to query
        :param reporter: the reporting user to query
        :return:
        """
        if translation and author and reporter:
            try:
                return cls.query.filter(
                    cls.page_id == translation.page_id,
                    cls.language_id == translation.language_id,
                    cls.group_id == translation.group_id,
                    cls.author_id == author.id,
                    cls.reporter_id == reporter.id).one()
            except exc.NoResultFound:
                return None
        return None
models.py 文件源码 项目:annotran 作者: BirkbeckCTP 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_public_translations(cls, page):
        """
        Get a list of public translations for a page
        :param page: the page to query
        :return: a list of languages or an empty list
        """
        try:
            return lang_models.Language.query.\
                join(cls, and_(cls.page_id == page.id,
                               cls.group_id == -1,
                               lang_models.Language.id == cls.language_id)).\
                with_entities(lang_models.Language.id,
                              lang_models.Language.name,
                              lang_models.Language.pubid,
                              cls.group_id).all()
        except exc.NoResultFound:
            return None
models.py 文件源码 项目:annotran 作者: BirkbeckCTP 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_page_translations(cls, page):
        """
        Get a list of all translations for a page
        :param page: the page to query
        :return: a list of languages or an empty list
        """
        try:
            return lang_models.Language.query.\
                join(cls, and_(cls.page_id == page.id,
                               lang_models.Language.id == cls.language_id)).\
                with_entities(lang_models.Language.id,
                              lang_models.Language.name,
                              lang_models.Language.pubid,
                              cls.group_id).all()
        except exc.NoResultFound:
            return None
models.py 文件源码 项目:annotran 作者: BirkbeckCTP 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_vote(cls, page, language, group, author, voter):
        """
        Retrieve a vote by page, language, group, author and voter
        :param page: the page ID
        :param language: the language ID
        :param group: the group ID
        :param author: the author ID
        :param voter: the voter ID
        :return: a vote or None
        """
        if page is None or language is None or group is None\
                or author is None or voter is None:
            return None
        try:
            return cls.query.filter(
                cls.page_id == page.id,
                cls.language_id == language.id,
                cls.group_id == group.id,
                cls.author_id == author.id,
                cls.voter_id == voter.id).one()
        except exc.NoResultFound:
            return None

    # returns avg of author scores per page, language, and group
sqlalchemy.py 文件源码 项目:opwen-webapp 作者: ascoderu 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_or_create(db, model, create_method: str='',
                  create_method_kwargs=None, **kwargs):
    try:
        return db.query(model).filter_by(**kwargs).one()
    except NoResultFound:
        pass

    kwargs.update(create_method_kwargs or {})
    created = getattr(model, create_method, model)(**kwargs)
    try:
        db.add(created)
        db.flush()
        return created
    except IntegrityError:
        pass

    db.rollback()
    return db.query(model).filter_by(**kwargs).one()
utils.py 文件源码 项目:fabric8-analytics-server 作者: fabric8-analytics 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def retrieve_worker_results(rdb, external_request_id):
    start = datetime.datetime.now()
    try:
        query = rdb.session.query(WorkerResult) \
                           .filter(WorkerResult.external_request_id == external_request_id)
        results = query.all()
    except (NoResultFound, MultipleResultsFound):
        return None
    except SQLAlchemyError:
        rdb.session.rollback()
        raise

    elapsed_seconds = (datetime.datetime.now() - start).total_seconds()
    msg = "It took {t} seconds to retrieve " \
          "all worker results for {r}.".format(t=elapsed_seconds, r=external_request_id)
    current_app.logger.debug(msg)

    return results
utils.py 文件源码 项目:fabric8-analytics-server 作者: fabric8-analytics 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def retrieve_worker_result(rdb, external_request_id, worker):
    start = datetime.datetime.now()
    try:
        query = rdb.session.query(WorkerResult) \
                           .filter(WorkerResult.external_request_id == external_request_id,
                                   WorkerResult.worker == worker)
        result = query.one()
    except (NoResultFound, MultipleResultsFound):
        return None
    except SQLAlchemyError:
        rdb.session.rollback()
        raise
    result_dict = result.to_dict()
    elapsed_seconds = (datetime.datetime.now() - start).total_seconds()
    msg = "It took {t} seconds to retrieve {w} " \
          "worker results for {r}.".format(t=elapsed_seconds, w=worker, r=external_request_id)
    current_app.logger.debug(msg)

    return result_dict
utils.py 文件源码 项目:backfeed-protocol 作者: Backfeed 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_contract(name):
    """return the contract identified by name

    returns a Contract instance

    TODO: for now, this function returns a DMagContract()
    """
    if name == 'example':
        contract_class = ExampleContract
    elif name == 'dmag':
        contract_class = DMagContract
    else:
        raise ValueError('Unknown contract type: {name}'.format(name=name))

    try:
        contract = DBSession.query(contract_class).filter(contract_class.name == name).one()
    except NoResultFound:
        contract = contract_class(name=name)
        DBSession.add(contract)
    return contract
views.py 文件源码 项目:flask-learning 作者: MrUPGrade 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def login2():
    form = LoginForm()
    if form.validate_on_submit():
        try:
            user = models.User.query.filter(
                models.User.username == form.username.data
            ).one()
        except NoResultFound:
            form.errors[''] = ['wrong user or password']
        else:
            if sha256_crypt.verify(form.password.data, user.password):
                login_user(user, form.remember_me.data)
                return redirect('/')
            else:
                form.errors[''] = ['wrong user or password']

    return render_template('login/login.jinja2', form=form)
new_form.py 文件源码 项目:web 作者: pyjobs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _validate_python(self, value, state=None):
        if len(value) > self.max_length:
            raise ValidationError('toolong', self)

        name_slug = slugify(value)

        # Check for duplicates in the database
        try:
            CompanyAlchemy.get_company(name_slug)
        except NoResultFound:
            # There are no duplicates, the validation is therefore successful
            pass
        else:
            # This company slug name is already present in the database, notify
            # the user that the company he's trying to register already exists.
            raise ValidationError('already_exists', self)
root.py 文件源码 项目:web 作者: pyjobs 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def origine_des_annonces_diffusees(self, *args, **kwargs):
        sources_last_crawl = {}
        sorted_sources = collections.OrderedDict(
            sorted(SOURCES.items(), key=lambda x: x[1].label))
        for source_name in sorted_sources:
            try:
                sources_last_crawl[source_name] = DBSession.query(Log.datetime) \
                    .filter(Log.source == source_name) \
                    .order_by(Log.datetime.desc()) \
                    .limit(1) \
                    .one()[0]
            except NoResultFound:
                sources_last_crawl[source_name] = None

        return dict(
            sources=sorted_sources,
            existing_fields=existing_fields,
            sources_last_crawl=sources_last_crawl
        )
logic.py 文件源码 项目:ckanext-validation 作者: frictionlessdata 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def resource_validation_show(context, data_dict):
    u'''
    Display the validation job result for a particular resource.
    Returns a validation object, including the validation report or errors
    and metadata about the validation like the timestamp and current status.

    Validation status can be one of:

    * `created`: The validation job is in the processing queue
    * `running`: Validation is under way
    * `error`: There was an error while performing the validation, eg the file
        could not be downloaded or there was an error reading it
    * `success`: Validation was performed, and no issues were found
    * `failure`: Validation was performed, and there were issues found

    :param resource_id: id of the resource to validate
    :type resource_id: string

    :rtype: dict

    '''

    t.check_access(u'resource_validation_show', context, data_dict)

    if not data_dict.get(u'resource_id'):
        raise t.ValidationError({u'resource_id': u'Missing value'})

    Session = context['model'].Session

    try:
        validation = Session.query(Validation).filter(
            Validation.resource_id == data_dict['resource_id']).one()
    except NoResultFound:
        validation = None

    if not validation:
        raise t.ObjectNotFound(
            'No validation report exists for this resource')

    return _validation_dictize(validation)
logic.py 文件源码 项目:ckanext-validation 作者: frictionlessdata 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def resource_validation_delete(context, data_dict):
    u'''
    Remove the validation job result for a particular resource.
    It also deletes the underlying Validation object.

    :param resource_id: id of the resource to remove validation from
    :type resource_id: string

    :rtype: None

    '''

    t.check_access(u'resource_validation_delete', context, data_dict)

    if not data_dict.get(u'resource_id'):
        raise t.ValidationError({u'resource_id': u'Missing value'})

    Session = context['model'].Session

    try:
        validation = Session.query(Validation).filter(
            Validation.resource_id == data_dict['resource_id']).one()
    except NoResultFound:
        validation = None

    if not validation:
        raise t.ObjectNotFound(
            'No validation report exists for this resource')

    Session.delete(validation)
    Session.commit()
api.py 文件源码 项目:craton 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_user_info(context, username):
    """Get user info."""
    query = model_query(context, models.User, project_only=True)
    query = query.filter_by(username=username)
    try:
        return query.one()
    except sa_exc.NoResultFound:
        raise exceptions.NotFound()
    except Exception as err:
        raise exceptions.UnknownException(message=err)
api.py 文件源码 项目:craton 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def regions_get_by_name(context, name):
    """Get cell detail for the region with given name."""
    query = model_query(context, models.Region, project_only=True)
    query = query.filter_by(name=name)
    try:
        return query.one()
    except sa_exc.NoResultFound:
        raise exceptions.NotFound()
api.py 文件源码 项目:craton 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def regions_get_by_id(context, region_id):
    """Get cell detail for the region with given id."""
    query = model_query(context, models.Region, project_only=True)
    query = query.filter_by(id=region_id)
    try:
        return query.one()
    except sa_exc.NoResultFound:
        raise exceptions.NotFound()
api.py 文件源码 项目:craton 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def clouds_get_by_name(context, name):
    """Get cell detail for the cloud with given name."""
    query = model_query(context, models.Cloud, project_only=True)
    query = query.filter_by(name=name)
    try:
        return query.one()
    except sa_exc.NoResultFound:
        raise exceptions.NotFound()
api.py 文件源码 项目:craton 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def clouds_get_by_id(context, cloud_id):
    """Get cell detail for the cloud with given id."""
    query = model_query(context, models.Cloud, project_only=True)
    query = query.filter_by(id=cloud_id)
    try:
        return query.one()
    except sa_exc.NoResultFound:
        raise exceptions.NotFound()


问题


面经


文章

微信
公众号

扫码关注公众号