def test_rename_log(mocked_url, mocked_rw_apikey, mocked_ro_apikey, capsys):
test_log_id = str(uuid.uuid4())
mocked_url.return_value = '', MOCK_API_URL
mocked_rw_apikey.return_value = ID_WITH_VALID_LENGTH
mocked_ro_apikey.return_value = ID_WITH_VALID_LENGTH
request_body = '{"log": {"name": "test.log", "logsets_info": [], "source_type": "token"}}'
expected_result = '{"log": {"name": "new_test_log_name", "logsets_info": [], "source_type": "token"}}'
httpretty.register_uri(httpretty.GET, MOCK_API_URL,
status=200,
content_type='application/json',
body=request_body)
httpretty.register_uri(httpretty.PUT, MOCK_API_URL, status=200,
body = expected_result, content_type='application/json')
new_name_for_log = "new_test_log_name"
api.rename_log(test_log_id, new_name_for_log)
out, err = capsys.readouterr()
assert new_name_for_log in out
python类uuid4()的实例源码
def test_delete_api_key(mocked_url, mocked_owner_apikey, mocked_owner_apikey_id,
mocked_account_resource_id, capsys):
api_key_id = str(uuid.uuid4())
mocked_url.return_value = '', MOCK_API_URL + '/' + api_key_id
mocked_owner_apikey.return_value = str(uuid.uuid4())
mocked_owner_apikey_id.return_value = str(uuid.uuid4())
mocked_account_resource_id.return_value = str(uuid.uuid4())
httpretty.register_uri(httpretty.DELETE, MOCK_API_URL + '/' + api_key_id,
status=204,
content_type='application/json')
api.delete(api_key_id)
out, err = capsys.readouterr()
assert not err
assert 'Deleted api key with id: %s' % api_key_id in out
def test_disable_api_key(mocked_url, mocked_owner_apikey, mocked_owner_apikey_id,
mocked_account_resource_id, capsys):
api_key_id = str(uuid.uuid4())
mocked_url.return_value = '', MOCK_API_URL
mocked_owner_apikey.return_value = str(uuid.uuid4())
mocked_owner_apikey_id.return_value = str(uuid.uuid4())
mocked_account_resource_id.return_value = str(uuid.uuid4())
httpretty.register_uri(httpretty.PATCH, MOCK_API_URL,
status=200,
content_type='application/json',
body=json.dumps({}))
api.update(api_key_id, False)
out, err = capsys.readouterr()
assert {'apikey': {'active': False}} == json.loads(httpretty.last_request().body)
assert not err
assert 'Disabled api key with id: %s' % api_key_id in out
def test_enable_api_key(mocked_url, mocked_owner_apikey, mocked_owner_apikey_id,
mocked_account_resource_id, capsys):
api_key_id = str(uuid.uuid4())
mocked_url.return_value = '', MOCK_API_URL
mocked_owner_apikey.return_value = str(uuid.uuid4())
mocked_owner_apikey_id.return_value = str(uuid.uuid4())
mocked_account_resource_id.return_value = str(uuid.uuid4())
httpretty.register_uri(httpretty.PATCH, MOCK_API_URL,
status=200,
content_type='application/json',
body=json.dumps({}))
api.update(api_key_id, True)
out, err = capsys.readouterr()
assert {'apikey': {'active': True}} == json.loads(httpretty.last_request().body)
assert not err
assert 'Enabled api key with id: %s' % api_key_id in out
def test_create_logset_from_file(mocked_url, mocked_rw_apikey, capsys):
mocked_url.return_value = '', MOCK_API_URL
mocked_rw_apikey.return_value = str(uuid.uuid4())
httpretty.register_uri(httpretty.POST, MOCK_API_URL, status=201,
content_type='application/json',
body=json.dumps(LOGSET_RESPONSE))
params = {
"logset": {
"name": "Test Logset"
}
}
api.create_logset(params=params)
out, err = capsys.readouterr()
assert 'Test Logset' in out
def test_create_logset_invalid_json(mocked_url, mocked_rw_apikey, capsys):
with pytest.raises(SystemExit) as exit:
mocked_url.return_value = '', MOCK_API_URL
mocked_rw_apikey.return_value = str(uuid.uuid4())
httpretty.register_uri(httpretty.POST, MOCK_API_URL, status=400,
content_type='application/json',
body='Client Error: Bad Request for url: https://rest.logentries.com/management/logsets')
invalid_params = {
"logset": {
"id": "12341234-XXXX-YYYY-XXXX-12341234",
"unknown_field": "unknown value"
}
}
api.create_logset(params=invalid_params)
out, err = capsys.readouterr()
assert exit.code is 1
assert "Creating logset failed, status code: 400" in out
def test_rename_logset(mocked_url, mocked_rw_apikey, mocked_ro_apikey, capsys):
mocked_url.return_value = '', MOCK_API_URL
mocked_rw_apikey.return_value = str(uuid.uuid4())
mocked_ro_apikey.return_value = str(uuid.uuid4())
response_body = '{"logset": {"id": "XXXXXXXX-XXXX-YYYY-XXXX-XXXXXXXX","logs_info": [],"name": "old logset name"}}'
expected_result = '{"logset": {"id": "XXXXXXXX-XXXX-YYYY-XXXX-XXXXXXXX","logs_info": [],"name": "new logset name"}}'
httpretty.register_uri(httpretty.GET, MOCK_API_URL, status=200,
content_type='application/json',
body=response_body)
httpretty.register_uri(httpretty.PUT, MOCK_API_URL, status=200,
content_type='application/json',
body=expected_result)
api.rename_logset('XXXXXXXX-XXXX-YYYY-XXXX-XXXXXXXX', 'new logset name')
out, err = capsys.readouterr()
assert "new logset name" in out
def test_add_unknown_log_to_logset(mocked_url, mocked_ro_apikey, mocked_rw_apikey, capsys):
with pytest.raises(SystemExit) as exit:
mocked_url.return_value = '', MOCK_API_URL
mocked_rw_apikey.return_value = str(uuid.uuid4())
mocked_ro_apikey.return_value = str(uuid.uuid4())
httpretty.register_uri(httpretty.GET, MOCK_API_URL, status=200,
content_type='application/json', body=json.dumps(LOGSET_RESPONSE))
httpretty.register_uri(httpretty.PUT, MOCK_API_URL, status=400,
content_type='application/json')
api.add_log('XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXX', 'unknown_log')
out, err = capsys.readouterr()
assert "400" in out
assert exit.code is 1
def test_remove_log_from_logset(mocked_url, mocked_ro_apikey, mocked_rw_apikey, capsys):
mocked_url.return_value = '', MOCK_API_URL
mocked_rw_apikey.return_value = str(uuid.uuid4())
mocked_ro_apikey.return_value = str(uuid.uuid4())
httpretty.register_uri(httpretty.GET, MOCK_API_URL ,
status=200,
content_type='application/json',
body=json.dumps(LOGSET_RESPONSE))
httpretty.register_uri(httpretty.PUT, MOCK_API_URL,
status=200,
content_Type='application/json',
body=json.dumps(LOGSET_RESPONSE))
api.delete_log('123', str(uuid.uuid4()))
out, err = capsys.readouterr()
assert not err
def test_delete_logset_with_log_in_another_logset(mocked_url, mocked_rw_apikey, mocked_ro_apikey, capsys):
mocked_url.return_value = '', MOCK_API_URL
mocked_rw_apikey.return_value = str(uuid.uuid4())
mocked_ro_apikey.return_value = str(uuid.uuid4())
httpretty.register_uri(httpretty.DELETE, MOCK_API_URL,
status=204, content_type='application/json')
httpretty.register_uri(httpretty.GET, MOCK_API_URL,
status=200, content_type='application/json',
body=json.dumps({}))
api.delete_logset('123')
api.get_logset('456')
out, err = capsys.readouterr()
assert not err
def test_delete_user_from_team(mocked_url, mocked_rw_apikey, mocked_account_resource_id, capsys):
test_team_id = str(uuid.uuid4())
mocked_url.return_value = '', MOCK_API_URL
mocked_rw_apikey.return_value = str(uuid.uuid4())
mocked_account_resource_id.return_value = str(uuid.uuid4())
httpretty.register_uri(httpretty.GET, MOCK_API_URL, status=200,
body=json.dumps({'team': TEAM_RESPONSE}),
content_type='application/json')
httpretty.register_uri(httpretty.PUT, MOCK_API_URL, status=200,
content_type='application/json')
user_id_to_add = str(uuid.uuid4())
api.delete_user_from_team(test_team_id, user_id_to_add)
out, err = capsys.readouterr()
assert "Deleted user with key: '%s' from team: %s\n" % (user_id_to_add, test_team_id) == out
def upload_file(upload_file_name, temp):
# upload_file_name?????
# ??? saveas???
# ?????????,??git???saveas
#key = md5(str(time.time())+''.join(random.sample(string.letters, 12))).hexdigest()
# key ??????
print u"??????: ",
pic_name = raw_input()
uuid_6 = uuid.uuid4().get_hex()[:8] #?????
key = pic_name+"_"+uuid_6+".png"
copyfile(upload_file_name,join(saveas,key))
mime_type = 'image/png'
token = q.upload_token(bucket, key)
ret, info = put_file(token, key, upload_file_name, mime_type=mime_type, check_crc=True)
print 'upload qiniu result:', info
assert ret['key'] == key
assert ret['hash'] == etag(upload_file_name)
os.rename(upload_file_name, upload_file_name+'.old')
return domain+'/'+key
def compare_component_output(self, input_path, expected_output_path):
rendering_engine = self.get_rendering_engine()
temp_dir = tempfile.gettempdir()
output_dir = os.path.join(temp_dir, str(uuid.uuid4()))
process_sketch_archive(zip_path=input_path, compress_zip=False,
output_path=output_dir, engine=rendering_engine)
self.assertTrue(dircmp.is_same(expected_output_path, output_dir))
shutil.rmtree(output_dir)
storage.clear()
output_zip = os.path.join(temp_dir, "{}.zip".format(str(uuid.uuid4())))
process_sketch_archive(zip_path=input_path, compress_zip=True,
output_path=output_zip, engine=rendering_engine)
z = zipfile.ZipFile(output_zip)
z.extractall(output_dir)
self.assertTrue(dircmp.is_same(expected_output_path, output_dir))
shutil.rmtree(output_dir)
os.remove(output_zip)
def remote_restart(rel_name, remote_service=None):
trigger = {
'restart-trigger': str(uuid.uuid4()),
}
if remote_service:
trigger['remote-service'] = remote_service
for rid in relation_ids(rel_name):
# This subordinate can be related to two seperate services using
# different subordinate relations so only issue the restart if
# the principle is conencted down the relation we think it is
if related_units(relid=rid):
relation_set(relation_id=rid,
relation_settings=trigger,
)
def get_uuid_epoch_stamp(self):
"""Returns a stamp string based on uuid4 and epoch time. Useful in
generating test messages which need to be unique-ish."""
return '[{}-{}]'.format(uuid.uuid4(), time.time())
# amulet juju action helpers:
def setUpTestData(cls):
super().setUpTestData()
cls.create_local_and_remote_user()
cls.local_content = LocalContentFactory()
cls.local_content2 = LocalContentFactory(guid=str(uuid4()))
cls.remote_content = PublicContentFactory()
cls.remote_profile2 = PublicProfileFactory()
def test_does_not_forward_share_if_not_local_content(self, mock_rq):
entity = base.Share(
guid=str(uuid4()), handle=self.remote_profile.handle, target_guid=self.remote_content.guid,
target_handle=self.remote_content.author.handle, public=True,
)
process_entity_share(entity, self.remote_profile)
self.assertFalse(mock_rq.called)
def test_forwards_share_if_local_content(self, mock_rq):
entity = base.Share(
guid=str(uuid4()), handle=self.remote_profile.handle, target_guid=self.local_content.guid,
target_handle=self.local_content.author.handle, public=True,
)
process_entity_share(entity, self.remote_profile)
mock_rq.assert_called_once_with(forward_entity, entity, self.local_content.id)