python类NoResultFound()的实例源码

pgp.py 文件源码 项目:meg-server 作者: Argonne-National-Laboratory 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def store_revocation_cert(db, armored_key, RevocationKey):
    # First be able to ensure we can add an extra newline on the fly.
    ascii_data = get_pgp_key_data(armored_key)
    packets = list(ascii_data.packets())

    if len(packets) == 0:
        raise BadRevocationKeyException("No packets found")

    signature = get_revocation_signature_packet(ascii_data)

    created = signature.creation_time
    expires = signature.expiration_time
    length = signature.length
    key_id = signature.key_id.decode()[-8:]  # We just need the last 8 chars

    try:
        # XXX Can two keys have the same id? I mean can there be collision with
        # the last 8 digits?
        RevocationKey.query.filter(RevocationKey.pgp_keyid_for == key_id).one()
    except NoResultFound:
        revocation_key = RevocationKey(created, expires, length, armored_key, key_id)
        db.session.add(revocation_key)
        db.session.commit()
migration.py 文件源码 项目:craton 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def create_bootstrap_project(name, project_id=None, db_uri=None):
    """Creates a new project.
    :param name: Name of the new project
    """
    if not project_id:
        project_id = str(uuid.uuid4())
    engine = create_engine(db_uri)
    Session = sessionmaker(bind=engine)
    session = Session()
    project = models.Project(name=name,
                             id=project_id)

    try:
        project = session.query(models.Project).filter_by(name=name).one()
    except sa_exc.NoResultFound:
        session.add(project)
        session.commit()

    return project
api.py 文件源码 项目:craton 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def resource_get_by_id(
        context, resources, resource_id, session=None, for_update=False
        ):
    """Get resource for the unique resource id."""
    model = _get_resource_model(resources)

    query = model_query(context, model, project_only=True, session=session)

    if resources in ("hosts", "network-devices"):
        query = query.filter_by(type=resources.replace("-", "_"))

    query = query.filter_by(id=resource_id)

    if for_update:
        query = query.with_for_update()

    try:
        resource = query.one()
    except sa_exc.NoResultFound:
        raise exceptions.NotFound()
    else:
        return resource
api.py 文件源码 项目:craton 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _device_labels_update(context, device_type, device_id, labels):
    """Update labels for the given device. Add the label if it is not present
    in host labels list, otherwise do nothing."""
    session = get_session()
    with session.begin():
        devices = with_polymorphic(models.Device, '*')
        query = model_query(context, devices, session=session,
                            project_only=True)
        query = query.filter_by(type=device_type)
        query = query.filter_by(id=device_id)
        try:
            device = query.one()
        except sa_exc.NoResultFound:
            raise exceptions.NotFound()

        device.labels.update(labels["labels"])
        device.save(session)
        return device
api.py 文件源码 项目:craton 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _device_labels_delete(context, device_type, device_id, labels):
    """Delete labels from the device labels list if it matches
    the given label in the query, otherwise do nothing."""
    session = get_session()
    with session.begin():
        devices = with_polymorphic(models.Device, '*')
        query = model_query(context, devices, session=session,
                            project_only=True)
        query = query.filter_by(type=device_type)
        query = query.filter_by(id=device_id)
        try:
            device = query.one()
        except sa_exc.NoResultFound:
            raise exceptions.NotFound()

        for label in labels["labels"]:
            device.labels.discard(label)
        device.save(session)
        return device
api.py 文件源码 项目:iotronic 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def destroy_board(self, board_id):

        session = get_session()
        with session.begin():
            query = model_query(models.Board, session=session)
            query = add_identity_filter(query, board_id)
            try:
                board_ref = query.one()
            except NoResultFound:
                raise exception.BoardNotFound(board=board_id)

            # Get board ID, if an UUID was supplied. The ID is
            # required for deleting all ports, attached to the board.
            if uuidutils.is_uuid_like(board_id):
                board_id = board_ref['id']

            location_query = model_query(models.Location, session=session)
            location_query = self._add_location_filter_by_board(
                location_query, board_id)
            location_query.delete()

            query.delete()
