def _try_deleting_ranker(self, ranker_id):
config = load_config()
url, user_id, password = get_rnr_credentials(config)
response = requests.get(path.join(url, 'v1/rankers', ranker_id),
auth=(user_id, password),
headers={'x-global-transaction-id': 'Rishavs app',
'Content-type': 'application/json'})
response_text = json.dumps(response.json(), indent=4, sort_keys=True)
if response.status_code == 200:
self.logger.info('Found a test ranker that needs cleanup: %s' % response_text)
response = requests.delete(path.join(url, 'v1/rankers', ranker_id),
auth=(user_id, password),
headers={'x-global-transaction-id': 'Rishavs app',
'Content-type': 'application/json'})
response.raise_for_status()
self.logger.info("Successfully deleted test ranker: %s" % ranker_id)
else:
self.logger.info('No cleanup required for ranker id: %s (got response: %s)' % (ranker_id, response_text))
python类delete()的实例源码
def delete_recipe(recipe_id):
# clear fault injection rules
url = '{0}/v1/rules?tag={1}'.format(a8_controller_url, recipe_id)
headers = {}
if a8_controller_token != "" :
headers['Authorization'] = "Bearer " + token
try:
r = requests.delete(url, headers=headers)
except Exception, e:
sys.stderr.write("Could not DELETE {0}".format(url))
sys.stderr.write("\n")
sys.stderr.write(str(e))
sys.stderr.write("\n")
abort(500, "Could not DELETE {0}".format(url))
if r.status_code != 200 and r.status_code != 204:
abort(r.status_code)
return ""
def clear_rules_from_all_proxies(self):
"""
Clear fault injection rules from all known service proxies.
"""
if self.debug:
print 'Clearing rules'
try:
headers = {"Content-Type" : "application/json"}
if self.a8_controller_token != "" :
headers['Authorization'] = "Bearer " + self.a8_controller_token
for rule_id in self._rule_ids:
resp = requests.delete(self.a8_controller_url + "?id=" + rule_id,
headers = headers)
resp.raise_for_status()
except requests.exceptions.ConnectionError, e:
print "FAILURE: Could not communicate with control plane %s" % self.a8_controller_url
print e
sys.exit(3)
#TODO: Create a plugin model here, to support gremlinproxy and nginx
def roll_back_instantiation(self, serv_id):
"""
This method tries to roll back the instantiation workflow if an error
occured. It will send messages to the SMR and the IA to remove deployed
SSMs, FSMs and stacks. It will instruct the Repositories to delete the
records.
"""
# Kill the stack
corr_id = str(uuid.uuid4())
payload = json.dumps({'instance_uuid': serv_id})
self.manoconn.notify(t.IA_REMOVE,
payload,
reply_to=t.IA_REMOVE,
correlation_id=corr_id)
# Kill the SSMs and FSMs
self.terminate_ssms(serv_id, require_resp=False)
self.terminate_fsms(serv_id, require_resp=False)
LOG.info("Instantiation aborted, cleanup completed")
# TODO: Delete the records
def tearDown(self):
#Killing the scaling Executive
if self.plex_proc is not None:
self.plex_proc.terminate()
del self.plex_proc
#Killing the connection with the broker
try:
self.manoconn.stop_connection()
self.sm_connection.stop_connection()
except Exception as e:
LOG.exception("Stop connection exception.")
#Clearing the threading helpers
del self.wait_for_event1
del self.wait_for_event2
url_user = "{0}/api/users/specific-management".format(self.man_host)
url_vhost = "{0}/api/vhosts/fsm-1234".format(self.man_host)
requests.delete(url=url_user, headers=self.headers, auth=('guest', 'guest'))
requests.delete(url=url_vhost, headers=self.headers, auth=('guest', 'guest'))
#Method that terminates the timer that waits for an event
def delete_configuration(self, EP):
try:
url = self.base_url + self.CORPS_EP + self.corp + self.SITES_EP + self.site + EP
with open(self.file) as data_file:
data = json.load(data_file)
for config in data['data']:
requests.delete(url + "/" + config['id'], cookies=self.authn.cookies, headers=self.get_headers())
print("Delete complete!")
except Exception as e:
print('Error: %s ' % str(e))
print('Query: %s ' % url)
quit()
def delete_all_zendesk_articles(self, category):
'''
delete all articles in the category being filled with UV articles
for our app, we had multiple uservoice knowledge bases, and each gets put in a zendesk category
'''
print "**DELETING ALL SECTIONS/ARTICLES in destination category {}".format(category)
url = '{}/api/v2/help_center/categories/{}/sections.json'.format(self.zendesk_url, category)
response = requests.get(url, headers=self.headers, auth=self.credentials)
if response.status_code != 200:
print('FAILED to delete articles with error {}'.format(response.status_code))
exit()
sections = response.json()['sections']
section_ids=list(section['id'] for section in sections)
for section_id in section_ids:
url = "{}/api/v2/help_center/sections/{}.json".format(self.zendesk_url, section_id)
response = requests.delete(url, headers=self.headers, auth=self.credentials)
if response.status_code != 204:
print('FAILED to delete sections for category {} with error {}'.format(category, response.status_code))
exit()
def _send_raw_http_request(self, method, url, data=None):
self.__logger.debug('%s %s' % (method, url))
if method in ['POST', 'PUT', 'PATCH']:
self.__logger.log(TINTRI_LOG_LEVEL_DATA, 'Data: %s' % data)
headers = {'content-type': 'application/json'}
if self.__session_id:
headers['cookie'] = 'JSESSIONID=%s' % self.__session_id
if method in ['GET', 'POST', 'PUT', 'PATCH', 'DELETE']:
if method == 'GET': httpresp = requests.get(url, headers=headers, verify=False)
elif method == 'POST': httpresp = requests.post(url, data, headers=headers, verify=False)
elif method == 'PUT': httpresp = requests.put(url, data, headers=headers, verify=False)
elif method == 'PATCH': httpresp = requests.patch(url, data, headers=headers, verify=False)
elif method == 'DELETE': httpresp = requests.delete(url, headers=headers, verify=False)
self._httpresp = httpresp # self._httpresp is for debugging only, not thread-safe
return httpresp
else:
raise TintriError(None, message='Invalid HTTP method: ' + method) # This should never happen
def sections_delete(self, section_id,):
headers = {'token': self.token}
p = requests.delete(self.base + "sections/%s/" %(section_id), headers=headers)
print p.text
sections_delete = json.loads(p.text)
if p.status_code != 200:
logging.error("phpipam.sections_delete: Failure %s" % (p.status_code))
logging.error(sections_delete)
self.error = p.status_code
self.error_message = sections_delete['message']
return self.error, self.error_message
if not sections_delete['success']:
logging.error("phpipam.sections_delete: FAILURE: %s" % (sections_delete['code']))
self.error = sections_delete['code']
return self.error
logging.info("phpipam.sections_delete: success %s" % (sections_delete['success']))
return sections_delete['code']
def subnet_delete(self, subnet_id, ):
headers = {'token': self.token}
p = requests.delete(self.base + "subnets/%s/" % (subnet_id), headers=headers)
print p.text
subnets_delete = json.loads(p.text)
if p.status_code != 200:
logging.error("phpipam.subnets_delete: Failure %s" % (p.status_code))
logging.error(subnets_delete)
self.error = p.status_code
self.error_message = subnets_delete['message']
return self.error, self.error_message
if not subnets_delete['success']:
logging.error("phpipam.subnets_delete: FAILURE: %s" % (subnets_delete['code']))
self.error = subnets_delete['code']
return self.error
logging.info("phpipam.subnets_delete: success %s" % (subnets_delete['success']))
return subnets_delete['code']
def vlan_delete(self, vlan_id, ):
headers = {'token': self.token}
p = requests.delete(self.base + "vlans/%s/" % (vlan_id), headers=headers)
print p.text
vlans_delete = json.loads(p.text)
if p.status_code != 200:
logging.error("phpipam.vlans_delete: Failure %s" % (p.status_code))
logging.error(vlans_delete)
self.error = p.status_code
self.error_message = vlans_delete['message']
return self.error, self.error_message
if not vlans_delete['success']:
logging.error("phpipam.vlans_delete: FAILURE: %s" % (vlans_delete['code']))
self.error = vlans_delete['code']
return self.error
logging.info("phpipam.vlans_delete: success %s" % (vlans_delete['success']))
return vlans_delete['code']
def delete_domain_whitelist():
""" Deletes the domain whitelist set previously .
:usage:
>>> response = fbbotw.delete_domain_whitelist()
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
:facebook docs: `/domain-whitelisting#delete <https://developers.facebook.\
com/docs/messenger-platform/messenger-profile/domain-whitelisting#delete>`_
"""
url = MESSENGER_PROFILE_URL.format(access_token=PAGE_ACCESS_TOKEN)
payload = {}
payload['fields'] = ["whitelisted_domains"]
data = json.dumps(payload)
status = requests.delete(url, headers=HEADER, data=data)
return status
def _pdns_delete(url):
# We first delete the zone from nslord, the main authoritative source of our DNS data.
# However, we do not want to wait for the zone to expire on the slave ("nsmaster").
# We thus issue a second delete request on nsmaster to delete the zone there immediately.
r1 = requests.delete(settings.NSLORD_PDNS_API + url, headers=headers_nslord)
if r1.status_code < 200 or r1.status_code >= 300:
# Deletion technically does not fail if the zone didn't exist in the first place
if r1.status_code == 422 and 'Could not find domain' in r1.text:
pass
else:
raise PdnsException(r1)
# Delete from nsmaster as well
r2 = requests.delete(settings.NSMASTER_PDNS_API + url, headers=headers_nsmaster)
if r2.status_code < 200 or r2.status_code >= 300:
# Deletion technically does not fail if the zone didn't exist in the first place
if r2.status_code == 422 and 'Could not find domain' in r2.text:
pass
else:
raise PdnsException(r2)
return (r1, r2)
def logs_download():
"""Download (and then delete) the logs from DocuSign
The logs are stored in files/log/<account_id>/log_<timestamp>_<orig_name>
<orig_name> is the original name of the file from the platform. Eg:
00_OK_GetAccountSharedAccess.txt
01_OK_GetFolderList.txt
02_OK_ExecuteLoggedApiBusinessLogic.txt
03_Created_RequestRecipientToken.txt
04_OK_GetEnvelope.txt
05_OK_GetEnvelope.txt
06_OK_SendEnvelope.txt
Returns {err: False or a problem string, entries: log_entries, err_code }
returns an array of just the new log_entries
"""
global auth
auth = ds_authentication.get_auth()
if auth["err"]:
return {"err": auth["err"], "err_code": auth["err_code"]}
r = logging_do_download() # returns {err, new_entries}
return r
def delete(self, path='', **kwargs):
"""
:param path: str api path
:param kwargs: additional parameters for request
:return: dict response
:raises: HTTPError
:raises: InvalidJSONError
"""
r = requests.delete(self._path_join(path), headers=self.auth_custom, auth=self.auth, **kwargs)
r.raise_for_status()
try:
return r.json()
except ValueError:
raise InvalidJSONError(r.content)
def httpdo(method, url, data=None):
"""
Do HTTP Request
"""
if isinstance(data, dict):
data = json.dumps(data)
if DEBUG:
print "Shell: curl -X {method} -d '{data}' '{url}'".format(method=method, data=data or '', url=url)
fn = dict(GET=requests.get, POST=requests.post, DELETE=requests.delete)[method]
response = fn(url, data=data)
retjson = response.json()
if DEBUG:
print 'Return:', json.dumps(retjson, indent=4)
r = convert(retjson)
if r.status != 0:
raise WebDriverError(r.status, r.value)
return r
def delete_logstore(self, project_name, logstore_name):
""" delete log store
Unsuccessful opertaion will cause an LogException.
:type project_name: string
:param project_name: the Project name
:type logstore_name: string
:param logstore_name: the logstore name
:return: DeleteLogStoreResponse
:raise: LogException
"""
headers = {}
params = {}
resource = "/logstores/" + logstore_name
(resp, header) = self._send("DELETE", project_name, None, resource, params, headers)
return DeleteLogStoreResponse(header, resp)
def delete_shard(self, project_name, logstore_name, shardId):
""" delete a readonly shard
Unsuccessful opertaion will cause an LogException.
:type project_name: string
:param project_name: the Project name
:type logstore_name: string
:param logstore_name: the logstore name
:type shardId: int
:param shardId: the read only shard id
:return: ListShardResponse
:raise: LogException
"""
headers = {}
params = {}
resource = "/logstores/" + logstore_name + "/shards/" + str(shardId)
(resp, header) = self._send("DELETE", project_name, None, resource, params, headers)
return DeleteShardResponse(header, resp)
def delete_index(self, project_name, logstore_name):
""" delete index of a logstore
Unsuccessful opertaion will cause an LogException.
:type project_name: string
:param project_name: the Project name
:type logstore_name: string
:param logstore_name: the logstore name
:return: DeleteIndexResponse
:raise: LogException
"""
headers = {}
params = {}
resource = "/logstores/" + logstore_name + "/index"
(resp, header) = self._send("DELETE", project_name, None, resource, params, headers)
return DeleteIndexResponse(header, resp)
def delete_logtail_config(self, project_name, config_name):
""" delete logtail config in a project
Unsuccessful opertaion will cause an LogException.
:type project_name: string
:param project_name: the Project name
:type config_name: string
:param config_name: the logtail config name
:return: DeleteLogtailConfigResponse
:raise: LogException
"""
headers = {}
params = {}
resource = "/configs/" + config_name
(resp, headers) = self._send("DELETE", project_name, None, resource, params, headers)
return DeleteLogtailConfigResponse(headers, resp)