python类Callable()的实例源码

paraproxio.py 文件源码 项目:paraproxio 作者: intagger 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def read(self, callback: Callable[[bytearray], Any]):
        try:
            for downloader in self._downloaders:

                # Wait until downloader is not in a downloaded/cancelled state.
                async with self._state_condition:
                    while downloader.state not in (DOWNLOADED, CANCELLED):
                        await self._state_condition.wait()
                    if downloader.state != DOWNLOADED:
                        self._debug('Downloader not in `DOWNLOADED` state, but in `{!s}`.'.format(downloader.state))
                        raise CancelledError()

                # Open file and send all its bytes it to back.
                await read_from_file_by_chunks(downloader.buffer_file_path, callback, self._chunk_size,
                                               lambda: self._state != CANCELLED, loop=self._loop)
        except Exception as exc:
            raise ReadError(exc)
paraproxio.py 文件源码 项目:paraproxio 作者: intagger 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def read_from_file_by_chunks(
        file_path: str,
        callback: Callable[[bytearray], None],
        chunk_size: int = DEFAULT_CHUNK_SIZE,
        condition: Callable[[], bool] = lambda: True,
        *,
        loop):
    chunk = bytearray(chunk_size)
    with open(file_path, 'rb') as f:
        while condition():
            r = await loop.run_in_executor(None, lambda: f.readinto(chunk))
            if not r:
                break
            if r < chunk_size:
                callback(memoryview(chunk)[:r].tobytes())
            else:
                callback(chunk)
operators.py 文件源码 项目:asyncqlio 作者: SunDwarf 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def requires_bop(func) -> 'typing.Callable[[BaseOperator, BaseOperator], typing.Any]':
    """
    A decorator that marks a magic method as requiring another BaseOperator.

    :param func: The function to decorate.
    :return: A function that returns NotImplemented when the class required isn't specified.
    """

    @functools.wraps(func)
    def inner(self, other: 'BaseOperator'):
        if not isinstance(other, BaseOperator):
            return NotImplemented

        return func(self, other)

    return inner
table.py 文件源码 项目:asyncqlio 作者: SunDwarf 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _get_delete_sql(self, emitter: typing.Callable[[], str], session: 'md_session.Session') \
            -> typing.Tuple[str, typing.Any]:
        """
        Gets the DELETE sql for this row.
        """
        if self._session is None:
            self._session = session

        query = io.StringIO()
        query.write("DELETE FROM {} ".format(self.__quoted_name__))
        # generate the where clauses
        wheres = []
        params = {}

        for col, value in zip(self.table.primary_key.columns,
                              md_inspection.get_pk(self, as_tuple=True)):
            name = emitter()
            params[name] = value
            wheres.append("{} = {}".format(col.quoted_fullname, session.bind.emit_param(name)))

        query.write("WHERE ({}) ".format(" AND ".join(wheres)))
        return query.getvalue(), params

    # value loading methods
debversion.py 文件源码 项目:gopythongo 作者: gopythongo 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _compare(self, other: 'DebianVersion', method: Callable[[int], bool]) -> bool:
        if not isinstance(other, DebianVersion):
            return NotImplemented

        # special case: zero Epoch is the same as no Epoch
        if self.epoch is not None and other.epoch is not None and \
           int(self.epoch) != int(other.epoch) and int(self.epoch) != 0 and int(other.epoch) != 0:
            return method(int(self.epoch) - int(other.epoch))

        res = debian_versionpart_compare(split_version_parts(self.version, self.version_char_re),
                                         split_version_parts(other.version, self.version_char_re))
        if res == 0:
            return method(debian_versionpart_compare(split_version_parts(self.revision),
                                                     split_version_parts(other.revision)))
        else:
            return method(res)
endpoint.py 文件源码 项目:indy-client 作者: hyperledger-archives 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def __init__(self, port: int, msgHandler: Callable,
                 name: str=None, basedirpath: str=None):
        if name and basedirpath:
            ha = getHaFromLocalEstate(name, basedirpath)
            if ha and ha[1] != port:
                port = ha[1]

        stackParams = {
            "name": name or randomString(8),
            "ha": HA("0.0.0.0", port),
            "main": True,
            "auth_mode": AuthMode.ALLOW_ANY.value,
            "mutable": "mutable",
            "messageTimeout": config.RAETMessageTimeout
        }
        if basedirpath:
            stackParams["basedirpath"] = basedirpath

            SimpleRStack.__init__(self, stackParams, self.tracedMsgHandler)

        self.msgHandler = msgHandler
