python类O_NONBLOCK的实例源码

__init__.py 文件源码 项目:girder_worker 作者: girder 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def open(self):
        self._pipe.open(os.O_RDONLY | os.O_NONBLOCK)
__init__.py 文件源码 项目:girder_worker 作者: girder 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def open(self):
        self._pipe.open(os.O_WRONLY | os.O_NONBLOCK)
twisted_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _set_nonblocking(self, fd):
        flags = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
posix.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _set_nonblocking(fd):
    flags = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
twisted_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _set_nonblocking(self, fd):
        flags = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
posix.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _set_nonblocking(fd):
    flags = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
twisted_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _set_nonblocking(self, fd):
        flags = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
server.py 文件源码 项目:pypilot 作者: pypilot 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, connection):
        connection.setblocking(0)
        #//fcntl.fcntl(connection.fileno(), fcntl.F_SETFD, os.O_NONBLOCK)
        # somehow it's much slower to baseclass ?!?
        #super(LineBufferedNonBlockingSocket, self).__init__(connection.fileno())
        self.b = linebuffer.LineBuffer(connection.fileno())

        self.socket = connection
        self.out_buffer = ''
        self.pollout = select.poll()
        self.pollout.register(connection, select.POLLOUT)
        self.sendfail_msg = self.sendfail_cnt = 0
EventQueueEmptyEventHandler.py 文件源码 项目:py-enarksh 作者: SetBased 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def init(cls):
        """
        Creates a pipe for waking up a select call when a signal has been received.
        """
        cls.__wake_up_pipe = os.pipe()
        fcntl.fcntl(cls.__wake_up_pipe[0], fcntl.F_SETFL, os.O_NONBLOCK)

        signal.set_wakeup_fd(EventQueueEmptyEventHandler.__wake_up_pipe[1])

    # ------------------------------------------------------------------------------------------------------------------
posix.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _set_nonblocking(fd):
    flags = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
fdesc.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def setNonBlocking(fd):
    """Make a fd non-blocking."""
    flags = fcntl.fcntl(fd, FCNTL.F_GETFL)
    flags = flags | os.O_NONBLOCK
    fcntl.fcntl(fd, FCNTL.F_SETFL, flags)
fdesc.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def setBlocking(fd):
    """Make a fd blocking."""
    flags = fcntl.fcntl(fd, FCNTL.F_GETFL)
    flags = flags & ~os.O_NONBLOCK
    fcntl.fcntl(fd, FCNTL.F_SETFL, flags)
asyncore.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def __init__(self, fd, map=None):
            dispatcher.__init__(self, None, map)
            self.connected = True
            try:
                fd = fd.fileno()
            except AttributeError:
                pass
            self.set_file(fd)
            # set it to non-blocking mode
            flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
            flags = flags | os.O_NONBLOCK
            fcntl.fcntl(fd, fcntl.F_SETFL, flags)
serialposix.py 文件源码 项目:android3dblendermouse 作者: sketchpunk 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def open(self):
        """\
        Open port with current settings. This may throw a SerialException
        if the port cannot be opened."""
        if self._port is None:
            raise SerialException("Port must be configured before it can be used.")
        if self.is_open:
            raise SerialException("Port is already open.")
        self.fd = None
        # open
        try:
            self.fd = os.open(self.portstr, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
        except OSError as msg:
            self.fd = None
            raise SerialException(msg.errno, "could not open port %s: %s" % (self._port, msg))
        #~ fcntl.fcntl(self.fd, fcntl.F_SETFL, 0)  # set blocking

        try:
            self._reconfigure_port(force_update=True)
        except:
            try:
                os.close(self.fd)
            except:
                # ignore any exception when closing the port
                # also to keep original exception that happened when setting up
                pass
            self.fd = None
            raise
        else:
            self.is_open = True
        if not self._dsrdtr:
            self._update_dtr_state()
        if not self._rtscts:
            self._update_rts_state()
        self.reset_input_buffer()
serialposix.py 文件源码 项目:android3dblendermouse 作者: sketchpunk 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def nonblocking(self):
        """internal - not portable!"""
        if not self.is_open:
            raise portNotOpenError
        fcntl.fcntl(self.fd, fcntl.F_SETFL, os.O_NONBLOCK)
serialposix.py 文件源码 项目:android3dblendermouse 作者: sketchpunk 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _reconfigure_port(self, force_update=True):
        """Set communication parameters on opened port."""
        super(VTIMESerial, self)._reconfigure_port()
        fcntl.fcntl(self.fd, fcntl.F_SETFL, 0) # clear O_NONBLOCK

        if self._inter_byte_timeout is not None:
            vmin = 1
            vtime = int(self._inter_byte_timeout * 10)
        else:
            vmin = 0
            vtime = int(self._timeout * 10)
        try:
            orig_attr = termios.tcgetattr(self.fd)
            iflag, oflag, cflag, lflag, ispeed, ospeed, cc = orig_attr
        except termios.error as msg:      # if a port is nonexistent but has a /dev file, it'll fail here
            raise serial.SerialException("Could not configure port: %s" % msg)

        if vtime < 0 or vtime > 255:
            raise ValueError('Invalid vtime: %r' % vtime)
        cc[termios.VTIME] = vtime
        cc[termios.VMIN] = vmin

        termios.tcsetattr(
                self.fd,
                termios.TCSANOW,
                [iflag, oflag, cflag, lflag, ispeed, ospeed, cc])
unix_events.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _set_nonblocking(fd):
        flags = fcntl.fcntl(fd, fcntl.F_GETFL)
        flags = flags | os.O_NONBLOCK
        fcntl.fcntl(fd, fcntl.F_SETFL, flags)
git_cmd.py 文件源码 项目:scm-workbench 作者: barry-scott 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __setNonBlocking( self, fileobj ):
        fl = fcntl.fcntl( fileobj.fileno(), fcntl.F_GETFL )
        fcntl.fcntl( fileobj.fileno(), fcntl.F_SETFL, fl | os.O_NONBLOCK)
utils.py 文件源码 项目:riko 作者: nerevu 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def make_blocking(f):
    fd = f.fileno()
    flags = fcntl.fcntl(fd, fcntl.F_GETFL)

    if flags & O_NONBLOCK:
        blocking = flags & ~O_NONBLOCK
        fcntl.fcntl(fd, fcntl.F_SETFL, blocking)


问题


面经


文章

微信
公众号

扫码关注公众号