def api_call(self, url, http_method, data):
headers = {"Content-Type" : "application/json", "x-api-key" : self.api_key,
"authorization" : "Bearer " + self.access_token}
r = None
if http_method is PyCurlVerbs.POST:
r = requests.post(url, data=json.dumps(data), headers=headers)
elif http_method is PyCurlVerbs.DELETE:
r = requests.delete(url, headers=headers)
elif http_method is PyCurlVerbs.PUT:
r = requests.put(url, data=json.dumps(data), headers=headers)
elif http_method is PyCurlVerbs.GET:
r = requests.get(url, headers=headers)
return r
python类delete()的实例源码
def delete(api_key_id):
"""
Delete an api key with the provided ID
"""
action, url = _url((api_key_id,))
headers = api_utils.generate_headers('owner', method='DELETE', body='', action=action)
try:
response = requests.delete(url, headers=headers)
if response_utils.response_error(response):
sys.stderr.write('Deleting api key failed.')
sys.exit(1)
elif response.status_code == 204:
sys.stdout.write('Deleted api key with id: %s \n' % api_key_id)
except requests.exceptions.RequestException as error:
sys.stderr.write(error)
sys.exit(1)
def call_api(self, method, endpoint, payload):
url = urlparse.urljoin(self.api_base_url, endpoint)
if method == 'POST':
response = requests.post(url, data=payload)
elif method == 'delete':
response = requests.delete(url)
elif method == 'put':
response = requests.put(url, data=payload)
else:
if self.api_key:
payload.update({'api_token': self.api_key})
response = requests.get(url, params=payload)
content = json.loads(response.content)
return content
def delete_pipeline(app='', pipeline_name=''):
"""Delete _pipeline_name_ from _app_."""
safe_pipeline_name = normalize_pipeline_name(name=pipeline_name)
url = murl.Url(API_URL)
LOG.warning('Deleting Pipeline: %s', safe_pipeline_name)
url.path = 'pipelines/{app}/{pipeline}'.format(app=app, pipeline=safe_pipeline_name)
response = requests.delete(url.url, verify=GATE_CA_BUNDLE, cert=GATE_CLIENT_CERT)
if not response.ok:
LOG.debug('Delete response code: %d', response.status_code)
if response.status_code == requests.status_codes.codes['method_not_allowed']:
raise SpinnakerPipelineDeletionFailed('Failed to delete "{0}" from "{1}", '
'possibly invalid Pipeline name.'.format(safe_pipeline_name, app))
else:
LOG.debug('Pipeline missing, no delete required.')
LOG.debug('Deleted "%s" Pipeline response:\n%s', safe_pipeline_name, response.text)
return response.text
def delete_log(log_id):
"""
Delete a log with the provided log ID
"""
headers = api_utils.generate_headers('rw')
try:
response = requests.delete(_url(('logs', log_id))[1], headers=headers)
if response_utils.response_error(response):
sys.stderr.write('Delete log failed.')
sys.exit(1)
elif response.status_code == 204:
sys.stdout.write('Deleted log with id: %s \n' % log_id)
except requests.exceptions.RequestException as error:
sys.stderr.write(error)
sys.exit(1)
def delete_team(team_id):
"""
Delete a team with the provided team ID.
"""
headers = api_utils.generate_headers('rw')
try:
response = requests.delete(_url((team_id,))[1], headers=headers)
if response_utils.response_error(response): # Check response has no errors
click.echo('Delete team failed.', err=True)
sys.exit(1)
elif response.status_code == 204:
click.echo('Deleted team with id: %s.' % team_id)
except requests.exceptions.RequestException as error:
click.echo(error, err=True)
sys.exit(1)
def remove_show(self, beid):
"""Remove a given show from sonarr.
It will not delete files. The backend ID we're given is not the ID we need, so the show
is looked up first. It will only delete shows if the DELETE_SHOWS config value is set.
Args:
beid (int): The TVDB ID of the show.
"""
_logger.debug("Entering remove_show. Getting all shows being watched from sonarr.")
if self._delete_shows:
_logger.debug("Config line set to delete shows from sonarr. Continuing.")
shows = self.get_watching_shows()
_logger.debug("Got all shows. Attempting to find show with ID {0}".format(beid))
show = [x for x in shows if x['beid'] == beid][0]
_logger.debug("Found show {0}. Deleting.".format(show['title']))
requests.delete("{0}/api/series/{1}?apikey={2}".format(self.url, show['id'], self.api_key))
_logger.debug("Config line set to leave shows in sonarr, exiting with no changes.")
def post_account_metadata(self, params, action):
"""
post account's metadata for add, delete
:param params: params should iterable by key, value
:param action: means add or delete ['add', 'delete']
:return: result status code
"""
# post account's metadata
url = self.url
headers = self.base_headers
for key, value in params.iteritems():
if action == 'add':
headers['X-Account-Meta-' + key] = value
else:
headers['X-Remove-Account-Meta-' + key] = value
response = requests.post(url, headers=headers)
return response.status_code
def post_container_metadata(self, container_name, params, action):
"""
post account's metadata for add, delete
:param params: params should iterable by key, value
:param action: means add or delete ['add', 'delete']
:return: result status code
"""
# post account's metadata
url = self.url + '/' + container_name
headers = self.base_headers
for key, value in params.iteritems():
if action == 'add':
headers['X-Container-Meta-' + key] = value
else:
headers['X-Remove-Container-Meta-' + key] = value
response = requests.post(url, headers=headers)
return response.status_code
def post_object_metadata(self, container_name, file_name, params, action):
"""
post account's metadata for add, delete
:param params: params should iterable by key, value
:param action: means add or delete ['add', 'delete']
:return: result status code
"""
# post account's metadata
url = self.url + '/' + container_name + '/' + file_name
headers = self.base_headers
for key, value in params.iteritems():
if action == 'add':
headers['X-Container-Meta-' + key] = value
else:
headers['X-Remove-Container-Meta-' + key] = value
response = check_response_status(
requests.post(url, headers=headers)
)
return response.status_code
def post_container_metadata(self, container_name, params, action):
"""
post account's metadata for add, delete
:param container_name: target container name
:param params: params should iterable by key, value
:param action: means add or delete ['add', 'delete']
:return: result status code
"""
# post account's metadata
url = self.url + '/' + container_name
headers = self.base_headers
for key, value in params.iteritems():
if action == 'add':
headers['X-Container-Meta-' + key] = value
else:
headers['X-Remove-Container-Meta-' + key] = value
response = requests.post(url, headers=headers)
return response.status_code
def post_object_metadata(self, container_name, file_name, params, action):
"""
post account's metadata for add, delete
:param params: params should iterable by key, value
:param action: means add or delete ['add', 'delete']
:return: result status code
"""
# post account's metadata
url = self.url + '/' + container_name + '/' + file_name
headers = self.base_headers
for key, value in params.iteritems():
if action == 'add':
headers['X-Container-Meta-' + key] = value
else:
headers['X-Remove-Container-Meta-' + key] = value
response = check_response_status(
requests.post(url, headers=headers)
)
return response.status_code
def delete_post(self, pk, force=False):
"""
Delete a Post.
Arguments
---------
pk : int
The post id you want to delete.
force : bool
Whether to bypass trash and force deletion.
"""
resp = self._delete('posts/{0}'.format(pk), params=locals())
if resp.status_code == 200:
return True
else:
raise Exception(resp.json())
# Post Reivion Methods
def deladmin(apikey, orgid, adminid, suppressprint=False):
#
# Confirm API Key has Admin Access Otherwise Raise Error
#
__hasorgaccess(apikey, orgid)
calltype = 'Administrator'
delurl = '{0}/organizations/{1}/admins/{2}'.format(str(base_url), str(orgid), str(adminid))
headers = {
'x-cisco-meraki-api-key': format(str(apikey)),
'Content-Type': 'application/json'
}
dashboard = requests.delete(delurl, headers=headers)
#
# Call return handler function to parse Dashboard response
#
result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
return result
### CLIENTS ###
# List the clients of a device, up to a maximum of a month ago. The usage of each client is returned in kilobytes. If the device is a switch, the switchport is returned; otherwise the switchport field is null.
# https://dashboard.meraki.com/api_docs#list-the-clients-of-a-device-up-to-a-maximum-of-a-month-ago
def deltemplate(apikey, orgid, templateid, suppressprint=False):
#
# Confirm API Key has Admin Access Otherwise Raise Error
#
__hasorgaccess(apikey, orgid)
calltype = 'Template'
delurl = '{0}/organizations/{1}/configTemplates/{2}'.format(str(base_url), str(orgid), str(templateid))
headers = {
'x-cisco-meraki-api-key': format(str(apikey)),
'Content-Type': 'application/json'
}
dashboard = requests.delete(delurl, headers=headers)
#
# Call return handler function to parse Dashboard response
#
result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
return result
### DEVICES ###
# List the devices in a network
# https://dashboard.meraki.com/api_docs#list-the-devices-in-a-network
def delnetwork(apikey, networkid, suppressprint=False):
calltype = 'Network'
delurl = '{0}/networks/{1}'.format(str(base_url), str(networkid))
headers = {
'x-cisco-meraki-api-key': format(str(apikey)),
'Content-Type': 'application/json'
}
dashboard = requests.delete(delurl, headers=headers)
#
# Call return handler function to parse Dashboard response
#
result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
return result
# Bind a network to a template.
# https://dashboard.meraki.com/api_docs#bind-a-network-to-a-template
def delphone(apikey, networkid, serial, suppressprint=False):
calltype = 'Phone Assignment'
delurl = '{0}/networks/{1}/phoneAssignments/{2}'.format(str(base_url), str(networkid), str(serial))
headers = {
'x-cisco-meraki-api-key': format(str(apikey)),
'Content-Type': 'application/json'
}
dashboard = requests.delete(delurl, headers=headers)
result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
return result
### PHONE CONTACTS ###
# List the phone contacts in a network
# https://dashboard.meraki.com/api_docs#list-the-phone-contacts-in-a-network
def delcontact(apikey, networkid, contactid, suppressprint=False):
calltype = 'Phone Contact'
delurl = '{0}/networks/{1}/phoneContacts/{2}'.format(str(base_url), str(networkid), str(contactid))
headers = {
'x-cisco-meraki-api-key': format(str(apikey)),
'Content-Type': 'application/json'
}
dashboard = requests.delete(delurl, headers=headers)
result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
return result
### PHONE NUMBERS ###
# List all the phone numbers in a network
# https://dashboard.meraki.com/api_docs#list-all-the-phone-numbers-in-a-network
def delsamlrole(apikey, orgid, roleid, suppressprint=False):
#
# Confirm API Key has Admin Access Otherwise Raise Error
#
__hasorgaccess(apikey, orgid)
calltype = 'SAML Role'
delurl = '{0}/organizations/{1}/samlRoles/{2}'.format(str(base_url), str(orgid), str(roleid))
headers = {
'x-cisco-meraki-api-key': format(str(apikey)),
'Content-Type': 'application/json'
}
dashboard = requests.delete(delurl, headers=headers)
#
# Call return handler function to parse Dashboard response
#
result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
return result
### SM (Systems Manager) ###
# List the devices enrolled in an SM network with various specified fields and filters
# https://dashboard.meraki.com/api_docs#list-the-devices-enrolled-in-an-sm-network-with-various-specified-fields-and-filters
def addstaticroute(apikey, networkid, name, subnet, ip, suppressprint=False):
calltype = 'Static Route'
posturl = '{0}/networks/{1}/staticRoutes'.format(str(base_url), str(networkid))
headers = {
'x-cisco-meraki-api-key': format(str(apikey)),
'Content-Type': 'application/json'
}
postdata = {
'name': format(str(name)),
'subnet': format(str(subnet)),
'gatewayIp': format(str(ip))
}
dashboard = requests.post(posturl, data=json.dumps(postdata), headers=headers)
result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
return result
# Delete a static route from a network
# https://dashboard.meraki.com/api_docs#delete-a-static-route-from-a-network
def delstaticroute(apikey, networkid, routeid, suppressprint=False):
calltype = 'Static Route'
delurl = '{0}/networks/{1}/staticRoutes/{2}'.format(str(base_url), str(networkid), str(routeid))
headers = {
'x-cisco-meraki-api-key': format(str(apikey)),
'Content-Type': 'application/json'
}
dashboard = requests.delete(delurl, headers=headers)
result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
return result
### SWITCH PORTS ###
# List the switch ports for a switch
# https://dashboard.meraki.com/api_docs#list-the-switch-ports-for-a-switch
def delvlan(apikey, networkid, vlanid, suppressprint=False):
calltype = 'VLAN'
delurl = '{0}/networks/{1}/vlans/{2}'.format(str(base_url), str(networkid), str(vlanid))
headers = {
'x-cisco-meraki-api-key': format(str(apikey)),
'Content-Type': 'application/json'
}
dashboard = requests.delete(delurl, headers=headers)
#
# Call return handler function to parse Dashboard response
#
result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
return result
# MX performance score
def deltemplate(apikey, orgid, templateid, suppressprint=False):
#
# Confirm API Key has Admin Access Otherwise Raise Error
#
__hasorgaccess(apikey, orgid)
calltype = 'Template'
delurl = '{0}/organizations/{1}/configTemplates/{2}'.format(str(base_url), str(orgid), str(templateid))
headers = {
'x-cisco-meraki-api-key': format(str(apikey)),
'Content-Type': 'application/json'
}
dashboard = requests.delete(delurl, headers=headers)
#
# Call return handler function to parse Dashboard response
#
result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
return result
def delsamlrole(apikey, orgid, roleid, suppressprint=False):
#
# Confirm API Key has Admin Access Otherwise Raise Error
#
__hasorgaccess(apikey, orgid)
calltype = 'SAML Role'
delurl = '{0}/organizations/{1}/samlRoles/{2}'.format(str(base_url), str(orgid), str(roleid))
headers = {
'x-cisco-meraki-api-key': format(str(apikey)),
'Content-Type': 'application/json'
}
dashboard = requests.delete(delurl, headers=headers)
#
# Call return handler function to parse Dashboard response
#
result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
return result
def deladmin(apikey, orgid, adminid, suppressprint=False):
#
# Confirm API Key has Admin Access Otherwise Raise Error
#
__hasorgaccess(apikey, orgid)
calltype = 'Administrator'
delurl = '{0}/organizations/{1}/admins/{2}'.format(str(base_url), str(orgid), str(adminid))
headers = {
'x-cisco-meraki-api-key': format(str(apikey)),
'Content-Type': 'application/json'
}
dashboard = requests.delete(delurl, headers=headers)
#
# Call return handler function to parse Dashboard response
#
result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
return result
def del_admin(self, admin_id):
""" Delete a specified admin account.
Args:
admin_id: ID string or email of the admin to be deleted.
Returns:
deleted: The request object of the deleted admin, or None if the
passed admin ID doesn't exist.
"""
exists = self.__admin_exists(admin_id)
if not exists:
return None
elif not admin_id.isdigit():
admin_id = exists["id"]
url = self.url+"/"+admin_id
return requests.delete(url, headers=self.headers)
def cleanup_os_logical_ports(self):
"""
Delete all logical ports created by OpenStack
"""
lports = self.get_logical_ports()
os_lports = self.get_os_resources(lports)
LOG.info("Number of OS Logical Ports to be deleted: %s",
len(os_lports))
# logical port vif detachment
self.update_logical_port_attachment(os_lports)
for p in os_lports:
endpoint = '/logical-ports/%s' % p['id']
response = self.delete(endpoint=endpoint)
if response.status_code == requests.codes.ok:
LOG.info("Successfully deleted logical port %s", p['id'])
else:
LOG.error("Failed to delete lport %(port_id)s, response "
"code %(code)s",
{'port_id': p['id'], 'code': response.status_code})
ghost2loggerloopback.py 文件源码
项目:stackstorm-ghost2logger
作者: StackStorm-Exchange
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def _ghost2loggergpurge(self):
# Let's deal with the JSON API calls here
self._logger.info('[Ghost2logger]: Purge Ghost2logger')
HEADERS = {"Content-Type": 'application/json'}
_URL = self._ghost_url + "/v1/all"
try:
r = requests.delete(_URL, headers=HEADERS, verify=False,
auth=(self._username, self._password))
_data = r.json()
self._logger.info('[Ghost2logger]: Purged ghost2logger' +
str(_data))
except Exception as e:
self._logger.info('[Ghost2logger]: Cannot purge Ghost2logger: ' +
str(e))
def del_admin(self, admin_id):
""" Delete a specified admin account.
Args:
admin_id: ID string or email of the admin to be deleted.
Returns:
deleted: The request object of the deleted admin, or None if the
passed admin ID doesn't exist.
"""
exists = self.__admin_exists(admin_id)
if not exists:
return None
elif not admin_id.isdigit():
admin_id = exists["id"]
url = self.url+"/"+admin_id
return requests.delete(url, headers=self.headers)
test_generate_rnr_features.py 文件源码
项目:retrieve-and-rank-tuning
作者: rchaks
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def _delete_test_cluster(self, cluster_id):
self.logger.info("Attempting to clean up the test cluster that was spun up for the unit test: %s" % cluster_id)
config = load_config()
url, user_id, password = get_rnr_credentials(config)
response = requests.get('%s/v1/solr_clusters/%s' % (url, cluster_id),
auth=(user_id, password),
headers={'x-global-transaction-id': 'Rishavs app',
'Content-type': 'application/json'})
response_text = json.dumps(response.json(), indent=4, sort_keys=True)
if response.status_code == 200:
self.logger.info('Found a test cluster that needs cleanup: %s' % response_text)
response = requests.delete('%s/v1/solr_clusters/%s' % (url, cluster_id),
auth=(user_id, password),
headers={'x-global-transaction-id': 'Rishavs app',
'Content-type': 'application/json'})
response.raise_for_status()
self.logger.info("Successfully deleted test cluster: %s" % cluster_id)
else:
self.logger.info('No cleanup required for cluster id: %s (got response: %s)' % (cluster_id, response_text))