python类TCIFLUSH的实例源码

a_sync.py 文件源码 项目:a_sync 作者: notion 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def a_input(prompt: str) -> str:
    """Async input prompt."""
    readable = []  # type: List[int]
    print(prompt, end='')
    sys.stdout.flush()
    while not readable:
        readable, _, _ = select.select([sys.stdin], [], [], 0)
        try:
            await asyncio.sleep(0.1)
        except futures.CancelledError:
            print("input cancelled...")
            termios.tcflush(sys.stdin, termios.TCIFLUSH)
            raise
    return sys.stdin.readline().rstrip()


# [ Classes ]
serialposix.py 文件源码 项目:android3dblendermouse 作者: sketchpunk 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def reset_input_buffer(self):
        """Clear input buffer, discarding all that is in the buffer."""
        if not self.is_open:
            raise portNotOpenError
        termios.tcflush(self.fd, termios.TCIFLUSH)
unix_console.py 文件源码 项目:pyrepl 作者: dajose 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def forgetinput(self):
        termios.tcflush(self.input_fd, termios.TCIFLUSH)
serialposix.py 文件源码 项目:microperi 作者: c0d3st0rm 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def reset_input_buffer(self):
        """Clear input buffer, discarding all that is in the buffer."""
        if not self.is_open:
            raise portNotOpenError
        termios.tcflush(self.fd, termios.TCIFLUSH)
serialposix.py 文件源码 项目:gcodeplot 作者: arpruss 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def reset_input_buffer(self):
        """Clear input buffer, discarding all that is in the buffer."""
        if not self.is_open:
            raise portNotOpenError
        termios.tcflush(self.fd, termios.TCIFLUSH)
paramiko_ssh.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def missing_host_key(self, client, hostname, key):

        if C.HOST_KEY_CHECKING:

            self.connection.connection_lock()

            old_stdin = sys.stdin
            sys.stdin = self._new_stdin

            # clear out any premature input on sys.stdin
            tcflush(sys.stdin, TCIFLUSH)

            fingerprint = hexlify(key.get_fingerprint())
            ktype = key.get_name()

            inp = input(AUTHENTICITY_MSG % (hostname, ktype, fingerprint))
            sys.stdin = old_stdin

            self.connection.connection_unlock()

            if inp not in ['yes','y','']:
                raise AnsibleError("host connection rejected by user")

        key._added_by_ansible_this_time = True

        # existing implementation below:
        client._host_keys.add(hostname, key.get_name(), key)

        # host keys are actually saved in close() function below
        # in order to control ordering.


# keep connection objects on a per host basis to avoid repeated attempts to reconnect
serialposix.py 文件源码 项目:bitio 作者: whaleygeek 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def reset_input_buffer(self):
        """Clear input buffer, discarding all that is in the buffer."""
        if not self.is_open:
            raise portNotOpenError
        termios.tcflush(self.fd, termios.TCIFLUSH)
serialposix.py 文件源码 项目:microbit-serial 作者: martinohanlon 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def reset_input_buffer(self):
        """Clear input buffer, discarding all that is in the buffer."""
        if not self.is_open:
            raise portNotOpenError
        termios.tcflush(self.fd, termios.TCIFLUSH)
serial_adapter.py 文件源码 项目:Parlay 作者: PromenadeSoftware 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def flushInput(self):
        """Clear input buffer, discarding all that is in the buffer."""
        termios.tcflush(self._f.fileno(), termios.TCIFLUSH)
serialposix.py 文件源码 项目:ddt4all 作者: cedricp 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def reset_input_buffer(self):
        """Clear input buffer, discarding all that is in the buffer."""
        if not self.is_open:
            raise portNotOpenError
        termios.tcflush(self.fd, termios.TCIFLUSH)
serialposix.py 文件源码 项目:mt7687-serial-uploader 作者: will127534 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def reset_input_buffer(self):
        """Clear input buffer, discarding all that is in the buffer."""
        if not self.is_open:
            raise portNotOpenError
        termios.tcflush(self.fd, termios.TCIFLUSH)
unix_console.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def forgetinput(self):
        termios.tcflush(self.input_fd, termios.TCIFLUSH)
serialposix.py 文件源码 项目:Jackal_Velodyne_Duke 作者: MengGuo 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def reset_input_buffer(self):
        """Clear input buffer, discarding all that is in the buffer."""
        if not self.is_open:
            raise portNotOpenError
        termios.tcflush(self.fd, termios.TCIFLUSH)
serialposix.py 文件源码 项目:Jackal_Velodyne_Duke 作者: MengGuo 项目源码 文件源码 阅读 46 收藏 0 点赞 0 评论 0
def reset_input_buffer(self):
        """Clear input buffer, discarding all that is in the buffer."""
        if not self.is_open:
            raise portNotOpenError
        termios.tcflush(self.fd, termios.TCIFLUSH)
paramiko_ssh.py 文件源码 项目:ansible-provider-docs 作者: alibaba 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def missing_host_key(self, client, hostname, key):

        if all((C.HOST_KEY_CHECKING, not C.PARAMIKO_HOST_KEY_AUTO_ADD)):

            if C.USE_PERSISTENT_CONNECTIONS:
                raise AnsibleConnectionFailure('rejected %s host key for host %s: %s' % (key.get_name(), hostname, hexlify(key.get_fingerprint())))

            self.connection.connection_lock()

            old_stdin = sys.stdin
            sys.stdin = self._new_stdin

            # clear out any premature input on sys.stdin
            tcflush(sys.stdin, TCIFLUSH)

            fingerprint = hexlify(key.get_fingerprint())
            ktype = key.get_name()

            inp = input(AUTHENTICITY_MSG % (hostname, ktype, fingerprint))
            sys.stdin = old_stdin

            self.connection.connection_unlock()

            if inp not in ['yes', 'y', '']:
                raise AnsibleError("host connection rejected by user")

        key._added_by_ansible_this_time = True

        # existing implementation below:
        client._host_keys.add(hostname, key.get_name(), key)

        # host keys are actually saved in close() function below
        # in order to control ordering.


# keep connection objects on a per host basis to avoid repeated attempts to reconnect
randpool.py 文件源码 项目:helios-server-mixnet 作者: RunasSudo 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def getch(self):
                termios.tcflush(0, termios.TCIFLUSH) # XXX Leave this in?
                return os.read(self._fd, 1)
randpool.py 文件源码 项目:helios-server-mixnet 作者: RunasSudo 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def close(self, delay = 0):
                if delay:
                    time.sleep(delay)
                    termios.tcflush(self._fd, termios.TCIFLUSH)
                termios.tcsetattr(self._fd, termios.TCSAFLUSH, self._old)
randpool.py 文件源码 项目:helios-server-mixnet 作者: RunasSudo 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def getch(self):
                termios.tcflush(0, termios.TCIFLUSH) # XXX Leave this in?
                return os.read(self._fd, 1)
randpool.py 文件源码 项目:helios-server-mixnet 作者: RunasSudo 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def close(self, delay = 0):
                if delay:
                    time.sleep(delay)
                    termios.tcflush(self._fd, termios.TCIFLUSH)
                termios.tcsetattr(self._fd, termios.TCSAFLUSH, self._old)
serialposix.py 文件源码 项目:ftcommunity-apps 作者: ftCommunity 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def reset_input_buffer(self):
        """Clear input buffer, discarding all that is in the buffer."""
        if not self.is_open:
            raise portNotOpenError
        termios.tcflush(self.fd, termios.TCIFLUSH)


问题


面经


文章

微信
公众号

扫码关注公众号