def chunkify(self, gen):
bs = self.bs
chunks = self.chunks()
chunk = chunks.next()
pos = 0
for src in gen:
srctype = type(src)
src = memoryview(src) if srctype in (str, buffer, bytearray, memoryview) else memoryview(str(src))
slen = len(src)
try:
# fast append
chunk.payload[pos:pos + slen] = src
pos += slen
except ValueError:
# oops - too big - slice & dice
soff = bs - pos
# pad buffer out to end using first n bytes from src
chunk.payload[pos:bs] = src[0:soff]
yield chunk
chunk = chunks.next()
pos = 0
# then carve off full blocks directly from src
while soff + bs <= slen:
chunk.payload[0:bs] = src[soff:soff+bs]
yield chunk
chunk = chunks.next()
soff += bs
# and stash the remainder
pos = slen - soff
chunk.payload[0:pos] = src[soff:soff+pos]
if pos:
yield chunk(pos)
# because every multiprocessing.Process().start() very helpfully
# does a waitpid(WNOHANG) across all known children, and I want
# to use os.wait() to catch exiting children
评论列表
文章目录