python类partial()的实例源码

__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def add(self, categorize):
        """Add given method to categorize messages. When a message is received,
        each of the added methods (most recently added method first) is called
        with the message. The method should return a category (any hashable
        object) or None (in which case next recently added method is called with
        the same message). If all the methods return None for a given message,
        the message is queued with category=None, so that 'receive' method here
        works just as Task.receive.
        """
        if inspect.isfunction(categorize):
            argspec = inspect.getargspec(categorize)
            if len(argspec.args) != 1:
                categorize = None
        elif type(categorize) != partial_func:
            categorize = None

        if categorize:
            self._categorize.insert(0, categorize)
        else:
            logger.warning('invalid categorize function ignored')
upload_docs.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _build_multipart(cls, data):
        """
        Build up the MIME payload for the POST data
        """
        boundary = b'--------------GHSKFJDLGDS7543FJKLFHRE75642756743254'
        sep_boundary = b'\n--' + boundary
        end_boundary = sep_boundary + b'--'
        end_items = end_boundary, b"\n",
        builder = functools.partial(
            cls._build_part,
            sep_boundary=sep_boundary,
        )
        part_groups = map(builder, data.items())
        parts = itertools.chain.from_iterable(part_groups)
        body_items = itertools.chain(parts, end_items)
        content_type = 'multipart/form-data; boundary=%s' % boundary.decode('ascii')
        return b''.join(body_items), content_type
upload_docs.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 102 收藏 0 点赞 0 评论 0
def _build_multipart(cls, data):
        """
        Build up the MIME payload for the POST data
        """
        boundary = b'--------------GHSKFJDLGDS7543FJKLFHRE75642756743254'
        sep_boundary = b'\n--' + boundary
        end_boundary = sep_boundary + b'--'
        end_items = end_boundary, b"\n",
        builder = functools.partial(
            cls._build_part,
            sep_boundary=sep_boundary,
        )
        part_groups = map(builder, data.items())
        parts = itertools.chain.from_iterable(part_groups)
        body_items = itertools.chain(parts, end_items)
        content_type = 'multipart/form-data; boundary=%s' % boundary.decode('ascii')
        return b''.join(body_items), content_type
subunit-trace.py 文件源码 项目:networking-huawei 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def main():
    args = parse_args()
    stream = subunit.ByteStreamToStreamResult(
        sys.stdin, non_subunit_name='stdout')
    starts = Starts(sys.stdout)
    outcomes = testtools.StreamToDict(
        functools.partial(show_outcome, sys.stdout,
                          print_failures=args.print_failures,
                          failonly=args.failonly
                      ))
    summary = testtools.StreamSummary()
    result = testtools.CopyStreamResult([starts, outcomes, summary])
    result.startTestRun()
    try:
        stream.run(result)
    finally:
        result.stopTestRun()
    if count_tests('status', '.*') == 0:
        print("The test run didn't actually run any tests")
        return 1
    if args.post_fails:
        print_fails(sys.stdout)
    print_summary(sys.stdout)
    return (0 if summary.wasSuccessful() else 1)
model.py 文件源码 项目:python-libjuju 作者: juju 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def add_local_charm_dir(self, charm_dir, series):
        """Upload a local charm to the model.

        This will automatically generate an archive from
        the charm dir.

        :param charm_dir: Path to the charm directory
        :param series: Charm series

        """
        fh = tempfile.NamedTemporaryFile()
        CharmArchiveGenerator(charm_dir).make_archive(fh.name)
        with fh:
            func = partial(
                self.add_local_charm, fh, series, os.stat(fh.name).st_size)
            charm_url = await self._connector.loop.run_in_executor(None, func)

        log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url)
        return charm_url
model.py 文件源码 项目:python-libjuju 作者: juju 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __getattr__(self, name):
        """
        Wrap method calls in coroutines that use run_in_executor to make them
        async.
        """
        attr = getattr(self._cs, name)
        if not callable(attr):
            wrapper = partial(getattr, self._cs, name)
            setattr(self, name, wrapper)
        else:
            async def coro(*args, **kwargs):
                method = partial(attr, *args, **kwargs)
                for attempt in range(1, 4):
                    try:
                        return await self.loop.run_in_executor(None, method)
                    except theblues.errors.ServerError:
                        if attempt == 3:
                            raise
                        await asyncio.sleep(1, loop=self.loop)
            setattr(self, name, coro)
            wrapper = coro
        return wrapper
