python类O_NONBLOCK的实例源码

PTY_handler.py 文件源码 项目:bloodyshell 作者: deadPix3l 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, slave=0, pid=os.getpid()):
        # apparently python GC's modules before class instances so, here
        # we have some hax to ensure we can restore the terminal state.
        self.termios, self.fcntl = termios, fcntl

        # open our controlling PTY
        self.pty  = open(os.readlink("/proc/%d/fd/%d" % (pid, slave)), "rb+")

        # store our old termios settings so we can restore after
        # we are finished 
        self.oldtermios = termios.tcgetattr(self.pty)

        # get the current settings se we can modify them
        newattr = termios.tcgetattr(self.pty)

        # set the terminal to uncanonical mode and turn off
        # input echo.
        newattr[3] &= ~termios.ICANON & ~termios.ECHO

        # don't handle ^C / ^Z / ^\
        newattr[6][termios.VINTR] = '\x00'
        newattr[6][termios.VQUIT] = '\x00'
        newattr[6][termios.VSUSP] = '\x00'

        # set our new attributes
        termios.tcsetattr(self.pty, termios.TCSADRAIN, newattr)

        # store the old fcntl flags
        self.oldflags = fcntl.fcntl(self.pty, fcntl.F_GETFL)
        # fcntl.fcntl(self.pty, fcntl.F_SETFD, fcntl.FD_CLOEXEC)
        # make the PTY non-blocking
        fcntl.fcntl(self.pty, fcntl.F_SETFL, self.oldflags | os.O_NONBLOCK)
stream_test.py 文件源码 项目:girder_worker 作者: girder 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testOutputStreams(self):
        output_spec = {
            'mode': 'http',
            'method': 'PUT',
            'url': 'http://localhost:%d' % _socket_port
        }

        fd = os.open(_pipepath, os.O_RDONLY | os.O_NONBLOCK)
        adapters = {
            fd: make_stream_push_adapter(output_spec)
        }
        cmd = [sys.executable, _oscript, _pipepath]

        try:
            with captureOutput() as stdpipes:
                run_process(cmd, adapters)
        except Exception:
            print('Stdout/stderr from exception: ')
            print(stdpipes)
            raise
        self.assertEqual(stdpipes, ['start\ndone\n', ''])
        self.assertEqual(len(_req_chunks), 1)
        self.assertEqual(_req_chunks[0], (9, 'a message'))
utils.py 文件源码 项目:girder_worker 作者: girder 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _open_ipipes(wds, fifos, input_pipes):
    """
    This will attempt to open the named pipes in the set of ``fifos`` for
    writing, which will only succeed if the subprocess has opened them for
    reading already. This modifies and returns the list of write descriptors,
    the list of waiting fifo names, and the mapping back to input adapters.
    """
    for fifo in fifos.copy():
        try:
            fd = os.open(fifo, os.O_WRONLY | os.O_NONBLOCK)
            input_pipes[fd] = fifos.pop(fifo)
            wds.append(fd)
        except OSError as e:
            if e.errno != errno.ENXIO:
                raise e

    return wds, fifos, input_pipes
tailbot.py 文件源码 项目:abusehelper 作者: Exploit-install 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def follow_file(filename):
    while True:
        try:
            fd = os.open(filename, os.O_RDONLY | os.O_NONBLOCK)
        except OSError:
            yield None
            continue

        try:
            inode = os.fstat(fd).st_ino
            first = True

            while True:
                try:
                    stat = os.stat(filename)
                except OSError:
                    stat = None

                yield first, time.time(), fd
                if stat is None or inode != stat.st_ino:
                    break

                first = False
        finally:
            os.close(fd)
