python类assertCountEqual()的实例源码

test_quotas.py 文件源码 项目:mogan 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_quota_get_all(self):
        ids_project_1 = []
        ids_project_2 = []
        ids_project_all = []
        resource_names = ['servers', 'servers_type', 'test_resource']
        for i in range(0, 3):
            quota = utils.create_test_quota(project_id='project_1',
                                            resource_name=resource_names[i])
            ids_project_1.append(quota['id'])
        for i in range(3, 5):
            resource_name = resource_names[i - 3]
            quota = utils.create_test_quota(project_id='project_2',
                                            resource_name=resource_name)
            ids_project_2.append(quota['id'])
        ids_project_all.extend(ids_project_1)
        ids_project_all.extend(ids_project_2)

        # Set project_only to False
        # get all quotas from all projects
        res = self.dbapi.quota_get_all(self.context, project_only=False)
        res_ids = [r.id for r in res]
        six.assertCountEqual(self, ids_project_all, res_ids)

        # Set project_only to True
        # get quotas from current project (project_1)
        self.context.tenant = 'project_1'
        res = self.dbapi.quota_get_all(self.context, project_only=True)
        res_ids = [r.id for r in res]
        six.assertCountEqual(self, ids_project_1, res_ids)

        # Set project_only to True
        # get quotas from current project (project_2)
        self.context.tenant = 'project_2'
        res = self.dbapi.quota_get_all(self.context, project_only=True)
        res_ids = [r.id for r in res]
        six.assertCountEqual(self, ids_project_2, res_ids)
test_aggregates.py 文件源码 项目:mogan 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_get_aggregate_list(self):
        uuids = [self.aggregate['uuid']]
        for i in range(1, 6):
            aggregate = utils.create_test_aggregate(
                uuid=uuidutils.generate_uuid(),
                name=six.text_type(i))
            uuids.append(six.text_type(aggregate['uuid']))
        res = self.dbapi.aggregate_get_all(self.context)
        res_uuids = [r['uuid'] for r in res]
        six.assertCountEqual(self, uuids, res_uuids)
int_test_user_role_v1.py 文件源码 项目:intern 作者: jhuapl-boss 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test_add_role(self):
        self.rmt.add_user_role(self.user, self.rsrc_mgr)

        expected = [self.rsrc_mgr]
        actual = self.rmt.get_user_roles(self.user)

        six.assertCountEqual(self, expected, actual)
int_test_metadata_v1.py 文件源码 项目:intern 作者: jhuapl-boss 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_collection(self):
        actual_list = self.rmt.list_metadata(self.coll)
        self.assertEqual([], actual_list)

        keys_vals = {'red': 'green', 'two': 'four', 'inside': 'out'}
        self.rmt.create_metadata(self.coll, keys_vals)

        actual = self.rmt.get_metadata(self.coll, list(keys_vals.keys()))
        six.assertCountEqual(self,keys_vals, actual)

        with self.assertRaises(HTTPErrorList):
            # Should fail when trying create keys that already exist.
            self.rmt.create_metadata(self.coll, keys_vals)

        update = {'two': 'six', 'inside': 'upside-down'}
        self.rmt.update_metadata(self.coll, update)

        actual_upd = self.rmt.get_metadata(self.coll, list(update.keys()))
        six.assertCountEqual(self, update, actual_upd)

        actual_list_upd = self.rmt.list_metadata(self.coll)
        six.assertCountEqual(self, list(keys_vals.keys()), actual_list_upd)

        with self.assertRaises(HTTPErrorList):
            # Try updating a non-existent key.
            self.rmt.update_metadata(self.coll, {'foo': 'bar'})

        self.rmt.delete_metadata(self.coll, list(keys_vals.keys()))

        with self.assertRaises(HTTPErrorList):
            # Try getting keys that don't exist.
            self.rmt.get_metadata(self.coll, ['foo', 'bar'])

        actual_list_end = self.rmt.list_metadata(self.coll)
        self.assertEqual([], actual_list_end)