linalg.py 文件源码 项目:npstreams 作者: LaurentRDC 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _ireduce_linalg(arrays, func, **kwargs):
    """
    Yield the cumulative reduction of a linag algebra function
    """
    arrays = iter(arrays)
    first = next(arrays)
    second = next(arrays)

    func = partial(func, **kwargs)

    accumulator = func(first,  second)
    yield accumulator

    for array in arrays:
        # For some reason, np.dot(..., out = accumulator) did not produce results
        # that were equal to numpy.linalg.multi_dot
        func(accumulator, array, out = accumulator)
        yield accumulator
array_stream.py 文件源码 项目:npstreams 作者: LaurentRDC 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def iload(files, load_func, **kwargs):
    """
    Create a stream of arrays from files, which are loaded lazily.

    Parameters
    ----------
    pattern : iterable of str or str
        Either an iterable of filenames or a glob-like pattern str.
    load_func : callable, optional
        Function taking a filename as its first arguments
    kwargs
        Keyword arguments are passed to ``load_func``.

    Yields
    ------
    arr: `~numpy.ndarray`
        Loaded data. 
    """
    if isinstance(files, str):
        files = iglob(files)
    files = iter(files)

    yield from map(partial(load_func, **kwargs), files)

# pmap does not support local functions
decode_text.py 文件源码 项目:seq2seq 作者: google 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, params):
    super(DecodeText, self).__init__(params)
    self._unk_mapping = None
    self._unk_replace_fn = None

    if self.params["unk_mapping"] is not None:
      self._unk_mapping = _get_unk_mapping(self.params["unk_mapping"])
    if self.params["unk_replace"]:
      self._unk_replace_fn = functools.partial(
          _unk_replace, mapping=self._unk_mapping)

    self._postproc_fn = None
    if self.params["postproc_fn"]:
      self._postproc_fn = locate(self.params["postproc_fn"])
      if self._postproc_fn is None:
        raise ValueError("postproc_fn not found: {}".format(
            self.params["postproc_fn"]))
serialization_utils.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def loads_with_persistent_ids(str, env):
    """
    Performs a pickle loads on the given string, substituting the given
    TradingEnvironment in to any tokenized representations of a
    TradingEnvironment or AssetFinder.

    Parameters
    ----------
    str : String
        The string representation of the object to be unpickled.
    env : TradingEnvironment
        The TradingEnvironment to be inserted to the unpickled object.

    Returns
    -------
    obj
       An unpickled object formed from the parameter 'str'.
    """
    file = BytesIO(str)
    unpickler = pickle.Unpickler(file)
    unpickler.persistent_load = partial(_persistent_load, env=env)
    return unpickler.load()
visualize.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def display_graph(g, format='svg', include_asset_exists=False):
    """
    Display a TermGraph interactively from within IPython.
    """
    try:
        import IPython.display as display
    except ImportError:
        raise NoIPython("IPython is not installed.  Can't display graph.")

    if format == 'svg':
        display_cls = display.SVG
    elif format in ("jpeg", "png"):
        display_cls = partial(display.Image, format=format, embed=True)

    out = BytesIO()
    _render(g, out, format, include_asset_exists=include_asset_exists)
    return display_cls(data=out.getvalue())
fuzzer.py 文件源码 项目:pbtk 作者: marin-m 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, min_, max_, float_=False):
        super(QwordSpinBox, self).__init__()

        self._minimum = min_
        self._maximum = max_
        self.int_ = float if float_ else int

        rx = QRegExp('-?\d{0,20}(?:\.\d{0,20})?' if float_ else '-?\d{0,20}')
        validator = QRegExpValidator(rx, self)

        self._lineEdit = QLineEdit(self)
        self._lineEdit.setText(str(self.int_(0)))
        self._lineEdit.setValidator(validator)
        self._lineEdit.textEdited.connect(partial(self.setValue, change=False))
        self.editingFinished.connect(lambda: self.setValue(self.value(), update=False) or True)
        self.setLineEdit(self._lineEdit)
rtm.py 文件源码 项目:nameko-slack 作者: iky 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def handle_event(self, event):
        if event.get('type') == EVENT_TYPE_MESSAGE:
            if self.message_pattern:
                match = self.message_pattern.match(event.get('text', ''))
                if match:
                    kwargs = match.groupdict()
                    args = () if kwargs else match.groups()
                    args = (event, event.get('text')) + args
                else:
                    return
            else:
                args = (event, event.get('text'))
                kwargs = {}
            context_data = {}
            handle_result = partial(self.handle_result, event)
            self.container.spawn_worker(
                self, args, kwargs,
                context_data=context_data,
                handle_result=handle_result)
