python类__stdout__()的实例源码

test_unit.py 文件源码 项目:clopure 作者: vbkaisetsu 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_and(self):
        code = "(and (do (print \"a\") 1) (do (print \"b\") 0) (do (print \"c\") 1))"
        tree = self.parser.parse_line(code)
        io = StringIO()
        sys.stdout = io
        result = self.runner.evaluate(tree[0])
        sys.stdout = sys.__stdout__
        self.assertEqual(result, 0)
        self.assertEqual(io.getvalue(), "a\nb\n")
        code = "(and True False True)"
        tree = self.parser.parse_line(code)
        result = self.runner.evaluate(tree[0])
        self.assertEqual(result, False)
        code = "(and True 1 4)"
        tree = self.parser.parse_line(code)
        result = self.runner.evaluate(tree[0])
        self.assertEqual(result, 4)
test_doctests.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _check_docs(self, module):
        if self._skip:
            # Printing this directly to __stdout__ so that it doesn't get
            # captured by nose.
            print("Warning: Skipping doctests for %s because "
                  "pdbpp is installed." % module.__name__, file=sys.__stdout__)
            return
        try:
            doctest.testmod(
                module,
                verbose=True,
                raise_on_error=True,
                optionflags=self.flags,
            )
        except doctest.UnexpectedException as e:
            raise e.exc_info[1]
        except doctest.DocTestFailure as e:
            print("Got:")
            print(e.got)
            raise
util.py 文件源码 项目:data_kennel 作者: amplify-education 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def configure_logging(debug):
    '''Sets the data kennel logger to appropriate levels of chattiness.'''
    default_logger = logging.getLogger('')
    datadog_logger = logging.getLogger('datadog.api')
    requests_logger = logging.getLogger('requests')
    if debug:
        default_logger.setLevel(logging.DEBUG)
        datadog_logger.setLevel(logging.INFO)
        requests_logger.setLevel(logging.INFO)
    else:
        default_logger.setLevel(logging.INFO)
        datadog_logger.setLevel(logging.WARNING)
        requests_logger.setLevel(logging.WARNING)

    stream_handler = logging.StreamHandler(sys.__stdout__)
    stream_handler.setLevel(logging.DEBUG)
    stream_handler.setFormatter(logging.Formatter('%(asctime)s %(name)s %(levelname)s %(message)s'))
    default_logger.addHandler(stream_handler)
tkunzip.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def doItConsolicious(opt):
    # reclaim stdout/stderr from log
    sys.stdout = sys.__stdout__
    sys.stderr = sys.__stderr__
    if opt['zipfile']:
        print 'Unpacking documentation...'
        for n in zipstream.unzipIter(opt['zipfile'], opt['ziptargetdir']):
            if n % 100 == 0:
                print n,
            if n % 1000 == 0:
                print
        print 'Done unpacking.'

    if opt['compiledir']:
        print 'Compiling to pyc...'
        import compileall
        compileall.compile_dir(opt["compiledir"])
        print 'Done compiling.'
test_hpp2plantuml.py 文件源码 项目:hpp2plantuml 作者: thibaultmarin 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_main_function(self):

        # List files
        file_list = [os.path.join(test_fold, f) for f in self._input_files]

        # Output to string
        with io.StringIO() as io_stream:
            sys.stdout = io_stream
            hpp2plantuml.CreatePlantUMLFile(file_list)
            io_stream.seek(0)
            # Read string output, exclude final line return
            output_str = io_stream.read()[:-1]
        sys.stdout = sys.__stdout__
        nt.assert_equal(self._diag_saved_ref, output_str)

        # Output to file
        output_fname = 'output.puml'
        hpp2plantuml.CreatePlantUMLFile(file_list, output_fname)
        output_fcontent = ''
        with open(output_fname, 'rt') as fid:
            output_fcontent = fid.read()
        nt.assert_equal(self._diag_saved_ref, output_fcontent)
        os.unlink(output_fname)
style50.py 文件源码 项目:style50 作者: cs50 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_terminal_size(fallback=(80, 24)):
    """
    Return tuple containing columns and rows of controlling terminal, trying harder
    than shutil.get_terminal_size to find a tty before returning fallback.

    Theoretically, stdout, stderr, and stdin could all be different ttys that could
    cause us to get the wrong measurements (instead of using the fallback) but the much more
    common case is that IO is piped.
    """
    for stream in [sys.__stdout__, sys.__stderr__, sys.__stdin__]:
        try:
            # Make WINSIZE call to terminal
            data = fcntl.ioctl(stream.fileno(), TIOCGWINSZ, b"\x00\x00\00\x00")
        except (IOError, OSError):
            pass
        else:
            # Unpack two shorts from ioctl call
            lines, columns = struct.unpack("hh", data)
            break
    else:
        columns, lines = fallback

    return columns, lines