InputDevice.py 文件源码 项目:enigma2 作者: OpenLD 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def getInputDevices(self):
        devices = os.listdir("/dev/input/")

        for evdev in devices:
            try:
                buffer = "\0"*512
                self.fd = os.open("/dev/input/" + evdev, os.O_RDWR | os.O_NONBLOCK)
                self.name = ioctl(self.fd, EVIOCGNAME(256), buffer)
                self.name = self.name[:self.name.find("\0")]
                if str(self.name).find("Keyboard") != -1:
                    self.name = 'keyboard'
                os.close(self.fd)
            except (IOError,OSError), err:
                print '[iInputDevices] getInputDevices  <ERROR: ioctl(EVIOCGNAME): ' + str(err) + ' >'
                self.name = None

            if self.name:
                self.Devices[evdev] = {'name': self.name, 'type': self.getInputDeviceType(self.name),'enabled': False, 'configuredName': None }
                if boxtype.startswith('et'):
                    self.setDefaults(evdev) # load default remote control "delay" and "repeat" values for ETxxxx ("QuickFix Scrollspeed Menues" proposed by Xtrend Support)
agent.py 文件源码 项目:watchmen 作者: lycclsltt 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _communicate(self):
        import fcntl
        oldflags = fcntl.fcntl(self.__inr, fcntl.F_GETFL)
        fcntl.fcntl(self.__inr, fcntl.F_SETFL, oldflags | os.O_NONBLOCK)
        while not self._exit:
            events = select([self._agent._conn, self.__inr], [], [], 0.5)
            for fd in events[0]:
                if self._agent._conn == fd:
                    data = self._agent._conn.recv(512)
                    if len(data) != 0:
                        self.__inr.send(data)
                    else:
                        self._close()
                        break
                elif self.__inr == fd:
                    data = self.__inr.recv(512)
                    if len(data) != 0:
                        self._agent._conn.send(data)
                    else:
                        self._close()
                        break
            time.sleep(io_sleep)
cli.py 文件源码 项目:HyperGAN 作者: 255BITS 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def train(self):
        i=0
        if(self.args.ipython):
            fd = sys.stdin.fileno()
            fl = fcntl.fcntl(fd, fcntl.F_GETFL)
            fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

        while(i < self.total_steps or self.total_steps == -1):
            i+=1
            start_time = time.time()
            self.step()

            if (self.args.save_every != None and
                self.args.save_every != -1 and
                self.args.save_every > 0 and
                i % self.args.save_every == 0):
                print(" |= Saving network")
                self.gan.save(self.save_file)
            if self.args.ipython:
                self.check_stdin()
            end_time = time.time()
Keyboard.py 文件源码 项目:led 作者: rec 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def keyboard(callback, exit='q'):
    fd = sys.stdin.fileno()

    oldterm = termios.tcgetattr(fd)
    newattr = termios.tcgetattr(fd)
    newattr[3] = newattr[3] & ~termios.ICANON & ~termios.ECHO
    termios.tcsetattr(fd, termios.TCSANOW, newattr)

    oldflags = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, oldflags | os.O_NONBLOCK)

    try:
        while True:
            try:
                ch = sys.stdin.read(1)
                if ch:
                    callback(ch)
                    if ch == exit:
                        break
            except IOError:
                pass
    finally:
        termios.tcsetattr(fd, termios.TCSAFLUSH, oldterm)
        fcntl.fcntl(fd, fcntl.F_SETFL, oldflags)
gadgetfs_phy.py 文件源码 项目:sahara_emulator 作者: bkerler 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _open_endpoint_fd(self, ep):
        '''
        :param ep: USBEndpoint object

        :raises Exception: if no endpoint file found, or failed to open

        .. todo:: detect transfer-type specific endpoint files
        '''
        num = ep.number
        s_dir = 'out' if ep.direction == USBEndpoint.direction_out else 'in'
        filename = 'ep%d%s' % (num, s_dir)
        path = os.path.join(self.gadgetfs_dir, filename)
        fd = os.open(path, os.O_RDWR | os.O_NONBLOCK)
        self.debug('Opened endpoint %d' % (num))
        self.debug('ep: %d dir: %s file: %s fd: %d' % (num, s_dir, filename, fd))
        ep.fd = fd
tundev.py 文件源码 项目:fdslight 作者: fdslight 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def init_func(self, creator_fd, tun_dev_name, *args, **kwargs):
        """
        :param creator_fd:
        :param tun_dev_name:tun ????
        :param subnet:?????????????
        """
        tun_fd = self.__create_tun_dev(tun_dev_name)

        if tun_fd < 3:
            print("error:create tun device failed:%s" % tun_dev_name)
            sys.exit(-1)

        self.__creator_fd = creator_fd
        self.__qos = simple_qos.qos(simple_qos.QTYPE_DST)

        self.set_fileno(tun_fd)
        fcntl.fcntl(tun_fd, fcntl.F_SETFL, os.O_NONBLOCK)
        self.dev_init(tun_dev_name, *args, **kwargs)

        return tun_fd
