def init_pprint(self):
"""Activates pretty-printing of output values.
"""
keys_re = re.compile(r'([\'\("]+(.*?[\'\)"]: ))+?')
color_dict = partial(keys_re.sub, lambda m: purple(m.group()))
format_func = pprint.pformat
if sys.version_info.major >= 3 and sys.version_info.minor > 3:
format_func = partial(pprint.pformat, compact=True)
def pprint_callback(value):
if value is not None:
try:
rows, cols = os.get_teminal_size()
except AttributeError:
try:
rows, cols = map(int, subprocess.check_output(['stty', 'size']).split())
except:
cols = 80
builtins._ = value
formatted = format_func(value, width=cols)
print(color_dict(formatted) if issubclass(type(value), dict) else blue(formatted))
sys.displayhook = pprint_callback
python类_的实例源码
def test_original_displayhook(self):
import __builtin__
savestdout = sys.stdout
out = cStringIO.StringIO()
sys.stdout = out
dh = sys.__displayhook__
self.assertRaises(TypeError, dh)
if hasattr(__builtin__, "_"):
del __builtin__._
dh(None)
self.assertEqual(out.getvalue(), "")
self.assertTrue(not hasattr(__builtin__, "_"))
dh(42)
self.assertEqual(out.getvalue(), "42\n")
self.assertEqual(__builtin__._, 42)
del sys.stdout
self.assertRaises(RuntimeError, dh, 42)
sys.stdout = savestdout
def test_original_displayhook(self):
import __builtin__
savestdout = sys.stdout
out = cStringIO.StringIO()
sys.stdout = out
dh = sys.__displayhook__
self.assertRaises(TypeError, dh)
if hasattr(__builtin__, "_"):
del __builtin__._
dh(None)
self.assertEqual(out.getvalue(), "")
self.assertTrue(not hasattr(__builtin__, "_"))
dh(42)
self.assertEqual(out.getvalue(), "42\n")
self.assertEqual(__builtin__._, 42)
del sys.stdout
self.assertRaises(RuntimeError, dh, 42)
sys.stdout = savestdout
def test_original_displayhook(self):
import __builtin__
savestdout = sys.stdout
out = cStringIO.StringIO()
sys.stdout = out
dh = sys.__displayhook__
self.assertRaises(TypeError, dh)
if hasattr(__builtin__, "_"):
del __builtin__._
dh(None)
self.assertEqual(out.getvalue(), "")
self.assertTrue(not hasattr(__builtin__, "_"))
dh(42)
self.assertEqual(out.getvalue(), "42\n")
self.assertEqual(__builtin__._, 42)
del sys.stdout
self.assertRaises(RuntimeError, dh, 42)
sys.stdout = savestdout
def test_original_displayhook(self):
import __builtin__
savestdout = sys.stdout
out = cStringIO.StringIO()
sys.stdout = out
dh = sys.__displayhook__
self.assertRaises(TypeError, dh)
if hasattr(__builtin__, "_"):
del __builtin__._
dh(None)
self.assertEqual(out.getvalue(), "")
self.assertTrue(not hasattr(__builtin__, "_"))
dh(42)
self.assertEqual(out.getvalue(), "42\n")
self.assertEqual(__builtin__._, 42)
del sys.stdout
self.assertRaises(RuntimeError, dh, 42)
sys.stdout = savestdout
def gettext_translate( s ):
"""
Thread-safe version of _().
We look up the thread-local translation function.
"""
return catalogs.translate(s)
# Inject the _() function in the builtins. You could also inject a N_()
# function for noop translation markers.
def handle_request():
"""
Treat a request, with i18n strings.
"""
# Fetch and return a translated string.
# This is the interesting bit, from a client's point-of-view.
print _('bli'), _('bla'), _('blo')
# Do something else.
time.sleep(random.random())
# A thread class. This would be provided by the web framework,
# normally.
def init_prompt(self):
"""Activates color on the prompt based on python version.
Also adds the hosts IP if running on a remote host over a
ssh connection.
"""
prompt_color = green if sys.version_info.major == 2 else yellow
sys.ps1 = prompt_color('>>> ', readline_workaround=True)
sys.ps2 = red('... ', readline_workaround=True)
# - if we are over a remote connection, modify the ps1
if os.getenv('SSH_CONNECTION'):
_, _, this_host, _ = os.getenv('SSH_CONNECTION').split()
sys.ps1 = prompt_color('[{}]>>> '.format(this_host), readline_workaround=True)
sys.ps2 = red('[{}]... '.format(this_host), readline_workaround=True)
def improved_rlcompleter(self):
"""Enhances the default rlcompleter
The function enhances the default rlcompleter by also doing
pathname completion and module name completion for import
statements. Additionally, it inserts a tab instead of attempting
completion if there is no preceding text.
"""
completer = rlcompleter.Completer(namespace=self.locals)
# - remove / from the delimiters to help identify possibility for path completion
readline.set_completer_delims(readline.get_completer_delims().replace('/', ''))
modlist = frozenset(name for _, name, _ in pkgutil.iter_modules())
def complete_wrapper(text, state):
line = readline.get_line_buffer().strip()
if line == '':
return None if state > 0 else self.tab
if state == 0:
if line.startswith(('import', 'from')):
completer.matches = [name for name in modlist if name.startswith(text)]
else:
match = completer.complete(text, state)
if match is None and '/' in text:
completer.matches = glob.glob(text+'*')
try:
match = completer.matches[state]
return '{}{}'.format(match, ' ' if keyword.iskeyword(match) else '')
except IndexError:
return None
return complete_wrapper
def testVerify(spec, pub, priv, closed, proxy):
"""
Runs a single test case against verify.verifyDEP() and returns the
result and potentially an error message. In addition to the elements
understood by run_test.runTest(), this function also understands the
"expectedException" element, which indicates the name of the exception
the verifyDEP() function is expected to throw, and the
"exceptionReceipt" element, which indicates the receipt at which an
exception is expected to occur. If "exceptionReceipt" is omitted, the
expected exception may occur anywhere in the generated DEP. If
"expectedException" is omitted, verifyDEP() must not throw any
exception.
:param spec: The test case specification as a dict structure.
:param pub: The public key or certificate. For a closed system a public
key must be used, for an open system a certificate must be used.
:param priv: The private key used to sign the generated receipts.
:param closed: Indicates whether the system is a closed system (True) or
an open system (False).
:param proxy: An object implementing RKSVVerificationProxyI. This will
be used to do the actual verification.
:return: A TestVerifyResult indicating the result of the test and an
error message. If the result is OK, the message is None.
"""
try:
keymat = [(pub, priv)] * spec['numberOfSignatureDevices']
deps, cc = run_test.runTest(spec, keymat, closed)
except Exception as e:
return TestVerifyResult.ERROR, e
rN, mN = _testVerify(spec, deps, cc, False, proxy)
rP, mP = _testVerify(spec, deps, cc, True, proxy)
if rN == rP and str(mN) == str(mP):
return rN, mN
r = TestVerifyResult.FAIL
if rN == TestVerifyResult.ERROR or rP == TestVerifyResult.ERROR:
r = TestVerifyResult.ERROR
return r, Exception(
_('Result mismatch: without parsing {}:>{}<, with parsing {}:>{}<').format(
rN.name, mN, rP.name, mP))
def testVerifyMulti(specs, groupLabel, crt, pub, priv, tcDefaultSize,
proxy):
"""
Runs all the given test cases against verify.verifyDEP. In addition to
the elements understood by TestVerify(), this function also understands
the "closedSystem" element in the root dictionary, which indicates
whether the system is a closed system (True) or an open system (False).
This function is a generator to facilitate more responsive output when
used with many test cases.
:param specs: A list or generator with test specifications as dict
structures.
:param groupLabel: A label to indicate which group the tests belong to
as a string.
:param crt: The certificate used to sign the generated receipts if an
open system is used.
:param pub: The public key used to sign the generated receipts if a
closed system is used.
:param priv: The private key belonging to the given certificate and
public key.
:param tcDefaultSize: The turnover counter size in bytes to use if no
size is given in the test specification.
:param proxy: An object implementing RKSVVerificationProxyI. This will
be used to do the actual verification.
:yield: A tuple containing (in order) the test cases name, the group
label, a boolean indicating whether the system is a closed (True) or an
open (False) system, the used turnover counter size in bytes, the
result of the test as a TestVerifyResult and the generated error
message or None if no error occurred.
"""
for s in specs:
label = s.get('simulationRunLabel', 'Unknown')
tc_size = s.get('turnoverCounterSize', tcDefaultSize)
closed = s.get('closedSystem', False)
if label == 'Unknown':
result = TestVerifyResult.ERROR
msg = _('No run label')
else:
pc = pub if closed else crt
result, msg = testVerify(s, pc, priv, closed, proxy)
yield (label, groupLabel, closed, tc_size, result, msg)
def printTestVerifySummary(results):
nFails = sum(r[4] == TestVerifyResult.FAIL for r in results)
nErrors = sum(r[4] == TestVerifyResult.ERROR for r in results)
print(_('{} tests run, {} failed, {} errors').format(len(results), nFails, nErrors))
def verify(self, fd, keyStore, aesKey, inState, registerIdx, chunksize):
# Save the _() function.
trvec = (
__builtin__._ ,
depparser._,
key_store._,
receipt._,
verification_state._,
verify._,
verify_receipt._,
)
# Temporarily disable translations to make sure error
# messages match.
(
__builtin__._ ,
depparser._,
key_store._,
receipt._,
verification_state._,
verify._,
verify_receipt._,
) = [lambda x: x] * len(trvec)
try:
parser = depparser.IncrementalDEPParser.fromFd(fd, True)
outState = verify.verifyParsedDEP(parser, keyStore, aesKey, inState,
registerIdx, self.pool, self.nprocs, chunksize)
finally:
(
__builtin__._ ,
depparser._,
key_store._,
receipt._,
verification_state._,
verify._,
verify_receipt._,
) = trvec
return outState
def process_edit_cmd(self, arg=''):
"""{EDIT_CMD} [object|filename]
Open {EDITOR} with session history, provided filename or
object's source file.
- without arguments, a temporary file containing session history is
created and opened in {EDITOR}. On quitting the editor, all
the non commented lines in the file are executed.
- with a filename argument, the file is opened in the editor. On
close, you are returned bay to the interpreter.
- with an object name argument, an attempt is made to lookup the
source file of the object and it is opened if found. Else the
argument is treated as a filename.
"""
line_num_opt = ''
if arg:
obj = self.lookup(arg)
try:
if obj:
filename = inspect.getsourcefile(obj)
_, line_no = inspect.getsourcelines(obj)
line_num_opt = config['LINE_NUM_OPT'].format(line_no=line_no)
else:
filename = arg
except (IOError, TypeError, NameError) as e:
return self.writeline(e)
else:
# - make a list of all lines in session history, commenting
# any non-blank lines.
filename = self._mktemp_buffer("# {}".format(line) if line else ''
for line in (line.strip('\n') for line in self.session_history))
# - shell out to the editor
os.system('{} {} {}'.format(config['EDITOR'], line_num_opt, filename))
# - if arg was not provided (we edited session history), execute
# it in the current namespace
if not arg:
self._exec_from_file(filename)
os.unlink(filename)
def process_sh_cmd(self, cmd):
"""{SH_EXEC} [cmd [args ...] | {{fmt string}}]
Escape to {SHELL} or execute `cmd` in {SHELL}
- without arguments, the current interpreter will be suspended
and you will be dropped in a {SHELL} prompt. Use fg to return.
- with arguments, the text will be executed in {SHELL} and the
output/error will be displayed. Additionally '_' will contain
a named tuple with the (<stdout>, <stderror>, <return_code>)
for the execution of the command.
You may pass strings from the global namespace to the command
line using the `.format()` syntax. for example:
>>> filename = '/does/not/exist'
>>> !ls {{filename}}
ls: cannot access /does/not/exist: No such file or directory
>>> _
CmdExec(out='', err='ls: cannot access /does/not/exist: No such file or directory\n', rc=2)
"""
if cmd:
try:
cmd = cmd.format(**self.locals)
cmd = shlex.split(cmd)
if cmd[0] == 'cd':
os.chdir(os.path.expanduser(os.path.expandvars(' '.join(cmd[1:]) or '${HOME}')))
else:
cmd_exec = namedtuple('CmdExec', ['out', 'err', 'rc'])
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = process.communicate()
rc = process.returncode
print (red(err.decode('utf-8')) if err else green(out.decode('utf-8'), bold=False))
builtins._ = cmd_exec(out, err, rc)
del cmd_exec
except:
self.showtraceback()
else:
if os.getenv('SSH_CONNECTION'):
# I use the bash function similar to the one below in my
# .bashrc to directly open a python prompt on remote
# systems I log on to.
# function rpython { ssh -t $1 -- "python" }
# Unfortunately, suspending this ssh session, does not place me
# in a shell, so I need to create one:
os.system(config['SHELL'])
else:
os.kill(os.getpid(), signal.SIGSTOP)