daemonizer.py 文件源码 项目:SameKeyProxy 作者: xzhou 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def become_daemon(self, root_dir='/'):
        if os.fork() != 0:  # launch child and ...
            os._exit(0)  # kill off parent
        os.setsid()
        os.chdir(root_dir)
        os.umask(0)
        if os.fork() != 0: # fork again so we are not a session leader
            os._exit(0)
        sys.stdin.close()
        sys.__stdin__ = sys.stdin
        sys.stdout.close()
        sys.stdout = sys.__stdout__ = _NullDevice()
        sys.stderr.close()
        sys.stderr = sys.__stderr__ = _NullDevice()
        for fd in range(1024):
            try:
                os.close(fd)
            except OSError:
                pass
test_coding_standards.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def find_pep8_errors(cls, filename=None, lines=None):
        try:
            sys.stdout = cStringIO.StringIO()
            config = {}

            # Ignore long lines on test files, as the test names can get long
            # when following our test naming standards.
            if cls._is_test(filename):
                config['ignore'] = ['E501']

            checker = pep8.Checker(filename=filename, lines=lines,
                                   **config)
            checker.check_all()
            output = sys.stdout.getvalue()
        finally:
            sys.stdout = sys.__stdout__

        errors = []
        for line in output.split('\n'):
            parts = line.split(' ', 2)
            if len(parts) == 3:
                location, error, desc = parts
                line_no = location.split(':')[1]
                errors.append('%s ln:%s %s' % (error, line_no, desc))
        return errors
data.py 文件源码 项目:isar 作者: ilbers 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def emit_func(func, o=sys.__stdout__, d = init()):
    """Emits all items in the data store in a format such that it can be sourced by a shell."""

    keys = (key for key in d.keys() if not key.startswith("__") and not d.getVarFlag(key, "func", False))
    for key in keys:
        emit_var(key, o, d, False)

    o.write('\n')
    emit_var(func, o, d, False) and o.write('\n')
    newdeps = bb.codeparser.ShellParser(func, logger).parse_shell(d.getVar(func, True))
    newdeps |= set((d.getVarFlag(func, "vardeps", True) or "").split())
    seen = set()
    while newdeps:
        deps = newdeps
        seen |= deps
        newdeps = set()
        for dep in deps:
            if d.getVarFlag(dep, "func", False) and not d.getVarFlag(dep, "python", False):
               emit_var(dep, o, d, False) and o.write('\n')
               newdeps |=  bb.codeparser.ShellParser(dep, logger).parse_shell(d.getVar(dep, True))
               newdeps |= set((d.getVarFlag(dep, "vardeps", True) or "").split())
        newdeps -= seen
testutils.py 文件源码 项目:fabric-oftest 作者: opencord 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def pkt_verify(parent, rcv_pkt, exp_pkt):
    if str(exp_pkt) != str(rcv_pkt):
        logging.error("ERROR: Packet match failed.")
        logging.debug("Expected (" + str(len(exp_pkt)) + ")")
        logging.debug(str(exp_pkt).encode('hex'))
        sys.stdout = tmpout = StringIO()
        exp_pkt.show()
        sys.stdout = sys.__stdout__
        logging.debug(tmpout.getvalue())
        logging.debug("Received (" + str(len(rcv_pkt)) + ")")
        logging.debug(str(rcv_pkt).encode('hex'))
        sys.stdout = tmpout = StringIO()
        Ether(rcv_pkt).show()
        sys.stdout = sys.__stdout__
        logging.debug(tmpout.getvalue())
    parent.assertEqual(str(exp_pkt), str(rcv_pkt),
                       "Packet match error")

    return rcv_pkt
pmfiberbiref.py 文件源码 项目:pm-fiber-birefringence 作者: sid5432 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def quit(self, *args):
        # restore stdout before signaling the run thread to exit!
        # it can get stuck in trying to dump to the redirected text message area
        sys.stdout = sys.__stdout__
        sys.stderr = sys.__stderr__

        try:
            self.display.quit()
        except:
            pass

        self.root.destroy()
        print "* All done!"
        # sys.exit() # force quit!

        return

    # ----------------------------------------------------------
tkunzip.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def doItConsolicious(opt):
    # reclaim stdout/stderr from log
    sys.stdout = sys.__stdout__
    sys.stderr = sys.__stderr__
    if opt['zipfile']:
        print 'Unpacking documentation...'
        for n in zipstream.unzipIter(opt['zipfile'], opt['ziptargetdir']):
            if n % 100 == 0:
                print n,
            if n % 1000 == 0:
                print
        print 'Done unpacking.'

    if opt['compiledir']:
        print 'Compiling to pyc...'
        import compileall
        compileall.compile_dir(opt["compiledir"])
        print 'Done compiling.'