int_test_metadata_v1.py 文件源码 项目:intern 作者: jhuapl-boss 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_experiment(self):
        actual_list = self.rmt.list_metadata(self.exp)
        self.assertEqual([], actual_list)

        keys_vals = {'red': 'green', 'two': 'four', 'inside': 'out'}
        self.rmt.create_metadata(self.exp, keys_vals)
        actual = self.rmt.get_metadata(self.exp, list(keys_vals.keys()))
        six.assertCountEqual(self, keys_vals, actual)

        with self.assertRaises(HTTPErrorList):
            # Should fail when trying create keys that already exist.
            self.rmt.create_metadata(self.exp, keys_vals)

        update = { 'two': 'six', 'inside': 'upside-down' }
        self.rmt.update_metadata(self.exp, update)

        actual_upd = self.rmt.get_metadata(self.exp, list(update.keys()))
        six.assertCountEqual(self, update, actual_upd)

        actual_list_upd = self.rmt.list_metadata(self.exp)
        six.assertCountEqual(self, list(keys_vals.keys()), actual_list_upd)

        with self.assertRaises(HTTPErrorList):
            # Try updating a non-existent key.
            self.rmt.update_metadata(self.exp, {'foo': 'bar'})

        self.rmt.delete_metadata(self.exp, list(keys_vals.keys()))

        with self.assertRaises(HTTPErrorList):
            # Try getting keys that don't exist.
            self.rmt.get_metadata(self.exp, ['foo', 'bar'])

        actual_list_end = self.rmt.list_metadata(self.exp)
        self.assertEqual([], actual_list_end)
int_test_metadata_v1.py 文件源码 项目:intern 作者: jhuapl-boss 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_channel(self):
        actual_list = self.rmt.list_metadata(self.chan)
        self.assertEqual([], actual_list)

        keys_vals = { 'red': 'green', 'two': 'four', 'inside': 'out'}
        self.rmt.create_metadata(self.chan, keys_vals)
        actual = self.rmt.get_metadata(self.chan, list(keys_vals.keys()))
        six.assertCountEqual(self, keys_vals, actual)

        with self.assertRaises(HTTPErrorList):
            # Should fail when trying create keys that already exist.
            self.rmt.create_metadata(self.chan, keys_vals)

        update = { 'two': 'six', 'inside': 'upside-down' }
        self.rmt.update_metadata(self.chan, update)

        actual_upd = self.rmt.get_metadata(self.chan, list(update.keys()))
        six.assertCountEqual(self,update, actual_upd)

        actual_list_upd = self.rmt.list_metadata(self.chan)
        six.assertCountEqual(self,keys_vals, actual_list_upd)

        with self.assertRaises(HTTPErrorList):
            # Try updating a non-existent key.
            self.rmt.update_metadata(self.chan, {'foo': 'bar'})

        self.rmt.delete_metadata(self.chan, list(keys_vals.keys()))

        with self.assertRaises(HTTPErrorList):
            # Try getting keys that don't exist.
            self.rmt.get_metadata(self.chan, ['foo', 'bar'])

        actual_list_end = self.rmt.list_metadata(self.chan)
        self.assertEqual([], actual_list_end)
test_user.py 文件源码 项目:intern 作者: jhuapl-boss 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_get_success(self, mock_session, mock_resp):
        expected = ['default']
        mock_resp.status_code = 200
        mock_resp.json.return_value = expected
        mock_session.prepare_request.return_value = PreparedRequest()
        mock_session.send.return_value = mock_resp

        url_prefix = 'https://api.theboss.io'
        auth = 'mytoken'
        send_opts = {}

        actual = self.prj.get_user(
            'johndoe', url_prefix, auth, mock_session, send_opts)
        six.assertCountEqual(self, expected, actual)
test_six.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_assertCountEqual():
    class TestAssertCountEqual(unittest.TestCase):
        def test(self):
            with self.assertRaises(AssertionError):
                six.assertCountEqual(self, (1, 2), [3, 4, 5])

            six.assertCountEqual(self, (1, 2), [2, 1])

    TestAssertCountEqual('test').test()
