python类dup2()的实例源码

capture.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def start(self):
        try:
            os.fstat(self._savefd)
        except OSError:
            raise ValueError("saved filedescriptor not valid, "
                "did you call start() twice?")
        if self.targetfd == 0 and not self.tmpfile:
            fd = os.open(devnullpath, os.O_RDONLY)
            os.dup2(fd, 0)
            os.close(fd)
            if hasattr(self, '_oldsys'):
                setattr(sys, patchsysdict[self.targetfd], DontReadFromInput())
        else:
            os.dup2(self.tmpfile.fileno(), self.targetfd)
            if hasattr(self, '_oldsys'):
                setattr(sys, patchsysdict[self.targetfd], self.tmpfile)
_twistd_unix.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def daemonize():
    # See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16
    if os.fork():   # launch child and...
        os._exit(0) # kill off parent
    os.setsid()
    if os.fork():   # launch child and...
        os._exit(0) # kill off parent again.
    os.umask(077)
    null=os.open('/dev/null', os.O_RDWR)
    for i in range(3):
        try:
            os.dup2(null, i)
        except OSError, e:
            if e.errno != errno.EBADF:
                raise
    os.close(null)
popen2.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def __init__(self, cmd, bufsize=-1):
        _cleanup()
        self.cmd = cmd
        p2cread, p2cwrite = os.pipe()
        c2pread, c2pwrite = os.pipe()
        self.pid = os.fork()
        if self.pid == 0:
            # Child
            os.dup2(p2cread, 0)
            os.dup2(c2pwrite, 1)
            os.dup2(c2pwrite, 2)
            self._run_child(cmd)
        os.close(p2cread)
        self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
        os.close(c2pwrite)
        self.fromchild = os.fdopen(c2pread, 'r', bufsize)
basetest.py 文件源码 项目:protoc-gen-lua-bin 作者: u0u0 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, stream, filename):
    self._stream = stream
    self._fd = stream.fileno()
    self._filename = filename

    # Keep original stream for later
    self._uncaptured_fd = os.dup(self._fd)

    # Open file to save stream to
    cap_fd = os.open(self._filename,
                     os.O_CREAT | os.O_TRUNC | os.O_WRONLY,
                     0600)

    # Send stream to this file
    self._stream.flush()
    os.dup2(cap_fd, self._fd)
    os.close(cap_fd)
basetest.py 文件源码 项目:protoc-gen-lua-bin 作者: u0u0 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, stream, filename):
    self._stream = stream
    self._fd = stream.fileno()
    self._filename = filename

    # Keep original stream for later
    self._uncaptured_fd = os.dup(self._fd)

    # Open file to save stream to
    cap_fd = os.open(self._filename,
                     os.O_CREAT | os.O_TRUNC | os.O_WRONLY,
                     0600)

    # Send stream to this file
    self._stream.flush()
    os.dup2(cap_fd, self._fd)
    os.close(cap_fd)
daemon.py 文件源码 项目:IotCenter 作者: panjanek 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def daemonize(self):
        try:
            pid = os.fork()
            if pid > 0:
                sys.exit(0)
        except OSError, e:
            sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
            sys.exit(1)

        os.chdir("/")
        os.setsid()
        os.umask(0)

        try:
            pid = os.fork()
            if pid > 0:
                sys.exit(0)
        except OSError, e:
            sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
            sys.exit(1)

        sys.stdout.flush()
        sys.stderr.flush()
        si = file(self.stdin, 'r')
        so = file(self.stdout, 'a+')
        se = file(self.stderr, 'a+', 0)
        os.dup2(si.fileno(), sys.stdin.fileno())
        os.dup2(so.fileno(), sys.stdout.fileno())
        os.dup2(se.fileno(), sys.stderr.fileno())

        atexit.register(self.delpid)
        pid = str(os.getpid())
        file(self.pidfile,'w+').write("%s\n" % pid)
