def test_and(self):
code = "(and (do (print \"a\") 1) (do (print \"b\") 0) (do (print \"c\") 1))"
tree = self.parser.parse_line(code)
io = StringIO()
sys.stdout = io
result = self.runner.evaluate(tree[0])
sys.stdout = sys.__stdout__
self.assertEqual(result, 0)
self.assertEqual(io.getvalue(), "a\nb\n")
code = "(and True False True)"
tree = self.parser.parse_line(code)
result = self.runner.evaluate(tree[0])
self.assertEqual(result, False)
code = "(and True 1 4)"
tree = self.parser.parse_line(code)
result = self.runner.evaluate(tree[0])
self.assertEqual(result, 4)
python类__stdout__()的实例源码
def _check_docs(self, module):
if self._skip:
# Printing this directly to __stdout__ so that it doesn't get
# captured by nose.
print("Warning: Skipping doctests for %s because "
"pdbpp is installed." % module.__name__, file=sys.__stdout__)
return
try:
doctest.testmod(
module,
verbose=True,
raise_on_error=True,
optionflags=self.flags,
)
except doctest.UnexpectedException as e:
raise e.exc_info[1]
except doctest.DocTestFailure as e:
print("Got:")
print(e.got)
raise
def configure_logging(debug):
'''Sets the data kennel logger to appropriate levels of chattiness.'''
default_logger = logging.getLogger('')
datadog_logger = logging.getLogger('datadog.api')
requests_logger = logging.getLogger('requests')
if debug:
default_logger.setLevel(logging.DEBUG)
datadog_logger.setLevel(logging.INFO)
requests_logger.setLevel(logging.INFO)
else:
default_logger.setLevel(logging.INFO)
datadog_logger.setLevel(logging.WARNING)
requests_logger.setLevel(logging.WARNING)
stream_handler = logging.StreamHandler(sys.__stdout__)
stream_handler.setLevel(logging.DEBUG)
stream_handler.setFormatter(logging.Formatter('%(asctime)s %(name)s %(levelname)s %(message)s'))
default_logger.addHandler(stream_handler)
def doItConsolicious(opt):
# reclaim stdout/stderr from log
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
if opt['zipfile']:
print 'Unpacking documentation...'
for n in zipstream.unzipIter(opt['zipfile'], opt['ziptargetdir']):
if n % 100 == 0:
print n,
if n % 1000 == 0:
print
print 'Done unpacking.'
if opt['compiledir']:
print 'Compiling to pyc...'
import compileall
compileall.compile_dir(opt["compiledir"])
print 'Done compiling.'
def test_main_function(self):
# List files
file_list = [os.path.join(test_fold, f) for f in self._input_files]
# Output to string
with io.StringIO() as io_stream:
sys.stdout = io_stream
hpp2plantuml.CreatePlantUMLFile(file_list)
io_stream.seek(0)
# Read string output, exclude final line return
output_str = io_stream.read()[:-1]
sys.stdout = sys.__stdout__
nt.assert_equal(self._diag_saved_ref, output_str)
# Output to file
output_fname = 'output.puml'
hpp2plantuml.CreatePlantUMLFile(file_list, output_fname)
output_fcontent = ''
with open(output_fname, 'rt') as fid:
output_fcontent = fid.read()
nt.assert_equal(self._diag_saved_ref, output_fcontent)
os.unlink(output_fname)
def get_terminal_size(fallback=(80, 24)):
"""
Return tuple containing columns and rows of controlling terminal, trying harder
than shutil.get_terminal_size to find a tty before returning fallback.
Theoretically, stdout, stderr, and stdin could all be different ttys that could
cause us to get the wrong measurements (instead of using the fallback) but the much more
common case is that IO is piped.
"""
for stream in [sys.__stdout__, sys.__stderr__, sys.__stdin__]:
try:
# Make WINSIZE call to terminal
data = fcntl.ioctl(stream.fileno(), TIOCGWINSZ, b"\x00\x00\00\x00")
except (IOError, OSError):
pass
else:
# Unpack two shorts from ioctl call
lines, columns = struct.unpack("hh", data)
break
else:
columns, lines = fallback
return columns, lines
def become_daemon(self, root_dir='/'):
if os.fork() != 0: # launch child and ...
os._exit(0) # kill off parent
os.setsid()
os.chdir(root_dir)
os.umask(0)
if os.fork() != 0: # fork again so we are not a session leader
os._exit(0)
sys.stdin.close()
sys.__stdin__ = sys.stdin
sys.stdout.close()
sys.stdout = sys.__stdout__ = _NullDevice()
sys.stderr.close()
sys.stderr = sys.__stderr__ = _NullDevice()
for fd in range(1024):
try:
os.close(fd)
except OSError:
pass
def find_pep8_errors(cls, filename=None, lines=None):
try:
sys.stdout = cStringIO.StringIO()
config = {}
# Ignore long lines on test files, as the test names can get long
# when following our test naming standards.
if cls._is_test(filename):
config['ignore'] = ['E501']
checker = pep8.Checker(filename=filename, lines=lines,
**config)
checker.check_all()
output = sys.stdout.getvalue()
finally:
sys.stdout = sys.__stdout__
errors = []
for line in output.split('\n'):
parts = line.split(' ', 2)
if len(parts) == 3:
location, error, desc = parts
line_no = location.split(':')[1]
errors.append('%s ln:%s %s' % (error, line_no, desc))
return errors
def emit_func(func, o=sys.__stdout__, d = init()):
"""Emits all items in the data store in a format such that it can be sourced by a shell."""
keys = (key for key in d.keys() if not key.startswith("__") and not d.getVarFlag(key, "func", False))
for key in keys:
emit_var(key, o, d, False)
o.write('\n')
emit_var(func, o, d, False) and o.write('\n')
newdeps = bb.codeparser.ShellParser(func, logger).parse_shell(d.getVar(func, True))
newdeps |= set((d.getVarFlag(func, "vardeps", True) or "").split())
seen = set()
while newdeps:
deps = newdeps
seen |= deps
newdeps = set()
for dep in deps:
if d.getVarFlag(dep, "func", False) and not d.getVarFlag(dep, "python", False):
emit_var(dep, o, d, False) and o.write('\n')
newdeps |= bb.codeparser.ShellParser(dep, logger).parse_shell(d.getVar(dep, True))
newdeps |= set((d.getVarFlag(dep, "vardeps", True) or "").split())
newdeps -= seen
def pkt_verify(parent, rcv_pkt, exp_pkt):
if str(exp_pkt) != str(rcv_pkt):
logging.error("ERROR: Packet match failed.")
logging.debug("Expected (" + str(len(exp_pkt)) + ")")
logging.debug(str(exp_pkt).encode('hex'))
sys.stdout = tmpout = StringIO()
exp_pkt.show()
sys.stdout = sys.__stdout__
logging.debug(tmpout.getvalue())
logging.debug("Received (" + str(len(rcv_pkt)) + ")")
logging.debug(str(rcv_pkt).encode('hex'))
sys.stdout = tmpout = StringIO()
Ether(rcv_pkt).show()
sys.stdout = sys.__stdout__
logging.debug(tmpout.getvalue())
parent.assertEqual(str(exp_pkt), str(rcv_pkt),
"Packet match error")
return rcv_pkt
def quit(self, *args):
# restore stdout before signaling the run thread to exit!
# it can get stuck in trying to dump to the redirected text message area
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
try:
self.display.quit()
except:
pass
self.root.destroy()
print "* All done!"
# sys.exit() # force quit!
return
# ----------------------------------------------------------
def doItConsolicious(opt):
# reclaim stdout/stderr from log
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
if opt['zipfile']:
print 'Unpacking documentation...'
for n in zipstream.unzipIter(opt['zipfile'], opt['ziptargetdir']):
if n % 100 == 0:
print n,
if n % 1000 == 0:
print
print 'Done unpacking.'
if opt['compiledir']:
print 'Compiling to pyc...'
import compileall
compileall.compile_dir(opt["compiledir"])
print 'Done compiling.'
def shutdown(self, c):
'''
Shutdown this process
'''
try:
try:
util.debug('manager received shutdown message')
c.send(('#RETURN', None))
if sys.stdout != sys.__stdout__:
util.debug('resetting stdout, stderr')
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
util._run_finalizers(0)
for p in active_children():
util.debug('terminating a child process of manager')
p.terminate()
for p in active_children():
util.debug('terminating a child process of manager')
p.join()
util._run_finalizers()
util.info('manager exiting with exitcode 0')
except:
import traceback
traceback.print_exc()
finally:
exit(0)
def __dir__(self):
return dir(sys.__stdout__)
def __getattribute__(self, name):
if name == '__members__':
return dir(sys.__stdout__)
try:
stream = _local.stream
except AttributeError:
stream = sys.__stdout__
return getattr(stream, name)
def __repr__(self):
return repr(sys.__stdout__)
# add the threaded stream as display hook
operators.py 文件源码
项目:Blender-WMO-import-export-scripts
作者: WowDevTools
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def execute(self, context):
if not hasattr(bpy, "wow_game_data"):
print("\n\n### Loading game data ###")
bpy.ops.scene.load_wow_filesystem()
game_data = bpy.wow_game_data
for ob in bpy.context.selected_objects:
mesh = ob.data
for i in range(len(mesh.materials)):
if mesh.materials[i].active_texture is not None \
and not mesh.materials[i].WowMaterial.Texture1 \
and mesh.materials[i].active_texture.type == 'IMAGE' \
and mesh.materials[i].active_texture.image is not None:
path = (os.path.splitext(bpy.path.abspath(mesh.materials[i].active_texture.image.filepath))[0] + ".blp", "")
rest_path = ""
while True:
path = os.path.split(path[0])
if not path[1]:
print("\nTexture <<{}>> not found.".format(mesh.materials[i].active_texture.image.filepath))
break
rest_path = os.path.join(path[1], rest_path)
rest_path = rest_path[:-1] if rest_path.endswith("\\") else rest_path
sys.stdout = open(os.devnull, 'w')
if game_data.read_file(rest_path):
mesh.materials[i].WowMaterial.Texture1 = rest_path
break
sys.stdout = sys.__stdout__
self.report({'INFO'}, "Done filling texture paths")
return {'FINISHED'}
def __init__(self, log, prefix = ""):
self.console = sys.__stdout__
self.log = log
self.prefix = prefix
def init(ctx):
global LOGFILE
filename = os.path.abspath(LOGFILE)
try:
os.makedirs(os.path.dirname(os.path.abspath(filename)))
except OSError:
pass
if hasattr(os, 'O_NOINHERIT'):
fd = os.open(LOGFILE, os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT)
fileobj = os.fdopen(fd, 'w')
else:
fileobj = open(LOGFILE, 'w')
old_stderr = sys.stderr
# sys.stdout has already been replaced, so __stdout__ will be faster
#sys.stdout = log_to_file(sys.stdout, fileobj, filename)
#sys.stderr = log_to_file(sys.stderr, fileobj, filename)
def wrap(stream):
if stream.isatty():
return ansiterm.AnsiTerm(stream)
return stream
sys.stdout = log_to_file(wrap(sys.__stdout__), fileobj, filename)
sys.stderr = log_to_file(wrap(sys.__stderr__), fileobj, filename)
# now mess with the logging module...
for x in Logs.log.handlers:
try:
stream = x.stream
except AttributeError:
pass
else:
if id(stream) == id(old_stderr):
x.stream = sys.stderr
def init(ctx):
global LOGFILE
filename = os.path.abspath(LOGFILE)
try:
os.makedirs(os.path.dirname(os.path.abspath(filename)))
except OSError:
pass
if hasattr(os, 'O_NOINHERIT'):
fd = os.open(LOGFILE, os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT)
fileobj = os.fdopen(fd, 'w')
else:
fileobj = open(LOGFILE, 'w')
old_stderr = sys.stderr
# sys.stdout has already been replaced, so __stdout__ will be faster
#sys.stdout = log_to_file(sys.stdout, fileobj, filename)
#sys.stderr = log_to_file(sys.stderr, fileobj, filename)
def wrap(stream):
if stream.isatty():
return ansiterm.AnsiTerm(stream)
return stream
sys.stdout = log_to_file(wrap(sys.__stdout__), fileobj, filename)
sys.stderr = log_to_file(wrap(sys.__stderr__), fileobj, filename)
# now mess with the logging module...
for x in Logs.log.handlers:
try:
stream = x.stream
except AttributeError:
pass
else:
if id(stream) == id(old_stderr):
x.stream = sys.stderr
def init(ctx):
global LOGFILE
filename = os.path.abspath(LOGFILE)
try:
os.makedirs(os.path.dirname(os.path.abspath(filename)))
except OSError:
pass
if hasattr(os, 'O_NOINHERIT'):
fd = os.open(LOGFILE, os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT)
fileobj = os.fdopen(fd, 'w')
else:
fileobj = open(LOGFILE, 'w')
old_stderr = sys.stderr
# sys.stdout has already been replaced, so __stdout__ will be faster
#sys.stdout = log_to_file(sys.stdout, fileobj, filename)
#sys.stderr = log_to_file(sys.stderr, fileobj, filename)
sys.stdout = log_to_file(sys.__stdout__, fileobj, filename)
sys.stderr = log_to_file(sys.__stderr__, fileobj, filename)
# now mess with the logging module...
for x in Logs.log.handlers:
try:
stream = x.stream
except AttributeError:
pass
else:
if id(stream) == id(old_stderr):
x.stream = sys.stderr
def __dir__(self):
return dir(sys.__stdout__)
def __getattribute__(self, name):
if name == '__members__':
return dir(sys.__stdout__)
try:
stream = _local.stream
except AttributeError:
stream = sys.__stdout__
return getattr(stream, name)
def __repr__(self):
return repr(sys.__stdout__)
# add the threaded stream as display hook
def handle(self, *args, **options):
if (options['reset_db']):
call_command('cleardatabase')
print("Grammar objects initially in database: {}".format(Grammar.objects.count()))
#Number of randomly generated grammars
num = options['num']
#Number variables this run will include.
#For example [2,3] will run the script to generate
#grammars with 2 variables and 3 variables
nVariables = [2, 3]
nonTerminals = ['A','B','C','D']
terminals = ['x','y','z','w']
if options['silent']:
sys.stdout = open(os.devnull, "w")
for n in nVariables:
start_time = time.time()
mg = MassGrammarGenerator.MassGrammarGenerator(n)
mg.run(num,nonTerminals[:n],terminals)
print("{}Variables: {} seconds---".format(n,(time.time() - start_time)))
if options['silent']:
sys.stdout = sys.__stdout__
print("Grammar objects finally in database: {}".format(Grammar.objects.count()))
def __dir__(self):
return dir(sys.__stdout__)
def __getattribute__(self, name):
if name == '__members__':
return dir(sys.__stdout__)
try:
stream = _local.stream
except AttributeError:
stream = sys.__stdout__
return getattr(stream, name)
def __repr__(self):
return repr(sys.__stdout__)
# add the threaded stream as display hook
def quiet():
# save stdout/stderr
# Jupyter doesn't support setting it back to
# sys.__stdout__ and sys.__stderr__
_sys_stdout = sys.stdout
_sys_stderr = sys.stderr
# Divert stdout and stderr to devnull
sys.stdout = sys.stderr = open(os.devnull, "w")
try:
yield
finally:
# Revert back to standard stdout/stderr
sys.stdout = _sys_stdout
sys.stderr = _sys_stderr
def _finalize(self):
sys.stdout = sys.__stdout__
logging.root.removeHandler(self._log_handler)
self._log_handler.close()
logger.info('Jupyter window finalized successfully')