test_utils.py 文件源码 项目:resolwe-bio-py 作者: genialis 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_iterate_fields(self):
        result = list(iterate_fields(OUTPUT, PROCESS_OUTPUT_SCHEMA))
        # result object is iterator - we use lists to pull all elements

        expected = [
            ({
                'type': 'basic:string:',
                'name': 'id',
                'label': 'ID'
            }, {
                'k': 123,
                'id': 'abc'
            }), ({
                'type': 'basic:string:',
                'name': 'bases',
                'label': 'Number of bases'
            }, {
                'options': {
                    'k': 123,
                    'id': 'abc'
                },
                'bases': '75',
                'fastq': {
                    'file': 'example.fastq.gz'
                }
            }), ({
                'type': 'basic:file:',
                'name': 'fastq',
                'label': 'Reads file'
            }, {
                'options': {
                    'k': 123,
                    'id': 'abc'
                },
                'bases': '75',
                'fastq': {
                    'file': 'example.fastq.gz'
                }
            }), ({
                'type': 'basic:integer:',
                'name': 'k',
                'label': 'k-mer size'
            }, {
                'k': 123,
                'id': 'abc'
            })
        ]

        six.assertCountEqual(self, result, expected)
test_data.py 文件源码 项目:resolwe-bio-py 作者: genialis 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_files(self):
        data = Data(id=123, resolwe=MagicMock())
        data._get_dir_files = MagicMock(
            side_effect=[['first_dir/file1.txt'], ['fastq_dir/file2.txt']])

        data.annotation = {
            'output.list': {'value': [{'file': "element.gz"}], 'type': 'list:basic:file:'},
            'output.dir_list': {'value': [{'dir': "first_dir"}], 'type': 'list:basic:dir:'},
            'output.fastq': {'value': {'file': "file.fastq.gz"}, 'type': 'basic:file:fastq'},
            'output.fastq_archive': {'value': {'file': "archive.gz"}, 'type': 'basic:file:'},
            'output.fastq_dir': {'value': {'dir': "fastq_dir"}, 'type': 'basic:dir:'},
            'input.fastq_url': {'value': {'file': "blah"}, 'type': 'basic:url:'},
            'input.blah': {'value': "blah.gz", 'type': 'basic:file:'}
        }

        file_list = data.files()
        six.assertCountEqual(self, file_list, [
            'element.gz',
            'archive.gz',
            'file.fastq.gz',
            'first_dir/file1.txt',
            'fastq_dir/file2.txt'
        ])
        file_list = data.files(file_name='element.gz')
        self.assertEqual(file_list, ['element.gz'])
        file_list = data.files(field_name='output.fastq')
        self.assertEqual(file_list, ['file.fastq.gz'])

        data.annotation = {
            'output.list': {'value': [{'no_file_field_here': "element.gz"}],
                            'type': 'list:basic:file:'},
        }
        with six.assertRaisesRegex(self, KeyError, "does not contain 'file' key."):
            data.files()

        data = Data(resolwe=MagicMock(), id=None)
        with six.assertRaisesRegex(self, ValueError, "must be saved before"):
            data.files()
graph_test.py 文件源码 项目:paleo 作者: TalwalkarLab 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_block(self):
        two_towers = """{
            "name" : "test",
            "layers" : {
                "data": {
                    "parents": []
                },
                "conv1": {
                    "parents": ["data"]
                },
                "conv2": {
                    "type": "Block",
                    "parents": ["conv1"],
                    "endpoint": "concat",
                    "layers": {
                        "conv2a": {
                            "parents": []
                        },
                        "conv2b" : {
                            "parents": []
                        },
                        "concat": {
                            "parents": ["conv2a", "conv2b"]
                        }
                    }
                },
                "output": {
                    "parents" : ["conv2"]
                }
            }
        }
        """
        self.graph.load_from_string(two_towers)
        nested_list = self._to_strings(self.graph.nested_list)
        self.assertEqual(nested_list[0], 'data')
        self.assertEqual(nested_list[1], 'conv1')
        six.assertCountEqual(self, nested_list[2], (['conv2/conv2a'],
                                                    ['conv2/conv2b']))
        self.assertEqual(nested_list[3], 'conv2/concat')
        self.assertEqual(nested_list[4], 'output')
test_six.py 文件源码 项目:six 作者: benjaminp 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_assertCountEqual():
    class TestAssertCountEqual(unittest.TestCase):
        def test(self):
            with self.assertRaises(AssertionError):
                six.assertCountEqual(self, (1, 2), [3, 4, 5])

            six.assertCountEqual(self, (1, 2), [2, 1])

    TestAssertCountEqual('test').test()