model.py 文件源码 项目:j3dview 作者: blank63 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def gl_init(self):
        self.gl_vertex_shader_factory = functools.lru_cache(maxsize=None)(functools.partial(gl.Shader,GL_VERTEX_SHADER))
        self.gl_fragment_shader_factory = functools.lru_cache(maxsize=None)(functools.partial(gl.Shader,GL_FRAGMENT_SHADER))
        self.gl_program_factory = functools.lru_cache(maxsize=None)(GLProgram)
        self.gl_texture_factory = functools.lru_cache(maxsize=None)(gx.texture.GLTexture)

        array_table = {gx.VA_PTNMTXIDX:GLMatrixIndexArray()}
        array_table.update((attribute,array.gl_convert()) for attribute,array in self.array_table.items())

        for shape in self.shapes:
            shape.gl_init(array_table)

        for material in self.materials:
            material.gl_init()

        for texture in self.textures:
            texture.gl_init(self.gl_texture_factory)

        self.gl_joints = [copy.copy(joint) for joint in self.joints]
        self.gl_joint_matrices = numpy.empty((len(self.joints),3,4),numpy.float32)
        self.gl_matrix_table = gl.TextureBuffer(GL_DYNAMIC_DRAW,GL_RGBA32F,(len(self.matrix_descriptors),3,4),numpy.float32)
        self.gl_update_matrix_table()

        self.gl_draw_objects = list(self.gl_generate_draw_objects(self.scene_graph))
        self.gl_draw_objects.sort(key=lambda draw_object: draw_object.material.unknown0)
__init__.py 文件源码 项目:segno 作者: heuer 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __getattr__(self, name):
        """\
        This is used to plug-in external serializers.

        When a "to_<name>" method is invoked, this method tries to find
        a ``segno.plugin.converter`` plugin with the provided ``<name>``.
        If such a plugin exists, a callable function is returned. The result
        of invoking the function depends on the plugin.
        """
        if name.startswith('to_'):
            from pkg_resources import iter_entry_points
            from functools import partial
            for ep in iter_entry_points(group='segno.plugin.converter',
                                        name=name[3:]):
                plugin = ep.load()
                return partial(plugin, self)
        raise AttributeError('{0} object has no attribute {1}'
                             .format(self.__class__, name))
storage.py 文件源码 项目:cloud-volume 作者: seung-lab 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def files_exist(self, file_paths):
        """
        Threaded exists for all file paths. 

        file_paths: (list) file paths to test for existence

        Returns: { filepath: bool }
        """
        results = {}

        def exist_thunk(path, interface):
            results[path] = interface.exists(path)

        for path in file_paths:
            if len(self._threads):
                self.put(partial(exist_thunk, path))
            else:
                exist_thunk(path, self._interface)

        desc = 'Existence Testing' if self.progress else None
        self.wait(desc)

        return results
storage.py 文件源码 项目:cloud-volume 作者: seung-lab 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def list_files(self, prefix="", flat=False):
        """
        List the files in the layer with the given prefix. 

        flat means only generate one level of a directory,
        while non-flat means generate all file paths with that 
        prefix.

        Here's how flat=True handles different senarios:
            1. partial directory name prefix = 'bigarr'
                - lists the '' directory and filters on key 'bigarr'
            2. full directory name prefix = 'bigarray'
                - Same as (1), but using key 'bigarray'
            3. full directory name + "/" prefix = 'bigarray/'
                - Lists the 'bigarray' directory
            4. partial file name prefix = 'bigarray/chunk_'
                - Lists the 'bigarray/' directory and filters on 'chunk_'

        Return: generated sequence of file paths relative to layer_path
        """

        for f in self._interface.list_files(prefix, flat):
            yield f
WebRunner.py 文件源码 项目:PyWebRunner 作者: IntuitiveWebSolutions 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def wait_for_opacity(self, selector, opacity, **kwargs):
        '''
        Wait for an element to reach a specific opacity.

        Parameters
        ----------
        selector: str
            A CSS selector to search for. This can be any valid CSS selector.

        opacity: float
            The opacity to wait for.

        kwargs:
            Passed on to _wait_for

        '''

        def _wait_for_opacity(self, browser):
            return str(self.get_element(selector).value_of_css_property('opacity')) == str(opacity)

        self._wait_for(partial(_wait_for_opacity, self), **kwargs)