api.py 文件源码 项目:iotronic 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def register_conductor(self, values, update_existing=False):
        session = get_session()
        with session.begin():
            query = (model_query(models.Conductor, session=session)
                     .filter_by(hostname=values['hostname']))
            try:
                ref = query.one()
                if ref.online is True and not update_existing:
                    raise exception.ConductorAlreadyRegistered(
                        conductor=values['hostname'])
            except NoResultFound:
                ref = models.Conductor()
            ref.update(values)
            # always set online and updated_at fields when registering
            # a conductor, especially when updating an existing one
            ref.update({'updated_at': timeutils.utcnow(),
                        'online': True})
            ref.save(session)
        return ref
api.py 文件源码 项目:iotronic 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def register_wampagent(self, values, update_existing=False):
        session = get_session()
        with session.begin():
            query = (model_query(models.WampAgent, session=session)
                     .filter_by(hostname=values['hostname']))
            try:
                ref = query.one()
                if ref.online is True and not update_existing:
                    raise exception.WampAgentAlreadyRegistered(
                        wampagent=values['hostname'])
            except NoResultFound:
                ref = models.WampAgent()
            ref.update(values)
            # always set online and updated_at fields when registering
            # a wampagent, especially when updating an existing one
            ref.update({'updated_at': timeutils.utcnow(),
                        'online': True})
            ref.save(session)
        return ref
api.py 文件源码 项目:iotronic 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def destroy_plugin(self, plugin_id):

        session = get_session()
        with session.begin():
            query = model_query(models.Plugin, session=session)
            query = add_identity_filter(query, plugin_id)
            try:
                plugin_ref = query.one()
            except NoResultFound:
                raise exception.PluginNotFound(plugin=plugin_id)

            # Get plugin ID, if an UUID was supplied. The ID is
            # required for deleting all ports, attached to the plugin.
            if uuidutils.is_uuid_like(plugin_id):
                plugin_id = plugin_ref['id']

            query.delete()
tasks.py 文件源码 项目:invenio-github 作者: inveniosoftware 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def sync_hooks(user_id, repositories):
    """Sync repository hooks for a user."""
    from .api import GitHubAPI

    try:
        # Sync hooks
        gh = GitHubAPI(user_id=user_id)
        for repo_id in repositories:
            try:
                with db.session.begin_nested():
                    gh.sync_repo_hook(repo_id)
                # We commit per repository, because while the task is running
                # the user might enable/disable a hook.
                db.session.commit()
            except (NoResultFound, RepositoryAccessError) as e:
                current_app.logger.warning(e.message, exc_info=True)
    except Exception as exc:
        sync_hooks.retry(exc=exc)
models.py 文件源码 项目:invenio-github 作者: inveniosoftware 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get(cls, user_id, github_id=None, name=None, check_owner=True):
        """Return a repository.

        :param integer user_id: User identifier.
        :param integer github_id: GitHub repository identifier.
        :param str name: GitHub repository full name.
        :returns: The repository object.
        :raises: :py:exc:`~sqlalchemy.orm.exc.NoResultFound`: if the repository
                 doesn't exist.
        :raises: :py:exc:`~sqlalchemy.orm.exc.MultipleResultsFound`: if
                 multiple repositories with the specified GitHub id and/or name
                 exist.
        :raises: :py:exc:`RepositoryAccessError`: if the user is not the owner
                 of the repository.
        """
        repo = cls.query.filter((Repository.github_id == github_id) |
                                (Repository.name == name)).one()
        if (check_owner and repo and repo.user_id and
                repo.user_id != int(user_id)):
            raise RepositoryAccessError(
                u'User {user} cannot access repository {repo}({repo_id}).'
                .format(user=user_id, repo=name, repo_id=github_id)
            )
        return repo