InputDevice.py 文件源码 项目:enigma2 作者: Openeight 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def getInputDevices(self):
        devices = os.listdir("/dev/input/")

        for evdev in devices:
            try:
                buffer = "\0"*512
                self.fd = os.open("/dev/input/" + evdev, os.O_RDWR | os.O_NONBLOCK)
                self.name = ioctl(self.fd, EVIOCGNAME(256), buffer)
                self.name = self.name[:self.name.find("\0")]
                os.close(self.fd)
            except (IOError,OSError), err:
                print '[iInputDevices] getInputDevices ' + evdev + ' <ERROR: ioctl(EVIOCGNAME): ' + str(err) + ' >'
                self.name = None

            if self.name:
                self.Devices[evdev] = {'name': self.name, 'type': self.getInputDeviceType(self.name),'enabled': False, 'configuredName': None }
server.py 文件源码 项目:Liljimbo-Chatbot 作者: chrisjim316 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, host='127.0.0.1', port=23, application=None, encoding='utf-8'):
        assert isinstance(host, text_type)
        assert isinstance(port, int)
        assert isinstance(application, TelnetApplication)
        assert isinstance(encoding, text_type)

        self.host = host
        self.port = port
        self.application = application
        self.encoding = encoding

        self.connections = set()

        self._calls_from_executor = []

        # Create a pipe for inter thread communication.
        self._schedule_pipe = os.pipe()
        fcntl.fcntl(self._schedule_pipe[0], fcntl.F_SETFL, os.O_NONBLOCK)
posix.py 文件源码 项目:Liljimbo-Chatbot 作者: chrisjim316 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, inputhook=None, selector=AutoSelector):
        assert inputhook is None or callable(inputhook)
        assert issubclass(selector, Selector)

        self.running = False
        self.closed = False
        self._running = False
        self._callbacks = None

        self._calls_from_executor = []
        self._read_fds = {} # Maps fd to handler.
        self.selector = selector()

        # Create a pipe for inter thread communication.
        self._schedule_pipe = os.pipe()
        fcntl.fcntl(self._schedule_pipe[0], fcntl.F_SETFL, os.O_NONBLOCK)

        # Create inputhook context.
        self._inputhook_context = InputHookContext(inputhook) if inputhook else None
server.py 文件源码 项目:leetcode 作者: thomasyimgit 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, host='127.0.0.1', port=23, application=None, encoding='utf-8'):
        assert isinstance(host, text_type)
        assert isinstance(port, int)
        assert isinstance(application, TelnetApplication)
        assert isinstance(encoding, text_type)

        self.host = host
        self.port = port
        self.application = application
        self.encoding = encoding

        self.connections = set()

        self._calls_from_executor = []

        # Create a pipe for inter thread communication.
        self._schedule_pipe = os.pipe()
        fcntl.fcntl(self._schedule_pipe[0], fcntl.F_SETFL, os.O_NONBLOCK)
posix.py 文件源码 项目:leetcode 作者: thomasyimgit 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, inputhook=None, selector=AutoSelector):
        assert inputhook is None or callable(inputhook)
        assert issubclass(selector, Selector)

        self.running = False
        self.closed = False
        self._running = False
        self._callbacks = None

        self._calls_from_executor = []
        self._read_fds = {} # Maps fd to handler.
        self.selector = selector()

        # Create a pipe for inter thread communication.
        self._schedule_pipe = os.pipe()
        fcntl.fcntl(self._schedule_pipe[0], fcntl.F_SETFL, os.O_NONBLOCK)

        # Create inputhook context.
        self._inputhook_context = InputHookContext(inputhook) if inputhook else None
