python类nested()的实例源码

base.py 文件源码 项目:CAL 作者: HPCC-Cloud-Computing 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def nested(*contexts):
        with contextlib.ExitStack() as stack:
            yield [stack.enter_context(c) for c in contexts]
upload.py 文件源码 项目:microcosm-flask 作者: globality-corp 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def nested(*contexts):
        """
        Reimplementation of nested in python 3.
        """
        with ExitStack() as stack:
            results = [
                stack.enter_context(context)
                for context in contexts
            ]
            yield results
upload.py 文件源码 项目:microcosm-flask 作者: globality-corp 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def create_upload_func(self, ns, definition, path, operation):
        request_schema = definition.request_schema or Schema()
        response_schema = definition.response_schema or Schema()

        @self.add_route(path, operation, ns)
        @wraps(definition.func)
        def upload(**path_data):
            request_data = load_query_string_data(request_schema)

            if not request.files:
                raise BadRequest("No files were uploaded")

            uploads = [
                temporary_upload(name, fileobj)
                for name, fileobj
                in request.files.items()
                if not self.exclude_func(name, fileobj)
            ]
            with nested(*uploads) as files:
                response_data = definition.func(files, **merge_data(path_data, request_data))
                if response_data is None:
                    return "", 204

            return dump_response_data(response_schema, response_data, operation.value.default_code)

        if definition.request_schema:
            upload = qs(definition.request_schema)(upload)
        if definition.response_schema:
            upload = response(definition.response_schema)(upload)
        return upload
base.py 文件源码 项目:kolla-kubernetes 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def nested(*contexts):
        with contextlib.ExitStack() as stack:
            yield [stack.enter_context(c) for c in contexts]
block.py 文件源码 项目:bifrost 作者: ledatelescope 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def flatten(self, *args):
        """Flatten a nested tuple/list of tuples/lists"""
        flattened_list = []
        for element in args:
            if isinstance(element, (tuple, list)):
                flattened_list.extend(self.flatten(*element))
            else:
                flattened_list.extend([element])
        return flattened_list
test_ec2.py 文件源码 项目:omni 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _create_vm_in_aws_nova(self):
        self._create_instance()
        self._create_network()
        with contextlib.nested(
            mock.patch.object(EC2Driver, '_get_image_ami_id_from_meta'),
            mock.patch.object(EC2Driver, '_process_network_info'),
            mock.patch.object(EC2Driver, '_get_instance_sec_grps'),
        ) as (mock_image, mock_network, mock_secgrp):
            mock_image.return_value = 'ami-1234abc'
            mock_network.return_value = (self.subnet_id, '192.168.10.5', None,
                                         None)
            mock_secgrp.return_value = []
            self._create_nova_vm()
test_ec2.py 文件源码 项目:omni 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_add_ssh_keys_key_exists(self):
        fake_key = 'fake_key'
        fake_key_data = 'abcdefgh'
        self.conn.ec2_conn.import_key_pair(fake_key, fake_key_data)
        with contextlib.nested(
            mock.patch.object(boto.ec2.EC2Connection, 'get_key_pair'),
            mock.patch.object(boto.ec2.EC2Connection, 'import_key_pair'),
        ) as (fake_get, fake_import):
            fake_get.return_value = True
            self.conn._add_ssh_keys(fake_key, fake_key_data)
            fake_get.assert_called_once_with(fake_key)
            fake_import.assert_not_called()
test_ec2.py 文件源码 项目:omni 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_add_ssh_keys_key_absent(self):
        fake_key = 'fake_key'
        fake_key_data = 'abcdefgh'
        with contextlib.nested(
            mock.patch.object(boto.ec2.EC2Connection, 'get_key_pair'),
            mock.patch.object(boto.ec2.EC2Connection, 'import_key_pair'),
        ) as (fake_get, fake_import):
            fake_get.return_value = False
            self.conn._add_ssh_keys(fake_key, fake_key_data)
            fake_get.assert_called_once_with(fake_key)
            fake_import.assert_called_once_with(fake_key, fake_key_data)
test_ec2.py 文件源码 项目:omni 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_spawn_with_network_error(self):
        self._create_instance()
        with contextlib.nested(
            mock.patch.object(EC2Driver, '_get_image_ami_id_from_meta'),
            mock.patch.object(EC2Driver, '_process_network_info'),
            mock.patch.object(EC2Driver, '_get_instance_sec_grps'),
        ) as (mock_image, mock_network, mock_secgrp):
            mock_image.return_value = 'ami-1234abc'
            mock_network.return_value = (None, None, None, None)
            mock_secgrp.return_value = []
            self.assertRaises(exception.BuildAbortException,
                              self._create_nova_vm)
        self.reset()