io.py 文件源码 项目:ego.powerflow 作者: openego 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def configure_timeindex(self):
        """
        """

        try:

            ormclass = self._mapped['TempResolution']
            tr = self.session.query(ormclass).filter(
                ormclass.temp_id == self.temp_id).one()

        except (KeyError, NoResultFound):
            print('temp_id %s does not exist.' % self.temp_id)

        timeindex = pd.DatetimeIndex(start=tr.start_time,
                                     periods=tr.timesteps,
                                     freq=tr.resolution)

        self.timeindex = timeindex[self.start_snapshot - 1: self.end_snapshot]
api.py 文件源码 项目:scoring_engine 作者: pwnbus 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def admin_update_password():
    if current_user.is_white_team:
        if 'user_id' in request.form and 'password' in request.form:
            try:
                user_obj = session.query(User).filter(User.id == request.form['user_id']).one()
            except NoResultFound:
                return redirect(url_for('auth.login'))
            user_obj.update_password(html.escape(request.form['password']))
            user_obj.authenticated = False
            session.add(user_obj)
            session.commit()
            flash('Password Successfully Updated.', 'success')
            return redirect(url_for('admin.manage'))
        else:
            flash('Error: user_id or password not specified.', 'danger')
            return redirect(url_for('admin.manage'))
    else:
        return {'status': 'Unauthorized'}, 403
manage.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def run(self, username, password, rolename):
        try:
            role = Role.filter_by(rolename=rolename).one()
        except NoResultFound:
            raise InvalidCommand('Role with name `%s` not found' % rolename)

        if User.filter_by(username=username).first():
            raise InvalidCommand('User `%s` already exists' % username)

        if not password:
            password = generate_new_pass()
            print "New password: {}".format(password)

        u = User.create(username=username, password=password, role=role,
                        active=True, package_id=0)
        db.session.add(u)
        db.session.commit()
pipeline.py 文件源码 项目:ozelot 作者: trycs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def clear(self):
        """Clear output from one (derived) class loader
        """
        # mark this task as incomplete
        self.mark_incomplete()

        # Delete the indicator metadata, this also deletes values by cascading.
        #
        # NOTE: calling 'delete()' on the query (instead of on the queried object,
        # as done here), would NOT work! For a query, there is no in-Python cascading
        # of delete statements in sqlalchemy, so the associated values would not
        # be deleted e.g. for SQLite databases.
        try:
            indicator = self.session.query(models.EuroStatIndicator) \
                .filter(models.EuroStatIndicator.number == self.number) \
                .one()
            self.session.delete(indicator)
        except NoResultFound:
            # Data didn't exist yet, no problem
            pass

        self.close_session()
pipeline.py 文件源码 项目:ozelot 作者: trycs 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def clear(self):
        """Clear output of one climate variable
        """
        # mark this task as incomplete
        self.mark_incomplete()

        # Delete the indicator metadata, this also deletes values by cascading.
        for suffix in list(CLIMATE_SEASON_SUFFIXES.values()):
            try:
                # noinspection PyUnresolvedReferences
                indicator = self.session.query(models.ClimateIndicator) \
                    .filter(models.ClimateIndicator.description == self.description + suffix) \
                    .one()
                self.session.delete(indicator)
            except NoResultFound:
                # Data didn't exist yet, no problem
                pass

        self.close_session()
labels_tools.py 文件源码 项目:SecuML 作者: ANSSI-FR 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def getLabel(session, instance_id, experiment_id):
    query = session.query(ExperimentsLabelsAlchemy)
    query = query.filter(ExperimentsLabelsAlchemy.experiment_id == experiment_id)
    res = query.one()
    labels_type = res.labels_type
    labels_id   = res.labels_id
    dataset_id  = res.experiment.dataset_id

    if labels_type == 'partial_labels':
        query = session.query(LabelsAlchemy)
        query = query.filter(LabelsAlchemy.instance_id == instance_id)
        query = query.filter(LabelsAlchemy.labels_id == labels_id)
        try:
            res = query.one()
            return res.label, res.family
        except NoResultFound:
            return None
    elif labels_type == 'true_labels':
        query = session.query(TrueLabelsAlchemy)
        query = query.filter(TrueLabelsAlchemy.dataset_id == dataset_id)
        query = query.filter(TrueLabelsAlchemy.instance_id == instance_id)
        res = query.one()
        return res.label, res.family
    else:
        return None
