python类update()的实例源码

session.py 文件源码 项目:smc-python 作者: gabstopper 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def import_submodules(package, recursive=True):
    """
    Import all submodules of a module, recursively,
    including subpackages.

    From http://stackoverflow.com/questions/3365740/how-to-import-all-submodules

    :param package: package (name or actual module)
    :type package: str | module
    :rtype: dict[str, types.ModuleType]
    """
    import importlib
    import pkgutil
    if isinstance(package, str):
        package = importlib.import_module(package)
    results = {}
    for _loader, name, is_pkg in pkgutil.walk_packages(package.__path__):
        full_name = package.__name__ + '.' + name
        results[full_name] = importlib.import_module(full_name)
        if recursive and is_pkg:
            results.update(import_submodules(full_name))
cytoscape_helper.py 文件源码 项目:viewX-vscode 作者: danielkupco 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def serialize_json(obj):
    """
    A helper method for serializing Cytoscape.js elements in desired json form.
    :param obj: Object to serialize
    :return: JSON string representation of obj
    """
    # handle concrete class serialization
    if hasattr(obj, '__metaclass__') and obj.__metaclass__.__name__ == 'Element':
        json = {}  # { '__classname__' : type(obj).__name__ }
        json.update(vars(obj))
        json.pop('__metaclass__', None)  # remove __metaclass__ from json
    # handle abstract class serialization
    elif obj.__class__.__name__ == 'type' and obj.__name__ == 'Element':
        json = obj.__name__
    elif obj.__class__.__name__ == 'ViewStyle':
        json = {}
        json.update(vars(obj))
    else:
        json = obj.__str__()
    return json
models.py 文件源码 项目:activitypub-example 作者: tOkeshu 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def to_activitystream(self):
        json = {
            "type": "Person",
            "id": self.uris.id,
            "name": self.name,
            "preferredUsername": self.username,
        }

        if not self.remote:
            json.update({
                "following": self.uris.following,
                "followers": self.uris.followers,
                "outbox": self.uris.outbox,
                "inbox": self.uris.inbox,
            })
        return json
api.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _encode_send_job(self, job):
        json = self._encode_base_job(job)
        json.update({
            'subject': job.subject,
            'messageId': job.message_id
        })
        return json
api.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def _encode_remind_job(self, job):
        json = self._encode_base_job(job)
        json.update({
            'subject': job.subject,
            'threadId': job.thread_id,
            'knownMessageIds': job.known_message_ids,
            'onlyIfNoreply': job.only_if_noreply,
            'disabledReply': job.disabled_reply
        })
        return json
session.py 文件源码 项目:smc-python 作者: gabstopper 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def switch_domain(self, domain):
        """
        Switch from one domain to another. You can call session.login() with a domain
        key value to log directly into the domain of choice or alternatively switch
        from domain to domain. The user must have permissions to the domain or
        unauthorized will be returned. 
        ::

            session.login() # Log in to 'Shared Domain'
            ...
            session.switch_domain('MyDomain')

        :raises SMCConnectionError: Error logging in to specified domain.
            This typically means the domain either doesn't exist or the
            user does not have privileges to that domain.
        """
        if self.domain != domain:
            # Do we already have a session
            if domain not in self._sessions:
                logger.info('Creating session for domain: %s', domain)
                credentials = self._get_login_params()
                credentials.update(domain=domain)
                self.login(**credentials)
            else:
                logger.info('Switching to existing domain session: %s', domain)
                self._session = self._sessions.get(domain)
                self._domain = domain
session.py 文件源码 项目:smc-python 作者: gabstopper 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _get_login_params(self):
        """
        Spec for login parameters
        """
        credentials = dict(
            url=self.url,
            api_version=self.api_version,
            timeout=self.timeout,
            verify=self._session.verify,
            domain=self.domain)
        credentials.update(self.credential.get_credentials())
        credentials.update(**self._extra_args)
        return credentials
core.py 文件源码 项目:coralillo 作者: getfleety 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def save(self):
        ''' Persists this object to the database. Each field knows how to store
        itself so we don't have to worry about it '''
        redis = type(self).get_redis()
        pipe = to_pipeline(redis)

        pipe.hset(self.key(), 'id', self.id)

        for fieldname, field in self.proxy:
            if not isinstance(field, Relation):
                field.save(getattr(self, fieldname), pipe, commit=False)

        pipe.sadd(type(self).members_key(), self.id)

        pipe.execute()

        if self.notify:
            data = json.dumps({
                'event': 'create' if not self._persisted else 'update',
                'data': self.to_json(),
            })
            redis.publish(type(self).cls_key(), data)
            redis.publish(self.key(), data)

        self._persisted = True

        return self
core.py 文件源码 项目:coralillo 作者: getfleety 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def update(self, **kwargs):
        ''' validates the given data against this object's rules and then
        updates '''
        redis = type(self).get_redis()
        errors = ValidationErrors()

        for fieldname, field in self.proxy:
            if not field.fillable:
                continue

            given = kwargs.get(fieldname)

            if given is None:
                continue

            try:
                value = field.validate(kwargs.get(fieldname), redis)
            except BadField as e:
                errors.append(e)
                continue

            setattr(
                self,
                fieldname,
                value
            )

        if errors.has_errors():
            raise errors

        return self.save()
cytoscape_helper.py 文件源码 项目:viewX-vscode 作者: danielkupco 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def add_data(self, key, value):
        # self.data.update({key : value})
        self.data[key] = value
objects.py 文件源码 项目:activitypub-example 作者: tOkeshu 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def to_json(self, **kwargs):
        json = Object.to_json(self, **kwargs)
        items = [item.to_json() if isinstance(item, Object) else item
                 for item in self.items]
        json.update({
            "items": items
        })
        return json
models.py 文件源码 项目:activitypub-example 作者: tOkeshu 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def to_activitystream(self):
        payload = self.payload.decode("utf-8")
        data = json.loads(payload)
        data.update({
            "id": self.uris.id
        })
        return data
core.py 文件源码 项目:coralillo 作者: getfleety 项目源码 文件源码 阅读 54 收藏 0 点赞 0 评论 0
def to_json(self, *, fields=None, embed=None):
        ''' Serializes this model to a JSON representation so it can be sent
        via an HTTP REST API '''
        json = dict()

        if fields is None or 'id' in fields:
            json['id'] = self.id

        if fields is None or '_type' in fields:
            json['_type'] = type(self).cls_key()

        def fieldfilter(fieldtuple):
            return\
                not fieldtuple[1].private and\
                not isinstance(fieldtuple[1], Relation) and (
                    fields is None or fieldtuple[0] in fields
                )

        json.update(dict(starmap(
            lambda fn, f: (fn, f.to_json(getattr(self, fn))),
            filter(
                fieldfilter,
                self.proxy
            )
        )))

        if embed:
            for requested_relation in parse_embed(embed):
                relation_name, subfields = requested_relation

                if not hasattr(self.proxy, relation_name):
                    continue

                relation = getattr(self.proxy, relation_name)

                if isinstance(relation, ForeignIdRelation):
                    item = relation.get()

                    if item is not None:
                        json[relation_name] = item.to_json(fields=subfields)
                    else:
                        json[relation_name] = None
                elif isinstance(relation, MultipleRelation):
                    json[relation_name] = list(map(lambda o: o.to_json(fields=subfields), relation.get()))

        return json


问题


面经


文章

微信
公众号

扫码关注公众号