python类F_GETFL的实例源码

twisted_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 52 收藏 0 点赞 0 评论 0
def _set_nonblocking(self, fd):
        flags = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
posix.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _set_nonblocking(fd):
    flags = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
posixfile.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def flags(self, *which):
        import fcntl, os

        if which:
            if len(which) > 1:
                raise TypeError, 'Too many arguments'
            which = which[0]
        else: which = '?'

        l_flags = 0
        if 'n' in which: l_flags = l_flags | os.O_NDELAY
        if 'a' in which: l_flags = l_flags | os.O_APPEND
        if 's' in which: l_flags = l_flags | os.O_SYNC

        file = self._file_

        if '=' not in which:
            cur_fl = fcntl.fcntl(file.fileno(), fcntl.F_GETFL, 0)
            if '!' in which: l_flags = cur_fl & ~ l_flags
            else: l_flags = cur_fl | l_flags

        l_flags = fcntl.fcntl(file.fileno(), fcntl.F_SETFL, l_flags)

        if 'c' in which:
            arg = ('!' not in which)    # 0 is don't, 1 is do close on exec
            l_flags = fcntl.fcntl(file.fileno(), fcntl.F_SETFD, arg)

        if '?' in which:
            which = ''                  # Return current flags
            l_flags = fcntl.fcntl(file.fileno(), fcntl.F_GETFL, 0)
            if os.O_APPEND & l_flags: which = which + 'a'
            if fcntl.fcntl(file.fileno(), fcntl.F_GETFD, 0) & 1:
                which = which + 'c'
            if os.O_NDELAY & l_flags: which = which + 'n'
            if os.O_SYNC & l_flags: which = which + 's'
            return which
asyncore.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, fd, map=None):
            dispatcher.__init__(self, None, map)
            self.connected = True
            try:
                fd = fd.fileno()
            except AttributeError:
                pass
            self.set_file(fd)
            # set it to non-blocking mode
            flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
            flags = flags | os.O_NONBLOCK
            fcntl.fcntl(fd, fcntl.F_SETFL, flags)
unix_events.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _set_nonblocking(fd):
        flags = fcntl.fcntl(fd, fcntl.F_GETFL)
        flags = flags | os.O_NONBLOCK
        fcntl.fcntl(fd, fcntl.F_SETFL, flags)
git_cmd.py 文件源码 项目:scm-workbench 作者: barry-scott 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __setNonBlocking( self, fileobj ):
        fl = fcntl.fcntl( fileobj.fileno(), fcntl.F_GETFL )
        fcntl.fcntl( fileobj.fileno(), fcntl.F_SETFL, fl | os.O_NONBLOCK)
utils.py 文件源码 项目:riko 作者: nerevu 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def make_blocking(f):
    fd = f.fileno()
    flags = fcntl.fcntl(fd, fcntl.F_GETFL)

    if flags & O_NONBLOCK:
        blocking = flags & ~O_NONBLOCK
        fcntl.fcntl(fd, fcntl.F_SETFL, blocking)
ptyshell.py 文件源码 项目:OSPTF 作者: xSploited 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def spawn(self, argv=None, term=None):
        if argv is None:
            if 'SHELL' in os.environ:
                argv = [os.environ['SHELL']]
            elif 'PATH' in os.environ: #searching sh in the path. It can be unusual like /system/bin/sh on android
                for shell in ["bash","sh","ksh","zsh","csh","ash"]:
                    for path in os.environ['PATH'].split(':'):
                        fullpath=os.path.join(path.strip(),shell)
                        if os.path.isfile(fullpath):
                            argv=[fullpath]
                            break
                    if argv:
                        break
        if not argv:
            argv= ['/bin/sh']

        if term is not None:
            os.environ['TERM']=term

        master, slave = pty.openpty()
        self.slave=slave
        self.master = os.fdopen(master, 'rb+wb', 0) # open file in an unbuffered mode
        flags = fcntl.fcntl(self.master, fcntl.F_GETFL)
        assert flags>=0
        flags = fcntl.fcntl(self.master, fcntl.F_SETFL , flags | os.O_NONBLOCK)
        assert flags>=0
        self.prog = subprocess.Popen(
            shell=False,
            args=argv,
            stdin=slave,
            stdout=slave,
            stderr=subprocess.STDOUT,
            preexec_fn=prepare
        )
rotationclient.py 文件源码 项目:adbmirror 作者: fhorinek 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self):
        def setNonBlocking(fd):
            flags = fcntl.fcntl(fd, fcntl.F_GETFL)
            flags = flags | os.O_NONBLOCK
            fcntl.fcntl(fd, fcntl.F_SETFL, flags)

        MyThread.__init__(self)

        cmd = ["adb", "shell", "pm path jp.co.cyberagent.stf.rotationwatcher"]
        out, __ = Popen(cmd, stdout = PIPE).communicate()
        apk_path = out.split(":")[1].split()[0]

        cmd = ["adb", "shell", "CLASSPATH='%s' exec app_process /system/bin jp.co.cyberagent.stf.rotationwatcher.RotationWatcher" % (apk_path)]
        self.app = Popen(cmd, stdout= PIPE)
        setNonBlocking(self.app.stdout)
pipe_non_blocking.py 文件源码 项目:bpy_lambda 作者: bcongdon 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def pipe_non_blocking_set(fd):
        import fcntl
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return True

    # only for compatibility with 'nt' version.
core.py 文件源码 项目:mitogen 作者: dw 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def set_nonblock(fd):
    flags = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
core.py 文件源码 项目:mitogen 作者: dw 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def set_block(fd):
    flags = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
utils.py 文件源码 项目:virt-bootstrap 作者: virt-manager 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def make_async(fd):
    """
    Add the O_NONBLOCK flag to a file descriptor.
    """
    fcntl.fcntl(fd, fcntl.F_SETFL,
                fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)
os.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def make_nonblocking(fd):
        flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
        if not bool(flags & os.O_NONBLOCK):
            fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
            return True
subprocess.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _remove_nonblock_flag(self, fd):
            flags = fcntl.fcntl(fd, fcntl.F_GETFL) & (~os.O_NONBLOCK)
            fcntl.fcntl(fd, fcntl.F_SETFL, flags)
os.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def make_nonblocking(fd):
        flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
        if not bool(flags & os.O_NONBLOCK):
            fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
            return True
subprocess.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _remove_nonblock_flag(self, fd):
            flags = fcntl.fcntl(fd, fcntl.F_GETFL) & (~os.O_NONBLOCK)
            fcntl.fcntl(fd, fcntl.F_SETFL, flags)
util.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def set_non_blocking(fd):
    flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK
    fcntl.fcntl(fd, fcntl.F_SETFL, flags)
pppd.py 文件源码 项目:hologram-python 作者: hologram-io 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def connect(self, timeout = DEFAULT_CONNECT_TIMEOUT):

        self.proc = Popen(self._commands, stdout=PIPE, stderr=STDOUT, universal_newlines=True)

        # set stdout to non-blocking
        fd = self.proc.stdout.fileno()
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

        result = [False]

        self._connectThread = threading.Thread(target = self.connectThreadedFunc,
                                               args = [result])
        self._connectThread.daemon = True
        self._connectThread.start()
        self._connectThread.join(timeout)

        # If thread is still alive, tear down pppd connection and kill the thread.
        if self._connectThread.is_alive():
            self._connectThread.join(1)
            self.proc.send_signal(signal.SIGTERM)
            time.sleep(1)

        return result[0]

    # EFFECTS: Establish a cellular connection. Returns true if successful,
    #          false otherwise.


问题


面经


文章

微信
公众号

扫码关注公众号