def capture_output(data=None):
"""
with capture_output as (stdout, stderr):
some_action()
print stdout.getvalue(), stderr.getvalue()
"""
in_ = BytesIO(data or b"")
err = BytesIO()
out = BytesIO()
old_in = sys.stdin
old_err = sys.stderr
old_out = sys.stdout
sys.stdin = in_
sys.stderr = err
sys.stdout = out
try:
yield (out, err)
finally:
sys.stdin = old_in
sys.stderr = old_err
sys.stdout = old_out
python类stdin()的实例源码
def __init__(self, addr="127.0.0.1", port=4444):
"""Initialize the socket and initialize pdb."""
# Backup stdin and stdout before replacing them by the socket handle
self.old_stdout = sys.stdout
self.old_stdin = sys.stdin
# Open a 'reusable' socket to let the webapp reload on the same port
self.skt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
self.skt.bind((addr, port))
self.skt.listen(1)
(clientsocket, address) = self.skt.accept()
handle = clientsocket.makefile('rw')
pdb.Pdb.__init__(self, completekey='tab', stdin=handle, stdout=handle)
sys.stdout = sys.stdin = handle
def __init__(self, addr="127.0.0.1", port=4444):
"""Initialize the socket and initialize pdb."""
# Backup stdin and stdout before replacing them by the socket handle
self.old_stdout = sys.stdout
self.old_stdin = sys.stdin
# Open a 'reusable' socket to let the webapp reload on the same port
self.skt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
self.skt.bind((addr, port))
self.skt.listen(1)
(clientsocket, address) = self.skt.accept()
handle = clientsocket.makefile('rw')
pdb.Pdb.__init__(self, completekey='tab', stdin=handle, stdout=handle)
sys.stdout = sys.stdin = handle
def __init__(self, addr="127.0.0.1", port=4444):
"""Initialize the socket and initialize pdb."""
# Backup stdin and stdout before replacing them by the socket handle
self.old_stdout = sys.stdout
self.old_stdin = sys.stdin
# Open a 'reusable' socket to let the webapp reload on the same port
self.skt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
self.skt.bind((addr, port))
self.skt.listen(1)
(clientsocket, address) = self.skt.accept()
handle = clientsocket.makefile('rw')
pdb.Pdb.__init__(self, completekey='tab', stdin=handle, stdout=handle)
sys.stdout = sys.stdin = handle
def __init__(self, addr="127.0.0.1", port=4444):
"""Initialize the socket and initialize pdb."""
# Backup stdin and stdout before replacing them by the socket handle
self.old_stdout = sys.stdout
self.old_stdin = sys.stdin
# Open a 'reusable' socket to let the webapp reload on the same port
self.skt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
self.skt.bind((addr, port))
self.skt.listen(1)
(clientsocket, address) = self.skt.accept()
handle = clientsocket.makefile('rw')
pdb.Pdb.__init__(self, completekey='tab', stdin=handle, stdout=handle)
sys.stdout = sys.stdin = handle
def main():
import sys
parser = argparse.ArgumentParser(description="pysnappy driver")
parser.add_argument("-f", "--file", help="Input file", default=sys.stdin)
parser.add_argument("-b", "--bytesize",
help="Bitesize for streaming reads",
type=int, default=65536)
parser.add_argument("-o", "--output", help="Output file",
default=sys.stdout)
group = parser.add_mutually_exclusive_group()
group.add_argument("-c", "--compress", action="store_true")
group.add_argument("-d", "--decompress", action="store_false")
parser.add_argument("-t", "--framing", help="Framing format",
choices=["framing2", "hadoop"], default="framing2")
args = parser.parse_args()
run(args)
def description_of(lines, name='stdin'):
"""
Return a string describing the probable encoding of a file or
list of strings.
:param lines: The lines to get the encoding of.
:type lines: Iterable of bytes
:param name: Name of file or collection of lines
:type name: str
"""
u = UniversalDetector()
for line in lines:
u.feed(line)
u.close()
result = u.result
if result['encoding']:
return '{0}: {1} with confidence {2}'.format(name, result['encoding'],
result['confidence'])
else:
return '{0}: no result'.format(name)
def description_of(lines, name='stdin'):
"""
Return a string describing the probable encoding of a file or
list of strings.
:param lines: The lines to get the encoding of.
:type lines: Iterable of bytes
:param name: Name of file or collection of lines
:type name: str
"""
u = UniversalDetector()
for line in lines:
u.feed(line)
u.close()
result = u.result
if result['encoding']:
return '{0}: {1} with confidence {2}'.format(name, result['encoding'],
result['confidence'])
else:
return '{0}: no result'.format(name)
def description_of(lines, name='stdin'):
"""
Return a string describing the probable encoding of a file or
list of strings.
:param lines: The lines to get the encoding of.
:type lines: Iterable of bytes
:param name: Name of file or collection of lines
:type name: str
"""
u = UniversalDetector()
for line in lines:
u.feed(line)
u.close()
result = u.result
if result['encoding']:
return '{0}: {1} with confidence {2}'.format(name, result['encoding'],
result['confidence'])
else:
return '{0}: no result'.format(name)
def description_of(lines, name='stdin'):
"""
Return a string describing the probable encoding of a file or
list of strings.
:param lines: The lines to get the encoding of.
:type lines: Iterable of bytes
:param name: Name of file or collection of lines
:type name: str
"""
u = UniversalDetector()
for line in lines:
u.feed(line)
u.close()
result = u.result
if result['encoding']:
return '{0}: {1} with confidence {2}'.format(name, result['encoding'],
result['confidence'])
else:
return '{0}: no result'.format(name)
def description_of(lines, name='stdin'):
"""
Return a string describing the probable encoding of a file or
list of strings.
:param lines: The lines to get the encoding of.
:type lines: Iterable of bytes
:param name: Name of file or collection of lines
:type name: str
"""
u = UniversalDetector()
for line in lines:
u.feed(line)
u.close()
result = u.result
if result['encoding']:
return '{0}: {1} with confidence {2}'.format(name, result['encoding'],
result['confidence'])
else:
return '{0}: no result'.format(name)
def description_of(lines, name='stdin'):
"""
Return a string describing the probable encoding of a file or
list of strings.
:param lines: The lines to get the encoding of.
:type lines: Iterable of bytes
:param name: Name of file or collection of lines
:type name: str
"""
u = UniversalDetector()
for line in lines:
u.feed(line)
u.close()
result = u.result
if result['encoding']:
return '{0}: {1} with confidence {2}'.format(name, result['encoding'],
result['confidence'])
else:
return '{0}: no result'.format(name)
def description_of(lines, name='stdin'):
"""
Return a string describing the probable encoding of a file or
list of strings.
:param lines: The lines to get the encoding of.
:type lines: Iterable of bytes
:param name: Name of file or collection of lines
:type name: str
"""
u = UniversalDetector()
for line in lines:
u.feed(line)
u.close()
result = u.result
if result['encoding']:
return '{0}: {1} with confidence {2}'.format(name, result['encoding'],
result['confidence'])
else:
return '{0}: no result'.format(name)
def description_of(lines, name='stdin'):
"""
Return a string describing the probable encoding of a file or
list of strings.
:param lines: The lines to get the encoding of.
:type lines: Iterable of bytes
:param name: Name of file or collection of lines
:type name: str
"""
u = UniversalDetector()
for line in lines:
u.feed(line)
u.close()
result = u.result
if result['encoding']:
return '{0}: {1} with confidence {2}'.format(name, result['encoding'],
result['confidence'])
else:
return '{0}: no result'.format(name)
def __call__(self, string):
# the special argument "-" means sys.std{in,out}
if string == '-':
if 'r' in self._mode:
return _sys.stdin
elif 'w' in self._mode:
return _sys.stdout
else:
msg = _('argument "-" with mode %r' % self._mode)
raise ValueError(msg)
try:
# all other arguments are used as file names
if self._bufsize:
return open(string, self._mode, self._bufsize)
else:
return open(string, self._mode)
except IOError:
err = _sys.exc_info()[1]
message = _("can't open '%s': %s")
raise ArgumentTypeError(message % (string, err))
def testWithIO(inp, out, f):
"""Calls the function `f` with ``sys.stdin`` changed to `inp`
and ``sys.stdout`` changed to `out`. They are restored when `f`
returns. This function returns whatever `f` returns.
"""
import os
try:
oldin,sys.stdin = sys.stdin,inp
oldout,sys.stdout = sys.stdout,out
x = f()
finally:
sys.stdin = oldin
sys.stdout = oldout
if os.environ.get('PYPNG_TEST_TMP') and hasattr(out,'getvalue'):
name = mycallersname()
if name:
w = open(name+'.png', 'wb')
w.write(out.getvalue())
w.close()
return x
def from_command_line(cls, *args, **keys):
params = list()
for name, param in cls.params():
if name not in keys:
params.append((name, param))
bot_name = inspect.getmodulename(inspect.stack()[1][1])
if "ABUSEHELPER_CONF_FROM_STDIN" in os.environ:
defaults = dict(pickle.load(sys.stdin))
defaults.setdefault("bot_name", bot_name)
added = cls._from_dict(params, **defaults)
else:
added = cls._from_sys_argv(params, bot_name=bot_name)
added.update(keys)
return cls(*args, **added)
def get_key(self, block=True, timeout=None):
if timeout:
check = select.select([sys.stdin], [], [], timeout)[0]
if len(check) == 0:
return None
ret = self.stdscrs[-1].getkey()
elif block:
ret = self.stdscrs[-1].getkey()
else:
self.stdscrs[-1].nodelay(1)
try:
ret = self.stdscrs[-1].getkey()
if ret == curses.ERR:
ret = None
except:
ret = None
finally:
self.stdscrs[-1].nodelay(0)
if len(ret) == 1:
if ord(ret) < 0x20:
ret = "^{}".format(chr(ord(ret) + ord('@')))
elif ord(ret) == 0x7f:
ret = "^?"
return ret
def shutdown(self):
"""Revert stdin and stdout, close the socket."""
sys.stdout = self.old_stdout
sys.stdin = self.old_stdin
self.skt.close()
self.set_continue()
def run():
# REVISIT(ivc): current CNI implementation provided by this package is
# experimental and its primary purpose is to enable development of other
# components (e.g. functional tests, service/LBaaSv2 support)
cni_conf = utils.CNIConfig(jsonutils.load(sys.stdin))
args = ['--config-file', cni_conf.kuryr_conf]
try:
if cni_conf.debug:
args.append('-d')
except AttributeError:
pass
config.init(args)
config.setup_logging()
# Initialize o.vo registry.
k_objects.register_locally_defined_vifs()
os_vif.initialize()
if CONF.cni_daemon.daemon_enabled:
runner = cni_api.CNIDaemonizedRunner()
else:
runner = cni_api.CNIStandaloneRunner(K8sCNIPlugin())
LOG.info("Using '%s' ", runner.__class__.__name__)
def _timeout(signum, frame):
runner._write_dict(sys.stdout, {
'msg': 'timeout',
'code': k_const.CNI_TIMEOUT_CODE,
})
LOG.debug('timed out')
sys.exit(1)
signal.signal(signal.SIGALRM, _timeout)
signal.alarm(_CNI_TIMEOUT)
status = runner.run(os.environ, cni_conf, sys.stdout)
LOG.debug("Exiting with status %s", status)
if status:
sys.exit(status)