def api_call(self, url, http_method, data):
headers = {"Content-Type" : "application/json", "x-api-key" : self.api_key,
"authorization" : "Bearer " + self.access_token}
r = None
if http_method is PyCurlVerbs.POST:
r = requests.post(url, data=json.dumps(data), headers=headers)
elif http_method is PyCurlVerbs.DELETE:
r = requests.delete(url, headers=headers)
elif http_method is PyCurlVerbs.PUT:
r = requests.put(url, data=json.dumps(data), headers=headers)
elif http_method is PyCurlVerbs.GET:
r = requests.get(url, headers=headers)
return r
python类put()的实例源码
def send_response(event, context, responseStatus, responseData):
responseBody = {'Status': responseStatus,
'Reason': 'See the details in CloudWatch Log Stream: ' + context.log_stream_name,
'PhysicalResourceId': context.log_stream_name,
'StackId': event['StackId'],
'RequestId': event['RequestId'],
'LogicalResourceId': event['LogicalResourceId'],
'Data': responseData}
req = None
try:
req = requests.put(event['ResponseURL'], data=json.dumps(responseBody))
if req.status_code != 200:
print(req.text)
raise Exception('Recieved non 200 response while sending response to CFN.')
return
except requests.exceptions.RequestException as e:
if req != None:
print(req.text)
print(e)
raise
def set_state(self, device_id, state):
"""Set device state."""
payload = {
'attributeName': 'desireddoorstate',
'myQDeviceId': device_id,
'AttributeValue': state,
}
device_action = requests.put(
'https://{host_uri}/{device_set_endpoint}'.format(
host_uri=HOST_URI,
device_set_endpoint=self.DEVICE_SET_ENDPOINT),
data=payload,
headers={
'MyQApplicationId': self.brand[APP_ID],
'SecurityToken': self.security_token,
'User-Agent': self.USERAGENT
}
)
return device_action.status_code == 200
def call_api(self, method, endpoint, payload):
url = urlparse.urljoin(self.api_base_url, endpoint)
if method == 'POST':
response = requests.post(url, data=payload)
elif method == 'delete':
response = requests.delete(url)
elif method == 'put':
response = requests.put(url, data=payload)
else:
if self.api_key:
payload.update({'api_token': self.api_key})
response = requests.get(url, params=payload)
content = json.loads(response.content)
return content
def replace_log(log_id, params):
"""
Replace the given log with the details provided
"""
headers = api_utils.generate_headers('rw')
try:
response = requests.put(_url(('logs', log_id))[1], json=params, headers=headers)
if response_utils.response_error(response):
sys.stderr.write('Update log failed.\n')
sys.exit(1)
elif response.status_code == 200:
sys.stdout.write('Log: %s updated to:\n' % log_id)
api_utils.pretty_print_string_as_json(response.text)
except requests.exceptions.RequestException as error:
sys.stderr.write(error)
sys.exit(1)
def cfn_response(response_status, response_data, reason, physical_resource_id, event, context):
# Put together the response to be sent to the S3 pre-signed URL
if reason:
reason = reason
else:
reason = 'See the details in CloudWatch Log Stream: ' + context.log_stream_name
responseBody = {'Status': response_status,
'Reason': 'See the details in CloudWatch Log Stream: ' + context.log_stream_name,
'PhysicalResourceId': physical_resource_id,
'StackId': event['StackId'],
'RequestId': event['RequestId'],
'LogicalResourceId': event['LogicalResourceId'],
'Data': response_data
}
print('Response Body:', responseBody)
response = requests.put(event['ResponseURL'], data=json.dumps(responseBody))
if response.status_code != 200:
print(response.text)
raise Exception('Response error received.')
return
def __requests_put(self, uri, data, headers):
if self.mesos_user:
# print "User " + self.mesos_user + "Pass " + self.mesos_pass
# print "URI " + self.mesos_host + uri
response = requests.put(self.mesos_host + uri, data,
headers=headers,
verify=False,
auth=(self.mesos_user,
self.mesos_pass))
else:
response = requests.put(self.mesos_host + uri,
data,
headers=headers,
verify=False)
# print "Response:" + str(response)
return response.json()
def __requests_put(self, uri, data, headers):
if self.marathon_user:
# print "User " + self.marathon_user + "Pass " + self.marathon_pass
# print "URI " + self.marathon_host + uri
response = requests.put(self.marathon_host + uri, data,
headers=headers,
verify=False,
auth=(self.marathon_user,
self.marathon_pass))
else:
response = requests.put(self.marathon_host + uri,
data,
headers=headers,
verify=False)
# print "Response:" + str(response)
return response
def sendResponse(event, context, responseStatus, responseData):
responseBody = {'Status': responseStatus,
'Reason': 'See the details in CloudWatch Log Stream: ' + context.log_stream_name,
'PhysicalResourceId': context.log_stream_name,
'StackId': event['StackId'],
'RequestId': event['RequestId'],
'LogicalResourceId': event['LogicalResourceId'],
'Data': responseData}
print 'RESPONSE BODY:n' + json.dumps(responseBody)
try:
req = requests.put(event['ResponseURL'], data=json.dumps(responseBody))
if req.status_code != 200:
print req.text
raise Exception('Recieved non 200 response while sending response to CFN.')
return
except requests.exceptions.RequestException as e:
print e
raise
def create_cloud(self, cloud, data=None):
cloud_url = self.url + "/clouds"
payload = {"name": cloud}
print("Creating cloud entry for %s with data %s" % (payload, data))
resp = requests.post(cloud_url, headers=self.headers,
data=json.dumps(payload), verify=False)
if resp.status_code != 201:
raise Exception(resp.text)
self.cloud = resp.json()
if data:
reg_id = self.cloud["id"]
cloud_data_url = self.url + "/clouds/%s/variables" % reg_id
resp = requests.put(cloud_data_url, headers=self.headers,
data=json.dumps(data), verify=False)
if resp.status_code != 200:
print(resp.text)
def create_region(self, region, data=None):
region_url = self.url + "/regions"
payload = {"name": region, "cloud_id": self.cloud.get("id")}
print("Creating region entry for %s with data %s" % (payload, data))
resp = requests.post(region_url, headers=self.headers,
data=json.dumps(payload), verify=False)
if resp.status_code != 201:
raise Exception(resp.text)
self.region = resp.json()
if data:
reg_id = self.region["id"]
region_data_url = self.url + "/regions/%s/variables" % reg_id
resp = requests.put(region_data_url, headers=self.headers,
data=json.dumps(data), verify=False)
if resp.status_code != 200:
print(resp.text)
def create_cell(self, cell, data=None):
region_url = self.url + "/cells"
payload = {"region_id": self.region.get("id"),
"cloud_id": self.cloud.get("id"), "name": cell}
print("Creating cell entry %s with data %s" % (payload, data))
resp = requests.post(region_url, headers=self.headers,
data=json.dumps(payload), verify=False)
if resp.status_code != 201:
raise Exception(resp.text)
self.cell = resp.json()
if data:
c_id = resp.json()["id"]
region_data_url = self.url + "/cells/%s/variables" % c_id
resp = requests.put(region_data_url, headers=self.headers,
data=json.dumps(data), verify=False)
if resp.status_code != 200:
print(resp.text)
def upload_es_json(es_json, es_index, es_base, id, doc_type='article'):
"""Uploads a document to the ElasticSearch index."""
# Using requests
url = "{es_base}/{index}/{doc_type}/{id}".format(
es_base=es_base,
index=es_index.lower(), # ES index names must be lowercase
doc_type=doc_type,
id=id,
)
headers = {"Content-Type": "application/json"}
logger.info("Uploading to ES: PUT %s" % url)
r = requests.put(url, headers=headers, data=es_json)
if r.status_code >= 400:
recoverable_error("ES upload failed with error: '%s'" % r.text,
config.bypass_errors)
def predict(path):
global n
global model
parentNode = os.path.split(path)[0]
j = requests.get(FIREBASE_URL + parentNode + FIREBASE_ENDING_URL).json()
sample = np.asarray([j[x] for x in j.keys() if "x_" in x]).reshape(1, -1)
if sample.shape[1] == n:
j['y_predicted'] = model.predict_proba(sample)[0][1]
j['action'] = model.predict(sample)[0]
r = requests.put(FIREBASE_URL + parentNode + FIREBASE_ENDING_URL, data = json.dumps(j))
print "Features: ", sample
print "Prediction: ", model.predict_proba(sample)
else:
print "Couldn't make prediction, wrong dimension: ", sample.shape[1], " vs ", n
# 2nd: SSEClient that prints stuff
def upload_model_to_nexus(project):
"""
Use 'pyb -P model_name="<model_name>" upload_model_to_nexus' to upload archived model to Nexus repository
of the lab. If model_name == 'deeppavlov_docs', then documentation from build/docs will be archived and uploaded.
archive_model task will be executed before.
"""
import requests, datetime
os.chdir('build')
model_name = project.get_property('model_name')
file_name = model_name + '_' + datetime.date.today().strftime("%y%m%d") + '.tar.gz'
url = 'http://share.ipavlov.mipt.ru:8080/repository/'
url += 'docs/' if model_name == 'deeppavlov_docs' else 'models/'
headers = {'Content-Type': 'application/binary'}
with open(file_name, 'rb') as artifact:
requests.put(url + model_name + '/' + file_name, headers=headers,
data=artifact, auth=('jenkins', 'jenkins123'))
def renameorg(apikey, orgid, neworgname, suppressprint=False):
__hasorgaccess(apikey, orgid)
calltype = 'Organization Rename'
puturl = '{0}/organizations/{1}'.format(str(base_url), str(orgid))
headers = {
'x-cisco-meraki-api-key': format(str(apikey)),
'Content-Type': 'application/json'
}
putdata = {
'name': format(str(neworgname))
}
dashboard = requests.put(puturl, data=json.dumps(putdata), headers=headers)
#
# Call return handler function to parse Dashboard response
#
result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
return result
# Create a new organization
# https://dashboard.meraki.com/api_docs#create-a-new-organization
def updatecontact(apikey, networkid, contactid, name, suppressprint=False):
calltype = 'Phone Contact'
puturl = '{0}/networks/{1}/phoneContacts/{2}'.format(str(base_url), str(networkid), str(contactid))
headers = {
'x-cisco-meraki-api-key': format(str(apikey)),
'Content-Type': 'application/json'
}
putdata = {'name': name}
dashboard = requests.put(puturl, data=json.dumps(putdata), headers=headers)
result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
return result
# Delete a phone contact. Google Directory contacts cannot be removed.
# https://dashboard.meraki.com/api_docs#delete-a-phone-contact
def updatevlan(apikey, networkid, vlanid, vlanname=None, mxip=None, subnetip=None, suppressprint=False):
calltype = 'VLAN'
puturl = '{0}/networks/{1}/vlans/{2}'.format(str(base_url), str(networkid), str(vlanid))
headers = {
'x-cisco-meraki-api-key': format(str(apikey)),
'Content-Type': 'application/json'
}
putdata = {}
if vlanname is not None:
putdata['name'] = format(str(vlanname))
if mxip is not None:
putdata['applianceIp'] = format(str(mxip))
if subnetip is not None:
putdata['subnet'] = format(str(subnetip))
putdata = json.dumps(putdata)
dashboard = requests.put(puturl, data=putdata, headers=headers)
#
# Call return handler function to parse Dashboard response
#
result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
return result
def renameorg(apikey, orgid, neworgname, suppressprint=False):
__hasorgaccess(apikey, orgid)
calltype = 'Organization Rename'
puturl = '{0}/organizations/{1}'.format(str(base_url), str(orgid))
headers = {
'x-cisco-meraki-api-key': format(str(apikey)),
'Content-Type': 'application/json'
}
putdata = {
'name': format(str(neworgname))
}
dashboard = requests.put(puturl, data=json.dumps(putdata), headers=headers)
#
# Call return handler function to parse Dashboard response
#
result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
return result
def lockTask(self, task):
"""Receives a task dictionary, obtainned with getTasks() method,
and locks the task so other users can't access this task at same time.
Usage:
>>> pe.lockTask(task)
"""
locked = requests.get(self.client.baseurl
+task['stepElement'],
auth = self.client.cred)
eTag = locked.headers['ETag']
locked = requests.put(self.client.baseurl
+ task['stepElement'],
auth = self.client.cred,
params={'action':'lock',
'If-Match':eTag}
)
def endTask(self, task, comment=None):
"""Receives a task and finishes it, finishing the workflow itself or
moving to the next step in the task. Is also possible to create a
comment before ending the task.
Usage:
>>> pe.endTask(task) #or
>>> pe.endTask(task, u'Completed the task!')
"""
params = {'action':'dispatch'}
step = self.getStep(task)
if step.get('systemProperties').get('responses')\
and not step.get('systemProperties').get('selectedResponse'):
return "This task needs to be updated. Check the updateTask method."
else:
params['selectedResponse'] = 1
if comment:
task = self.saveAndUnlockTask(task, comment)
lock = self.lockTask(task)
params['If-Match'] = task['ETag']
dispatched = requests.put(self.client.baseurl + task['stepElement'],
auth = self.client.cred,
params=params)
def upload_lifecycle(self, data):
path = '/api/1.0/lifecycle_event/'
url = self.url + path
payload = data
headers = {
'Authorization': 'Basic ' + base64.b64encode(self.username + ':' + self.password),
'Content-Type': 'application/x-www-form-urlencoded'
}
r = requests.put(url, data=payload, headers=headers, verify=False)
if DEBUG:
print '\t[+] Posting data: %s' % str(payload)
print '\t[*] Status code: %d' % r.status_code
print '\t[*] Response: %s' % str(r.text)
return r.json()
def _upload_file(self, url: str, data: str, content_type: str,
rrs: bool = False):
ensure_aws4auth()
auth = AWS4Auth(self.access_key, self.secret_key, self.region, 's3')
if rrs:
storage_class = 'REDUCED_REDUNDANCY'
else:
storage_class = 'STANDARD'
headers = {
'Cache-Control': 'max-age=' + str(self.max_age),
'x-amz-acl': self.acl,
'x-amz-storage-class': storage_class
}
if content_type:
headers['Content-Type'] = content_type
res = requests.put(url, auth=auth, data=data, headers=headers)
if not 200 <= res.status_code < 300:
raise S3Error(res.text)
def setdevicedata(p_apikey, p_shardhost, p_nwid, p_device, p_field, p_value, p_movemarker):
#modifies value of device record. Returns the new value
#on failure returns one device record, with all values 'null'
#p_movemarker is boolean: True/False
movevalue = "false"
if p_movemarker:
movevalue = "true"
time.sleep(API_EXEC_DELAY)
try:
r = requests.put('https://%s/api/v0/networks/%s/devices/%s' % (p_shardhost, p_nwid, p_device), data=json.dumps({p_field: p_value, 'moveMapMarker': movevalue}), headers={'X-Cisco-Meraki-API-Key': p_apikey, 'Content-Type': 'application/json'})
except:
printusertext('ERROR 16: Unable to contact Meraki cloud')
sys.exit(2)
if r.status_code != requests.codes.ok:
return ('null')
return('ok')
def saucelabsSetup(self):
self.log("Setup for saucelabs")
auth = (self._remoteUsername, os.getenv("REMOTE_ACCESSKEY"))
baseURL = "{0}/{1}".format(self.SAUCELABS_API_BASE, self._remoteUsername)
jobsURL = "{0}/jobs".format(baseURL)
response = requests.get(jobsURL, \
auth = auth, \
headers = { 'content-type': 'application/json' }, \
params = { 'full': 'true' })
self.log("response={0}".format(json.dumps(response.json())))
remoteJobid = self.saucelabsFindJobId(response.json(), self._remoteTestID())
self.saucelabsJobURL = '{0}/jobs/{1}'.format(baseURL, remoteJobid)
self._resultReference = "{0}/{1}".format(self.SAUCELABS_RESULTS_BASE, remoteJobid)
response = requests.put(self.saucelabsJobURL,
auth = auth,
headers = { 'content-type': 'application/json' },
data = json.dumps({'name': self._buildID() }))
def execute(self):
# get signature
auth = DSSAuth(self.http_method, self.access_key, self.secret_key, self.dss_op_path, content_type = 'application/octet-stream')
signature = auth.get_signature()
self.http_headers['Authorization'] = signature
self.http_headers['Date'] = formatdate(usegmt=True)
statinfo = os.stat(self.local_file_name)
self.http_headers['Content-Length'] = statinfo.st_size
self.http_headers['Content-Type'] = 'application/octet-stream'
# construct request
request_url = self.dss_url + self.dss_op_path
data = open(self.local_file_name, 'rb')
# make request
resp = requests.put(request_url, headers = self.http_headers, data=data, verify = self.is_secure_request)
return resp
def execute(self):
# get signature
query_str = 'partNumber=' + self.part_number + '&uploadId=' + self.upload_id
auth = DSSAuth(self.http_method, self.access_key, self.secret_key, self.dss_op_path, query_str = self.dss_query_str_for_signature, content_type = 'application/octet-stream')
signature = auth.get_signature()
self.http_headers['Authorization'] = signature
self.http_headers['Date'] = formatdate(usegmt=True)
statinfo = os.stat(self.local_file_name)
self.http_headers['Content-Length'] = statinfo.st_size
self.http_headers['Content-Type'] = 'application/octet-stream'
# construct request
request_url = self.dss_url + self.dss_op_path
if(self.dss_query_str is not None):
request_url += '?' + self.dss_query_str
data = open(self.local_file_name, 'rb')
# make request
resp = requests.put(request_url, headers = self.http_headers, data=data, verify = self.is_secure_request)
return resp
def create_vh(self, sm_type, uuid):
exists = False
vh_name = '{0}-{1}'.format(sm_type,uuid)
api = '/api/vhosts/'
url_list = '{0}{1}'.format(self.host,api)
url_create = '{0}{1}{2}'.format(self.host,api,vh_name)
url_permission = '{0}/api/permissions/{1}/specific-management'.format(self.host,vh_name)
data = '{"configure":".*","write":".*","read":".*"}'
list = requests.get(url=url_list, auth= ('guest','guest')).json()
for i in range(len(list)):
if list[i]['name'] == vh_name:
exists = True
break
if not exists:
response = requests.put(url=url_create, headers=self.headers, auth=('guest', 'guest'))
permission = requests.put(url=url_permission, headers=self.headers, data=data, auth=('guest', 'guest'))
return response.status_code, permission.status_code
else:
return 0,0
def delete_all_zendesk_articles(self, category):
'''
delete all articles in the category being filled with UV articles
for our app, we had multiple uservoice knowledge bases, and each gets put in a zendesk category
'''
print "**DELETING ALL SECTIONS/ARTICLES in destination category {}".format(category)
url = '{}/api/v2/help_center/categories/{}/sections.json'.format(self.zendesk_url, category)
response = requests.get(url, headers=self.headers, auth=self.credentials)
if response.status_code != 200:
print('FAILED to delete articles with error {}'.format(response.status_code))
exit()
sections = response.json()['sections']
section_ids=list(section['id'] for section in sections)
for section_id in section_ids:
url = "{}/api/v2/help_center/sections/{}.json".format(self.zendesk_url, section_id)
response = requests.delete(url, headers=self.headers, auth=self.credentials)
if response.status_code != 204:
print('FAILED to delete sections for category {} with error {}'.format(category, response.status_code))
exit()
def _upload_assets_to_OSF(dlgr_id, osf_id, provider="osfstorage"):
"""Upload experimental assets to the OSF."""
root = "https://files.osf.io/v1"
snapshot_filename = "{}-code.zip".format(dlgr_id)
snapshot_path = os.path.join("snapshots", snapshot_filename)
r = requests.put(
"{}/resources/{}/providers/{}/".format(
root,
osf_id,
provider,
),
params={
"kind": "file",
"name": snapshot_filename,
},
headers={
"Authorization": "Bearer {}".format(
config.get("osf_access_token")),
"Content-Type": "text/plain",
},
data=open(snapshot_path, 'rb'),
)
r.raise_for_status()