dockertty.py 文件源码 项目:dockertty 作者: JetMuffin 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def start_pty(self, *args):
        if not self.container_id:
            self.send_error_and_close("Error: container not found.")
            return
        try:
            # create a pseudo terminal of container by command:
            # `docker exec -ti <container_id> /bin/sh -c '[ -x /bin/bash ] && /bin/bash || /bin/sh'`
            # and then set the stream to non-blocking mode.
            pty = PtyProcessUnicode.spawn(
                ['docker', 'exec', '-ti', self.container_id, '/bin/sh', '-c',
                 'echo $$ > /tmp/sh.pid.{} && [ -x /bin/bash ] && /bin/bash || /bin/sh'.format(self.uuid)])
            flags = fcntl(pty.fileobj, F_GETFL)
            fcntl(pty.fileobj, F_SETFL, flags | O_NONBLOCK)

            setattr(self, "pty", pty)
            TerminalSocketHandler.clients.update({self: pty})
            logger.info('Connect to console of container {}'.format(self.container_id))
        except Exception as e:
            self.send_error_and_close("Error: cannot start console: {}".format(e))
linuxjsdev.py 文件源码 项目:crazyswarm 作者: USC-ACTLab 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def open(self):
        if self._f:
            raise Exception("{} at {} is already "
                            "opened".format(self.name, self._f_name))

        self._f = open("/dev/input/js{}".format(self.num), "rb")
        fcntl.fcntl(self._f.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)

        # Get number of axis and button
        val = ctypes.c_int()
        if fcntl.ioctl(self._f.fileno(), JSIOCGAXES, val) != 0:
            self._f.close()
            self._f = None
            raise Exception("Failed to read number of axes")

        self.axes = list(0 for i in range(val.value))
        if fcntl.ioctl(self._f.fileno(), JSIOCGBUTTONS, val) != 0:
            self._f.close()
            self._f = None
            raise Exception("Failed to read number of axes")

        self.buttons = list(0 for i in range(val.value))
        self.__initvalues()
wurlitzer.py 文件源码 项目:ipybind 作者: aldanor 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _setup_pipe(self, name):
        from fcntl import fcntl, F_GETFL, F_SETFL

        real_fd = getattr(sys, '__%s__' % name).fileno()
        save_fd = os.dup(real_fd)
        self._save_fds[name] = save_fd

        pipe_out, pipe_in = os.pipe()
        os.dup2(pipe_in, real_fd)
        os.close(pipe_in)
        self._real_fds[name] = real_fd

        # make pipe_out non-blocking
        flags = fcntl(pipe_out, F_GETFL)
        fcntl(pipe_out, F_SETFL, flags | os.O_NONBLOCK)
        return pipe_out
copytool_monitor.py 文件源码 项目:intel-manager-for-lustre 作者: intel-hpdd 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def open_fifo(self):
        try:
            os.mkfifo(self.copytool.event_fifo)
        except OSError as e:
            if e.errno != errno.EEXIST:
                raise e

        pids = lsof(file=self.copytool.event_fifo)
        readers = set()
        writers = set()
        for pid, files in pids.items():
            for file, info in files.items():
                if 'r' in info['mode']:
                    readers.add(pid)
                if 'w' in info['mode']:
                    writers.add(pid)

        if readers:
            raise FifoReaderConflict(readers)

        self.reader_fd = os.open(self.copytool.event_fifo,
                                 os.O_RDONLY | os.O_NONBLOCK)
        copytool_log.info("Opened %s for reading" % self.copytool.event_fifo)
agent.py 文件源码 项目:SublimeRemoteGDB 作者: summerwinter 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _communicate(self):
        import fcntl
        oldflags = fcntl.fcntl(self.__inr, fcntl.F_GETFL)
        fcntl.fcntl(self.__inr, fcntl.F_SETFL, oldflags | os.O_NONBLOCK)
        while not self._exit:
            events = select([self._agent._conn, self.__inr], [], [], 0.5)
            for fd in events[0]:
                if self._agent._conn == fd:
                    data = self._agent._conn.recv(512)
                    if len(data) != 0:
                        self.__inr.send(data)
                    else:
                        self._close()
                        break
                elif self.__inr == fd:
                    data = self.__inr.recv(512)
                    if len(data) != 0:
                        self._agent._conn.send(data)
                    else:
                        self._close()
                        break
            time.sleep(io_sleep)
