paraproxio.py 文件源码

python
阅读 23 收藏 0 点赞 0 评论 0

项目:paraproxio 作者: intagger 项目源码 文件源码
def read_from_file_by_chunks(
        file_path: str,
        callback: Callable[[bytearray], None],
        chunk_size: int = DEFAULT_CHUNK_SIZE,
        condition: Callable[[], bool] = lambda: True,
        *,
        loop):
    chunk = bytearray(chunk_size)
    with open(file_path, 'rb') as f:
        while condition():
            r = await loop.run_in_executor(None, lambda: f.readinto(chunk))
            if not r:
                break
            if r < chunk_size:
                callback(memoryview(chunk)[:r].tobytes())
            else:
                callback(chunk)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号