managers.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def shutdown(self, c):
        '''
        Shutdown this process
        '''
        try:
            try:
                util.debug('manager received shutdown message')
                c.send(('#RETURN', None))

                if sys.stdout != sys.__stdout__:
                    util.debug('resetting stdout, stderr')
                    sys.stdout = sys.__stdout__
                    sys.stderr = sys.__stderr__

                util._run_finalizers(0)

                for p in active_children():
                    util.debug('terminating a child process of manager')
                    p.terminate()

                for p in active_children():
                    util.debug('terminating a child process of manager')
                    p.join()

                util._run_finalizers()
                util.info('manager exiting with exitcode 0')
            except:
                import traceback
                traceback.print_exc()
        finally:
            exit(0)
console.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __dir__(self):
        return dir(sys.__stdout__)
console.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __getattribute__(self, name):
        if name == '__members__':
            return dir(sys.__stdout__)
        try:
            stream = _local.stream
        except AttributeError:
            stream = sys.__stdout__
        return getattr(stream, name)
console.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __repr__(self):
        return repr(sys.__stdout__)


# add the threaded stream as display hook
operators.py 文件源码 项目:Blender-WMO-import-export-scripts 作者: WowDevTools 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def execute(self, context):

        if not hasattr(bpy, "wow_game_data"):
            print("\n\n### Loading game data ###")
            bpy.ops.scene.load_wow_filesystem()

        game_data = bpy.wow_game_data

        for ob in bpy.context.selected_objects:
            mesh = ob.data
            for i in range(len(mesh.materials)):
                if mesh.materials[i].active_texture is not None \
                        and not mesh.materials[i].WowMaterial.Texture1 \
                        and mesh.materials[i].active_texture.type == 'IMAGE' \
                        and mesh.materials[i].active_texture.image is not None:
                    path = (os.path.splitext(bpy.path.abspath(mesh.materials[i].active_texture.image.filepath))[0] + ".blp", "")
                    rest_path = ""

                    while True:
                        path = os.path.split(path[0])
                        if not path[1]:
                            print("\nTexture <<{}>> not found.".format(mesh.materials[i].active_texture.image.filepath))
                            break

                        rest_path = os.path.join(path[1], rest_path)
                        rest_path = rest_path[:-1] if rest_path.endswith("\\") else rest_path

                        sys.stdout = open(os.devnull, 'w')

                        if game_data.read_file(rest_path):
                            mesh.materials[i].WowMaterial.Texture1 = rest_path
                            break

                        sys.stdout = sys.__stdout__

            self.report({'INFO'}, "Done filling texture paths")

        return {'FINISHED'}
logging.py 文件源码 项目:Harmonbot 作者: Harmon758 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __init__(self, log, prefix = ""):
        self.console = sys.__stdout__
        self.log = log
        self.prefix = prefix
build_logs.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def init(ctx):
    global LOGFILE
    filename = os.path.abspath(LOGFILE)
    try:
        os.makedirs(os.path.dirname(os.path.abspath(filename)))
    except OSError:
        pass

    if hasattr(os, 'O_NOINHERIT'):
        fd = os.open(LOGFILE, os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT)
        fileobj = os.fdopen(fd, 'w')
    else:
        fileobj = open(LOGFILE, 'w')
    old_stderr = sys.stderr

    # sys.stdout has already been replaced, so __stdout__ will be faster
    #sys.stdout = log_to_file(sys.stdout, fileobj, filename)
    #sys.stderr = log_to_file(sys.stderr, fileobj, filename)
    def wrap(stream):
        if stream.isatty():
            return ansiterm.AnsiTerm(stream)
        return stream
    sys.stdout = log_to_file(wrap(sys.__stdout__), fileobj, filename)
    sys.stderr = log_to_file(wrap(sys.__stderr__), fileobj, filename)

    # now mess with the logging module...
    for x in Logs.log.handlers:
        try:
            stream = x.stream
        except AttributeError:
            pass
        else:
            if id(stream) == id(old_stderr):
                x.stream = sys.stderr
