def fromtarfile(cls, tarfile):
"""Return the next TarInfo object from TarFile object
tarfile.
"""
buf = tarfile.fileobj.read(BLOCKSIZE)
obj = cls.frombuf(buf, tarfile.encoding, tarfile.errors)
obj.offset = tarfile.fileobj.tell() - BLOCKSIZE
return obj._proc_member(tarfile)
#--------------------------------------------------------------------------
# The following are methods that are called depending on the type of a
# member. The entry point is _proc_member() which can be overridden in a
# subclass to add custom _proc_*() methods. A _proc_*() method MUST
# implement the following
# operations:
# 1. Set self.offset_data to the position where the data blocks begin,
# if there is data that follows.
# 2. Set tarfile.offset to the position where the next member's header will
# begin.
# 3. Return self or another valid TarInfo object.
python类read()的实例源码
def _proc_gnulong(self, tarfile):
"""Process the blocks that hold a GNU longname
or longlink member.
"""
buf = tarfile.fileobj.read(self._block(self.size))
# Fetch the next header and process it.
try:
next = self.fromtarfile(tarfile)
except HeaderError:
raise SubsequentHeaderError("missing or bad subsequent header")
# Patch the TarInfo object from the next header with
# the longname information.
next.offset = self.offset
if self.type == GNUTYPE_LONGNAME:
next.name = nts(buf, tarfile.encoding, tarfile.errors)
elif self.type == GNUTYPE_LONGLINK:
next.linkname = nts(buf, tarfile.encoding, tarfile.errors)
return next
def __next__(self):
"""Return the next item using TarFile's next() method.
When all members have been read, set TarFile as _loaded.
"""
# Fix for SF #1100429: Under rare circumstances it can
# happen that getmembers() is called during iteration,
# which will cause TarIter to stop prematurely.
if not self.tarfile._loaded:
tarinfo = self.tarfile.next()
if not tarinfo:
self.tarfile._loaded = True
raise StopIteration
else:
try:
tarinfo = self.tarfile.members[self.index]
except IndexError:
raise StopIteration
self.index += 1
return tarinfo
def read(self, size=None):
"""Return the next size number of bytes from the stream.
If size is not defined, return all bytes of the stream
up to EOF.
"""
if size is None:
t = []
while True:
buf = self._read(self.bufsize)
if not buf:
break
t.append(buf)
buf = "".join(t)
else:
buf = self._read(size)
self.pos += len(buf)
return buf
def fromtarfile(cls, tarfile):
"""Return the next TarInfo object from TarFile object
tarfile.
"""
buf = tarfile.fileobj.read(BLOCKSIZE)
obj = cls.frombuf(buf, tarfile.encoding, tarfile.errors)
obj.offset = tarfile.fileobj.tell() - BLOCKSIZE
return obj._proc_member(tarfile)
#--------------------------------------------------------------------------
# The following are methods that are called depending on the type of a
# member. The entry point is _proc_member() which can be overridden in a
# subclass to add custom _proc_*() methods. A _proc_*() method MUST
# implement the following
# operations:
# 1. Set self.offset_data to the position where the data blocks begin,
# if there is data that follows.
# 2. Set tarfile.offset to the position where the next member's header will
# begin.
# 3. Return self or another valid TarInfo object.
def _proc_gnulong(self, tarfile):
"""Process the blocks that hold a GNU longname
or longlink member.
"""
buf = tarfile.fileobj.read(self._block(self.size))
# Fetch the next header and process it.
try:
next = self.fromtarfile(tarfile)
except HeaderError:
raise SubsequentHeaderError("missing or bad subsequent header")
# Patch the TarInfo object from the next header with
# the longname information.
next.offset = self.offset
if self.type == GNUTYPE_LONGNAME:
next.name = nts(buf, tarfile.encoding, tarfile.errors)
elif self.type == GNUTYPE_LONGLINK:
next.linkname = nts(buf, tarfile.encoding, tarfile.errors)
return next
def __next__(self):
"""Return the next item using TarFile's next() method.
When all members have been read, set TarFile as _loaded.
"""
# Fix for SF #1100429: Under rare circumstances it can
# happen that getmembers() is called during iteration,
# which will cause TarIter to stop prematurely.
if not self.tarfile._loaded:
tarinfo = self.tarfile.next()
if not tarinfo:
self.tarfile._loaded = True
raise StopIteration
else:
try:
tarinfo = self.tarfile.members[self.index]
except IndexError:
raise StopIteration
self.index += 1
return tarinfo
def input(s, stdout = True, timeout = 2,
prompt = rgx2nd(('(.+)', 'py3to2 server: \\1'), ),
subprompt = rgx2nd(('>>> ', '', None), ),
fndexc = re.compile('\WTraceback '),
):
self = _server
if not s: return
SERVER.stdin.write(s)
try:
buf = ''
SERVER.stdin.write("\n\nimport os, signal; os.kill(CLIENTPID, signal.SIGINT)\n")
time.sleep(timeout)
raise IOError('py3to2 server not responding to input: %s'%repr(s))
except KeyboardInterrupt:
buf = os.read(SERVERIO[0], self.bufsize)
buf = subprompt.sub(buf)
if prompt: buf = prompt.sub(buf)
if fndexc.search(buf): raise IOError('py3to2 server input: %s\n%s'%(s, buf))
if stdout: sys.stdout.write(buf)
else: return buf
def input(s, stdout = True, timeout = 2,
prompt = rgx2nd(('(.+)', 'py3to2 server: \\1'), ),
subprompt = rgx2nd(('>>> ', '', None), ),
fndexc = re.compile('\WTraceback '),
):
self = _server
if not s: return
SERVER.stdin.write(s)
try:
buf = ''
SERVER.stdin.write("\n\nimport os, signal; os.kill(CLIENTPID, signal.SIGINT)\n")
time.sleep(timeout)
raise IOError('py3to2 server not responding to input: %s'%repr(s))
except KeyboardInterrupt:
buf = os.read(SERVERIO[0], self.bufsize)
buf = subprompt.sub(buf)
if prompt: buf = prompt.sub(buf)
if fndexc.search(buf): raise IOError('py3to2 server input: %s\n%s'%(s, buf))
if stdout: sys.stdout.write(buf)
else: return buf
def write(self, n=65536):
"""
Called when it is detected the output side of this connector is ready
to write. Reads (potentially blocks) at most n bytes and writes them to
the output ends of this connector. If no bytes could be read, the connector
is closed.
:param n The maximum number of bytes to write.
:type n int
:returns: The actual number of bytes written.
"""
buf = self.input.read(n)
if buf:
return self.output.write(buf)
else:
self.close()
return 0
def read(self, n=65536):
"""
Called when it is detected the input side of this connector is ready
to read. Reads at most n bytes and writes them to the output ends of
this connector. If no bytes could be read, the connector is closed.
:param n The maximum number of bytes to read.
:type n int
:returns: The actual number of bytes read.
"""
buf = self.input.read(n)
if buf:
self.output.write(buf)
# TODO PushAdapter/Writers should return number of bytes actually
# written.
return len(buf)
else:
self.close()
return 0
def write(self, buf):
"""
Write a chunk of data to the output stream in accordance with the
chunked transfer encoding protocol.
"""
try:
self.conn.send(hex(len(buf))[2:].encode('utf-8'))
self.conn.send(b'\r\n')
self.conn.send(buf)
self.conn.send(b'\r\n')
except Exception:
resp = self.conn.getresponse()
sys.stderr.write(
'Exception while sending HTTP chunk to %s, status was %s, '
'message was:\n%s\n' % (self.output_spec['url'], resp.status,
resp.read()))
self.conn.close()
self._closed = True
raise
def close(self):
"""
Close the output stream. Called after the last data is sent.
"""
if self._closed:
return
try:
self.conn.send(b'0\r\n\r\n')
resp = self.conn.getresponse()
if resp.status >= 300 and resp.status < 400:
raise Exception('Redirects are not supported for streaming '
'requests at this time. %d to Location: %s' % (
resp.status, resp.getheader('Location')))
if resp.status >= 400:
raise Exception(
'HTTP stream output to %s failed with status %d. Response '
'was: %s' % (
self.output_spec['url'], resp.status, resp.read()))
finally:
self.conn.close()
def keep_reading(self):
"""Output thread method for the process
Sends the process output to the ViewController (through OutputTranscoder)
"""
while True:
if self.stop:
break
ret = self.process.poll()
if ret is not None:
self.stop = True
readable, writable, executable = select.select([self.master], [], [], 5)
if readable:
""" We read the new content """
data = os.read(self.master, 1024)
text = data.decode('UTF-8', errors='replace')
log_debug("RAW", repr(text))
log_debug("PID", os.getenv('BASHPID'))
self.output_transcoder.decode(text)
# log_debug("{} >> {}".format(int(time.time()), repr(text)))
def run(self):
while True :
writefd = []
if not self.messages.empty():
# Expects a message to contain either the string 'exit'
# or a line of input in a tuple: ('input', None)
message = self.messages.get()
if message == 'exit':
self.messages.task_done()
break
else:
message, _encoding = message
writefd = [self.master]
r,w,_ = select.select([self.master], writefd, [], 0)
if r:
# Read when the binary has new output for us (sometimes this came from us writing)
line = os.read(self.master, 1024) # Reads up to a kilobyte at once. Should this be higher/lower?
self.RECV_LINE.emit(line)
if w:
os.write(self.master, message + "\n")
self.messages.task_done()
def read(self, size=None):
"""Return the next size number of bytes from the stream.
If size is not defined, return all bytes of the stream
up to EOF.
"""
if size is None:
t = []
while True:
buf = self._read(self.bufsize)
if not buf:
break
t.append(buf)
buf = "".join(t)
else:
buf = self._read(size)
self.pos += len(buf)
return buf
def fromtarfile(cls, tarfile):
"""Return the next TarInfo object from TarFile object
tarfile.
"""
buf = tarfile.fileobj.read(BLOCKSIZE)
obj = cls.frombuf(buf, tarfile.encoding, tarfile.errors)
obj.offset = tarfile.fileobj.tell() - BLOCKSIZE
return obj._proc_member(tarfile)
#--------------------------------------------------------------------------
# The following are methods that are called depending on the type of a
# member. The entry point is _proc_member() which can be overridden in a
# subclass to add custom _proc_*() methods. A _proc_*() method MUST
# implement the following
# operations:
# 1. Set self.offset_data to the position where the data blocks begin,
# if there is data that follows.
# 2. Set tarfile.offset to the position where the next member's header will
# begin.
# 3. Return self or another valid TarInfo object.
def _proc_gnulong(self, tarfile):
"""Process the blocks that hold a GNU longname
or longlink member.
"""
buf = tarfile.fileobj.read(self._block(self.size))
# Fetch the next header and process it.
try:
next = self.fromtarfile(tarfile)
except HeaderError:
raise SubsequentHeaderError("missing or bad subsequent header")
# Patch the TarInfo object from the next header with
# the longname information.
next.offset = self.offset
if self.type == GNUTYPE_LONGNAME:
next.name = nts(buf, tarfile.encoding, tarfile.errors)
elif self.type == GNUTYPE_LONGLINK:
next.linkname = nts(buf, tarfile.encoding, tarfile.errors)
return next
def __next__(self):
"""Return the next item using TarFile's next() method.
When all members have been read, set TarFile as _loaded.
"""
# Fix for SF #1100429: Under rare circumstances it can
# happen that getmembers() is called during iteration,
# which will cause TarIter to stop prematurely.
if not self.tarfile._loaded:
tarinfo = self.tarfile.next()
if not tarinfo:
self.tarfile._loaded = True
raise StopIteration
else:
try:
tarinfo = self.tarfile.members[self.index]
except IndexError:
raise StopIteration
self.index += 1
return tarinfo
def getchar(echo):
if not isatty(sys.stdin):
f = open('/dev/tty')
fd = f.fileno()
else:
fd = sys.stdin.fileno()
f = None
try:
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(fd)
ch = os.read(fd, 32)
if echo and isatty(sys.stdout):
sys.stdout.write(ch)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
sys.stdout.flush()
if f is not None:
f.close()
except termios.error:
pass
_translate_ch_to_exc(ch)
return ch.decode(get_best_encoding(sys.stdin), 'replace')