daemon.py 文件源码 项目:amadash 作者: ipartola 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def daemonize(double_fork=True):
    '''Puts process in the background using usual UNIX best practices.'''

    try:
        os.umask(0o22)
    except Exception as e:
        raise Exception("Unable to change file creation mask: %s" % e)

    os.chdir('/')

    # First fork
    if double_fork:
        try:
            pid = os.fork()
            if pid > 0:
                os._exit(0)
        except OSError as e:
            raise Exception("Error on first fork: [%d] %s" % (e.errno, e.strerr,))

    os.setsid()

    # Second fork
    try:
        pid = os.fork()
        if pid > 0:
            os._exit(0)
    except OSError as e:
        raise Exception("Error on second fork: [%d] %s" % (e.errno, e.strerr,))

    close_open_files()
    os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdin.fileno())
    os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdout.fileno())
    os.dup2(os.open(os.devnull, os.O_RDWR), sys.stderr.fileno())
fd_redirector.py 文件源码 项目:human-rl 作者: gsastry 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def start(self):
        """ Setup the redirection.
        """
        if not self.started:
            self.oldhandle = os.dup(self.fd)
            self.piper, self.pipew = os.pipe()
            os.dup2(self.pipew, self.fd)
            os.close(self.pipew)

            self.started = True
fd_redirector.py 文件源码 项目:human-rl 作者: gsastry 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def stop(self):
        """ Unset the redirection and return the captured output.
        """
        if self.started:
            self.flush()
            os.dup2(self.oldhandle, self.fd)
            os.close(self.oldhandle)
            f = os.fdopen(self.piper, 'r')
            output = f.read()
            f.close()

            self.started = False
            return output
        else:
            return ''
fd_redirector.py 文件源码 项目:human-rl 作者: gsastry 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def start(self):
        """ Setup the redirection.
        """
        if not self.started:
            self.oldhandle = os.dup(self.fd)
            self.piper, self.pipew = os.pipe()
            os.dup2(self.pipew, self.fd)
            os.close(self.pipew)

            self.started = True
fd_redirector.py 文件源码 项目:human-rl 作者: gsastry 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def stop(self):
        """ Unset the redirection and return the captured output.
        """
        if self.started:
            self.flush()
            os.dup2(self.oldhandle, self.fd)
            os.close(self.oldhandle)
            f = os.fdopen(self.piper, 'r')
            output = f.read()
            f.close()

            self.started = False
            return output
        else:
            return ''
popen2.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def __init__(self, cmd, capturestderr=False, bufsize=-1):
        """The parameter 'cmd' is the shell command to execute in a
        sub-process.  On UNIX, 'cmd' may be a sequence, in which case arguments
        will be passed directly to the program without shell intervention (as
        with os.spawnv()).  If 'cmd' is a string it will be passed to the shell
        (as with os.system()).   The 'capturestderr' flag, if true, specifies
        that the object should capture standard error output of the child
        process.  The default is false.  If the 'bufsize' parameter is
        specified, it specifies the size of the I/O buffers to/from the child
        process."""
        _cleanup()
        self.cmd = cmd
        p2cread, p2cwrite = os.pipe()
        c2pread, c2pwrite = os.pipe()
        if capturestderr:
            errout, errin = os.pipe()
        self.pid = os.fork()
        if self.pid == 0:
            # Child
            os.dup2(p2cread, 0)
            os.dup2(c2pwrite, 1)
            if capturestderr:
                os.dup2(errin, 2)
            self._run_child(cmd)
        os.close(p2cread)
        self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
        os.close(c2pwrite)
        self.fromchild = os.fdopen(c2pread, 'r', bufsize)
        if capturestderr:
            os.close(errin)
            self.childerr = os.fdopen(errout, 'r', bufsize)
        else:
            self.childerr = None
daemon.py 文件源码 项目:shadowsocksR-b 作者: hao35954514 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def freopen(f, mode, stream):
    oldf = open(f, mode)
    oldfd = oldf.fileno()
    newfd = stream.fileno()
    os.close(newfd)
    os.dup2(oldfd, newfd)
