def __init__(self, addr="127.0.0.1", port=4444):
"""Initialize the socket and initialize pdb."""
# Backup stdin and stdout before replacing them by the socket handle
self.old_stdout = sys.stdout
self.old_stdin = sys.stdin
# Open a 'reusable' socket to let the webapp reload on the same port
self.skt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
self.skt.bind((addr, port))
self.skt.listen(1)
(clientsocket, address) = self.skt.accept()
handle = clientsocket.makefile('rw')
pdb.Pdb.__init__(self, completekey='tab', stdin=handle, stdout=handle)
sys.stdout = sys.stdin = handle
python类stdin()的实例源码
def isEmpty():
"""
Return True if no non-whitespace characters remain in standard
input. Otherwise return False.
"""
global _buffer
while _buffer.strip() == '':
line = sys.stdin.readline()
if sys.hexversion < 0x03000000:
line = line.decode('utf-8')
if line == '':
return True
_buffer += line
return False
#-----------------------------------------------------------------------
def hasNextLine():
"""
Return True if standard input has a next line. Otherwise return
False.
"""
global _buffer
if _buffer != '':
return True
else:
_buffer = sys.stdin.readline()
if sys.hexversion < 0x03000000:
_buffer = _buffer.decode('utf-8')
if _buffer == '':
return False
return True
#-----------------------------------------------------------------------
def readAll():
"""
Read and return as a string all remaining lines of standard input.
"""
global _buffer
s = _buffer
_buffer = ''
for line in sys.stdin:
if sys.hexversion < 0x03000000:
line = line.decode('utf-8')
s += line
return s
#=======================================================================
# For Testing
#=======================================================================
def __init__(self, file):
self.file = file
if file == '':
self.infile = sys.stdin
elif file.lower().startswith('http://') or file.lower().startswith('https://'):
try:
if sys.hexversion >= 0x020601F0:
self.infile = urllib23.urlopen(file, timeout=5)
else:
self.infile = urllib23.urlopen(file)
except urllib23.HTTPError:
print('Error accessing URL %s' % file)
print(sys.exc_info()[1])
sys.exit()
elif file.lower().endswith('.zip'):
try:
self.zipfile = zipfile.ZipFile(file, 'r')
self.infile = self.zipfile.open(self.zipfile.infolist()[0], 'r', C2BIP3('infected'))
except:
print('Error opening file %s' % file)
print(sys.exc_info()[1])
sys.exit()
else:
try:
self.infile = open(file, 'rb')
except:
print('Error opening file %s' % file)
print(sys.exc_info()[1])
sys.exit()
self.ungetted = []
def description_of(lines, name='stdin'):
"""
Return a string describing the probable encoding of a file or
list of strings.
:param lines: The lines to get the encoding of.
:type lines: Iterable of bytes
:param name: Name of file or collection of lines
:type name: str
"""
u = UniversalDetector()
for line in lines:
u.feed(line)
u.close()
result = u.result
if result['encoding']:
return '{0}: {1} with confidence {2}'.format(name, result['encoding'],
result['confidence'])
else:
return '{0}: no result'.format(name)
def description_of(lines, name='stdin'):
"""
Return a string describing the probable encoding of a file or
list of strings.
:param lines: The lines to get the encoding of.
:type lines: Iterable of bytes
:param name: Name of file or collection of lines
:type name: str
"""
u = UniversalDetector()
for line in lines:
u.feed(line)
u.close()
result = u.result
if result['encoding']:
return '{0}: {1} with confidence {2}'.format(name, result['encoding'],
result['confidence'])
else:
return '{0}: no result'.format(name)
def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
starts = Starts(sys.stdout)
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly
))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
return 1
if args.post_fails:
print_fails(sys.stdout)
print_summary(sys.stdout)
return (0 if summary.wasSuccessful() else 1)
def __init__(self, addr="127.0.0.1", port=4444):
"""Initialize the socket and initialize pdb."""
# Backup stdin and stdout before replacing them by the socket handle
self.old_stdout = sys.stdout
self.old_stdin = sys.stdin
# Open a 'reusable' socket to let the webapp reload on the same port
self.skt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
self.skt.bind((addr, port))
self.skt.listen(1)
(clientsocket, address) = self.skt.accept()
handle = clientsocket.makefile('rw')
pdb.Pdb.__init__(self, completekey='tab', stdin=handle, stdout=handle)
sys.stdout = sys.stdin = handle
def _run(self):
with tf.Session() as session:
self.io.restore_session(session)
inputs = sys.stdin
singsen = SingleSentenceData()
scounter = SpeedCounter().start()
while True:
senlen = singsen.read_from_file(sys.stdin, self.io.w2id)
if senlen is None:
break
if senlen < 2:
print(-9999)
continue
o = run_epoch(session, self.test_model, singsen)
scounter.next()
if self.params.progress and scounter.val % 20 ==0:
print("\rLoglikes per secs: %f" % scounter.speed, end="", file=sys.stderr)
print("%f" % o)
def default(self, line):
if line[:1] == '!': line = line[1:]
locals = self.curframe_locals
globals = self.curframe.f_globals
try:
code = compile(line + '\n', '<stdin>', 'single')
save_stdout = sys.stdout
save_stdin = sys.stdin
save_displayhook = sys.displayhook
try:
sys.stdin = self.stdin
sys.stdout = self.stdout
sys.displayhook = self.displayhook
exec code in globals, locals
finally:
sys.stdout = save_stdout
sys.stdin = save_stdin
sys.displayhook = save_displayhook
except:
t, v = sys.exc_info()[:2]
if type(t) == type(''):
exc_type_name = t
else: exc_type_name = t.__name__
print >>self.stdout, '***', exc_type_name + ':', v
def _test():
"""Simple test program to disassemble a file."""
if sys.argv[1:]:
if sys.argv[2:]:
sys.stderr.write("usage: python dis.py [-|file]\n")
sys.exit(2)
fn = sys.argv[1]
if not fn or fn == "-":
fn = None
else:
fn = None
if fn is None:
f = sys.stdin
else:
f = open(fn)
source = f.read()
if fn is not None:
f.close()
else:
fn = "<stdin>"
code = compile(source, fn, "exec")
dis(code)
def s_unload(self, *args):
"""Unload the module.
Removes it from the restricted environment's sys.modules dictionary.
This method is implicitly called by code executing in the
restricted environment. Overriding this method in a subclass is
used to change the policies enforced by a restricted environment.
Similar to the r_unload() method, but has access to restricted
versions of the standard I/O streams sys.stdin, sys.stderr, and
sys.stdout.
"""
return self.s_apply(self.r_unload, args)
# Restricted open(...)
def _raw_input(prompt="", stream=None, input=None):
# A raw_input() replacement that doesn't save the string in the
# GNU readline history.
if not stream:
stream = sys.stderr
if not input:
input = sys.stdin
prompt = str(prompt)
if prompt:
stream.write(prompt)
stream.flush()
# NOTE: The Python C API calls flockfile() (and unlock) during readline.
line = input.readline()
if not line:
raise EOFError
if line[-1] == '\n':
line = line[:-1]
return line
def __call__(self, string):
# the special argument "-" means sys.std{in,out}
if string == '-':
if 'r' in self._mode:
return _sys.stdin
elif 'w' in self._mode:
return _sys.stdout
else:
msg = _('argument "-" with mode %r') % self._mode
raise ValueError(msg)
# all other arguments are used as file names
try:
return open(string, self._mode, self._bufsize)
except IOError as e:
message = _("can't open '%s': %s")
raise ArgumentTypeError(message % (string, e))
def __init__(self, completekey='tab', stdin=None, stdout=None):
"""Instantiate a line-oriented interpreter framework.
The optional argument 'completekey' is the readline name of a
completion key; it defaults to the Tab key. If completekey is
not None and the readline module is available, command completion
is done automatically. The optional arguments stdin and stdout
specify alternate input and output file objects; if not specified,
sys.stdin and sys.stdout are used.
"""
import sys
if stdin is not None:
self.stdin = stdin
else:
self.stdin = sys.stdin
if stdout is not None:
self.stdout = stdout
else:
self.stdout = sys.stdout
self.cmdqueue = []
self.completekey = completekey
def expand_args(args, flist):
"""read names in flist and append to args"""
expanded = args[:]
if flist:
try:
if flist == '-':
fd = sys.stdin
else:
fd = open(flist)
while 1:
line = fd.readline()
if not line:
break
expanded.append(line[:-1])
except IOError:
print "Error reading file list %s" % flist
raise
return expanded
def interact(self):
"""Interaction function, emulates a very dumb telnet client."""
if sys.platform == "win32":
self.mt_interact()
return
while 1:
rfd, wfd, xfd = select.select([self, sys.stdin], [], [])
if self in rfd:
try:
text = self.read_eager()
except EOFError:
print '*** Connection closed by remote host ***'
break
if text:
sys.stdout.write(text)
sys.stdout.flush()
if sys.stdin in rfd:
line = sys.stdin.readline()
if not line:
break
self.write(line)
def test():
"""Small test program"""
import sys, getopt
try:
opts, args = getopt.getopt(sys.argv[1:], 'deut')
except getopt.error, msg:
sys.stdout = sys.stderr
print msg
print """usage: %s [-d|-e|-u|-t] [file|-]
-d, -u: decode
-e: encode (default)
-t: encode and decode string 'Aladdin:open sesame'"""%sys.argv[0]
sys.exit(2)
func = encode
for o, a in opts:
if o == '-e': func = encode
if o == '-d': func = decode
if o == '-u': func = decode
if o == '-t': test1(); return
if args and args[0] != '-':
with open(args[0], 'rb') as f:
func(f, sys.stdout)
else:
func(sys.stdin, sys.stdout)
def unified_diff(filename, content2=None):
# type: (str, Optional[bytes]) -> Tuple[int, Iterable[str]]
"""This function prints a unified diff of the contents of
filename and the standard input, when used from the command line
as follows:
echo 123 > d.txt ; echo 456 | ./whatstyle.py --stdindiff d.txt
We get this result:
---
+++
@@ -1 +1 @@
-123
+456
"""
use_stdin = content2 is None
if content2 is None:
# Read binary input stream
stdin = rawstream(sys.stdin)
econtent2 = bytestr(stdin.read())
else:
econtent2 = content2
exit_code, diff = compute_unified_diff(filename, econtent2, lineterm='')
if use_stdin:
write('\n'.join(diff))
return exit_code, diff
def main():
parser = argparse.ArgumentParser(description="")
parser.add_argument("--mode", default="stdio",
help="communication (stdio|tcp)")
parser.add_argument("--addr", default=2087,
help="server listen (tcp)", type=int)
args = parser.parse_args()
if args.mode == "stdio":
log("Reading on stdin, writing on stdout")
s = LangServer(conn=ReadWriter(sys.stdin, sys.stdout))
s.listen()
elif args.mode == "tcp":
host, addr = "0.0.0.0", args.addr
log("Accepting TCP connections on {}:{}".format(host, addr))
ThreadingTCPServer.allow_reuse_address = True
s = ThreadingTCPServer((host, addr), LangserverTCPTransport)
try:
s.serve_forever()
finally:
s.shutdown()
def __init__(self, fileOrPath, ttFont, progress=None, quiet=None):
if fileOrPath == '-':
fileOrPath = sys.stdin
if not hasattr(fileOrPath, "read"):
self.file = open(fileOrPath, "rb")
self._closeStream = True
else:
# assume readable file object
self.file = fileOrPath
self._closeStream = False
self.ttFont = ttFont
self.progress = progress
if quiet is not None:
from fontTools.misc.loggingTools import deprecateArgument
deprecateArgument("quiet", "configure logging instead")
self.quiet = quiet
self.root = None
self.contentStack = []
self.stackSize = 0
def main():
if len(sys.argv) == 1:
infile = sys.stdin
outfile = sys.stdout
elif len(sys.argv) == 2:
infile = open(sys.argv[1], 'r')
outfile = sys.stdout
elif len(sys.argv) == 3:
infile = open(sys.argv[1], 'r')
outfile = open(sys.argv[2], 'w')
else:
raise SystemExit(sys.argv[0] + " [infile [outfile]]")
with infile:
try:
obj = json.load(infile,
object_pairs_hook=json.OrderedDict,
use_decimal=True)
except ValueError:
raise SystemExit(sys.exc_info()[1])
with outfile:
json.dump(obj, outfile, sort_keys=True, indent=' ', use_decimal=True)
outfile.write('\n')
def process(self, argv=sys.argv, ifile=sys.stdin, ofile=sys.stdout):
""" Process data.
:param argv: Command line arguments.
:type argv: list or tuple
:param ifile: Input data file.
:type ifile: file
:param ofile: Output data file.
:type ofile: file
:return: :const:`None`
:rtype: NoneType
"""
if len(argv) > 1:
self._process_protocol_v1(argv, ifile, ofile)
else:
self._process_protocol_v2(argv, ifile, ofile)
def print(self, f=sys.stdin):
def pad(x):
return x + " " * (4 - len(x))
for r in range(self.mapHeight):
s = ""
for c in range(self.mapWidth):
idx = r * self.mapWidth + c
if self.terrain[idx] == Terrain.MOUNTAIN:
s += pad('M')
elif self.terrain[idx] == Terrain.NEUTRAL_CITY:
s += pad('C')
elif self.owner[idx] == Owner.NEUTRAL:
s += pad('.')
elif self.owner[idx] == Owner.OURS:
s += pad(str(self.armies[idx]))
else:
s += pad(str(-self.armies[idx]))
print(s, file=f)
print("move=%s" % str(self.nextMove), file=f)
print("", file=f)
def read_input_features(l, inp=sys.stdin):
if isinstance(inp, str):
with open(inp, 'r') as f:
return read_input_features(f)
print("%d samples" % l, file=sys.stderr)
xs = np.zeros((l, flen), np.int16)
ys = np.zeros((l, n*n*classes), np.int16)
i = 0
for line in inp:
xs[i, :], ys[i, :] = parse_csv_row_xy(line)
i += 1
if i % 10000 == 0:
print("%d read from disk" % i, file=sys.stderr)
return xs, ys
def testWithIO(inp, out, f):
"""Calls the function `f` with ``sys.stdin`` changed to `inp`
and ``sys.stdout`` changed to `out`. They are restored when `f`
returns. This function returns whatever `f` returns.
"""
import os
try:
oldin,sys.stdin = sys.stdin,inp
oldout,sys.stdout = sys.stdout,out
x = f()
finally:
sys.stdin = oldin
sys.stdout = oldout
if os.environ.get('PYPNG_TEST_TMP') and hasattr(out,'getvalue'):
name = mycallersname()
if name:
w = open(name+'.png', 'wb')
w.write(out.getvalue())
w.close()
return x
def description_of(lines, name='stdin'):
"""
Return a string describing the probable encoding of a file or
list of strings.
:param lines: The lines to get the encoding of.
:type lines: Iterable of bytes
:param name: Name of file or collection of lines
:type name: str
"""
u = UniversalDetector()
for line in lines:
u.feed(line)
u.close()
result = u.result
if result['encoding']:
return '{0}: {1} with confidence {2}'.format(name, result['encoding'],
result['confidence'])
else:
return '{0}: no result'.format(name)
def process(self, argv=sys.argv, ifile=sys.stdin, ofile=sys.stdout):
""" Process data.
:param argv: Command line arguments.
:type argv: list or tuple
:param ifile: Input data file.
:type ifile: file
:param ofile: Output data file.
:type ofile: file
:return: :const:`None`
:rtype: NoneType
"""
if len(argv) > 1:
self._process_protocol_v1(argv, ifile, ofile)
else:
self._process_protocol_v2(argv, ifile, ofile)
def _argparse():
argparse = ArgumentParser('bh_tsne Python wrapper')
argparse.add_argument('-d', '--no_dims', type=int,
default=DEFAULT_NO_DIMS)
argparse.add_argument('-p', '--perplexity', type=float,
default=DEFAULT_PERPLEXITY)
# 0.0 for theta is equivalent to vanilla t-SNE
argparse.add_argument('-t', '--theta', type=float, default=DEFAULT_THETA)
argparse.add_argument('-r', '--randseed', type=int, default=EMPTY_SEED)
argparse.add_argument('-n', '--initial_dims', type=int, default=INITIAL_DIMENSIONS)
argparse.add_argument('-v', '--verbose', action='store_true')
argparse.add_argument('-i', '--input', type=FileType('r'), default=stdin)
argparse.add_argument('-o', '--output', type=FileType('w'),
default=stdout)
argparse.add_argument('--use_pca', action='store_true')
argparse.add_argument('--no_pca', dest='use_pca', action='store_false')
argparse.set_defaults(use_pca=DEFAULT_USE_PCA)
argparse.add_argument('-m', '--max_iter', type=int, default=DEFAULT_MAX_ITERATIONS)
return argparse