endpoint.py 文件源码 项目:indy-client 作者: hyperledger-archives 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, port: int, msgHandler: Callable,
                 name: str=None, basedirpath: str=None, seed=None,
                 onlyListener=False):
        stackParams = {
            "name": name or randomString(8),
            "ha": HA("0.0.0.0", port),
            "auth_mode": AuthMode.ALLOW_ANY.value
        }
        if basedirpath:
            stackParams["basedirpath"] = basedirpath

        seed = seed or randomSeed()
        SimpleZStack.__init__(self, stackParams, self.tracedMsgHandler,
                              seed=seed, onlyListener=onlyListener)

        self.msgHandler = msgHandler
page.py 文件源码 项目:pyppeteer 作者: miyakogi 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def exposeFunction(self, name: str, puppeteerFunction: Callable
                             ) -> None:
        """Execute function on this page."""
        if self._pageBindings[name]:
            raise PageError(f'Failed to add page binding with name {name}: '
                            'window["{name}"] already exists!')
        self._pageBindings[name] = puppeteerFunction

        addPageBinding = '''
function addPageBinding(bindingName) {
  window[bindingName] = async(...args) => {
    const me = window[bindingName];
    let callbacks = me['callbacks'];
    if (!callbacks) {
      callbacks = new Map();
      me['callbacks'] = callbacks;
    }
    const seq = (me['lastSeq'] || 0) + 1;
    me['lastSeq'] = seq;
    const promise = new Promise(fulfill => callbacks.set(seq, fulfill));
    // eslint-disable-next-line no-console
    console.debug('driver:page-binding', JSON.stringify({name: bindingName, seq, args}));
    return promise;
  };
}
        '''  # noqa: E501
        expression = helper.evaluationString(addPageBinding, name)
        await self._client.send('Page.addScriptToEvaluateOnNewDocument',
                                {'source': expression})
        await self._client.send('Runtime.evaluate', {
            'expression': expression,
            'returnByValue': True
        })
browser.py 文件源码 项目:pyppeteer 作者: miyakogi 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, connection: Connection, ignoreHTTPSErrors: bool,
                 closeCallback: Callable[[], None]) -> None:
        """Make new browser object."""
        self._connection = connection
        self._ignoreHTTPSErrors = ignoreHTTPSErrors
        self._closeCallback = closeCallback
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def tracebackwrapper(func, args, kwargs):
    # type: (Callable[..., Any], List[Any], Dict[Any, Any]) -> Any
    try:
        return func(*args, **kwargs)
    except Exception as e:
        e.traceback = traceback.format_exc()  # type: ignore
        raise
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def iter_parallel_report(func,  # type: Callable[..., Any]
                         args_lists,  # type: Sequence[CallArgs]
                         ccmode=CC_PROCESSES):
    # type: (...) -> Iterator[Union[ExeResult, ExcInfo]]
    if ccmode == CC_OFF or len(args_lists) <= 1 or not multiprocessing:
        for args, kwargs in args_lists:
            yield func(*args, **kwargs)
        return

    processes = min(len(args_lists), multiprocessing.cpu_count())
    if ccmode == CC_THREADS:
        pool = multiprocessing.pool.ThreadPool(processes=processes)
    else:
        pool = multiprocessing.Pool(processes=processes, initializer=per_process_init)
    try:
        async_results = [pool.apply_async(func, args=args, kwds=kwargs)
                         for args, kwargs in args_lists]
        pool.close()
        while async_results:
            try:
                asyncres = async_results.pop(0)
                yield asyncres.get()
            except (KeyboardInterrupt, GeneratorExit):
                raise
            except Exception as e:
                t, v, tb = sys.exc_info()
                try:
                    # Report the textual traceback of the subprocess rather
                    # than this local exception which was triggered
                    # by the other side.
                    tb = e.traceback  # type: ignore
                except AttributeError:
                    pass
                yield ExcInfo((t, v, tb))
    except GeneratorExit:
        pool.terminate()
    except KeyboardInterrupt:
        pool.terminate()
        raise
    finally:
        pool.join()
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def iter_parallel(func,        # type: Callable
                  args_lists,  # type: Sequence[CallArgs]
                  ccmode=CC_PROCESSES):
    # type: (...) -> Iterator[Any]
    if not args_lists:
        return
    if ccmode != CC_OFF:
        args_lists = [((func, args, kwargs), {}) for args, kwargs in args_lists]
        wrappedfunc = tracebackwrapper
    else:
        wrappedfunc = func

    for result in iter_parallel_report(wrappedfunc, args_lists, ccmode=ccmode):
        if ccmode == CC_OFF:
            yield result
        else:
            tbtext = None
            try:
                if isinstance(result, ExcInfo):
                    t, v, tb = result.exc_info
                    if not isinstance(tb, types.TracebackType):
                        tbtext = tb
                        tb = None
                    reraise(t, v, tb)
                else:
                    yield result
            except Exception:
                if tbtext is not None:
                    raise Exception(tbtext)
                else:
                    traceback.print_exc()
                    raise