socket_server.py 文件源码 项目:salt-toaster 作者: openSUSE 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def daemonize(stdin="/dev/null", stdout="/dev/null", stderr="/dev/null"):
    '''
    Daemonize current script
    '''
    try: 
        pid = os.fork() 
        if pid > 0:
            sys.exit(0)
    except OSError, e: 
        sys.stderr.write ("fork #1 failed: (%d) %s\n" % (e.errno, e.strerror) )
        sys.exit(1)

    os.chdir("/") 
    os.umask(0) 
    os.setsid() 

    try: 
        pid = os.fork() 
        if pid > 0:
            sys.exit(0)
    except OSError, e: 
        sys.stderr.write ("fork #2 failed: (%d) %s\n" % (e.errno, e.strerror) )
        sys.exit(1)

    stdin_par = os.path.dirname(stdin)
    stdout_par = os.path.dirname(stdout)
    stderr_par = os.path.dirname(stderr)
    if not stdin_par:
        os.path.makedirs(stdin_par)
    if not stdout_par:
        os.path.makedirs(stdout_par)
    if not stderr_par:
        os.path.makedirs(stderr_par)

    si = open(stdin, 'r')
    so = open(stdout, 'a+')
    se = open(stderr, 'a+', 0)
    os.dup2(si.fileno(), sys.stdin.fileno())
    os.dup2(so.fileno(), sys.stdout.fileno())
    os.dup2(se.fileno(), sys.stderr.fileno())
gamma.py 文件源码 项目:bosh-azure-template 作者: cf-platform-eng 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def cli(ctx, index):
    # Get settings from CustomScriptForLinux extension configurations
    waagent.LoggerInit('/var/log/waagent.log', '/dev/null')
    ctx.meta["settings"] = settings = get_settings()
    ctx.meta["index-file"] = index

    group = ctx.command
    commands = sorted([c for c in group.commands])

    # start logging to stdout and install.log
    sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)

    username = settings["username"]
    log_file = os.path.join("/home", username, "pcf_install.log")

    tee = Popen(["tee", log_file], stdin=PIPE)
    os.dup2(tee.stdin.fileno(), sys.stdout.fileno())
    os.dup2(tee.stdin.fileno(), sys.stderr.fileno())

    if ctx.invoked_subcommand is None:
        for command in commands:
            command = re.sub('\d{2}_', '', command)
            print "Running {0}...".format(command)
            ctx.invoke(eval(command))
    else:
        pass
daemon.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def redirect(self):
        sys.stdout.flush()
        sys.stderr.flush()
        os.dup2(self.input.fileno(),  sys.stdin.fileno())
        os.dup2(self.output.fileno(), sys.stdout.fileno())
        os.dup2(self.output.fileno(), sys.stderr.fileno())
daemon.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def redirect(self):
        sys.stdout.flush()
        sys.stderr.flush()
        os.dup2(self.input.fileno(),  sys.stdin.fileno())
        os.dup2(self.output.fileno(), sys.stdout.fileno())
        os.dup2(self.output.fileno(), sys.stderr.fileno())
daemon.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def redirect(self):
        sys.stdout.flush()
        sys.stderr.flush()
        os.dup2(self.input.fileno(),  sys.stdin.fileno())
        os.dup2(self.output.fileno(), sys.stdout.fileno())
        os.dup2(self.output.fileno(), sys.stderr.fileno())
daemon.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def redirect(self):
        sys.stdout.flush()
        sys.stderr.flush()

        os.dup2(self.stdin.fileno(),  sys.stdin.fileno())
        os.dup2(self.stdout.fileno(), sys.stdout.fileno())
        os.dup2(self.stderr.fileno(), sys.stderr.fileno())

    # private only
daemon.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def redirect(self):
        sys.stdout.flush()
        sys.stderr.flush()
        os.dup2(self.input.fileno(),  sys.stdin.fileno())
        os.dup2(self.output.fileno(), sys.stdout.fileno())
        os.dup2(self.output.fileno(), sys.stderr.fileno())


问题


面经


文章

微信
公众号

扫码关注公众号