def _patch_upload(
self,
image,
digest
):
mounted, location = self._start_upload(digest, self._mount)
if mounted:
logging.info('Layer %s mounted.', digest)
return
location = self._get_absolute_url(location)
resp, unused_content = self._transport.Request(
location, method='PATCH', body=self._get_blob(image, digest),
content_type='application/octet-stream',
accepted_codes=[httplib.NO_CONTENT, httplib.ACCEPTED, httplib.CREATED])
location = self._add_digest(resp['location'], digest)
location = self._get_absolute_url(location)
self._transport.Request(
location, method='PUT', body=None,
accepted_codes=[httplib.CREATED])
python类CREATED的实例源码
def _put_manifest(
self,
image,
use_digest=False
):
"""Upload the manifest for this image."""
if use_digest:
tag_or_digest = image.digest()
else:
tag_or_digest = _tag_or_digest(self._name)
self._transport.Request(
'{base_url}/manifests/{tag_or_digest}'.format(
base_url=self._base_url(),
tag_or_digest=tag_or_digest),
method='PUT',
body=image.manifest(),
content_type=image.media_type(),
accepted_codes=[httplib.OK, httplib.CREATED, httplib.ACCEPTED])
def _patch_upload(
self,
image,
digest
):
mounted, location = self._start_upload(digest, self._mount)
if mounted:
logging.info('Layer %s mounted.', digest)
return
location = self._get_absolute_url(location)
resp, unused_content = self._transport.Request(
location, method='PATCH', body=image.blob(digest),
content_type='application/octet-stream',
accepted_codes=[httplib.NO_CONTENT, httplib.ACCEPTED, httplib.CREATED])
location = self._add_digest(resp['location'], digest)
location = self._get_absolute_url(location)
self._transport.Request(
location, method='PUT', body=None,
accepted_codes=[httplib.CREATED])
def _start_upload(
self,
digest,
mount=None
):
"""POST to begin the upload process with optional cross-repo mount param."""
if not mount:
# Do a normal POST to initiate an upload if mount is missing.
url = '{base_url}/blobs/uploads/'.format(base_url=self._base_url())
accepted_codes = [httplib.ACCEPTED]
else:
# If we have a mount parameter, try to mount the blob from another repo.
mount_from = '&'.join(
['from=' + urllib.quote(repo.repository, '') for repo in self._mount])
url = '{base_url}/blobs/uploads/?mount={digest}&{mount_from}'.format(
base_url=self._base_url(),
digest=digest,
mount_from=mount_from)
accepted_codes = [httplib.CREATED, httplib.ACCEPTED]
resp, unused_content = self._transport.Request(
url, method='POST', body=None,
accepted_codes=accepted_codes)
return resp.status == httplib.CREATED, resp.get('location') # type: ignore
def test_upload(self):
self.client.post.return_value = mock_upload_response('/files/content',
httplib.CREATED,
None,
'files', 'POST_response.json')
file_name = 'upload.jpg'
file_path = path.join(path.dirname(__file__), '..', 'fixtures', 'files', file_name)
folder_id = 'folder-id'
@mock.patch('orangecloud_client.files.guess_type', return_value=('image/jpeg', 'binary'))
@mock.patch('__builtin__.open', spec=open, return_value=MockFile())
@mock.patch('orangecloud_client.files.MultipartEncoder', return_value=mock.Mock())
def fire_test(mock_multipart_encoder, mock_open, _):
data = mock_multipart_encoder()
data.content_type = 'upload content type'
response_file = self.files.upload(file_path, folder_id)
self.client.post.assert_called_with(self.client.post.return_value.url,
data=data,
headers={'Content-Type': data.content_type})
self.assertEqual('file-id', response_file.fileId)
self.assertEqual('file-name', response_file.fileName)
fire_test()
def test__raise_for_status(self, mock_resp):
"""
Verify that an exceptions gets raised for unexpected responses.
:param mock_resp: mocked httplib Response
"""
#
# ok_statuses should not raise
#
mock_resp.status = httplib.CREATED
self.up.resp = mock_resp
self.up.content = ''
try:
self.up._raise_for_status([httplib.OK, httplib.CREATED])
except Exception as exc:
self.fail('_raise_for_status unexpectedly threw {}'.format(exc))
#
# Anything else should raise.
#
mock_resp.status = httplib.ACCEPTED
self.assertRaises(
upapi.exceptions.UnexpectedAPIResponse,
self.up._raise_for_status,
[httplib.OK, httplib.CREATED])
def parse_response(self, response):
if response.status == httplib.CREATED:
self.newKey = True
headers = response.headers
self.etcd_index = int(headers.get('x-etcd-index', 1))
self.raft_index = int(headers.get('x-raft-index', 1))
self.raft_term = int(headers.get('x-raft-term', 0))
def _handle_server_response(self, response):
if response.status in (httplib.OK, httplib.CREATED,
httplib.NO_CONTENT):
return response
logger.debug('invalid response status:{st} body:{body}'.format(
st=response.status, body=response.data))
EtcdError.handle(response)
def _monolithic_upload(
self,
image,
digest
):
self._transport.Request(
'{base_url}/blobs/uploads/?digest={digest}'.format(
base_url=self._base_url(),
digest=digest),
method='POST', body=self._get_blob(image, digest),
accepted_codes=[httplib.CREATED])
def _put_upload(
self,
image,
digest
):
mounted, location = self._start_upload(digest, self._mount)
if mounted:
logging.info('Layer %s mounted.', digest)
return
location = self._add_digest(location, digest)
self._transport.Request(
location, method='PUT', body=self._get_blob(image, digest),
accepted_codes=[httplib.CREATED])
def _start_upload(
self,
digest,
mount=None
):
"""POST to begin the upload process with optional cross-repo mount param."""
if not mount:
# Do a normal POST to initiate an upload if mount is missing.
url = '{base_url}/blobs/uploads/'.format(base_url=self._base_url())
accepted_codes = [httplib.ACCEPTED]
else:
# If we have a mount parameter, try to mount the blob from another repo.
mount_from = '&'.join(
['from=' + urllib.quote(repo.repository, '') for repo in self._mount])
url = '{base_url}/blobs/uploads/?mount={digest}&{mount_from}'.format(
base_url=self._base_url(),
digest=digest,
mount_from=mount_from)
accepted_codes = [httplib.CREATED, httplib.ACCEPTED]
resp, unused_content = self._transport.Request(
url, method='POST', body=None,
accepted_codes=accepted_codes)
# pytype: disable=attribute-error
return resp.status == httplib.CREATED, resp.get('location')
# pytype: enable=attribute-error
def _monolithic_upload(self, image,
digest):
self._transport.Request(
'{base_url}/blobs/uploads/?digest={digest}'.format(
base_url=self._base_url(),
digest=digest),
method='POST', body=image.blob(digest),
accepted_codes=[httplib.CREATED])
def _put_upload(
self,
image,
digest
):
mounted, location = self._start_upload(digest, self._mount)
if mounted:
logging.info('Layer %s mounted.', digest)
return
location = self._add_digest(location, digest)
self._transport.Request(
location, method='PUT', body=image.blob(digest),
accepted_codes=[httplib.CREATED])
def _put_manifest(self, image):
"""Upload the manifest for this image."""
self._transport.Request(
'{base_url}/manifests/{tag_or_digest}'.format(
base_url=self._base_url(),
tag_or_digest=_tag_or_digest(self._name)),
method='PUT', body=image.manifest(),
accepted_codes=[httplib.OK, httplib.CREATED, httplib.ACCEPTED])
def test_send_project_valid(self, mock_cs):
mock_cs.side_effect = [None]
api = API.apiCalls.ApiCalls(
client_id="",
client_secret="",
base_URL="",
username="",
password=""
)
json_dict = {
"resource": {
"name": "project1",
"projectDescription": "projectDescription",
"identifier": "1"
}
}
json_obj = json.dumps(json_dict)
session_response = Foo()
setattr(session_response, "status_code", httplib.CREATED)
setattr(session_response, "text", json_obj)
session_post = MagicMock(side_effect=[session_response])
session = Foo()
setattr(session, "post", session_post)
api.session = session
api.get_link = lambda x, y, targ_dict="": None
proj = API.apiCalls.Project("project1", "projectDescription", "1")
json_res = api.send_project(proj)
self.assertEqual(json_dict, json_res)
def authenticate(self, auth_data, expected_status=httplib.CREATED):
resp = requests.post(
self.keystone_token_endpoint, headers=self.headers, json=auth_data,
verify='/etc/ssl/certs/keystone.pem',
)
self.assertEqual(expected_status, resp.status_code, resp.text)
return resp
zendesk_localization.py 文件源码
项目:zendesk-help-center-localization
作者: mykola-mokhnach
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def _post(self, endpoint, post_data=None):
response = requests.request('POST',
url='{}/{}'.format(self._api_root, endpoint),
headers={'Content-Type': 'application/json',
'Accept': 'application/json'},
auth=(self._login + '/token', self._token),
json=post_data)
if response.status_code in (httplib.OK, httplib.CREATED):
return response.json()
raise APIError(response.text, response.status_code)
zendesk_localization.py 文件源码
项目:zendesk-help-center-localization
作者: mykola-mokhnach
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def _put(self, endpoint, put_data=None):
response = requests.request('PUT',
url='{}/{}'.format(self._api_root, endpoint),
headers={'Content-Type': 'application/json',
'Accept': 'application/json'},
auth=(self._login + '/token', self._token),
json=put_data)
if response.status_code in (httplib.OK, httplib.CREATED):
return response.json()
raise APIError(response.text, response.status_code)
zendesk_localization.py 文件源码
项目:zendesk-help-center-localization
作者: mykola-mokhnach
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def _upload_file(self, endpoint, path, form_data=None):
multipart_data = {}
if form_data is not None:
for key, value in form_data.iteritems():
multipart_data[key] = (None, value)
with open(path, 'rb') as src_file:
multipart_data['file'] = src_file
response = requests.request('POST',
url='{}/{}'.format(self._api_root, endpoint),
headers={'Accept': 'application/json'},
auth=(self._login + '/token', self._token),
files=multipart_data)
if response.status_code in (httplib.NO_CONTENT, httplib.CREATED):
return response.json()
raise APIError(response.text, response.status_code)
def create_credentials(token, account_id):
"""
Get client credentials, given a client token and an account_id.
Reference:
https://docs.brightcove.com/en/video-cloud/oauth-api/guides/get-client-credentials.html
"""
headers = {'Authorization': 'BC_TOKEN {}'.format(token)}
data = {
"type": "credential",
"maximum_scope": [{
"identity": {
"type": "video-cloud-account",
"account-id": int(account_id),
},
"operations": [
"video-cloud/video/all",
"video-cloud/ingest-profiles/profile/read",
"video-cloud/ingest-profiles/account/read",
"video-cloud/ingest-profiles/profile/write",
"video-cloud/ingest-profiles/account/write",
],
}],
"name": "Open edX Video XBlock"
}
url = 'https://oauth.brightcove.com/v4/client_credentials'
response = requests.post(url, json=data, headers=headers)
response_data = response.json()
# New resource must have been created.
if response.status_code == httplib.CREATED and response_data:
client_secret = response_data.get('client_secret')
client_id = response_data.get('client_id')
error_message = ''
else:
# For dev purposes, response_data.get('error_description') may also be considered.
error_message = "Authentication to Brightcove API failed: no client credentials have been retrieved.\n" \
"Please ensure you have provided an appropriate BC token, using Video API Token field."
raise BrightcoveApiClientError(error_message)
return client_secret, client_id, error_message
def post(self, url, payload, headers=None, can_retry=True):
"""
Issue REST POST request to a given URL. Can throw ApiClientError or its subclass.
Arguments:
url (str): API url to fetch a resource from.
payload (dict): POST data.
headers (dict): Headers necessary as per API, e.g. authorization bearer to perform authorised requests.
can_retry (bool): True if in a case of authentication error it can refresh access token and retry a call.
Returns:
Response in Python native data format.
"""
headers_ = {
'Authorization': 'Bearer ' + self.access_token,
'Content-type': 'application/json'
}
if headers is not None:
headers_.update(headers)
resp = requests.post(url, data=payload, headers=headers_)
if resp.status_code in (httplib.OK, httplib.CREATED):
return resp.json()
elif resp.status_code == httplib.UNAUTHORIZED and can_retry:
self.access_token = self._refresh_access_token()
return self.post(url, payload, headers, can_retry=False)
else:
raise BrightcoveApiClientError
def _handle_post(gcs_stub, filename, headers):
"""Handle POST that starts object creation."""
content_type = _ContentType(headers)
token = gcs_stub.post_start_creation(filename, headers)
response_headers = {
'location': 'https://storage.googleapis.com/%s?%s' % (
urllib.quote(filename),
urllib.urlencode({'upload_id': token})),
'content-type': content_type.value,
'content-length': 0
}
return _FakeUrlFetchResult(httplib.CREATED, response_headers, '')
def test_send_samples_valid(self, mock_cs):
mock_cs.side_effect = [None]
api = API.apiCalls.ApiCalls(
client_id="",
client_secret="",
base_URL="",
username="",
password=""
)
json_dict = {
"resource": {
"sequencerSampleId": "03-3333",
"description": "The 53rd sample",
"sampleName": "03-3333",
"sampleProject": "1"
}
}
json_obj = json.dumps(json_dict)
session_response = Foo()
setattr(session_response, "status_code", httplib.CREATED)
setattr(session_response, "text", json_obj)
session_post = MagicMock(side_effect=[session_response])
session = Foo()
setattr(session, "post", session_post)
api.get_link = lambda x, y, targ_dict="": None
api.session = session
sample_dict = {
"sequencerSampleId": "03-3333",
"description": "The 53rd sample",
"sampleName": "03-3333",
"sampleProject": "1"
}
sample = API.apiCalls.Sample(sample_dict)
json_res_list = api.send_samples([sample])
self.assertEqual(len(json_res_list), 1)
json_res = json_res_list[0]
self.assertEqual(json_res, json_dict)
def test_send_sequence_files_valid(self, getsize, mock_cs):
mock_cs.side_effect = [None]
api = API.apiCalls.ApiCalls(
client_id="",
client_secret="",
base_URL="",
username="",
password=""
)
json_dict = {
"resource": [
{
"file": "03-3333_S1_L001_R1_001.fastq.gz"
},
{
"file": "03-3333_S1_L001_R2_001.fastq.gz"
}
]
}
json_obj = json.dumps(json_dict)
session_response = Foo()
setattr(session_response, "status_code", httplib.CREATED)
setattr(session_response, "text", json_obj)
session_post = MagicMock(side_effect=[session_response])
session = Foo()
setattr(session, "post", session_post)
api.get_link = lambda x, y, targ_dict="": None
api.session = session
API.apiCalls.ApiCalls.get_file_size_list = MagicMock()
sample_dict = {
"sequencerSampleId": "03-3333",
"description": "The 53rd sample",
"sampleName": "03-3333",
"sampleProject": "1"
}
sample = API.apiCalls.Sample(sample_dict)
files = ["03-3333_S1_L001_R1_001.fastq.gz",
"03-3333_S1_L001_R2_001.fastq.gz"]
seq_file = SequenceFile({}, files)
sample.set_seq_file(seq_file)
sample.run = SequencingRun(sample_sheet="sheet", sample_list=[sample])
sample.run._sample_sheet_name = "sheet"
kwargs = {
"samples_list": [sample]
}
json_res_list = api.send_sequence_files(**kwargs)
self.assertEqual(len(json_res_list), 1)
json_res = json_res_list[0]
self.assertEqual(json_res, json_dict)
def send_project(self, project, clear_cache=True):
"""
post request to send a project to IRIDA via API
the project being sent requires a name that is at least
5 characters long
arguments:
project -- a Project object to be sent.
returns a dictionary containing the result of post request.
when post is successful the dictionary it returns will contain the same
name and projectDescription that was originally sent as well as
additional keys like createdDate and identifier.
when post fails then an error will be raised so return statement is
not even reached.
"""
if clear_cache:
self.cached_projects = None
json_res = {}
if len(project.get_name()) >= 5:
url = self.get_link(self.base_URL, "projects")
json_obj = json.dumps(project.get_dict())
headers = {
"headers": {
"Content-Type": "application/json"
}
}
response = self.session.post(url, json_obj, **headers)
if response.status_code == httplib.CREATED: # 201
json_res = json.loads(response.text)
else:
raise ProjectError("Error: " +
str(response.status_code) + " " +
response.text)
else:
raise ProjectError("Invalid project name: " +
project.get_name() +
". A project requires a name that must be " +
"5 or more characters.")
return json_res
def send_samples(self, samples_list):
"""
post request to send sample(s) to the given project
the project that the sample will be sent to is in its dictionary's
"sampleProject" key
arguments:
samples_list -- list containing Sample object(s) to send
returns a list containing dictionaries of the result of post request.
"""
self.cached_samples = {} # reset the cache, we're updating stuff
self.cached_projects = None
json_res_list = []
for sample in samples_list:
try:
project_id = sample.get_project_id()
proj_URL = self.get_link(self.base_URL, "projects")
url = self.get_link(proj_URL, "project/samples",
targ_dict={
"key": "identifier",
"value": project_id
})
except StopIteration:
raise ProjectError("The given project ID: " +
project_id + " doesn't exist")
headers = {
"headers": {
"Content-Type": "application/json"
}
}
json_obj = json.dumps(sample, cls=Sample.JsonEncoder)
response = self.session.post(url, json_obj, **headers)
if response.status_code == httplib.CREATED: # 201
json_res = json.loads(response.text)
json_res_list.append(json_res)
else:
logging.error("Didn't create sample on server, response code is [{}] and error message is [{}]".format(response.status_code, response.text))
raise SampleError("Error {status_code}: {err_msg}.\nSample data: {sample_data}".format(
status_code=str(response.status_code),
err_msg=response.text,
sample_data=str(sample)), ["IRIDA rejected the sample."])
return json_res_list