def import_submodules(package, recursive=True):
"""
Import all submodules of a module, recursively,
including subpackages.
From http://stackoverflow.com/questions/3365740/how-to-import-all-submodules
:param package: package (name or actual module)
:type package: str | module
:rtype: dict[str, types.ModuleType]
"""
import importlib
import pkgutil
if isinstance(package, str):
package = importlib.import_module(package)
results = {}
for _loader, name, is_pkg in pkgutil.walk_packages(package.__path__):
full_name = package.__name__ + '.' + name
results[full_name] = importlib.import_module(full_name)
if recursive and is_pkg:
results.update(import_submodules(full_name))
python类update()的实例源码
def serialize_json(obj):
"""
A helper method for serializing Cytoscape.js elements in desired json form.
:param obj: Object to serialize
:return: JSON string representation of obj
"""
# handle concrete class serialization
if hasattr(obj, '__metaclass__') and obj.__metaclass__.__name__ == 'Element':
json = {} # { '__classname__' : type(obj).__name__ }
json.update(vars(obj))
json.pop('__metaclass__', None) # remove __metaclass__ from json
# handle abstract class serialization
elif obj.__class__.__name__ == 'type' and obj.__name__ == 'Element':
json = obj.__name__
elif obj.__class__.__name__ == 'ViewStyle':
json = {}
json.update(vars(obj))
else:
json = obj.__str__()
return json
def to_activitystream(self):
json = {
"type": "Person",
"id": self.uris.id,
"name": self.name,
"preferredUsername": self.username,
}
if not self.remote:
json.update({
"following": self.uris.following,
"followers": self.uris.followers,
"outbox": self.uris.outbox,
"inbox": self.uris.inbox,
})
return json
def _encode_send_job(self, job):
json = self._encode_base_job(job)
json.update({
'subject': job.subject,
'messageId': job.message_id
})
return json
def _encode_remind_job(self, job):
json = self._encode_base_job(job)
json.update({
'subject': job.subject,
'threadId': job.thread_id,
'knownMessageIds': job.known_message_ids,
'onlyIfNoreply': job.only_if_noreply,
'disabledReply': job.disabled_reply
})
return json
def switch_domain(self, domain):
"""
Switch from one domain to another. You can call session.login() with a domain
key value to log directly into the domain of choice or alternatively switch
from domain to domain. The user must have permissions to the domain or
unauthorized will be returned.
::
session.login() # Log in to 'Shared Domain'
...
session.switch_domain('MyDomain')
:raises SMCConnectionError: Error logging in to specified domain.
This typically means the domain either doesn't exist or the
user does not have privileges to that domain.
"""
if self.domain != domain:
# Do we already have a session
if domain not in self._sessions:
logger.info('Creating session for domain: %s', domain)
credentials = self._get_login_params()
credentials.update(domain=domain)
self.login(**credentials)
else:
logger.info('Switching to existing domain session: %s', domain)
self._session = self._sessions.get(domain)
self._domain = domain
def _get_login_params(self):
"""
Spec for login parameters
"""
credentials = dict(
url=self.url,
api_version=self.api_version,
timeout=self.timeout,
verify=self._session.verify,
domain=self.domain)
credentials.update(self.credential.get_credentials())
credentials.update(**self._extra_args)
return credentials
def save(self):
''' Persists this object to the database. Each field knows how to store
itself so we don't have to worry about it '''
redis = type(self).get_redis()
pipe = to_pipeline(redis)
pipe.hset(self.key(), 'id', self.id)
for fieldname, field in self.proxy:
if not isinstance(field, Relation):
field.save(getattr(self, fieldname), pipe, commit=False)
pipe.sadd(type(self).members_key(), self.id)
pipe.execute()
if self.notify:
data = json.dumps({
'event': 'create' if not self._persisted else 'update',
'data': self.to_json(),
})
redis.publish(type(self).cls_key(), data)
redis.publish(self.key(), data)
self._persisted = True
return self
def update(self, **kwargs):
''' validates the given data against this object's rules and then
updates '''
redis = type(self).get_redis()
errors = ValidationErrors()
for fieldname, field in self.proxy:
if not field.fillable:
continue
given = kwargs.get(fieldname)
if given is None:
continue
try:
value = field.validate(kwargs.get(fieldname), redis)
except BadField as e:
errors.append(e)
continue
setattr(
self,
fieldname,
value
)
if errors.has_errors():
raise errors
return self.save()
def add_data(self, key, value):
# self.data.update({key : value})
self.data[key] = value
def to_json(self, **kwargs):
json = Object.to_json(self, **kwargs)
items = [item.to_json() if isinstance(item, Object) else item
for item in self.items]
json.update({
"items": items
})
return json
def to_activitystream(self):
payload = self.payload.decode("utf-8")
data = json.loads(payload)
data.update({
"id": self.uris.id
})
return data
def to_json(self, *, fields=None, embed=None):
''' Serializes this model to a JSON representation so it can be sent
via an HTTP REST API '''
json = dict()
if fields is None or 'id' in fields:
json['id'] = self.id
if fields is None or '_type' in fields:
json['_type'] = type(self).cls_key()
def fieldfilter(fieldtuple):
return\
not fieldtuple[1].private and\
not isinstance(fieldtuple[1], Relation) and (
fields is None or fieldtuple[0] in fields
)
json.update(dict(starmap(
lambda fn, f: (fn, f.to_json(getattr(self, fn))),
filter(
fieldfilter,
self.proxy
)
)))
if embed:
for requested_relation in parse_embed(embed):
relation_name, subfields = requested_relation
if not hasattr(self.proxy, relation_name):
continue
relation = getattr(self.proxy, relation_name)
if isinstance(relation, ForeignIdRelation):
item = relation.get()
if item is not None:
json[relation_name] = item.to_json(fields=subfields)
else:
json[relation_name] = None
elif isinstance(relation, MultipleRelation):
json[relation_name] = list(map(lambda o: o.to_json(fields=subfields), relation.get()))
return json