def _send_raw_http_request(self, method, url, data=None):
self.__logger.debug('%s %s' % (method, url))
if method in ['POST', 'PUT', 'PATCH']:
self.__logger.log(TINTRI_LOG_LEVEL_DATA, 'Data: %s' % data)
headers = {'content-type': 'application/json'}
if self.__session_id:
headers['cookie'] = 'JSESSIONID=%s' % self.__session_id
if method in ['GET', 'POST', 'PUT', 'PATCH', 'DELETE']:
if method == 'GET': httpresp = requests.get(url, headers=headers, verify=False)
elif method == 'POST': httpresp = requests.post(url, data, headers=headers, verify=False)
elif method == 'PUT': httpresp = requests.put(url, data, headers=headers, verify=False)
elif method == 'PATCH': httpresp = requests.patch(url, data, headers=headers, verify=False)
elif method == 'DELETE': httpresp = requests.delete(url, headers=headers, verify=False)
self._httpresp = httpresp # self._httpresp is for debugging only, not thread-safe
return httpresp
else:
raise TintriError(None, message='Invalid HTTP method: ' + method) # This should never happen
python类put()的实例源码
def toggle_tracking(self, enabled: bool=True):
"""
Enable or disable automatic deck tracking
:param bool enabled: If True, tracking is enabled, else it is disabled
:return None:
:raises: requests.exceptions.HTTPError on error
"""
logger.info('Called toggle_tracking()')
endpoint = '/profile/settings/decks/toggle'
url = self._url + endpoint
val = 'true' if enabled else 'false'
data = {'user[deck_tracking]': val, '_method': 'put'}
logger.debug('POST on %s', endpoint)
r = requests.post(url, auth=self._auth, data=data)
r.raise_for_status()
def put(self, path='', **kwargs):
"""
:param path: str api path
:param kwargs: additional parameters for request
:return: dict response
:raises: HTTPError
:raises: InvalidJSONError
"""
r = requests.put(self._path_join(path), headers=self.auth_custom, auth=self.auth, **kwargs)
r.raise_for_status()
try:
return r.json()
except ValueError:
raise InvalidJSONError(r.content)
def send_response(event, context, status, reason, data):
import requests
body = json.dumps(
{
'Status': status,
'RequestId': event['RequestId'],
'StackId': event['StackId'],
'PhysicalResourceId': context.log_stream_name,
'Reason': reason,
'LogicalResourceId': event['LogicalResourceId'],
'Data': data
}
)
headers = {
'content-type': '',
'content-length': len(body)
}
r = requests.put(event['ResponseURL'], data=body, headers=headers)
def test_requests_mocks(self):
self.assertTrue(isinstance(requests.get, Mock))
self.assertTrue(isinstance(requests.put, Mock))
self.assertTrue(isinstance(requests.post, Mock))
self.assertTrue(isinstance(requests.patch, Mock))
self.assertTrue(isinstance(requests.delete, Mock))
def put(self, path, params=None, body=None):
if body:
return requests.put(self.endpoint + path, params=params, data=body, headers=self.headers,
verify=False)
else:
return requests.put(self.endpoint + path, params=params, headers=self.headers, verify=False)
katello-publish-cvs.py 文件源码
项目:katello-publish-cvs
作者: RedHatSatellite
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def put_json(location, json_data):
"""
Performs a PUT and passes the data to the URL location
"""
result = requests.put(location,
data=json_data,
auth=(USERNAME, PASSWORD),
verify=SSL_VERIFY,
headers=POST_HEADERS)
return result.json()
def authenticated_request_put(self, url, data):
self.custom_http_headers['content-type'] = 'application/json'
response = requests.put(url, data, headers=self.custom_http_headers)
del self.custom_http_headers['content-type']
return response
def put(self, endpoint, payload):
return self.call_api('put', endpoint, payload)
def delete(self, endpoint):
return self.call_api('delete', endpoint, payload=None)
# put methods
def update(self, element_id, **kwargs):
"""
updates an element on pipedrive
using /endpoint/:id as url
"""
endpoint = str(self.endpoint) + '/' + str(element_id)
restpoint = self.restify(endpoint)
content = self.put(restpoint, kwargs)
return content
def update(self, api_obj, data, obj_id=0, url_params=''):
if not data:
raise TypeError("Missing object data.")
if url_params:
url_params = '?' + url_params.lstrip("?")
if not obj_id:
obj_id = data['id']
url = self.server + '/' + api_obj + '/' + str(obj_id) + url_params
r = requests.put(url, data=json.dumps(data), headers=self.tokenHeaderJson, verify=self.sslVerify)
return self.__check_result(r)
# Delete object using HTTP DELETE request.
def add_job(self, session_id, job_id):
# endpoint /v1/sessions/{sessions_id}/jobs/{job_id}
endpoint = '{0}{1}/jobs/{2}'.format(self.endpoint, session_id, job_id)
r = requests.put(endpoint,
headers=self.headers, verify=self.verify)
if r.status_code != 204:
raise exceptions.ApiClientException(r)
return
def add_job(self, session_id, job_id):
# endpoint /v1/sessions/{sessions_id}/jobs/{job_id}
endpoint = '{0}{1}/jobs/{2}'.format(self.endpoint, session_id, job_id)
r = requests.put(endpoint,
headers=self.headers, verify=self.verify)
if r.status_code != 204:
raise exceptions.ApiClientException(r)
return
def replace_logset(logset_id, params):
"""
Replace a given logset with the details provided
"""
headers = api_utils.generate_headers('rw')
try:
response = requests.put(_url((logset_id,))[1], json=params, headers=headers)
handle_response(response, 'Update logset with details %s failed.\n' % params, 200)
except requests.exceptions.RequestException as error:
sys.stderr.write(error)
sys.exit(1)
def delete_user_from_team(team_id, user_key):
"""
Delete a user from a team.
"""
headers = api_utils.generate_headers('rw')
params = {'teamid': team_id}
try:
response = requests.request('GET', _url((team_id,))[1], params=params,
headers=headers)
if response.status_code == 200:
params = {
'team': {
'name': response.json()['team']['name'],
'users': [user for user in response.json()['team']['users'] if user['id'] !=
user_key]
}
}
headers = api_utils.generate_headers('rw')
try:
response = requests.put(_url((team_id,))[1], json=params, headers=headers)
if response_utils.response_error(response): # Check response has no errors
click.echo('Deleting user from team with key: %s failed.' % team_id, err=True)
sys.exit(1)
elif response.status_code == 200:
click.echo("Deleted user with key: '%s' from team: %s" % (user_key, team_id))
except requests.exceptions.RequestException as error:
click.echo(error, err=True)
sys.exit(1)
elif response_utils.response_error(response):
click.echo('Cannot find team. Deleting user from team %s failed.' % team_id, err=True)
sys.exit(1)
except requests.exceptions.RequestException as error:
click.echo(error, err=True)
sys.exit(1)
def put_json(location, json_data):
"""
Performs a PUT and passes the data to the URL location
"""
result = requests.put(
location,
data=json_data,
auth=(USERNAME, PASSWORD),
verify=True,
headers=POST_HEADERS)
return result.json()
def add_update_show(self, beid, subgroup):
"""Adds or edits a show in sonarr.
Args:
beid (int): The TVDB ID of the show we're adding or editing.
subgroup (str): The subgroup we're using for this show.
"""
_logger.debug("Entering add_update_show.")
tag = self._subgroup_tag(subgroup)
show = None
_logger.debug("Getting all shows being watched and searching for show with ID {0}.".format(beid))
shows = self.get_watching_shows()
for item in shows:
if int(item['tvdbId']) == int(beid):
_logger.debug("Found show {0} with ID {1} in watching shows! Updating instead of creating.".format(item['title'], item['tvdbId']))
show = item
if not show:
_logger.debug("Show not found in watching list, creating a new one.")
show = self.get_show(beid)
payload = {
"tvdbId": int(beid),
"title": show['title'],
"qualityProfileId": self._quality_profile,
"titleSlug": show['titleSlug'],
"images": show['images'],
"seasons": show['seasons'],
"rootFolderPath": self._library_path,
"seriesType": "anime",
"seasonFolder": True,
"addOptions": {"ignoreEpisodesWithFiles":True},
"tags": [tag]
}
out = requests.post("{0}/api/series?apikey={1}".format(self.url, self.api_key), json=payload)
else:
show['tags'] = [tag]
requests.put("{0}/api/series?apikey={1}".format(self.url, self.api_key), json=show)
def put_container(self, container_name):
"""
create container for authorized account
:param container_name: container's name for create
:return: http response
"""
# create or update container
url = self.url + '/' + container_name
# request api
response = requests.put(url, headers=self.base_headers)
return response
def put_object_to_container(self, container_name,
file_path=None,
file_name=None,
file_stream=None):
"""
upload file to target container
:param container_name:
target container_name - string
required=True
:param file_path: file's path for upload - string
:param file_name: file's name for get urls - string
:param file_stream: file's data string - string
:return: submit response
"""
# make file object to comfortable for uploading
if not file_name:
file_name = file_path.split('/')[-1]
url = self.url + '/' + container_name + '/' + file_name
if not file_stream:
file_stream = open(file_path, 'rb').read()
response = requests.put(
url,
data=file_stream,
headers=self.base_headers
)
return response