# ----------------------------------------------------------------------
# The data types option and style.
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_cache_value(key, func, cachevar):
    # type: (str, Callable[[str], Any], Dict[str, Any]) -> Any
    data = cachevar.get(key)
    if data is None:
        data = func(key)
        cachevar[key] = data
    return data
reflection.py 文件源码 项目:cxflow-tensorflow 作者: Cognexa 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def create_activation(activation_name: str) -> Callable[[tf.Tensor], tf.Tensor]:
    """
    Create TensorFlow activation function with the given name.

    List of available activation functions is available in
    `TensorFlow docs <https://www.tensorflow.org/versions/r0.12/api_docs/python/nn/activation_functions_>`_.

    :param activation_name: activation function name
    :return: callable activation function
    """
    if activation_name == 'identity':
        return tf.identity
    return get_attribute(TF_ACTIVATIONS_MODULE, activation_name)
operations.py 文件源码 项目:PicoSim 作者: Vadman97 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, op: Callable[[Memory.MEMORY_IMPL], List[bool]], args: List[Union[str, int]]):
        self.operator = op
        self.register = args[0]
        self.proc = None  # type: Processor
        self.reg_row = None  # type: Memory.MEMORY_IMPL
operations.py 文件源码 项目:PicoSim 作者: Vadman97 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def __init__(self, op: Callable[[bool, bool], bool], args: List[Union[str, int]]):
        self.operator = op
        self.register = args[0]
        if isinstance(args[1], str):
            self.argument = args[1]
            self.literal = False
        else:
            self.argument = Memory.MEMORY_IMPL(Memory.REGISTER_WIDTH, False)
            self.argument.set_value(args[1])
            self.literal = True
operations.py 文件源码 项目:PicoSim 作者: Vadman97 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, op: Callable[[List[bool], List[bool]], List[bool]], args: List[Union[str, int]]):
        self.operator = op
        self.register = args[0]
        if isinstance(args[1], str):
            self.argument = args[1]
            self.literal = False
        else:
            self.argument = Memory.MEMORY_IMPL(Memory.REGISTER_WIDTH, False)
            self.argument.set_value(args[1])
            self.literal = True
        self.proc = None  # type: Processor
        self.zero_bits = [False for _ in range(Memory.REGISTER_WIDTH)]  # type: List[bool]
operations.py 文件源码 项目:PicoSim 作者: Vadman97 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, op: Callable[[int, int], int], args: List[Union[str, int]]):
        self.operator = op
        self.register = args[0]
        self.o_args = args
        self.args = []  # type: List[int]
        self.proc = None  # type: Processor
operations.py 文件源码 项目:PicoSim 作者: Vadman97 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, op: Callable[[List[Union[str, int]]], None], args: List[Union[str, int]]):
        self.operator = op
        self.register = args[0]
        self.o_args = args
        self.proc = None  # type: Processor


问题


面经


文章

微信
公众号

扫码关注公众号