def get_user(self, effective=True):
"""Return `su` user, if `effective` else `user`"""
if self.user_id:
try:
if effective and self.su:
user = User.objects.get(id=self.su)
else:
user = User.objects.get(id=self.user_id)
return user
except me.DoesNotExist:
pass
return None
python类DoesNotExist()的实例源码
def user_from_session_id(session_id):
"""Returns user associated with given cookie session id"""
try:
user = SessionToken.objects.get(token=session_id).get_user()
if user is not None:
return user
except DoesNotExist:
pass
raise UserUnauthorizedError()
def create_dns_zone(request):
"""
Create a new DNS zone under a specific cloud.
---
"""
auth_context = auth_context_from_request(request)
cloud_id = request.matchdict['cloud']
auth_context.check_perm("cloud", "read", cloud_id)
auth_context.check_perm("cloud", "create_resources", cloud_id)
tags = auth_context.check_perm("zone", "add", None)
# Try to get the specific cloud for which we will create the zone.
try:
cloud = Cloud.objects.get(owner=auth_context.owner, id=cloud_id)
except me.DoesNotExist:
raise CloudNotFoundError
params = params_from_request(request)
new_zone = Zone.add(owner=cloud.owner, cloud=cloud, **params).as_dict()
if tags:
resolve_id_and_set_tags(auth_context.owner, 'zone', new_zone['id'],
tags, cloud_id=cloud_id)
# Schedule a UI update
trigger_session_update(auth_context.owner, ['zones'])
return new_zone
def create_dns_record(request):
"""
Create a new record under a specific zone
---
"""
auth_context = auth_context_from_request(request)
cloud_id = request.matchdict['cloud']
# Try to get the specific cloud for which we will create the zone.
try:
cloud = Cloud.objects.get(owner=auth_context.owner, id=cloud_id)
except me.DoesNotExist:
raise CloudNotFoundError
zone_id = request.matchdict['zone']
try:
zone = Zone.objects.get(owner=auth_context.owner, id=zone_id)
except Zone.DoesNotExist:
raise NotFoundError('Zone does not exist')
auth_context.check_perm("cloud", "read", cloud_id)
auth_context.check_perm("zone", "read", zone_id)
auth_context.check_perm("zone", "create_records", zone_id)
tags = auth_context.check_perm("record", "add", None)
# Get the params and create the new record
params = params_from_request(request)
dns_cls = RECORDS[params['type']]
rec = dns_cls.add(owner=auth_context.owner, zone=zone, **params).as_dict()
if tags:
resolve_id_and_set_tags(auth_context.owner, 'record', rec['id'], tags,
cloud_id=cloud_id, zone_id=zone_id)
# Schedule a UI update
trigger_session_update(auth_context.owner, ['zones'])
return rec
def delete_dns_record(request):
"""
Delete a specific DNS record under a zone.
---
"""
auth_context = auth_context_from_request(request)
cloud_id = request.matchdict['cloud']
zone_id = request.matchdict['zone']
record_id = request.matchdict['record']
try:
cloud = Cloud.objects.get(owner=auth_context.owner, id=cloud_id)
except me.DoesNotExist:
raise CloudNotFoundError
try:
zone = Zone.objects.get(owner=auth_context.owner, id=zone_id)
except Zone.DoesNotExist:
raise NotFoundError('Zone does not exist')
try:
record = Record.objects.get(zone=zone, id=record_id)
except Record.DoesNotExist:
raise NotFoundError('Record does not exist')
auth_context.check_perm("record", "remove", record_id)
record.ctl.delete_record()
# Schedule a UI update
trigger_session_update(auth_context.owner, ['zones'])
return OK
def enabled(self):
try:
return (super(CloudPollingSchedule, self).enabled and
self.cloud.enabled and not self.cloud.deleted)
except me.DoesNotExist:
log.error('Cannot get cloud for polling schedule.')
return False
def interval(self):
try:
if self.default_interval.every != self.cloud.polling_interval:
log.warning("Schedule has different interval from cloud, "
"fixing")
self.default_interval.every = self.cloud.polling_interval
self.save()
return super(CloudPollingSchedule, self).interval
except me.DoesNotExist:
log.error('Cannot get interval. Cloud is missing')
return PollingInterval(every=0)
def show_script(request):
"""
Show script details and job history.
READ permission required on script.
---
script_id:
type: string
required: true
in: path
"""
script_id = request.matchdict['script_id']
auth_context = auth_context_from_request(request)
if not script_id:
raise RequiredParameterMissingError('No script id provided')
try:
script = Script.objects.get(owner=auth_context.owner,
id=script_id, deleted=None)
except me.DoesNotExist:
raise NotFoundError('Script id not found')
# SEC require READ permission on SCRIPT
auth_context.check_perm('script', 'read', script_id)
ret_dict = script.as_dict()
jobs = get_stories('job', auth_context.owner.id, script_id=script_id)
ret_dict['jobs'] = [job['job_id'] for job in jobs]
return ret_dict
def download_script(request):
"""
Download script file or archive.
READ permission required on script.
---
script_id:
type: string
required: true
in: path
"""
script_id = request.matchdict['script_id']
auth_context = auth_context_from_request(request)
if not script_id:
raise RequiredParameterMissingError('No script id provided')
try:
script = Script.objects.get(owner=auth_context.owner,
id=script_id, deleted=None)
except me.DoesNotExist:
raise NotFoundError('Script id not found')
# SEC require READ permission on SCRIPT
auth_context.check_perm('script', 'read', script_id)
try:
return script.ctl.get_file()
except BadRequestError():
return Response("Unable to find: {}".format(request.path_info))
# SEC
def edit_script(request):
"""
Edit script (rename only as for now)
EDIT permission required on script.
---
script_id:
in: path
required: true
type: string
new_name:
type: string
required: true
new_description:
type: string
"""
script_id = request.matchdict['script_id']
params = params_from_request(request)
new_name = params.get('new_name')
new_description = params.get('new_description')
auth_context = auth_context_from_request(request)
# SEC require EDIT permission on script
auth_context.check_perm('script', 'edit', script_id)
try:
script = Script.objects.get(owner=auth_context.owner,
id=script_id, deleted=None)
except me.DoesNotExist:
raise NotFoundError('Script id not found')
if not new_name:
raise RequiredParameterMissingError('No new name provided')
script.ctl.edit(new_name, new_description)
ret = {'new_name': new_name}
if isinstance(new_description, basestring):
ret['new_description'] = new_description
return ret
# SEC
def url_script(request):
"""
Returns to a mist authenticated user,
a self-auth/signed url for fetching a script's file.
READ permission required on script.
---
script_id:
in: path
required: true
type: string
"""
auth_context = auth_context_from_request(request)
script_id = request.matchdict['script_id']
try:
Script.objects.get(owner=auth_context.owner,
id=script_id, deleted=None)
except Script.DoesNotExist:
raise NotFoundError('Script does not exist.')
# SEC require READ permission on script
auth_context.check_perm('script', 'read', script_id)
# build HMAC and inject into the `curl` command
hmac_params = {'action': 'fetch_script', 'object_id': script_id}
expires_in = 60 * 15
mac_sign(hmac_params, expires_in)
url = "%s/api/v1/fetch" % config.CORE_URI
encode_params = urllib.urlencode(hmac_params)
r_url = url + '?' + encode_params
return r_url
def fetch_script(script_id):
"""Used by mist.api.views.fetch"""
try:
script = Script.objects.get(id=script_id, deleted=None)
except Script.DoesNotExist:
raise NotFoundError('Script does not exist')
return script.ctl.get_file()
def request_whitelist_ip(request):
"""
User logs in successfully but it's from a non-whitelisted ip.
They click on a link 'whitelist current ip', which sends an email
to their account.
"""
try:
email = user_from_request(request).email
except UserUnauthorizedError:
email = params_from_request(request).get('email', '')
try:
user = User.objects.get(email=email)
except (UserNotFoundError, me.DoesNotExist):
# still return OK so that there's no leak on valid email
return OK
token = get_secure_rand_token()
user.whitelist_ip_token = token
user.whitelist_ip_token_created = time()
user.whitelist_ip_token_ip_addr = ip_from_request(request)
log.debug("will now save (whitelist_ip)")
user.save()
subject = config.WHITELIST_IP_EMAIL_SUBJECT
body = config.WHITELIST_IP_EMAIL_BODY
body = body % ( (user.first_name or "") + " " + (user.last_name or ""),
config.CORE_URI,
encrypt("%s:%s" % (token, email), config.SECRET),
user.whitelist_ip_token_ip_addr,
config.CORE_URI)
if not send_email(subject, body, email):
log.info("Failed to send email to user %s for whitelist IP link" %
user.email)
raise ServiceUnavailableError()
log.info("Sent email to user %s\n%s" % (email, body))
return OK
# SEC
def list_subnets(request):
"""
List subnets of a cloud
Currently supports the EC2, GCE and OpenStack clouds.
For other providers this returns an empty list.
READ permission required on cloud.
---
cloud:
in: path
required: true
type: string
network_id:
in: path
required: true
description: The DB ID of the network whose subnets will be returned
type: string
"""
cloud_id = request.matchdict['cloud']
network_id = request.matchdict['network']
auth_context = auth_context_from_request(request)
auth_context.check_perm("cloud", "read", cloud_id)
try:
cloud = Cloud.objects.get(owner=auth_context.owner, id=cloud_id)
except Cloud.DoesNotExist:
raise CloudNotFoundError
try:
network = Network.objects.get(cloud=cloud, id=network_id)
except Network.DoesNotExist:
raise NetworkNotFoundError
subnets = methods.list_subnets(cloud, network=network)
return subnets
def find_metrics(request):
"""
Get metrics of a machine
Get all metrics associated with specific machine
READ permission required on cloud.
READ permission required on machine.
---
cloud:
in: path
required: true
type: string
machine:
in: path
required: true
type: string
"""
raise NotImplementedError()
cloud_id = request.matchdict['cloud']
machine_id = request.matchdict['machine']
auth_context = auth_context_from_request(request)
auth_context.check_perm("cloud", "read", cloud_id)
try:
machine = Machine.objects.get(cloud=cloud_id, machine_id=machine_id)
machine_uuid = machine.id
except me.DoesNotExist:
machine_uuid = ""
auth_context.check_perm("machine", "read", machine_uuid)
return methods.find_metrics(auth_context.owner, cloud_id, machine_id)
def assoc_metric(request):
"""
Associate metric with machine
Associate metric with specific machine
READ permission required on cloud.
EDIT_GRAPHS permission required on machine.
---
cloud:
in: path
required: true
type: string
machine:
in: path
required: true
type: string
metric_id:
description: ' Metric_id '
type: string
"""
raise NotImplementedError()
cloud_id = request.matchdict['cloud']
machine_id = request.matchdict['machine']
params = params_from_request(request)
metric_id = params.get('metric_id')
if not metric_id:
raise RequiredParameterMissingError('metric_id')
auth_context = auth_context_from_request(request)
auth_context.check_perm("cloud", "read", cloud_id)
try:
machine = Machine.objects.get(cloud=cloud_id, machine_id=machine_id)
machine_uuid = machine.id
except me.DoesNotExist:
machine_uuid = ""
auth_context.check_perm("machine", "edit_graphs", machine_uuid)
methods.assoc_metric(auth_context.owner, cloud_id, machine_id, metric_id)
return {}
def delete_avatar(request):
"""
Deletes the requested avatar
---
avatar:
description: 'Avatar Id'
in: path
required: true
type: string
"""
avatar_id = request.matchdict['avatar']
auth_context = auth_context_from_request(request)
try:
avatar = Avatar.objects.get(id=avatar_id, owner=auth_context.user)
except me.DoesNotExist:
raise NotFoundError()
try:
org = Owner.objects.get(avatar=avatar_id)
org.avatar = ''
org.save()
except me.DoesNotExist:
pass
avatar.delete()
trigger_session_update(auth_context.owner, ["org"])
return OK
def list_user_organizations(request):
"""
List user's organizations
List all the organizations where user is a member
"""
try:
user = user_from_request(request)
except me.DoesNotExist:
raise UnauthorizedError()
return [{'id': org.id, 'name': org.name}
for org in Organization.objects(members=user)]
# SEC
def show_team(request):
"""
Show team.
Only available to organization owners.
---
org_id:
description: The team's org id
type: string
required: true
team_id:
description: The team's id
type: string
"""
auth_context = auth_context_from_request(request)
org_id = request.matchdict['org_id']
team_id = request.matchdict['team_id']
# SEC check if owner
if not (auth_context.org and auth_context.is_owner()
and auth_context.org.id == org_id):
raise OrganizationAuthorizationFailure()
# Check if team entry exists
try:
team = auth_context.org.get_team_by_id(team_id)
except me.DoesNotExist:
raise TeamNotFound()
return team.as_dict()
# SEC
def delete(self):
super(Machine, self).delete()
mist.api.tag.models.Tag.objects(resource=self).delete()
try:
self.owner.mapper.remove(self)
except (AttributeError, me.DoesNotExist) as exc:
log.error(exc)