jobs.py 文件源码 项目:incubator-airflow-old 作者: apache 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_execute_task_instances(self):
        dag_id = 'SchedulerJobTest.test_execute_task_instances'
        task_id_1 = 'dummy_task'
        task_id_2 = 'dummy_task_nonexistent_queue'
        # important that len(tasks) is less than concurrency
        # because before scheduler._execute_task_instances would only
        # check the num tasks once so if concurrency was 3,
        # we could execute arbitrarily many tasks in the second run
        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=3)
        task1 = DummyOperator(dag=dag, task_id=task_id_1)
        task2 = DummyOperator(dag=dag, task_id=task_id_2)
        dagbag = self._make_simple_dag_bag([dag])

        scheduler = SchedulerJob(**self.default_scheduler_args)
        session = settings.Session()

        # create first dag run with 1 running and 1 queued
        dr1 = scheduler.create_dag_run(dag)
        ti1 = TI(task1, dr1.execution_date)
        ti2 = TI(task2, dr1.execution_date)
        ti1.refresh_from_db()
        ti2.refresh_from_db()
        ti1.state = State.RUNNING
        ti2.state = State.RUNNING
        session.merge(ti1)
        session.merge(ti2)
        session.commit()

        self.assertEqual(State.RUNNING, dr1.state)
        self.assertEqual(2, DAG.get_num_task_instances(dag_id, dag.task_ids,
            states=[State.RUNNING], session=session))

        # create second dag run
        dr2 = scheduler.create_dag_run(dag)
        ti3 = TI(task1, dr2.execution_date)
        ti4 = TI(task2, dr2.execution_date)
        ti3.refresh_from_db()
        ti4.refresh_from_db()
        # manually set to scheduled so we can pick them up
        ti3.state = State.SCHEDULED
        ti4.state = State.SCHEDULED
        session.merge(ti3)
        session.merge(ti4)
        session.commit()

        self.assertEqual(State.RUNNING, dr2.state)

        res = scheduler._execute_task_instances(dagbag, [State.SCHEDULED])

        # check that concurrency is respected
        ti1.refresh_from_db()
        ti2.refresh_from_db()
        ti3.refresh_from_db()
        ti4.refresh_from_db()
        self.assertEqual(3, DAG.get_num_task_instances(dag_id, dag.task_ids,
            states=[State.RUNNING, State.QUEUED], session=session))
        self.assertEqual(State.RUNNING, ti1.state)
        self.assertEqual(State.RUNNING, ti2.state)
        six.assertCountEqual(self, [State.QUEUED, State.SCHEDULED], [ti3.state, ti4.state])
        self.assertEqual(1, res)
test.py 文件源码 项目:pyfilesystem2 作者: PyFilesystem 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_listdir(self):
        # Check listing directory that doesn't exist
        with self.assertRaises(errors.ResourceNotFound):
            self.fs.listdir('foobar')

        # Check aliases for root
        self.assertEqual(self.fs.listdir('/'), [])
        self.assertEqual(self.fs.listdir('.'), [])
        self.assertEqual(self.fs.listdir('./'), [])

        # Make a few files
        self.fs.setbytes('foo', b'egg')
        self.fs.setbytes('bar', b'egg')
        self.fs.setbytes('baz', b'egg')

        # Check list works
        six.assertCountEqual(self,
                             self.fs.listdir('/'),
                             ['foo', 'bar', 'baz'])
        six.assertCountEqual(self,
                             self.fs.listdir('.'),
                             ['foo', 'bar', 'baz'])
        six.assertCountEqual(self,
                             self.fs.listdir('./'),
                             ['foo', 'bar', 'baz'])

        # Check paths are unicode strings
        for name in self.fs.listdir('/'):
            self.assertIsInstance(name, text_type)

        # Create a subdirectory
        self.fs.makedir('dir')

        # Should start empty
        self.assertEqual(self.fs.listdir('/dir'), [])

        # Write some files
        self.fs.setbytes('dir/foofoo', b'egg')
        self.fs.setbytes('dir/barbar', b'egg')

        # Check listing subdirectory
        six.assertCountEqual(self,
                             self.fs.listdir('dir'),
                             ['foofoo', 'barbar'])
        # Make sure they are unicode stringd
        for name in self.fs.listdir('dir'):
            self.assertIsInstance(name, text_type)

        self.fs.create('notadir')
        with self.assertRaises(errors.DirectoryExpected):
            self.fs.listdir('notadir')


问题


面经


文章

微信
公众号

扫码关注公众号