test_ec2.py 文件源码 项目:omni 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_spawn_with_network_error_from_aws(self):
        self._create_instance()
        with contextlib.nested(
            mock.patch.object(EC2Driver, '_get_image_ami_id_from_meta'),
            mock.patch.object(EC2Driver, '_process_network_info'),
            mock.patch.object(EC2Driver, '_get_instance_sec_grps'),
        ) as (mock_image, mock_network, mock_secgrp):
            mock_image.return_value = 'ami-1234abc'
            mock_network.return_value = (None, '192.168.10.5', None, None)
            mock_secgrp.return_value = []
            self.assertRaises(exception.BuildAbortException,
                              self._create_nova_vm)
        self.reset()
test_ec2.py 文件源码 项目:omni 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_spawn_with_image_error(self):
        self._create_instance()
        self._create_network()
        with contextlib.nested(
            mock.patch.object(EC2Driver, '_get_image_ami_id_from_meta'),
            mock.patch.object(EC2Driver, '_process_network_info'),
            mock.patch.object(EC2Driver, '_get_instance_sec_grps'),
        ) as (mock_image, mock_network, mock_secgrp):
            mock_image.side_effect = exception.BuildAbortException('fake')
            mock_network.return_value = ('subnet-1234abc', '192.168.10.5',
                                         None, None)
            mock_secgrp.return_value = []
            self.assertRaises(exception.BuildAbortException,
                              self._create_nova_vm)
        self.reset()
test_ec2.py 文件源码 项目:omni 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_destroy_instance_not_found(self):
        self._create_instance()
        with contextlib.nested(
            mock.patch.object(boto.ec2.EC2Connection, 'stop_instances'),
            mock.patch.object(boto.ec2.EC2Connection, 'terminate_instances'),
            mock.patch.object(EC2Driver, '_wait_for_state'),
        ) as (fake_stop, fake_terminate, fake_wait):
            self.conn.destroy(self.context, self.instance, None, None)
            fake_stop.assert_not_called()
            fake_terminate.assert_not_called()
            fake_wait.assert_not_called()
        self.reset()
test_ports.py 文件源码 项目:quark 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _stubs(self, network_info, subnet_info, port_info):
        self.ipam = quark.ipam.QuarkIpamANY()
        with contextlib.nested(
                mock.patch("neutron.common.rpc.get_notifier"),
                mock.patch("neutron.quota.QUOTAS.limit_check")):
            net = network_api.create_network(self.context, network_info)
            mac = {'mac_address_range': dict(cidr="AA:BB:CC")}
            self.context.is_admin = True
            macrng_api.create_mac_address_range(self.context, mac)
            self.context.is_admin = False
            subnet_info['subnet']['network_id'] = net['id']
            port_info['port']['network_id'] = net['id']
            sub = subnet_api.create_subnet(self.context, subnet_info)
            port = port_api.create_port(self.context, port_info)
            yield net, sub, port
ngamsTestLib.py 文件源码 项目:ngas 作者: ICRAR 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def unzip(infile, outfile):
    with contextlib.nested(gzip.open(infile, 'rb'), open(outfile, 'w')) as (gz, out):
        for data in iter(functools.partial(gz.read, 1024), ''):
            out.write(data)


###########################################################################
# END: Utility functions
###########################################################################
io.py 文件源码 项目:lang2program 作者: kelvinguu 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def read_files(*file_paths):
    files = []
    for i, p in enumerate(file_paths):
        if p:
            files.append(open(p, mode="r"))
            print 'Opened:', p
        else:
            files.append(EmptyFile())
            print 'WARNING: no path provided for file {} in list.'.format(i)

    with contextlib.nested(*files) as entered_files:
        for lines in izip(*entered_files):
            yield lines
io.py 文件源码 项目:lang2program 作者: kelvinguu 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def read_files(*file_paths):
    files = []
    for i, p in enumerate(file_paths):
        if p:
            files.append(open(p, mode="r"))
            print 'Opened:', p
        else:
            files.append(EmptyFile())
            print 'WARNING: no path provided for file {} in list.'.format(i)

    with contextlib.nested(*files) as entered_files:
        for lines in izip(*entered_files):
            yield lines
bencher.py 文件源码 项目:Benchmark 作者: SihyeongPark 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def makeSummaryFiles(iniName):
   os.chdir(dirs['tmp'])

   datacsv = 'filtered_measurements.csv'
   ndatacsv = 'fastest_measurements.csv'
   bulkdatacsv = 'all_measurements.csv'

   with nested( open( join(datacsv), 'w'),
                open( join(ndatacsv), 'w')) as (df,ndf):

      try:
         #bdf = bz2.BZ2File( join(bulkdatacsv), 'w')
         bdf = open( join(bulkdatacsv), 'w')

         writeHeader(df); writeHeader(ndf); writeHeader(bdf)
         walkDatFiles(df,ndf,bdf)

      except (OSError,IOError), err:
         if logger: logger.error(err)
      finally:
         bdf.close()

   try:
      if dirs['dat_sweep']:
         copy2( join(datacsv), dirs['dat_sweep'])
         copy2( join(ndatacsv), dirs['dat_sweep'])
         copy2( join(bulkdatacsv), dirs['dat_sweep'])
         if logger: logger.info('copy %s files to %s','*.csv',dirs['dat_sweep'])

   except (OSError,IOError), err:
      if logger: logger.error(err)