bpf.py 文件源码 项目:scapy-bpf 作者: guedou 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def set_nonblock(self, set_flag=True):
        """Set the non blocking flag on the socket"""

        # Get the current flags
        if self.fd_flags is None:
            try:
                self.fd_flags = fcntl.fcntl(self.ins, fcntl.F_GETFL)
            except IOError, err:
                warning("Can't get flags on this file descriptor !")
                return

        # Set the non blocking flag
        if set_flag:
            new_fd_flags = self.fd_flags | os.O_NONBLOCK
        else:
            new_fd_flags = self.fd_flags & ~os.O_NONBLOCK

        try:
            fcntl.fcntl(self.ins, fcntl.F_SETFL, new_fd_flags)
            self.fd_flags = new_fd_flags
        except:
            warning("Can't set flags on this file descriptor !")
subprocessng.py 文件源码 项目:autoinjection 作者: ChengWiLL 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _recv(self, which, maxsize):
            conn, maxsize = self.get_conn_maxsize(which, maxsize)
            if conn is None:
                return None

            flags = fcntl.fcntl(conn, fcntl.F_GETFL)
            if not conn.closed:
                fcntl.fcntl(conn, fcntl.F_SETFL, flags | os.O_NONBLOCK)

            try:
                if not select.select([conn], [], [], 0)[0]:
                    return ''

                r = conn.read(maxsize)
                if not r:
                    return self._close(which)

                if self.universal_newlines:
                    r = self._translate_newlines(r)
                return r
            finally:
                if not conn.closed:
                    fcntl.fcntl(conn, fcntl.F_SETFL, flags)
rogueinabox.py 文件源码 项目:rogueinabox 作者: rogueinabox 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def open_terminal(command="bash", columns=80, lines=24):
    p_pid, master_fd = pty.fork()
    if p_pid == 0:  # Child.
        path, *args = shlex.split(command)
        args = [path] + args
        env = dict(TERM="linux", LC_ALL="en_GB.UTF-8",
                   COLUMNS=str(columns), LINES=str(lines))
        try:
            os.execvpe(path, args, env)
        except FileNotFoundError:
            print("Could not find the executable in %s. Press any key to exit." % path)
            exit()

    # set non blocking read
    flag = fcntl.fcntl(master_fd, fcntl.F_GETFD)
    fcntl.fcntl(master_fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
    # File-like object for I/O with the child process aka command.
    p_out = os.fdopen(master_fd, "w+b", 0)
    return Terminal(columns, lines), p_pid, p_out
posix.py 文件源码 项目:rice 作者: randy3k 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, inputhook=None, selector=AutoSelector):
        assert inputhook is None or callable(inputhook)
        assert issubclass(selector, Selector)

        super(PosixEventLoop, self).__init__()

        self.closed = False
        self._running = False

        self._calls_from_executor = []
        self._read_fds = {}  # Maps fd to handler.
        self.selector = selector()

        self._signal_handler_mappings = {}  # signal: previous_handler

        # Create a pipe for inter thread communication.
        self._schedule_pipe = os.pipe()
        fcntl.fcntl(self._schedule_pipe[0], fcntl.F_SETFL, os.O_NONBLOCK)
        self.selector.register(self._schedule_pipe[0])

        # Create inputhook context.
        self._inputhook_context = InputHookContext(inputhook) if inputhook else None
gdbcontroller.py 文件源码 项目:pygdbmi 作者: cs01 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _make_non_blocking(file_obj):
    """make file object non-blocking
    Windows doesn't have the fcntl module, but someone on
    stack overflow supplied this code as an answer, and it works
    http://stackoverflow.com/a/34504971/2893090"""

    if USING_WINDOWS:
        LPDWORD = POINTER(DWORD)
        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
        SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
        SetNamedPipeHandleState.restype = BOOL

        h = msvcrt.get_osfhandle(file_obj.fileno())

        res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
        if res == 0:
            raise ValueError(WinError())

    else:
        # Set the file status flag (F_SETFL) on the pipes to be non-blocking
        # so we can attempt to read from a pipe with no new data without locking
        # the program up
        fcntl.fcntl(file_obj, fcntl.F_SETFL, os.O_NONBLOCK)
