def test_chain_with_different_for_the_nth_time_error(server: FakeServer):
requests.patch(server.base_uri + "/users", json={"name": "new_name", "level": 5})
requests.patch(server.base_uri + "/users", json={"name": "new_name", "level": 6})
requests.patch(server.base_uri + "/users", json={"name": "new_name", "level": 7})
with pytest.raises(AssertionError) as error:
expect_that(server.was_requested("patch", "/users").
for_the_first_time().
with_content_type("application/json").
with_body('{"name": "new_name", "level": 6}').
for_the_second_time().
with_content_type("application/json").
with_body('{"name": "new_name", "level": 7}').
for_the_3_time().
with_content_type("application/json").
with_body('{"name": "new_name", "level": 8}'))
assert str(error.value) == ("Expect that server was requested with [PATCH] http://localhost:8081/users.\n"
"For the 1 time: with body '{\"name\": \"new_name\", \"level\": 6}'.\n"
"But for the 1 time: body was '{\"name\": \"new_name\", \"level\": 5}'.\n"
"For the 2 time: with body '{\"name\": \"new_name\", \"level\": 7}'.\n"
"But for the 2 time: body was '{\"name\": \"new_name\", \"level\": 6}'.\n"
"For the 3 time: with body '{\"name\": \"new_name\", \"level\": 8}'.\n"
"But for the 3 time: body was '{\"name\": \"new_name\", \"level\": 7}'.")
python类patch()的实例源码
test_fake_server_expectations.py 文件源码
项目:py_fake_server
作者: Telichkin
项目源码
文件源码
阅读 38
收藏 0
点赞 0
评论 0
def finish_proxy_log(self, data):
""" ???????, ?????? ???
:param data: ????
data = {
"proxy_log_id": 123123,
"date_finished": timestamp,
}
"""
assert isinstance(data.get('date_finished'), (int, float))
data['date_finished'] = timestamp_to_datetime_str(data['date_finished'])
data['is_failed'] = 1 if data.get('is_failed') else 0
data['is_finished'] = 1
proxy_log_id = data.get('proxy_log_id') or 0
r, content = self.patch('finish-proxy-log', pk=proxy_log_id, data=data)
if r.status_code != 200:
logging.warning('Finish proxy log failed: %s' % proxy_log_id)
return False
return True
def _send_raw_http_request(self, method, url, data=None):
self.__logger.debug('%s %s' % (method, url))
if method in ['POST', 'PUT', 'PATCH']:
self.__logger.log(TINTRI_LOG_LEVEL_DATA, 'Data: %s' % data)
headers = {'content-type': 'application/json'}
if self.__session_id:
headers['cookie'] = 'JSESSIONID=%s' % self.__session_id
if method in ['GET', 'POST', 'PUT', 'PATCH', 'DELETE']:
if method == 'GET': httpresp = requests.get(url, headers=headers, verify=False)
elif method == 'POST': httpresp = requests.post(url, data, headers=headers, verify=False)
elif method == 'PUT': httpresp = requests.put(url, data, headers=headers, verify=False)
elif method == 'PATCH': httpresp = requests.patch(url, data, headers=headers, verify=False)
elif method == 'DELETE': httpresp = requests.delete(url, headers=headers, verify=False)
self._httpresp = httpresp # self._httpresp is for debugging only, not thread-safe
return httpresp
else:
raise TintriError(None, message='Invalid HTTP method: ' + method) # This should never happen
def patch(self, udid):
data = tornado.escape.json_decode(self.request.body)
id = data['id']
timeout = float(data.get('timeout', 20.0))
print 'Timeout:', timeout
result = self.results.get(id)
if result is None:
self.results[id] = self
yield gen.sleep(timeout)
if self.results.get(id) == self:
del(self.results[id])
self.write('null')
self.finish()
else:
self.write(json.dumps(result))
self.results.pop(id, None)
def update_ports(self, service, ports):
"""Update ports in service or remove service if ports emtpy
:param service: service to update
:param ports: new ports for service
:return: updated service
"""
name = service['metadata']['name']
namespace = service['metadata']['namespace']
if ports:
data = {'spec': {'ports': ports}}
rv = self.patch(name, namespace, data)
raise_if_failure(rv, "Couldn't patch service ports")
return rv
else:
rv = self.delete(name, namespace)
raise_if_failure(rv, "Couldn't delete service")
return None
def clear_snooze_label_if_set(github_auth, issue, snooze_label):
issue_labels = {label["name"] for label in issue.get("labels", [])}
if snooze_label not in issue_labels:
logging.debug(
"clear_snooze_label_if_set: Label {} not set on {}".
format(snooze_label, issue["html_url"]))
return False
issue_labels.remove(snooze_label)
auth = requests.auth.HTTPBasicAuth(*github_auth)
r = requests.patch(issue["url"], auth=auth,
json={"labels": list(issue_labels)},
headers=constants.GITHUB_HEADERS)
r.raise_for_status()
logging.debug(
"clear_snooze_label_if_set: Removed snooze label from {}".
format(issue["html_url"]))
return True
def patch(self, udid):
data = tornado.escape.json_decode(self.request.body)
id = data['id']
timeout = float(data.get('timeout', 20.0))
print 'Timeout:', timeout
result = self.results.get(id)
if result is None:
self.results[id] = self
yield gen.sleep(timeout)
if self.results.get(id) == self:
del(self.results[id])
self.write('null')
self.finish()
else:
self.write(json.dumps(result))
self.results.pop(id, None)
def create_static(host, port, user, password, route, nexthop, insecure):
"""Function to create a static route on CSR1000V."""
url = "https://{h}:{p}/api/running/native/ip/route".format(h=HOST, p=PORT)
headers = {'content-type': 'application/vnd.yang.data+json',
'accept': 'application/vnd.yang.data+json'}
try:
result = requests.patch(url, auth=(USER, PASS), data=data,
headers=headers, verify=not insecure)
except Exception:
print(str(sys.exc_info()[0]))
return -1
return result.text
if result.status_code == 201:
return 0
# somethine went wrong
print(result.status_code, result.text)
return -1
def promote_deployment(config, application, version, release, stage, execute):
'''Promote deployment to new stage'''
namespace = config.get('kubernetes_namespace')
deployment_name = '{}-{}-{}'.format(application, version, release)
info('Promoting deployment {} to {} stage..'.format(deployment_name, stage))
cluster_id = config.get('kubernetes_cluster')
namespace = config.get('kubernetes_namespace')
path = '/kubernetes-clusters/{}/namespaces/{}/resources'.format(cluster_id, namespace)
resources_update = ResourcesUpdate()
resources_update.set_label(deployment_name, 'stage', stage)
response = request(config, requests.patch, path, json=resources_update.to_dict())
change_request_id = response.json()['id']
if execute:
approve_and_execute(config, change_request_id)
else:
print(change_request_id)
def scale_deployment(config, application, version, release, replicas, execute):
'''Scale a single deployment'''
namespace = config.get('kubernetes_namespace')
kubectl_login(config)
deployment_name = '{}-{}-{}'.format(application, version, release)
info('Scaling deployment {} to {} replicas..'.format(deployment_name, replicas))
resources_update = ResourcesUpdate()
resources_update.set_number_of_replicas(deployment_name, replicas)
cluster_id = config.get('kubernetes_cluster')
namespace = config.get('kubernetes_namespace')
path = '/kubernetes-clusters/{}/namespaces/{}/resources'.format(cluster_id, namespace)
response = request(config, requests.patch, path, json=resources_update.to_dict())
change_request_id = response.json()['id']
if execute:
approve_and_execute(config, change_request_id)
else:
print(change_request_id)
def save_job_source_code(base_url, token, job_id, source):
headers = {
'X-Auth-Token': str(token),
'Content-Type': 'application/json'
}
url = '{}/jobs/{}/source-code'.format(base_url, job_id)
r = requests.patch(url,
data=json.dumps({'secret': token, 'source': source}),
headers=headers)
if r.status_code == 200:
return json.loads(r.text)
else:
raise RuntimeError(
u"Error loading data from stand: HTTP {} - {} ({})".format(
r.status_code, r.text, url))
def set_settings(
self, explicit_filter, allow_dms, friend_all, friend_mutual,
friend_mutual_guild):
settings = {}
if explicit_filter is not None:
if not (0 <= explicit_filter <= 2):
raise ValueError("Explicit filter must be from 0 to 2.")
settings["explicit_content_filter"] = explicit_filter
if allow_dms is not None:
settings["default_guilds_restricted"] = not allow_dms
if not (friend_all is friend_mutual is friend_mutual_guild):
friend_all = friend_all and friend_mutual and friend_mutual_guild
settings["friend_source_flags"] = {
"all": bool(friend_all),
"mutual_friends": bool(friend_mutual),
"mutual_guilds": bool(friend_mutual_guild),
}
r = self.patch("users/@me/settings", auth=True, json=settings)
# Returns form errors if invalid; new settings otherwise
return (r.ok, r.json())
def update_case(self, case):
"""
Update a case.
:param case: The case to update. The case's `id` determines which case to update.
:return:
"""
req = self.url + "/api/case/{}".format(case.id)
# Choose which attributes to send
update_keys = [
'title', 'description', 'severity', 'startDate', 'owner', 'flag', 'tlp', 'tags', 'resolutionStatus',
'impactStatus', 'summary', 'endDate', 'metrics'
]
data = {k: v for k, v in case.__dict__.items() if k in update_keys}
try:
return requests.patch(req, headers={'Content-Type': 'application/json'}, json=data, proxies=self.proxies, auth=self.auth, verify=self.cert)
except requests.exceptions.RequestException:
sys.exit(1)
def edit_issue(auth_token, username, repo, issue_number, issue_title, issue_body, insecure=False):
"""
Edit an issue.
"""
# https://developer.github.com/v3/issues/#edit-an-issue
# Parameters (not required):
# title - String containing the title of the issue.
# body - String containing the body of the issue.
parameters = {"title": issue_title, "body": issue_body}
try:
response = requests.patch("https://api.github.com/repos/{}/issues/{}".format(repo, issue_number),
json=parameters,
headers=auth_header(auth_token))
except requests.SSLError:
print("A network error occurred.")
if insecure:
print("An SSL certificate error occurred.")
return response.json()
def edit_issue_comment(auth_token, username, repo, comment_id, comment_body, insecure=False):
"""
Edits an existing comment on a specified issue.
"""
# https://developer.github.com/v3/issues/comments/#edit-a-comment
# Note: Issue Comments are ordered by ascending ID.
# Parameters:
# body - Reqired string. The contents of the comment.
parameters = {"body": comment_body}
try:
response = requests.patch("https://api.github.com/repos/{}/issues/comments/{}".format(repo, comment_id),
json=parameters,
headers=auth_header(auth_token))
except:
print("A network error occurred.")
if insecure:
print("An SSL certificate error occurred.")
return response.json()
def patch(*args, **kwargs):
headers = {
'X-Access-Token': appconfig.get('provider_config', 'access-token'),
'X-Client-ID': appconfig.get('provider_config', 'client-id')
}
resp = None
for _ in range(WRequests.MAX_RETRIES):
try:
resp = requests.patch(*args, headers=headers, **kwargs).json()
except json.decoder.JSONDecodeError:
resp = None
else:
break
raise_if_error(resp)
return resp
def patch(self, the_id, user_data, user_params={}, user_headers={}):
strjsondata = ujson.dumps(user_data, ensure_ascii=False)
resp = req.patch(
self._url(self.endpoint, the_id),
data=strjsondata,
headers=self._headers(user_headers),
params=self._params(user_params)
)
if resp.status_code != 200:
raise ApiError(
"GET",
self.endpoint,
resp.status_code,
resp.text)
else:
return self._formatreturn(resp)
def test_mock_works_with_multiple_methods(self):
double = ServiceMock(
service='foo',
methods=['GET', 'POST', 'PATCH'],
url='/foo',
input_schema={'type': 'object'},
output_schema={'type': 'array'},
output=[]
)
set_service_locations(dict(foo="http://localhost:1234/"))
self.useFixture(double)
self.assertEqual(
200,
requests.post("http://localhost:1234/foo", json={}).status_code
)
self.assertEqual(
200,
requests.get("http://localhost:1234/foo", json={}).status_code
)
self.assertEqual(
200,
requests.patch("http://localhost:1234/foo", json={}).status_code
)
def update_route(route_fqdn, route_name, config, logger, watcher):
ipa_client = IPAClient(
ipa_user = config.get('ipa_user'),
ipa_password = config.get('ipa_password'),
ipa_url = config.get('ipa_url'),
ca_trust = config.get('ipa_ca_cert', False)
)
ipa_client.create_host(route_fqdn)
certificate, key = ipa_client.create_cert(route_fqdn, config.get('ipa_realm'))
logger.info("[CERT CREATED]: {0}".format(route_fqdn))
logger.debug("Cert: {0}\nKey: {1}\n".format(certificate, key.exportKey().decode('UTF-8')))
req = requests.patch('https://{0}/oapi/v1/namespaces/{1}/routes/{2}'.format(watcher.config.k8s_endpoint, watcher.config.k8s_namespace, route_name),
headers={'Authorization': 'Bearer {0}'.format(watcher.config.k8s_token), 'Content-Type':'application/strategic-merge-patch+json'},
data=json.dumps({'metadata': {'annotations': {'{0}.state'.format(config.get('need_cert_annotation')): 'created'}}, 'spec': {'tls': {'certificate': '-----BEGIN CERTIFICATE-----\n{0}\n-----END CERTIFICATE-----'.format(
'\n'.join(certificate[i:i+65] for i in six.moves.range(0, len(certificate), 65))),
'key': '{0}'.format(key.exportKey('PEM').decode('UTF-8'))}}}),
params="", verify=watcher.config.k8s_ca)
logger.info("[ROUTE UPDATED]: {0}".format(route_fqdn))
def set_attributes(login: Roll20Login, character_id: str,
attributes: Mapping[str, Union[str, int]],
attribute_position: AttributePosition = AttributePosition.current) -> None:
attribute_ids = get_attributes_ids(login=login, character_id=character_id)
for attribute_name, attribute_value in attributes.items():
# Cast int values to str.
attribute_value = str(attribute_value)
attribute_id = attribute_ids[attribute_name]
url = (f'{login.firebase_root}{login.campaign_path}/char-attribs/char/{character_id}/'
f'{attribute_id}/.json?auth={login.auth_token}')
response = requests.patch(url,
data=json.dumps({attribute_position.value: attribute_value}))
if response.status_code == requests.status_codes.codes.UNAUTHORIZED:
raise Roll20PermissionError('Permission denied trying to set attribute.')
else:
updated_attribute = json.loads(response.text)[attribute_position.value]
logger.debug(f'New {attribute_name}: {updated_attribute}')
def create_ability(login: Roll20Login, character_id: str, ability_name: str, ability_action: str,
is_token_action: bool = True) -> None:
new_ability = {'action': ability_action,
'name': ability_name,
'istokenaction': is_token_action}
url = (f'{login.firebase_root}{login.campaign_path}/'
f'char-abils/char/{character_id}.json?auth={login.auth_token}')
response = requests.post(url, data=json.dumps(new_ability))
if response.status_code == requests.status_codes.codes.UNAUTHORIZED:
raise Roll20PermissionError('Permission denied trying to create ability.')
else:
new_ability_id = json.loads(response.text)['name']
logger.debug(f'Created {ability_name} {new_ability_id}')
url = (f'{login.firebase_root}{login.campaign_path}/char-abils/char/'
f'{character_id}/{new_ability_id}/.json?auth={login.auth_token}')
requests.patch(url, data=json.dumps({'id': new_ability_id}))
def update_latest_version(tag_name):
params = {
"access_token": GITHUB_TOKEN
}
r = requests.get(GH_BASE_URL + "/releases/tags/latest",
params=params)
if r.status_code == 404:
release_id = _create_latest_version()
elif r.status_code == 200:
release_id = r.json()["id"]
data = {
"target_commitish": "master"
}
r = requests.patch(GH_BASE_URL + "/releases/{0}".format(release_id),
params=params, json=data)
assert r.status_code == 200
upload_url = r.json()['upload_url'].replace("{?name,label}", "")
_delete_all_assets(release_id)
_upload_asset(upload_url, "version", "text/plain", tag_name)
def _upload_chunk(self, data, offset, file_endpoint, headers=None, auth=None):
floyd_logger.debug("Uploading %s bytes chunk from offset: %s", len(data), offset)
h = {
'Content-Type': 'application/offset+octet-stream',
'Upload-Offset': str(offset),
'Tus-Resumable': self.TUS_VERSION,
}
if headers:
h.update(headers)
response = requests.patch(file_endpoint, headers=h, data=data, auth=auth)
self.check_response_status(response)
return int(response.headers["Upload-Offset"])
def add_or_update(self, name, config=None):
# does it exist already?
plugins_response = self.list()
plugins_list = plugins_response.json().get('data', [])
data = {
"name": name,
}
if config is not None:
data.update(config)
plugin_id = self._get_plugin_id(name, plugins_list)
if plugin_id is None:
return requests.post(self.base_url, data, auth=self.auth)
else:
url = "{}/{}" . format (self.base_url, plugin_id)
return requests.patch(url, data, auth=self.auth)
def _partial_update(cls, entity, parameters=None, ids={}): # pragma: no cover
entity_id = getattr(entity, 'id', None)
if entity_id is not None:
ids['id'] = entity_id
if isinstance(entity, BaseAbstractEntity):
response = cls.REST_CLIENT.patch(
cls.get_base_uri(cls.endpoint_single(), **ids), json=entity.get_json_data(), headers=make_headers(),
params=parameters
)
else:
response = cls.REST_CLIENT.patch(
cls.get_base_uri(cls.endpoint_single(), **ids), data=json.dumps(entity), headers=make_headers(),
params=parameters
)
response = cls.REST_CLIENT.handle_response(response)
if cls.entity_class() is None or not issubclass(cls.entity_class(), BaseAbstractEntity):
return response # pragma: no cover
else:
return cls.entity_class().from_dict(response.json())
def enable_survey(tower_config, job_template_id):
url = 'https://{0}/api/v1/job_templates/{1}/'.format(
tower_config['host'],
job_template_id)
headers = {'Content-type': 'application/json'}
response = requests.patch(
url,
verify=False,
auth=(tower_config['username'], tower_config['password']),
headers=headers,
data=json.dumps({
'survey_enabled': True}))
if response.status_code != 200:
exit_failure('error enabling job template survey')
def test_list_assigned_submissions(mount, mount_dir, teacher_jwt, teacher_id):
course = 'Programmeertalen'
for assig in ls(mount_dir, course):
for sub in ls(mount_dir, course, assig):
if 'Stupid1' in sub:
with open(join(mount_dir, course, assig, sub, '.cg-submission-id')) as f:
sub_id = f.read().strip()
r = requests.patch(
f'http://localhost:5000/api/v1/submissions/{sub_id}/grader',
json={'user_id': teacher_id},
headers={
'Authorization': 'Bearer ' + teacher_jwt,
}
)
assert r.status_code == 204
mount(assigned_to_me=True)
for assig in ls(mount_dir, course):
for sub in ls(mount_dir, course, assig):
assert 'Stupid1' in sub or sub[0] == '.'
def saveScore(analysis_id, score):
"""Save calculated score back to pit."""
a = ANALYSIS_TEMPLATE.copy()
a['id'] = analysis_id
a['score'] = score
resp = requests.patch('/'.join([pit_url, 'analysis', str(analysis_id)]),
data=json.dumps(a), headers=post_headers)
resp.raise_for_status()
# For an analysis item:
# * Fetch the item.
# * Fetch all relevant rules.
# * Evaluate each rule, keeping a running total of score.
# * Save score to analysis item.
def build_relationship(self, Left_TLO, Right_TLO):
right_type = Right_TLO.type[:1].upper() + Right_TLO.type[1:] #handles CAPS issue
submit_url = '{}/{}/{}/'.format(self.url, Left_TLO.collection, Left_TLO.get_ID())
params = {'api_key': self.api_key,'username': self.username}
data = {
'action': 'forge_relationship',
'type_': Left_TLO.type,
'id_': Left_TLO.get_ID(),
'right_type': right_type,
'right_id': Right_TLO.get_ID(),
'rel_type': 'Related To',
'rel_date': datetime.datetime.now(),
'rel_reason': '',
'rel_confidence': 'low',
'get_rels': True
}
r = requests.patch(submit_url, params=params, data=data, verify=False)
if r.status_code == 200:
return json.loads(r.text)
else:
return None
def update(self, eid, data, token):
"""
Update a given Library Entry.
:param eid str: Entry ID
:param data dict: Attributes
:param token str: OAuth token
:return: True or ServerError
:rtype: Bool or Exception
"""
final_dict = {"data": {"id": eid, "type": "libraryEntries", "attributes": data}}
final_headers = self.header
final_headers['Authorization'] = "Bearer {}".format(token)
r = requests.patch(self.apiurl + "/library-entries/{}".format(eid), json=final_dict, headers=final_headers)
if r.status_code != 200:
raise ConnectionError(r.text)
return True