labels_tools.py 文件源码 项目:SecuML 作者: ANSSI-FR 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def getLabelDetails(experiment, instance_id):
    if experiment.labels_type == 'partial_labels':
        query = experiment.session.query(LabelsAlchemy)
        query = query.filter(LabelsAlchemy.instance_id == int(instance_id))
        query = query.filter(LabelsAlchemy.labels_id == int(experiment.labels_id))
        try:
            res = query.one()
            return res.label, res.family, res.method, res.annotation
        except NoResultFound:
            return None
    if experiment.labels_type == 'true_labels':
        query = experiment.session.query(TrueLabelsAlchemy)
        query = query.filter(TrueLabelsAlchemy.instance_id == int(instance_id))
        try:
            res = query.one()
            return res.label, res.family, 'true_labels', True
        except NoResultFound:
            return None
    return None
admin_helper.py 文件源码 项目:sciz 作者: erk3 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def reset_groups_conf(self, group_name=None):
        if group_name and isinstance(group_name, str) and group_name != '':
            if not isinstance(group_name, unicode):
                group_name = group_name.decode(sg.DEFAULT_CHARSET)
            flat_name = filter(str.isalnum, unidecode.unidecode(group_name.lower()))
            sg.logger.info('Reseting conf for group %s...' % flat_name)
            try:
                group = sg.db.session.query(GROUP).filter(GROUP.flat_name == flat_name).one()
                self.__push_group_conf(group, True)
            except NoResultFound as e:
                sg.logger.warning('No group %s, aborting reset confs...' % (flat_name))
        else:
            sg.logger.info('Reseting conf for all groups...')
            groups = sg.db.session.query(GROUP).all()
            for group in groups:
                self.__push_group_conf(group, True)

    # Routine for pushing conf to a group
admin_helper.py 文件源码 项目:sciz 作者: erk3 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __push_group_conf(self, group, force=False):
        for section in [sg.CONF_GROUP_BATTLE_FORMAT, sg.CONF_GROUP_TROLL_FORMAT, sg.CONF_GROUP_MOB_FORMAT, sg.CONF_GROUP_CDM_FORMAT, sg.CONF_GROUP_PIEGE_FORMAT, sg.CONF_GROUP_PORTAL_FORMAT]:
            if sg.config.has_section(section):
                for (each_key, each_value) in sg.config.items(section):
                    conf = CONF()
                    to_add = False
                    if not force:
                        try:
                            conf = sg.db.session.query(CONF).filter(CONF.group_id == group.id, CONF.section == section, CONF.key == each_key).one()
                        except NoResultFound as e:
                            to_add = True
                    if to_add or force:
                        conf.section = section
                        conf.key = each_key
                        conf.value = each_value
                        conf.group_id = group.id
                        conf.touch()
                        sg.db.add(conf)

    # Create or update users from JSON file, then if a group is set also do the binding and create the troll
admin_helper.py 文件源码 项目:sciz 作者: erk3 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def add_json_users(self, json_file):
        with codecs.open(json_file, 'r', sg.DEFAULT_CHARSET) as f:
            data = json.load(f)
            for u in data[self.json_users_tag]:
                user = USER()
                user.build_from_json(u)
                role = user.role if hasattr(user, 'role') and user.role else 1
                user = sg.db.add(user)
                if sg.group:
                    try:
                        assoc = sg.db.session.query(AssocUsersGroups).filter(AssocUsersGroups.user_id == user.id, AssocUsersGroups.group_id == sg.group.id).one()
                        assoc.role = role
                    except NoResultFound as e:
                        assoc = AssocUsersGroups(user_id=user.id, group_id=sg.group.id, role=role);
                        user.groups.append(assoc)
                    user.pwd = None # Required for avoiding double hashing
                    sg.db.add(user)
                    troll = TROLL()
                    troll.id = user.id
                    troll.user_id = user.id
                    troll.group_id = sg.group.id
                    sg.db.add(troll)
