def test_list_cleartext_rendered_documents_insufficient_permissions(self):
rules = {'deckhand:list_cleartext_documents': 'rule:admin_api',
'deckhand:create_cleartext_documents': '@'}
self.policy.set_rules(rules)
# Create a document for a bucket.
documents_factory = factories.DocumentFactory(1, [1])
payload = [documents_factory.gen_test({})[0]]
resp = self.app.simulate_put(
'/api/v1.0/buckets/mop/documents',
headers={'Content-Type': 'application/x-yaml'},
body=yaml.safe_dump_all(payload))
self.assertEqual(200, resp.status_code)
revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
'revision']
# Verify that the created document was not returned.
resp = self.app.simulate_get(
'/api/v1.0/revisions/%s/rendered-documents' % revision_id,
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(403, resp.status_code)
python类safe_load_all()的实例源码
test_rendered_documents_controller.py 文件源码
项目:deckhand
作者: att-comdev
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
test_revisions_rollback_controller.py 文件源码
项目:deckhand
作者: att-comdev
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_revision_rollback_cleartext_except_forbidden(self):
rules = {'deckhand:create_cleartext_documents': '@'}
self.policy.set_rules(rules)
# Create a revision so we have something to roll back to.
secrets_factory = factories.DocumentSecretFactory()
payload = [secrets_factory.gen_test('Certificate', 'cleartext')]
resp = self.app.simulate_put(
'/api/v1.0/buckets/mop/documents',
headers={'Content-Type': 'application/x-yaml'},
body=yaml.safe_dump_all(payload))
self.assertEqual(200, resp.status_code)
revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
'revision']
rules = {'deckhand:create_cleartext_documents': 'rule:admin_api'}
self.policy.set_rules(rules)
resp = self.app.simulate_post(
'/api/v1.0/rollback/%s' % revision_id,
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(403, resp.status_code)
test_revision_documents_controller.py 文件源码
项目:deckhand
作者: att-comdev
项目源码
文件源码
阅读 31
收藏 0
点赞 0
评论 0
def test_list_cleartext_revision_documents_insufficient_permissions(self):
rules = {'deckhand:list_cleartext_documents': 'rule:admin_api',
'deckhand:create_cleartext_documents': '@'}
self.policy.set_rules(rules)
# Create a document for a bucket.
secrets_factory = factories.DocumentSecretFactory()
payload = [secrets_factory.gen_test('Certificate', 'cleartext')]
resp = self.app.simulate_put(
'/api/v1.0/buckets/mop/documents',
headers={'Content-Type': 'application/x-yaml'},
body=yaml.safe_dump_all(payload))
self.assertEqual(200, resp.status_code)
revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
'revision']
# Verify that the created document was not returned.
resp = self.app.simulate_get(
'/api/v1.0/revisions/%s/documents' % revision_id,
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(403, resp.status_code)
def test_put_bucket(self):
rules = {'deckhand:create_cleartext_documents': '@'}
self.policy.set_rules(rules)
documents_factory = factories.DocumentFactory(2, [1, 1])
document_mapping = {
"_GLOBAL_DATA_1_": {"data": {"a": {"x": 1, "y": 2}}},
"_SITE_DATA_1_": {"data": {"a": {"x": 7, "z": 3}, "b": 4}},
"_SITE_ACTIONS_1_": {
"actions": [{"method": "merge", "path": "."}]}
}
payload = documents_factory.gen_test(document_mapping)
resp = self.app.simulate_put(
'/api/v1.0/buckets/mop/documents',
headers={'Content-Type': 'application/x-yaml'},
body=yaml.safe_dump_all(payload))
self.assertEqual(200, resp.status_code)
created_documents = list(yaml.safe_load_all(resp.text))
self.assertEqual(3, len(created_documents))
expected = sorted([(d['schema'], d['metadata']['name'])
for d in payload])
actual = sorted([(d['schema'], d['metadata']['name'])
for d in created_documents])
self.assertEqual(expected, actual)
def load_yaml(self, filename, serialization, k8s=False):
try:
# XXX This is a bit of a hack -- yaml.safe_load_all returns a
# generator, and if we don't use list() here, any exception
# dealing with the actual object gets deferred
ocount = 1
for obj in yaml.safe_load_all(serialization):
if k8s:
self.prep_k8s(filename, ocount, obj)
else:
self.objects_to_process.append((filename, ocount, obj))
ocount += 1
except Exception as e:
# No sense letting one attribute with bad YAML take down the whole
# gateway, so post the error but keep any objects we were able to
# parse before hitting the error.
self.filename = filename
self.ocount = ocount
self.post_error(RichStatus.fromError("%s: could not parse YAML" % filename))
def run_update(args):
logging.info('Getting updates...')
db = init_db(args.db)
cur = db.cursor()
cfg = next(yaml.safe_load_all(open(args.chores, 'r', encoding='utf-8')))
chores_avail = []
tasks = sched.scheduler(time.time)
for name, config in cfg.items():
result = cur.execute('SELECT updated, last_result FROM chore_status WHERE name = ?', (name,)).fetchone()
if result:
result = chores.ChoreStatus(*result)
chorename = config.pop('chore')
chore = chores.CHORE_HANDLERS[chorename](name, status=result, **config)
chores_avail.append((chorename, chore))
try:
while 1:
for chorename, chore in chores_avail:
tasks.enterabs(
chore.status.updated + args.keep * 60,
chores.CHORE_PRIO[chorename],
wrap_fetch, (chore, cur)
)
tasks.run()
db.commit()
if args.keep:
logging.info('A round of updating completed.')
else:
break
except KeyboardInterrupt:
logging.warning('Interrupted.')
finally:
db.commit()
def invoke(self):
if not self.ctx.obj.get('api', False):
documents = yaml.safe_load_all(open(self.filename).read())
manifest_obj = Manifest(documents).get_manifest()
obj_check = validate_armada_object(manifest_obj)
doc_check = validate_armada_documents(documents)
try:
if doc_check and obj_check:
self.logger.info(
'Successfully validated: %s', self.filename)
except Exception:
raise Exception('Failed to validate: %s', self.filename)
else:
client = self.ctx.obj.get('CLIENT')
with open(self.filename, 'r') as f:
resp = client.post_validate(f.read())
if resp.get('valid', False):
self.logger.info(
'Successfully validated: %s', self.filename)
else:
self.logger.error("Failed to validate: %s", self.filename)
def test_update_dictionary_valid(self):
expected = "{}/templates/override-{}-expected.yaml".format(
self.basepath, '01')
merge = "{}/templates/override-{}.yaml".format(self.basepath, '01')
with open(self.base_manifest) as f, open(expected) as e, open(
merge) as m:
merging_values = list(yaml.safe_load_all(m.read()))
doc_obj = list(yaml.safe_load_all(f.read()))
doc_path = ['chart', 'blog-1']
ovr = Override(doc_obj)
ovr.update_document(merging_values)
ovr_doc = ovr.find_manifest_document(doc_path)
expect_doc = list(yaml.load_all(e.read()))[0]
self.assertEqual(ovr_doc, expect_doc)
def load_schemas(self):
self.v1_doc_schemas = dict()
schema_dir = self._get_schema_dir()
for schema_file in os.listdir(schema_dir):
f = open(os.path.join(schema_dir, schema_file), 'r')
for schema in yaml.safe_load_all(f):
schema_for = schema['metadata']['name']
if schema_for in self.v1_doc_schemas:
self.logger.warning(
"Duplicate document schemas found for document kind %s."
% schema_for)
self.logger.debug(
"Loaded schema for document kind %s." % schema_for)
self.v1_doc_schemas[schema_for] = schema
f.close()
def load_schemas(self):
self.v1_doc_schemas = dict()
schema_dir = self._get_schema_dir()
for schema_file in os.listdir(schema_dir):
f = open(os.path.join(schema_dir, schema_file), 'r')
for schema in yaml.safe_load_all(f):
schema_for = schema['metadata']['name']
if schema_for in self.v1_doc_schemas:
self.logger.warning(
"Duplicate document schemas found for document kind %s."
% schema_for)
self.logger.debug(
"Loaded schema for document kind %s." % schema_for)
self.v1_doc_schemas[schema_for] = schema.get('data')
f.close()
def post(self, **params):
user = self.get_current_user()
data = params.get('data')
if data is None:
raise InsufficientData('No "data" provided')
try:
parsed_data = list(yaml.safe_load_all(data))
except yaml.YAMLError as e:
raise PredefinedAppExc.UnparseableTemplate(
'Incorrect yaml, parsing failed: "{0}"'.format(str(e)))
try:
res = start_pod_from_yaml(parsed_data, user=user)
except APIError as e: # pass as is
raise
except Exception as e:
raise PredefinedAppExc.InternalPredefinedAppError(
details={'message': str(e)})
send_event_to_user('pod:change', res, user.id)
return res
def load_file(input_file, lib_to_use):
try:
with open(input_file) as data_file:
if lib_to_use == 'yaml':
output_file = yaml.safe_load(data_file)
elif lib_to_use == 'yaml_multi':
output_file = []
for doc in yaml.safe_load_all(data_file):
output_file.append(doc)
return output_file
except Exception as ex:
print "Unable to load file: " + input_file + "\nException information:\n" + str(ex.args)
sys.exit("Unable to load file: " + input_file)
def read_yaml(self):
docs_gen = yaml.safe_load_all(self.app.stdin)
doc = next(docs_gen)
guard = object()
if next(docs_gen, guard) is not guard:
self.app.stderr.write("Warning: will use only first "
"document from YAML stream")
return doc
def get_value_to_set(self, parsed_args):
type_ = parsed_args.type
if type_ == 'null':
return None
elif type_ == 'bool':
if parsed_args.value.lower() in ('1', 'true'):
return True
elif parsed_args.value.lower() in ('0', 'false'):
return False
else:
raise Exception(
"Bad value for 'bool' type: '{}'. Should be one of '0', "
"'1', 'false', 'true'.".format(parsed_args.value))
elif type_ == 'int':
return int(parsed_args.value)
elif type_ == 'str':
return parsed_args.value
elif type_ == 'json':
return json.loads(parsed_args.value)
elif type_ == 'yaml':
return yaml.safe_load(parsed_args.value)
elif type_ is None:
if parsed_args.format == 'json':
return json.load(self.app.stdin)
elif parsed_args.format == 'yaml':
docs_gen = yaml.safe_load_all(self.app.stdin)
doc = next(docs_gen)
guard = object()
if next(docs_gen, guard) is not guard:
self.app.stderr.write("Warning: will use only first "
"document from YAML stream")
return doc
assert False, "Shouldn't get here"
def data(self):
# ????????data???yaml????????????????
if not self._data:
with open(self.yamlf, 'rb') as f:
self._data = list(yaml.safe_load_all(f)) # load???generator??list?????
return self._data
def get_all_configs():
return safe_load_all(open('config.yaml'))
def _to_dict(self, body, many=False):
"""Convert YAML-formatted response body into dict or list.
:param body: YAML-formatted response body to convert.
:param many: Controls whether to return list or dict. If True, returns
list, else dict. False by default.
:rtype: dict or list
"""
try:
return (
list(yaml.safe_load_all(body))
if many else yaml.safe_load(body)
)
except yaml.YAMLError:
return None
def loads(string):
# NOTE: The simple approach to handling dictionary versus list response
# bodies is to always parse the response body as a list and index into
# the first element using [0] throughout the tests.
return list(yaml.safe_load_all(string))
test_rendered_documents_controller.py 文件源码
项目:deckhand
作者: att-comdev
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def test_list_rendered_documents_exclude_abstract_documents(self):
rules = {'deckhand:list_cleartext_documents': '@',
'deckhand:list_encrypted_documents': '@',
'deckhand:create_cleartext_documents': '@'}
self.policy.set_rules(rules)
# Create 2 docs: one concrete, one abstract.
documents_factory = factories.DocumentFactory(2, [1, 1])
payload = documents_factory.gen_test(
{}, global_abstract=False, region_abstract=True)
concrete_doc = payload[1]
resp = self.app.simulate_put(
'/api/v1.0/buckets/mop/documents',
headers={'Content-Type': 'application/x-yaml'},
body=yaml.safe_dump_all(payload))
self.assertEqual(200, resp.status_code)
revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
'revision']
# Verify that the concrete document is returned, but not the abstract
# one.
resp = self.app.simulate_get(
'/api/v1.0/revisions/%s/rendered-documents' % revision_id,
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(200, resp.status_code)
rendered_documents = list(yaml.safe_load_all(resp.text))
self.assertEqual(1, len(rendered_documents))
is_abstract = rendered_documents[0]['metadata']['layeringDefinition'][
'abstract']
self.assertFalse(is_abstract)
for key, value in concrete_doc.items():
if isinstance(value, dict):
self.assertDictContainsSubset(value,
rendered_documents[0][key])
else:
self.assertEqual(value, rendered_documents[0][key])
test_rendered_documents_controller.py 文件源码
项目:deckhand
作者: att-comdev
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def test_list_rendered_documents_multiple_buckets(self):
rules = {'deckhand:list_cleartext_documents': '@',
'deckhand:list_encrypted_documents': '@',
'deckhand:create_cleartext_documents': '@'}
self.policy.set_rules(rules)
documents_factory = factories.DocumentFactory(1, [1])
for idx in range(2):
payload = documents_factory.gen_test({})
if idx == 0:
# Pop off the first entry so that a conflicting layering
# policy isn't created during the 1st iteration.
payload.pop(0)
resp = self.app.simulate_put(
'/api/v1.0/buckets/%s/documents' % test_utils.rand_name(
'bucket'),
headers={'Content-Type': 'application/x-yaml'},
body=yaml.safe_dump_all(payload))
self.assertEqual(200, resp.status_code)
revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
'revision']
resp = self.app.simulate_get(
'/api/v1.0/revisions/%s/rendered-documents' % revision_id,
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(200, resp.status_code)
test_rendered_documents_controller.py 文件源码
项目:deckhand
作者: att-comdev
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_list_encrypted_rendered_documents_insufficient_permissions(self):
rules = {'deckhand:list_cleartext_documents': '@',
'deckhand:list_encrypted_documents': 'rule:admin_api',
'deckhand:create_cleartext_documents': '@',
'deckhand:create_encrypted_documents': '@'}
self.policy.set_rules(rules)
# Create a document for a bucket.
documents_factory = factories.DocumentFactory(1, [1])
layering_policy = documents_factory.gen_test({})[0]
secrets_factory = factories.DocumentSecretFactory()
encrypted_document = secrets_factory.gen_test('Certificate',
'encrypted')
payload = [layering_policy, encrypted_document]
with mock.patch.object(buckets.BucketsResource, 'secrets_mgr',
autospec=True) as mock_secrets_mgr:
mock_secrets_mgr.create.return_value = {
'secret': payload[0]['data']}
resp = self.app.simulate_put(
'/api/v1.0/buckets/mop/documents',
headers={'Content-Type': 'application/x-yaml'},
body=yaml.safe_dump_all(payload))
self.assertEqual(200, resp.status_code)
revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
'revision']
# Verify that the created document was not returned.
resp = self.app.simulate_get(
'/api/v1.0/revisions/%s/rendered-documents' % revision_id,
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(200, resp.status_code)
self.assertEmpty(list(yaml.safe_load_all(resp.text)))
test_revisions_rollback_controller.py 文件源码
项目:deckhand
作者: att-comdev
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def test_revision_rollback_encrypted_except_forbidden(self):
rules = {'deckhand:create_encrypted_documents': '@',
'deckhand:create_cleartext_documents': '@'}
self.policy.set_rules(rules)
# Create a revision so we have something to roll back to.
secrets_factory = factories.DocumentSecretFactory()
payload = [secrets_factory.gen_test('Certificate', 'encrypted')]
with mock.patch.object(buckets.BucketsResource, 'secrets_mgr',
autospec=True) as mock_secrets_mgr:
mock_secrets_mgr.create.return_value = {
'secret': payload[0]['data']}
resp = self.app.simulate_put(
'/api/v1.0/buckets/mop/documents',
headers={'Content-Type': 'application/x-yaml'},
body=yaml.safe_dump_all(payload))
self.assertEqual(200, resp.status_code)
revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
'revision']
rules = {'deckhand:create_encrypted_documents': 'rule:admin_api',
'deckhand:create_cleartext_documents': '@'}
self.policy.set_rules(rules)
resp = self.app.simulate_post(
'/api/v1.0/rollback/%s' % revision_id,
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(403, resp.status_code)
def setUp(self):
super(TestRevisionTagsController, self).setUp()
rules = {'deckhand:create_cleartext_documents': '@'}
self.policy.set_rules(rules)
# Create a revision to tag.
secrets_factory = factories.DocumentSecretFactory()
payload = [secrets_factory.gen_test('Certificate', 'cleartext')]
resp = self.app.simulate_put(
'/api/v1.0/buckets/mop/documents',
headers={'Content-Type': 'application/x-yaml'},
body=yaml.safe_dump_all(payload))
self.assertEqual(200, resp.status_code)
self.revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
'revision']
test_revision_documents_controller.py 文件源码
项目:deckhand
作者: att-comdev
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def test_list_encrypted_revision_documents_insufficient_permissions(self):
rules = {'deckhand:list_cleartext_documents': '@',
'deckhand:list_encrypted_documents': 'rule:admin_api',
'deckhand:create_cleartext_documents': '@',
'deckhand:create_encrypted_documents': '@'}
self.policy.set_rules(rules)
# Create a document for a bucket.
secrets_factory = factories.DocumentSecretFactory()
payload = [secrets_factory.gen_test('Certificate', 'encrypted')]
with mock.patch.object(buckets.BucketsResource, 'secrets_mgr',
autospec=True) as mock_secrets_mgr:
mock_secrets_mgr.create.return_value = {
'secret': payload[0]['data']}
resp = self.app.simulate_put(
'/api/v1.0/buckets/mop/documents',
headers={'Content-Type': 'application/x-yaml'},
body=yaml.safe_dump_all(payload))
self.assertEqual(200, resp.status_code)
revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
'revision']
# Verify that the created document was not returned.
resp = self.app.simulate_get(
'/api/v1.0/revisions/%s/documents' % revision_id,
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(200, resp.status_code)
self.assertEmpty(list(yaml.safe_load_all(resp.text)))
test_revision_documents_controller.py 文件源码
项目:deckhand
作者: att-comdev
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def test_list_revision_documents_sorting_metadata_name(self):
rules = {'deckhand:list_cleartext_documents': '@',
'deckhand:list_encrypted_documents': '@',
'deckhand:create_cleartext_documents': '@'}
self.policy.set_rules(rules)
documents_factory = factories.DocumentFactory(2, [1, 1])
documents = documents_factory.gen_test({})
expected_names = ['bar', 'baz', 'foo']
for idx in range(len(documents)):
documents[idx]['metadata']['name'] = expected_names[idx]
resp = self.app.simulate_put(
'/api/v1.0/buckets/mop/documents',
headers={'Content-Type': 'application/x-yaml'},
body=yaml.safe_dump_all(documents))
self.assertEqual(200, resp.status_code)
revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
'revision']
resp = self.app.simulate_get(
'/api/v1.0/revisions/%s/documents' % revision_id,
params={'sort': 'metadata.name'}, params_csv=False,
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(200, resp.status_code)
retrieved_documents = list(yaml.safe_load_all(resp.text))
self.assertEqual(3, len(retrieved_documents))
self.assertEqual(expected_names,
[d['metadata']['name'] for d in retrieved_documents])
test_revision_documents_controller.py 文件源码
项目:deckhand
作者: att-comdev
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_list_revision_documents_sorting_by_metadata_name_and_schema(self):
rules = {'deckhand:list_cleartext_documents': '@',
'deckhand:list_encrypted_documents': '@',
'deckhand:create_cleartext_documents': '@'}
self.policy.set_rules(rules)
documents_factory = factories.DocumentFactory(2, [1, 1])
documents = documents_factory.gen_test({})
expected_names = ['foo', 'baz', 'bar']
expected_schemas = ['deckhand/Certificate/v1',
'deckhand/Certificate/v1',
'deckhand/LayeringPolicy/v1']
for idx in range(len(documents)):
documents[idx]['metadata']['name'] = expected_names[idx]
documents[idx]['schema'] = expected_schemas[idx]
resp = self.app.simulate_put(
'/api/v1.0/buckets/mop/documents',
headers={'Content-Type': 'application/x-yaml'},
body=yaml.safe_dump_all(documents))
self.assertEqual(200, resp.status_code)
revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
'revision']
resp = self.app.simulate_get(
'/api/v1.0/revisions/%s/documents' % revision_id,
params={'sort': ['schema', 'metadata.name']}, params_csv=False,
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(200, resp.status_code)
retrieved_documents = list(yaml.safe_load_all(resp.text))
self.assertEqual(3, len(retrieved_documents))
self.assertEqual(['baz', 'foo', 'bar'],
[d['metadata']['name'] for d in retrieved_documents])
self.assertEqual(expected_schemas,
[d['schema'] for d in retrieved_documents])
test_revision_documents_controller.py 文件源码
项目:deckhand
作者: att-comdev
项目源码
文件源码
阅读 36
收藏 0
点赞 0
评论 0
def test_list_revision_documents_sorting_by_schema(self):
rules = {'deckhand:list_cleartext_documents': '@',
'deckhand:list_encrypted_documents': '@',
'deckhand:create_cleartext_documents': '@'}
self.policy.set_rules(rules)
documents_factory = factories.DocumentFactory(2, [1, 1])
documents = documents_factory.gen_test({})
expected_schemas = ['deckhand/Certificate/v1',
'deckhand/CertificateKey/v1',
'deckhand/LayeringPolicy/v1']
for idx in range(len(documents)):
documents[idx]['schema'] = expected_schemas[idx]
resp = self.app.simulate_put(
'/api/v1.0/buckets/mop/documents',
headers={'Content-Type': 'application/x-yaml'},
body=yaml.safe_dump_all(documents))
self.assertEqual(200, resp.status_code)
revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
'revision']
resp = self.app.simulate_get(
'/api/v1.0/revisions/%s/documents' % revision_id,
params={'sort': 'schema'}, params_csv=False,
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(200, resp.status_code)
retrieved_documents = list(yaml.safe_load_all(resp.text))
self.assertEqual(3, len(retrieved_documents))
self.assertEqual(expected_schemas,
[d['schema'] for d in retrieved_documents])
def test_create_delete_then_recreate_document_in_different_bucket(self):
"""Ordiniarly creating a document with the same metadata.name/schema
in a separate bucket raises an exception, but if we delete the document
and re-create it in a different bucket this should be a success
scenario.
"""
rules = {'deckhand:create_cleartext_documents': '@'}
self.policy.set_rules(rules)
payload = factories.DocumentFactory(2, [1, 1]).gen_test({})
bucket_name = test_utils.rand_name('bucket')
alt_bucket_name = test_utils.rand_name('bucket')
# Create the documents in the first bucket.
resp = self.app.simulate_put(
'/api/v1.0/buckets/%s/documents' % bucket_name,
headers={'Content-Type': 'application/x-yaml'},
body=yaml.safe_dump_all(payload))
self.assertEqual(200, resp.status_code)
documents = list(yaml.safe_load_all(resp.text))
self.assertEqual(3, len(documents))
self.assertEqual([bucket_name] * 3,
[d['status']['bucket'] for d in documents])
# Delete the documents from the first bucket.
resp = self.app.simulate_put(
'/api/v1.0/buckets/%s/documents' % bucket_name,
headers={'Content-Type': 'application/x-yaml'}, body=None)
self.assertEqual(200, resp.status_code)
# Re-create the documents in the second bucket.
resp = self.app.simulate_put(
'/api/v1.0/buckets/%s/documents' % alt_bucket_name,
headers={'Content-Type': 'application/x-yaml'},
body=yaml.safe_dump_all(payload))
self.assertEqual(200, resp.status_code)
documents = list(yaml.safe_load_all(resp.text))
self.assertEqual(3, len(documents))
self.assertEqual([alt_bucket_name] * 3,
[d['status']['bucket'] for d in documents])
def _create_revision(self, payload=None):
if not payload:
documents_factory = factories.DocumentFactory(2, [1, 1])
payload = documents_factory.gen_test({})
resp = self.app.simulate_put(
'/api/v1.0/buckets/mop/documents',
headers={'Content-Type': 'application/x-yaml'},
body=yaml.safe_dump_all(payload))
self.assertEqual(200, resp.status_code)
revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
'revision']
return revision_id
def on_put(self, req, resp, bucket_name=None):
document_data = req.stream.read(req.content_length or 0)
try:
documents = list(yaml.safe_load_all(document_data))
except yaml.YAMLError as e:
error_msg = ("Could not parse the document into YAML data. "
"Details: %s." % e)
LOG.error(error_msg)
raise falcon.HTTPBadRequest(description=six.text_type(e))
# NOTE: Must validate documents before doing policy enforcement,
# because we expect certain formatting of the documents while doing
# policy enforcement. If any documents fail basic schema validaiton
# raise an exception immediately.
try:
doc_validator = document_validation.DocumentValidation(documents)
validations = doc_validator.validate_all()
except (deckhand_errors.InvalidDocumentFormat,
deckhand_errors.InvalidDocumentSchema) as e:
LOG.exception(e.format_message())
raise falcon.HTTPBadRequest(description=e.format_message())
for document in documents:
if document['metadata'].get('storagePolicy') == 'encrypted':
policy.conditional_authorize(
'deckhand:create_encrypted_documents', req.context)
break
self._prepare_secret_documents(documents)
created_documents = self._create_revision_documents(
bucket_name, documents, validations)
resp.body = self.view_builder.list(created_documents)
resp.status = falcon.HTTP_200