G__l_a_t.py 文件源码 项目:otRebuilder 作者: Pal3love 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def decompile(self, data, ttFont):
        sstruct.unpack2(Glat_format_0, data, self)
        if self.version <= 1.9:
            decoder = partial(self.decompileAttributes12,fmt=Glat_format_1_entry)
        elif self.version <= 2.9:   
            decoder = partial(self.decompileAttributes12,fmt=Glat_format_23_entry)
        elif self.version >= 3.0:
            (data, self.scheme) = grUtils.decompress(data)
            sstruct.unpack2(Glat_format_3, data, self)
            self.hasOctaboxes = (self.compression & 1) == 1
            decoder = self.decompileAttributes3

        gloc = ttFont['Gloc']
        self.attributes = {}
        count = 0
        for s,e in zip(gloc,gloc[1:]):
            self.attributes[ttFont.getGlyphName(count)] = decoder(data[s:e])
            count += 1
G__l_a_t.py 文件源码 项目:otRebuilder 作者: Pal3love 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def compile(self, ttFont):
        data = sstruct.pack(Glat_format_0, self)
        if self.version <= 1.9:
            encoder = partial(self.compileAttributes12, fmt=Glat_format_1_entry)
        elif self.version <= 2.9:
            encoder = partial(self.compileAttributes12, fmt=Glat_format_1_entry)
        elif self.version >= 3.0:
            self.compression = (self.scheme << 27) + (1 if self.hasOctaboxes else 0)
            data = sstruct.pack(Glat_format_3, self)
            encoder = self.compileAttributes3

        glocs = []
        for n in range(len(self.attributes)):
            glocs.append(len(data))
            data += encoder(self.attributes[ttFont.getGlyphName(n)])
        glocs.append(len(data))
        ttFont['Gloc'].set(glocs)

        if self.version >= 3.0:
            data = grUtils.compress(self.scheme, data)
        return data
ALTOInterface.py 文件源码 项目:PyPPSPP 作者: justas- 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def do_alto_post(self, endpoint, data, callback):
        """ALTO post to the given endpoint with given data"""

        # Make HTTP POST to ALTO
        url = self._alto_url + endpoint
        try:
            alto_resp_future = self._loop.run_in_executor(None, functools.partial(
                requests.post, url, json=data))
            alto_resp = yield from alto_resp_future
        except OSError as exc:
            logging.info('Consumed OSError while connecting to ALTO server')
            return

        # Process peers
        ranked_peers = self._process_alto_response(alto_resp)

        # Return results to swarm
        callback(ranked_peers)
wordgen_samples.py 文件源码 项目:saapy 作者: ashapochka 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def doctable(ctx):
    df = pd.read_csv('./docs/flight-options.csv')

    # open an existing document
    doc = docx.Document('./docs/style-reference.docx')

    as_int = partial(format_decimal, format='#')
    as_usd = partial(format_currency, currency='USD')

    s = doc.sections[0]
    width = s.page_width - s.left_margin - s.right_margin

    doc.add_picture('./docs/diagrams_002.png', width=width)

    formatters = {
        'ticket_price': as_usd,
        'total_hours': as_int,
        'trip': as_int,
        'airline': partial(shorten_long_name, width=20),
        'selected': compose({0: 'No', 1: 'Yes'}.get, int)
    }
    add_table(df, doc, table_style='Plain Table 3', formatters=formatters)

    # save the doc
    doc.save('./docs/test.docx')
analysis_support1.py 文件源码 项目:saapy 作者: ashapochka 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def fix_tickets(
            self, ticket_frame: pd.DataFrame, path_fixes) -> pd.DataFrame:
        ticket_frame.rename(
            columns={'Total changed lines': 'ChangedLines'}, inplace=True)
        ticket_frame = ticket_frame[
            ticket_frame.ChangedLines < 100000]
        ticket_frame = ticket_frame.assign(
            ChangedFiles=ticket_frame['Changed files'].apply(
            partial(self.fix_path_prefixes, path_fixes)))
        fixed_frame = ticket_frame.drop(
            'Changed files', axis=1).sort_values(
            by='CommitDate').reset_index(drop=True)
        fixed_frame.fillna(value={'Found': ''}, axis=0, inplace=True)
        return fixed_frame

    # prj1 specific methods
mapped_struct.py 文件源码 项目:sharedbuffers 作者: jampp 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def getter(self, proxy_into = None, no_idmap = False):
        schema = self.schema
        proxy_class = self.proxy_class
        index = self.index
        idmap = self.idmap if not no_idmap else None
        buf = self.buf

        if proxy_class is not None:
            proxy_class_new = functools.partial(proxy_class.__new__, proxy_class)
        else:
            proxy_class_new = None

        @cython.locals(pos=int)
        def getter(pos):
            return schema.unpack_from(buf, index[pos], idmap, proxy_class_new, proxy_into)
        return getter