# =============================
# sweep .code & .log
# =============================
test_endpoint.py 文件源码 项目:felix 作者: axbaretto 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_interface_initially_up(self):
        combined_id = WloadEndpointId("host_id", "orchestrator_id",
                                      "workload_id", "endpoint_id")
        ip_type = futils.IPV4
        local_ep = self.create_endpoint(combined_id, ip_type)

        ips = ["1.2.3.4"]
        iface = "tapabcdef"
        data = {
            'state': "active",
            'endpoint': "endpoint_id",
            'mac': stub_utils.get_mac(),
            'name': iface,
            'ipv4_nets': ips,
            'profile_ids': ["prof1"]
        }

        # We can only get on_interface_update calls after the first
        # on_endpoint_update, so trigger that.
        with nested(
                mock.patch('calico.felix.devices.set_routes'),
                mock.patch('calico.felix.devices.configure_interface_ipv4'),
                mock.patch('calico.felix.devices.interface_up'),
                mock.patch('calico.felix.devices.interface_exists'),
        ) as [m_set_routes, m_conf, m_iface_up, m_iface_exists]:
                m_iface_up.return_value = True
                m_iface_exists.return_value = True
                local_ep.on_endpoint_update(data, async=True)
                self.step_actor(local_ep)
                self.assertEqual(local_ep._mac, data['mac'])
                self.assertTrue(m_conf.called)
                self.assertTrue(local_ep._device_in_sync)
                # Interface is up so we should get the routes striaght away.
                m_set_routes.assert_called_once_with(ip_type,
                                                     set(ips),
                                                     iface,
                                                     data['mac'],
                                                     reset_arp=True)
test_endpoint.py 文件源码 项目:felix 作者: axbaretto 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_interface_goes_down_removes_routes(self):
        combined_id = WloadEndpointId("host_id", "orchestrator_id",
                                      "workload_id", "endpoint_id")
        ip_type = futils.IPV4
        local_ep = self.create_endpoint(combined_id, ip_type)

        ips = ["1.2.3.4"]
        iface = "tapabcdef"
        data = {
            'state': "active",
            'endpoint': "endpoint_id",
            'mac': stub_utils.get_mac(),
            'name': iface,
            'ipv4_nets': ips,
            'profile_ids': ["prof1"]
        }

        # We can only get on_interface_update calls after the first
        # on_endpoint_update, so trigger that.
        with nested(
                mock.patch('calico.felix.devices.set_routes'),
                mock.patch('calico.felix.devices.configure_interface_ipv4'),
                mock.patch('calico.felix.devices.interface_up'),
                mock.patch('calico.felix.devices.interface_exists'),
        ) as [m_set_routes, m_conf, m_iface_up, m_iface_exists]:
                m_iface_up.return_value = True
                m_iface_exists.return_value = True
                local_ep.on_endpoint_update(data, async=True)
                self.step_actor(local_ep)
                self.assertTrue(m_conf.called)  # Check we did the "UP" path.

        # Now pretend send an interface update.
        with mock.patch('calico.felix.devices.set_routes') as m_set_routes:
            local_ep.on_interface_update(False, async=True)
            self.step_actor(local_ep)
            m_set_routes.assert_called_once_with(ip_type,
                                                 set(),
                                                 iface,
                                                 None)
            self.assertTrue(local_ep._device_in_sync)
test_devices.py 文件源码 项目:felix 作者: axbaretto 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_configure_interface_ipv6_mainline(self):
        """
        Test that configure_interface_ipv6_mainline
            - opens and writes to the /proc system to enable proxy NDP on the
              interface.
            - calls ip -6 neigh to set up the proxy targets.

        Mainline test has two proxy targets.
        """
        m_open = mock.mock_open()
        rc = futils.CommandOutput("", "")
        if_name = "tap3e5a2b34222"
        proxy_target = "2001::3:4"

        open_patch = mock.patch('__builtin__.open', m_open, create=True)
        m_check_call = mock.patch('calico.felix.futils.check_call',
                                  return_value=rc)

        with nested(open_patch, m_check_call) as (_, m_check_call):
            devices.configure_interface_ipv6(if_name, proxy_target)
            calls = [mock.call('/proc/sys/net/ipv6/conf/%s/proxy_ndp' %
                               if_name,
                               'wb'),
                     M_ENTER,
                     mock.call().write('1'),
                     M_CLEAN_EXIT]
            m_open.assert_has_calls(calls)
            ip_calls = [mock.call(["ip", "-6", "neigh", "add", "proxy",
                                   str(proxy_target), "dev", if_name])]
            m_check_call.assert_has_calls(ip_calls)


问题


面经


文章

微信
公众号

扫码关注公众号