mh_caller.py 文件源码 项目:sciz 作者: erk3 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def monstres_ftp_call(self):
        # Fetch MH Monstres (Metamobs) from FTP
        mh_r = requests.get("http://%s/%s" % (self.ftpURL, self.ftpMonstres, ))
        lines = mh_r.text.split('\n')
        for line in lines:
            if line.find(";") > 0:
                id, nom, determinant, blason_url, empty = line.split(';')
                try:
                    metamob = sg.db.session.query(METAMOB).filter(METAMOB.id == id).one()
                except NoResultFound, MultipleResultsFound:
                    metamob = METAMOB()
                metamob.id = id
                metamob.nom = nom
                metamob.determinant = determinant
                metamob.blason_url = blason_url
                sg.db.session.add(metamob)
        sg.db.session.commit()

    # Main MH call dispatcher
hook.py 文件源码 项目:sciz 作者: erk3 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def trigger(self):
        if not self.revoked and self.url != None:
            try:
                # Find the events
                events = sg.db.session.query(EVENT).filter(EVENT.id > self.last_event_id, EVENT.notif_to_push == True, EVENT.group_id == self.group_id).order_by(asc(EVENT.time)).all()
                res = []
                max_id = 0
                for event in events:
                    max_id = max(event.id, max_id)
                    res.append({'id': event.id, 'notif': event.notif.encode(sg.DEFAULT_CHARSET)})
                # Send the data
                if len(res) > 0 :
                    try:
                        headers = {'Authorization': self.jwt}
                        r = requests.post(self.url, headers = headers, json = res, timeout = 1)
                        # Update the hook
                        self.last_event_id = max_id
                        sg.db.session.add(self)
                        sg.db.session.commit()
                    except requests.RequestException as e:
                        sg.logger.warning('Unable to send events for reverse hook %s (%s) and url %s : %s' % (self.name, self.id, self.url, str(e), ))
            except NoResultFound:
                sg.logger.warning('No event found corresponding to the reverse hook %s (%s)' % (self.name, self.id, ))
bm_nw_provision_db.py 文件源码 项目:networking-hpe 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_all_bnp_switch_port_maps(context, filter_dict):
    """Get all switch port maps."""
    try:
        switchportmap = models.BNPSwitchPortMapping
        neutronport = models.BNPNeutronPort
        physwitch = models.BNPPhysicalSwitch
        query = context.session.query(switchportmap.neutron_port_id,
                                      switchportmap.switch_port_name,
                                      neutronport.lag_id,
                                      neutronport.segmentation_id,
                                      neutronport.access_type,
                                      neutronport.bind_status, physwitch.name)
        query = query.join(neutronport,
                           neutronport.neutron_port_id ==
                           switchportmap.neutron_port_id)
        query = query.join(physwitch, switchportmap.switch_id == physwitch.id)
        for key, value in filter_dict.items():
            query = query.filter(key == value)
        port_maps = query.all()
    except exc.NoResultFound:
        LOG.error(_LE("no switch port mappings found"))
        return
    return port_maps
