def test_quota_get_all(self):
ids_project_1 = []
ids_project_2 = []
ids_project_all = []
resource_names = ['servers', 'servers_type', 'test_resource']
for i in range(0, 3):
quota = utils.create_test_quota(project_id='project_1',
resource_name=resource_names[i])
ids_project_1.append(quota['id'])
for i in range(3, 5):
resource_name = resource_names[i - 3]
quota = utils.create_test_quota(project_id='project_2',
resource_name=resource_name)
ids_project_2.append(quota['id'])
ids_project_all.extend(ids_project_1)
ids_project_all.extend(ids_project_2)
# Set project_only to False
# get all quotas from all projects
res = self.dbapi.quota_get_all(self.context, project_only=False)
res_ids = [r.id for r in res]
six.assertCountEqual(self, ids_project_all, res_ids)
# Set project_only to True
# get quotas from current project (project_1)
self.context.tenant = 'project_1'
res = self.dbapi.quota_get_all(self.context, project_only=True)
res_ids = [r.id for r in res]
six.assertCountEqual(self, ids_project_1, res_ids)
# Set project_only to True
# get quotas from current project (project_2)
self.context.tenant = 'project_2'
res = self.dbapi.quota_get_all(self.context, project_only=True)
res_ids = [r.id for r in res]
six.assertCountEqual(self, ids_project_2, res_ids)
python类assertCountEqual()的实例源码
def test_get_aggregate_list(self):
uuids = [self.aggregate['uuid']]
for i in range(1, 6):
aggregate = utils.create_test_aggregate(
uuid=uuidutils.generate_uuid(),
name=six.text_type(i))
uuids.append(six.text_type(aggregate['uuid']))
res = self.dbapi.aggregate_get_all(self.context)
res_uuids = [r['uuid'] for r in res]
six.assertCountEqual(self, uuids, res_uuids)
def test_add_role(self):
self.rmt.add_user_role(self.user, self.rsrc_mgr)
expected = [self.rsrc_mgr]
actual = self.rmt.get_user_roles(self.user)
six.assertCountEqual(self, expected, actual)
def test_collection(self):
actual_list = self.rmt.list_metadata(self.coll)
self.assertEqual([], actual_list)
keys_vals = {'red': 'green', 'two': 'four', 'inside': 'out'}
self.rmt.create_metadata(self.coll, keys_vals)
actual = self.rmt.get_metadata(self.coll, list(keys_vals.keys()))
six.assertCountEqual(self,keys_vals, actual)
with self.assertRaises(HTTPErrorList):
# Should fail when trying create keys that already exist.
self.rmt.create_metadata(self.coll, keys_vals)
update = {'two': 'six', 'inside': 'upside-down'}
self.rmt.update_metadata(self.coll, update)
actual_upd = self.rmt.get_metadata(self.coll, list(update.keys()))
six.assertCountEqual(self, update, actual_upd)
actual_list_upd = self.rmt.list_metadata(self.coll)
six.assertCountEqual(self, list(keys_vals.keys()), actual_list_upd)
with self.assertRaises(HTTPErrorList):
# Try updating a non-existent key.
self.rmt.update_metadata(self.coll, {'foo': 'bar'})
self.rmt.delete_metadata(self.coll, list(keys_vals.keys()))
with self.assertRaises(HTTPErrorList):
# Try getting keys that don't exist.
self.rmt.get_metadata(self.coll, ['foo', 'bar'])
actual_list_end = self.rmt.list_metadata(self.coll)
self.assertEqual([], actual_list_end)
def test_experiment(self):
actual_list = self.rmt.list_metadata(self.exp)
self.assertEqual([], actual_list)
keys_vals = {'red': 'green', 'two': 'four', 'inside': 'out'}
self.rmt.create_metadata(self.exp, keys_vals)
actual = self.rmt.get_metadata(self.exp, list(keys_vals.keys()))
six.assertCountEqual(self, keys_vals, actual)
with self.assertRaises(HTTPErrorList):
# Should fail when trying create keys that already exist.
self.rmt.create_metadata(self.exp, keys_vals)
update = { 'two': 'six', 'inside': 'upside-down' }
self.rmt.update_metadata(self.exp, update)
actual_upd = self.rmt.get_metadata(self.exp, list(update.keys()))
six.assertCountEqual(self, update, actual_upd)
actual_list_upd = self.rmt.list_metadata(self.exp)
six.assertCountEqual(self, list(keys_vals.keys()), actual_list_upd)
with self.assertRaises(HTTPErrorList):
# Try updating a non-existent key.
self.rmt.update_metadata(self.exp, {'foo': 'bar'})
self.rmt.delete_metadata(self.exp, list(keys_vals.keys()))
with self.assertRaises(HTTPErrorList):
# Try getting keys that don't exist.
self.rmt.get_metadata(self.exp, ['foo', 'bar'])
actual_list_end = self.rmt.list_metadata(self.exp)
self.assertEqual([], actual_list_end)
def test_channel(self):
actual_list = self.rmt.list_metadata(self.chan)
self.assertEqual([], actual_list)
keys_vals = { 'red': 'green', 'two': 'four', 'inside': 'out'}
self.rmt.create_metadata(self.chan, keys_vals)
actual = self.rmt.get_metadata(self.chan, list(keys_vals.keys()))
six.assertCountEqual(self, keys_vals, actual)
with self.assertRaises(HTTPErrorList):
# Should fail when trying create keys that already exist.
self.rmt.create_metadata(self.chan, keys_vals)
update = { 'two': 'six', 'inside': 'upside-down' }
self.rmt.update_metadata(self.chan, update)
actual_upd = self.rmt.get_metadata(self.chan, list(update.keys()))
six.assertCountEqual(self,update, actual_upd)
actual_list_upd = self.rmt.list_metadata(self.chan)
six.assertCountEqual(self,keys_vals, actual_list_upd)
with self.assertRaises(HTTPErrorList):
# Try updating a non-existent key.
self.rmt.update_metadata(self.chan, {'foo': 'bar'})
self.rmt.delete_metadata(self.chan, list(keys_vals.keys()))
with self.assertRaises(HTTPErrorList):
# Try getting keys that don't exist.
self.rmt.get_metadata(self.chan, ['foo', 'bar'])
actual_list_end = self.rmt.list_metadata(self.chan)
self.assertEqual([], actual_list_end)
def test_get_success(self, mock_session, mock_resp):
expected = ['default']
mock_resp.status_code = 200
mock_resp.json.return_value = expected
mock_session.prepare_request.return_value = PreparedRequest()
mock_session.send.return_value = mock_resp
url_prefix = 'https://api.theboss.io'
auth = 'mytoken'
send_opts = {}
actual = self.prj.get_user(
'johndoe', url_prefix, auth, mock_session, send_opts)
six.assertCountEqual(self, expected, actual)
def test_assertCountEqual():
class TestAssertCountEqual(unittest.TestCase):
def test(self):
with self.assertRaises(AssertionError):
six.assertCountEqual(self, (1, 2), [3, 4, 5])
six.assertCountEqual(self, (1, 2), [2, 1])
TestAssertCountEqual('test').test()
def test_iterate_fields(self):
result = list(iterate_fields(OUTPUT, PROCESS_OUTPUT_SCHEMA))
# result object is iterator - we use lists to pull all elements
expected = [
({
'type': 'basic:string:',
'name': 'id',
'label': 'ID'
}, {
'k': 123,
'id': 'abc'
}), ({
'type': 'basic:string:',
'name': 'bases',
'label': 'Number of bases'
}, {
'options': {
'k': 123,
'id': 'abc'
},
'bases': '75',
'fastq': {
'file': 'example.fastq.gz'
}
}), ({
'type': 'basic:file:',
'name': 'fastq',
'label': 'Reads file'
}, {
'options': {
'k': 123,
'id': 'abc'
},
'bases': '75',
'fastq': {
'file': 'example.fastq.gz'
}
}), ({
'type': 'basic:integer:',
'name': 'k',
'label': 'k-mer size'
}, {
'k': 123,
'id': 'abc'
})
]
six.assertCountEqual(self, result, expected)
def test_files(self):
data = Data(id=123, resolwe=MagicMock())
data._get_dir_files = MagicMock(
side_effect=[['first_dir/file1.txt'], ['fastq_dir/file2.txt']])
data.annotation = {
'output.list': {'value': [{'file': "element.gz"}], 'type': 'list:basic:file:'},
'output.dir_list': {'value': [{'dir': "first_dir"}], 'type': 'list:basic:dir:'},
'output.fastq': {'value': {'file': "file.fastq.gz"}, 'type': 'basic:file:fastq'},
'output.fastq_archive': {'value': {'file': "archive.gz"}, 'type': 'basic:file:'},
'output.fastq_dir': {'value': {'dir': "fastq_dir"}, 'type': 'basic:dir:'},
'input.fastq_url': {'value': {'file': "blah"}, 'type': 'basic:url:'},
'input.blah': {'value': "blah.gz", 'type': 'basic:file:'}
}
file_list = data.files()
six.assertCountEqual(self, file_list, [
'element.gz',
'archive.gz',
'file.fastq.gz',
'first_dir/file1.txt',
'fastq_dir/file2.txt'
])
file_list = data.files(file_name='element.gz')
self.assertEqual(file_list, ['element.gz'])
file_list = data.files(field_name='output.fastq')
self.assertEqual(file_list, ['file.fastq.gz'])
data.annotation = {
'output.list': {'value': [{'no_file_field_here': "element.gz"}],
'type': 'list:basic:file:'},
}
with six.assertRaisesRegex(self, KeyError, "does not contain 'file' key."):
data.files()
data = Data(resolwe=MagicMock(), id=None)
with six.assertRaisesRegex(self, ValueError, "must be saved before"):
data.files()
def test_block(self):
two_towers = """{
"name" : "test",
"layers" : {
"data": {
"parents": []
},
"conv1": {
"parents": ["data"]
},
"conv2": {
"type": "Block",
"parents": ["conv1"],
"endpoint": "concat",
"layers": {
"conv2a": {
"parents": []
},
"conv2b" : {
"parents": []
},
"concat": {
"parents": ["conv2a", "conv2b"]
}
}
},
"output": {
"parents" : ["conv2"]
}
}
}
"""
self.graph.load_from_string(two_towers)
nested_list = self._to_strings(self.graph.nested_list)
self.assertEqual(nested_list[0], 'data')
self.assertEqual(nested_list[1], 'conv1')
six.assertCountEqual(self, nested_list[2], (['conv2/conv2a'],
['conv2/conv2b']))
self.assertEqual(nested_list[3], 'conv2/concat')
self.assertEqual(nested_list[4], 'output')
def test_assertCountEqual():
class TestAssertCountEqual(unittest.TestCase):
def test(self):
with self.assertRaises(AssertionError):
six.assertCountEqual(self, (1, 2), [3, 4, 5])
six.assertCountEqual(self, (1, 2), [2, 1])
TestAssertCountEqual('test').test()
def test_execute_task_instances(self):
dag_id = 'SchedulerJobTest.test_execute_task_instances'
task_id_1 = 'dummy_task'
task_id_2 = 'dummy_task_nonexistent_queue'
# important that len(tasks) is less than concurrency
# because before scheduler._execute_task_instances would only
# check the num tasks once so if concurrency was 3,
# we could execute arbitrarily many tasks in the second run
dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=3)
task1 = DummyOperator(dag=dag, task_id=task_id_1)
task2 = DummyOperator(dag=dag, task_id=task_id_2)
dagbag = self._make_simple_dag_bag([dag])
scheduler = SchedulerJob(**self.default_scheduler_args)
session = settings.Session()
# create first dag run with 1 running and 1 queued
dr1 = scheduler.create_dag_run(dag)
ti1 = TI(task1, dr1.execution_date)
ti2 = TI(task2, dr1.execution_date)
ti1.refresh_from_db()
ti2.refresh_from_db()
ti1.state = State.RUNNING
ti2.state = State.RUNNING
session.merge(ti1)
session.merge(ti2)
session.commit()
self.assertEqual(State.RUNNING, dr1.state)
self.assertEqual(2, DAG.get_num_task_instances(dag_id, dag.task_ids,
states=[State.RUNNING], session=session))
# create second dag run
dr2 = scheduler.create_dag_run(dag)
ti3 = TI(task1, dr2.execution_date)
ti4 = TI(task2, dr2.execution_date)
ti3.refresh_from_db()
ti4.refresh_from_db()
# manually set to scheduled so we can pick them up
ti3.state = State.SCHEDULED
ti4.state = State.SCHEDULED
session.merge(ti3)
session.merge(ti4)
session.commit()
self.assertEqual(State.RUNNING, dr2.state)
res = scheduler._execute_task_instances(dagbag, [State.SCHEDULED])
# check that concurrency is respected
ti1.refresh_from_db()
ti2.refresh_from_db()
ti3.refresh_from_db()
ti4.refresh_from_db()
self.assertEqual(3, DAG.get_num_task_instances(dag_id, dag.task_ids,
states=[State.RUNNING, State.QUEUED], session=session))
self.assertEqual(State.RUNNING, ti1.state)
self.assertEqual(State.RUNNING, ti2.state)
six.assertCountEqual(self, [State.QUEUED, State.SCHEDULED], [ti3.state, ti4.state])
self.assertEqual(1, res)
def test_listdir(self):
# Check listing directory that doesn't exist
with self.assertRaises(errors.ResourceNotFound):
self.fs.listdir('foobar')
# Check aliases for root
self.assertEqual(self.fs.listdir('/'), [])
self.assertEqual(self.fs.listdir('.'), [])
self.assertEqual(self.fs.listdir('./'), [])
# Make a few files
self.fs.setbytes('foo', b'egg')
self.fs.setbytes('bar', b'egg')
self.fs.setbytes('baz', b'egg')
# Check list works
six.assertCountEqual(self,
self.fs.listdir('/'),
['foo', 'bar', 'baz'])
six.assertCountEqual(self,
self.fs.listdir('.'),
['foo', 'bar', 'baz'])
six.assertCountEqual(self,
self.fs.listdir('./'),
['foo', 'bar', 'baz'])
# Check paths are unicode strings
for name in self.fs.listdir('/'):
self.assertIsInstance(name, text_type)
# Create a subdirectory
self.fs.makedir('dir')
# Should start empty
self.assertEqual(self.fs.listdir('/dir'), [])
# Write some files
self.fs.setbytes('dir/foofoo', b'egg')
self.fs.setbytes('dir/barbar', b'egg')
# Check listing subdirectory
six.assertCountEqual(self,
self.fs.listdir('dir'),
['foofoo', 'barbar'])
# Make sure they are unicode stringd
for name in self.fs.listdir('dir'):
self.assertIsInstance(name, text_type)
self.fs.create('notadir')
with self.assertRaises(errors.DirectoryExpected):
self.fs.listdir('notadir')