async_sub.py 文件源码 项目:AIFun 作者: Plottel 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _recv(self, which, maxsize):
            conn, maxsize = self.get_conn_maxsize(which, maxsize)
            if conn is None:
                return None

            flags = fcntl.fcntl(conn, fcntl.F_GETFL)
            if not conn.closed:
                fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK)

            try:
                if not select.select([conn], [], [], 0)[0]:
                    return ''

                r = conn.read(maxsize)
                if not r:
                    return self._close(which)

                if self.universal_newlines:
                    r = r.replace("\r\n", "\n").replace("\r", "\n")
                return r
            finally:
                if not conn.closed:
                    fcntl.fcntl(conn, fcntl.F_SETFL, flags)

################################################################################
InputDevice.py 文件源码 项目:enigma2 作者: BlackHole 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def getInputDevices(self):
        devices = sorted(os.listdir("/dev/input/"))

        for evdev in devices:
            try:
                buffer = "\0"*512
                self.fd = os.open("/dev/input/" + evdev, os.O_RDWR | os.O_NONBLOCK)
                self.name = ioctl(self.fd, EVIOCGNAME(256), buffer)
                self.name = self.name[:self.name.find("\0")]
                if str(self.name).find("Keyboard") != -1:
                    self.name = 'keyboard'
                os.close(self.fd)
            except (IOError,OSError), err:
                print "[InputDevice] Error: evdev='%s' getInputDevices <ERROR: ioctl(EVIOCGNAME): '%s'>" % (evdev, str(err))
                self.name = None

            if self.name:
                devtype = self.getInputDeviceType(self.name)
                print "[InputDevice] Found: evdev='%s', name='%s', type='%s'" % (evdev, self.name, devtype)
                self.Devices[evdev] = {'name': self.name, 'type': devtype, 'enabled': False, 'configuredName': None }
asyncfile.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def __init__(self, fd):
            """'fd' is either a file object (e.g., obtained with 'open')
            or a file number (e.g., obtained with socket's fileno()).
            """
            if hasattr(fd, 'fileno'):
                self._fd = fd
                self._fileno = fd.fileno()
            elif isinstance(fd, int):
                self._fd, self._fileno = None, self._fd
            else:
                raise ValueError('invalid file descriptor')
            self._pycos = Pycos.scheduler()
            if self._pycos:
                self._notifier = self._pycos._notifier
                if hasattr(fd, '_fileno'):  # assume it is AsyncSocket
                    self._notifier.unregister(fd)
            else:
                self._notifier = None
            self._timeout = None
            self._read_task = None
            self._read_fn = None
            self._write_task = None
            self._write_fn = None
            self._buflist = []
            flags = fcntl.fcntl(self._fileno, fcntl.F_GETFL)
            fcntl.fcntl(self._fileno, fcntl.F_SETFL, flags | os.O_NONBLOCK)
asyncfile.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, fd):
            """'fd' is either a file object (e.g., obtained with 'open')
            or a file number (e.g., obtained with socket's fileno()).
            """
            if hasattr(fd, 'fileno'):
                self._fd = fd
                self._fileno = fd.fileno()
            elif isinstance(fd, int):
                self._fd, self._fileno = None, self._fd
            else:
                raise ValueError('invalid file descriptor')
            self._pycos = Pycos.scheduler()
            if self._pycos:
                self._notifier = self._pycos._notifier
                if hasattr(fd, '_fileno'):  # assume it is AsyncSocket
                    self._notifier.unregister(fd)
            else:
                self._notifier = None
            self._timeout = None
            self._read_task = None
            self._read_fn = None
            self._write_task = None
            self._write_fn = None
            self._buflist = []
            flags = fcntl.fcntl(self._fileno, fcntl.F_GETFL)
            fcntl.fcntl(self._fileno, fcntl.F_SETFL, flags | os.O_NONBLOCK)
asyncore.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, fd, map=None):
            dispatcher.__init__(self, None, map)
            self.connected = True
            try:
                fd = fd.fileno()
            except AttributeError:
                pass
            self.set_file(fd)
            # set it to non-blocking mode
            flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
            flags = flags | os.O_NONBLOCK
            fcntl.fcntl(fd, fcntl.F_SETFL, flags)


问题


面经


文章

微信
公众号

扫码关注公众号