bm_nw_provision_db.py 文件源码 项目:networking-hpe 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def update_bnp_phy_switch(context, sw_id, switch):
    """Update physical switch name."""
    try:
        with context.session.begin(subtransactions=True):
            (context.session.query(models.BNPPhysicalSwitch).filter_by(
                id=sw_id).update(
                    {'name': switch['name'],
                     'ip_address': switch['ip_address'],
                     'mac_address': switch['mac_address'],
                     'port_provisioning': switch['port_provisioning'],
                     'management_protocol': switch['management_protocol'],
                     'credentials': switch['credentials'],
                     'validation_result': switch['validation_result'],
                     'vendor': switch['vendor'],
                     'family': switch['family']},
                    synchronize_session=False))
    except exc.NoResultFound:
        LOG.error(_LE("no physical switch found for id: %s"), sw_id)
bm_nw_provision_db.py 文件源码 项目:networking-hpe 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def update_bnp_phys_switch_access_params(context, switch_id, params):
    """Update physical switch with access params."""
    try:
        with context.session.begin(subtransactions=True):
            (context.session.query(models.BNPPhysicalSwitch).filter_by(
                id=switch_id).update(
                    {'access_protocol': params['access_protocol'],
                     'write_community': params['write_community'],
                     'security_name': params['security_name'],
                     'auth_protocol': params['auth_protocol'],
                     'auth_key': params['auth_key'],
                     'priv_protocol': params['priv_protocol'],
                     'priv_key': params['priv_key'],
                     'security_level': params['security_level']},
                    synchronize_session=False))
    except exc.NoResultFound:
        LOG.error(_LE("no physical switch found for id: %s"), switch_id)
bm_nw_provision_db.py 文件源码 项目:networking-hpe 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def update_bnp_snmp_cred_by_id(context, cred_id, creds):
    """Update snmp switch credentials."""
    try:
        with context.session.begin(subtransactions=True):
            (context.session.query(models.BNPSNMPCredential).filter_by(
             id=cred_id).update(
                {'name': creds['name'],
                 'protocol_type': creds['protocol_type'],
                 'write_community': creds['write_community'],
                 'security_name': creds['security_name'],
                 'auth_protocol': creds['auth_protocol'],
                 'auth_key': creds['auth_key'],
                 'priv_protocol': creds['priv_protocol'],
                 'priv_key': creds['priv_key'],
                 'security_level': creds['security_level']},
             synchronize_session=False))
    except exc.NoResultFound:
        LOG.error(_LE("no snmp switch credentials found for id: %s"), cred_id)
base.py 文件源码 项目:marcotti-mls 作者: soccermetrics 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_id(self, model, **conditions):
        """
        Retrieve unique ID of record in database model that satisfies conditions.

        If no unique record exists, communicate error in log file and return None.

        :param model: Marcotti-MLS data model.
        :param conditions: Dictionary of fields/values that describe a record in model.
        :return: Unique ID of database record.
        """
        record_id = None
        try:
            record_id = self.session.query(model).filter_by(**conditions).one().id
        except NoResultFound:
            logger.error("{} has no records in Marcotti database for: {}".format(model.__name__, conditions))
        except MultipleResultsFound:
            logger.error("{} has multiple records in Marcotti database for: {}".format(model.__name__, conditions))
        return record_id
application.py 文件源码 项目:Sport-Item-Catalog 作者: msurmenok 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def getUserID(email):
    """
    Finds user id by his/her email
    Args:
        email (str): User's email.
    Returns:
        User id if user exists, otherwise None.
    """
    try:
        user = db_session.query(User).filter_by(email=email).one()
        return user.id
    except NoResultFound:
        return None


# At the end start Flask app.
repositories.py 文件源码 项目:experiment-manager 作者: softfire-eu 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def save(entity, _clazz=None):
    if _clazz:
        if hasattr(entity, 'id'):  # usually id is None so this method acs as normal save
            _id = entity.id
        else:
            _id = entity.name
        try:
            if _id:
                found = find(_clazz, _id)
                if isinstance(found, list):
                    for e in found:
                        delete(e)
                else:
                    if found:
                        delete(found)
        except NoResultFound:
            pass

    with get_db_session() as se:
        se.add(entity)
        se.commit()


问题


面经


文章

微信
公众号

扫码关注公众号