mapped_struct.py 文件源码 项目:sharedbuffers 作者: jampp 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def iter_fast(self):
        # getter inlined
        schema = self.schema
        proxy_class = self.proxy_class
        index = self.index
        idmap = self.idmap
        buf = self.buf

        if proxy_class is not None:
            proxy_class_new = functools.partial(proxy_class.__new__, proxy_class)
        else:
            proxy_class_new = None

        proxy_into = schema.Proxy()
        for i in xrange(len(self)):
            yield schema.unpack_from(buf, index[i], idmap, proxy_class_new, proxy_into)
stack.py 文件源码 项目:DeepSea 作者: SUSE 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def ext_pillar(minion_id, pillar, *args, **kwargs):
    import salt.utils
    stack = {}
    stack_config_files = list(args)
    traverse = {
        'pillar': partial(salt.utils.traverse_dict_and_list, pillar),
        'grains': partial(salt.utils.traverse_dict_and_list, __grains__),
        'opts': partial(salt.utils.traverse_dict_and_list, __opts__),
        }
    for matcher, matchs in kwargs.iteritems():
        t, matcher = matcher.split(':', 1)
        if t not in traverse:
            raise Exception('Unknown traverse option "{0}", '
                            'should be one of {1}'.format(t, traverse.keys()))
        cfgs = matchs.get(traverse[t](matcher, None), [])
        if not isinstance(cfgs, list):
            cfgs = [cfgs]
        stack_config_files += cfgs
    for cfg in stack_config_files:
        if not os.path.isfile(cfg):
            log.warning('Ignoring pillar stack cfg "{0}": '
                     'file does not exist'.format(cfg))
            continue
        stack = _process_stack_cfg(cfg, stack, minion_id, pillar)
    return stack
test_connection.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_single_connection(self):
        """
        Test a single connection with sequential requests.
        """
        conn = self.get_connection()
        query = "SELECT keyspace_name FROM system.schema_keyspaces LIMIT 1"
        event = Event()

        def cb(count, *args, **kwargs):
            count += 1
            if count >= 10:
                conn.close()
                event.set()
            else:
                conn.send_msg(
                    QueryMessage(query=query, consistency_level=ConsistencyLevel.ONE),
                    request_id=0,
                    cb=partial(cb, count))

        conn.send_msg(
            QueryMessage(query=query, consistency_level=ConsistencyLevel.ONE),
            request_id=0,
            cb=partial(cb, 0))
        event.wait()
test_connection.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_single_connection_pipelined_requests(self):
        """
        Test a single connection with pipelined requests.
        """
        conn = self.get_connection()
        query = "SELECT keyspace_name FROM system.schema_keyspaces LIMIT 1"
        responses = [False] * 100
        event = Event()

        def cb(response_list, request_num, *args, **kwargs):
            response_list[request_num] = True
            if all(response_list):
                conn.close()
                event.set()

        for i in range(100):
            conn.send_msg(
                QueryMessage(query=query, consistency_level=ConsistencyLevel.ONE),
                request_id=i,
                cb=partial(cb, responses, i))

        event.wait()
cluster.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _set_final_result(self, response):
        self._cancel_timer()
        if self._metrics is not None:
            self._metrics.request_timer.addValue(time.time() - self._start_time)

        with self._callback_lock:
            self._final_result = response
            # save off current callbacks inside lock for execution outside it
            # -- prevents case where _final_result is set, then a callback is
            # added and executed on the spot, then executed again as a
            # registered callback
            to_call = tuple(
                partial(fn, response, *args, **kwargs)
                for (fn, args, kwargs) in self._callbacks
            )

        self._event.set()

        # apply each callback
        for callback_partial in to_call:
            callback_partial()
test_action_plan.py 文件源码 项目:watcher-tempest-plugin 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_create_action_plan(self):
        _, goal = self.client.show_goal("dummy")
        _, audit_template = self.create_audit_template(goal['uuid'])
        _, audit = self.create_audit(audit_template['uuid'])

        self.assertTrue(test_utils.call_until_true(
            func=functools.partial(self.has_audit_finished, audit['uuid']),
            duration=30,
            sleep_for=.5
        ))
        _, action_plans = self.client.list_action_plans(
            audit_uuid=audit['uuid'])
        action_plan = action_plans['action_plans'][0]

        _, action_plan = self.client.show_action_plan(action_plan['uuid'])

        self.assertEqual(audit['uuid'], action_plan['audit_uuid'])
        self.assertEqual('RECOMMENDED', action_plan['state'])


问题


面经


文章

微信
公众号

扫码关注公众号