build_logs.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def init(ctx):
    global LOGFILE
    filename = os.path.abspath(LOGFILE)
    try:
        os.makedirs(os.path.dirname(os.path.abspath(filename)))
    except OSError:
        pass

    if hasattr(os, 'O_NOINHERIT'):
        fd = os.open(LOGFILE, os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT)
        fileobj = os.fdopen(fd, 'w')
    else:
        fileobj = open(LOGFILE, 'w')
    old_stderr = sys.stderr

    # sys.stdout has already been replaced, so __stdout__ will be faster
    #sys.stdout = log_to_file(sys.stdout, fileobj, filename)
    #sys.stderr = log_to_file(sys.stderr, fileobj, filename)
    def wrap(stream):
        if stream.isatty():
            return ansiterm.AnsiTerm(stream)
        return stream
    sys.stdout = log_to_file(wrap(sys.__stdout__), fileobj, filename)
    sys.stderr = log_to_file(wrap(sys.__stderr__), fileobj, filename)

    # now mess with the logging module...
    for x in Logs.log.handlers:
        try:
            stream = x.stream
        except AttributeError:
            pass
        else:
            if id(stream) == id(old_stderr):
                x.stream = sys.stderr
build_logs.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def init(ctx):
    global LOGFILE
    filename = os.path.abspath(LOGFILE)
    try:
        os.makedirs(os.path.dirname(os.path.abspath(filename)))
    except OSError:
        pass

    if hasattr(os, 'O_NOINHERIT'):
        fd = os.open(LOGFILE, os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT)
        fileobj = os.fdopen(fd, 'w')
    else:
        fileobj = open(LOGFILE, 'w')
    old_stderr = sys.stderr

    # sys.stdout has already been replaced, so __stdout__ will be faster
    #sys.stdout = log_to_file(sys.stdout, fileobj, filename)
    #sys.stderr = log_to_file(sys.stderr, fileobj, filename)
    sys.stdout = log_to_file(sys.__stdout__, fileobj, filename)
    sys.stderr = log_to_file(sys.__stderr__, fileobj, filename)

    # now mess with the logging module...
    for x in Logs.log.handlers:
        try:
            stream = x.stream
        except AttributeError:
            pass
        else:
            if id(stream) == id(old_stderr):
                x.stream = sys.stderr
console.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __dir__(self):
        return dir(sys.__stdout__)
console.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __getattribute__(self, name):
        if name == '__members__':
            return dir(sys.__stdout__)
        try:
            stream = _local.stream
        except AttributeError:
            stream = sys.__stdout__
        return getattr(stream, name)
console.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __repr__(self):
        return repr(sys.__stdout__)


# add the threaded stream as display hook
populate.py 文件源码 项目:LL1-Academy 作者: H-Huang 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        if (options['reset_db']):
            call_command('cleardatabase')

        print("Grammar objects initially in database: {}".format(Grammar.objects.count()))
        #Number of randomly generated grammars
        num = options['num']

        #Number variables this run will include. 
        #For example [2,3] will run the script to generate
        #grammars with 2 variables and 3 variables
        nVariables = [2, 3]

        nonTerminals = ['A','B','C','D']
        terminals = ['x','y','z','w']



        if options['silent']:
            sys.stdout = open(os.devnull, "w")

        for n in nVariables:
            start_time = time.time()
            mg = MassGrammarGenerator.MassGrammarGenerator(n)
            mg.run(num,nonTerminals[:n],terminals)
            print("{}Variables: {} seconds---".format(n,(time.time() - start_time)))

        if options['silent']:
            sys.stdout = sys.__stdout__

        print("Grammar objects finally in database: {}".format(Grammar.objects.count()))
console.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __dir__(self):
        return dir(sys.__stdout__)
console.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __getattribute__(self, name):
        if name == '__members__':
            return dir(sys.__stdout__)
        try:
            stream = _local.stream
        except AttributeError:
            stream = sys.__stdout__
        return getattr(stream, name)
console.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __repr__(self):
        return repr(sys.__stdout__)


# add the threaded stream as display hook
scriptcwl.py 文件源码 项目:scriptcwl 作者: NLeSC 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def quiet():
    # save stdout/stderr
    # Jupyter doesn't support setting it back to
    # sys.__stdout__ and sys.__stderr__
    _sys_stdout = sys.stdout
    _sys_stderr = sys.stderr
    # Divert stdout and stderr to devnull
    sys.stdout = sys.stderr = open(os.devnull, "w")
    try:
        yield
    finally:
        # Revert back to standard stdout/stderr
        sys.stdout = _sys_stdout
        sys.stderr = _sys_stderr
console.py 文件源码 项目:gui_tool 作者: UAVCAN 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _finalize(self):
        sys.stdout = sys.__stdout__
        logging.root.removeHandler(self._log_handler)
        self._log_handler.close()
        logger.info('Jupyter window finalized successfully')


问题


面经


文章

微信
公众号

扫码关注公众号