python类ReadFile()的实例源码

win32fileDemo.py 文件源码 项目:remoteControlPPT 作者: htwenning 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def SimpleFileDemo():
    testName = os.path.join( win32api.GetTempPath(), "win32file_demo_test_file")
    if os.path.exists(testName): os.unlink(testName)
    # Open the file for writing.
    handle = win32file.CreateFile(testName, 
                                  win32file.GENERIC_WRITE, 
                                  0, 
                                  None, 
                                  win32con.CREATE_NEW, 
                                  0, 
                                  None)
    test_data = "Hello\0there".encode("ascii")
    win32file.WriteFile(handle, test_data)
    handle.Close()
    # Open it for reading.
    handle = win32file.CreateFile(testName, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
    rc, data = win32file.ReadFile(handle, 1024)
    handle.Close()
    if data == test_data:
        print "Successfully wrote and read a file"
    else:
        raise Exception("Got different data back???")
    os.unlink(testName)
_pollingfile.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def checkWork(self):
        finished = 0
        fullDataRead = []

        while 1:
            try:
                buffer, bytesToRead, result = win32pipe.PeekNamedPipe(self.pipe, 1)
                # finished = (result == -1)
                if not bytesToRead:
                    break
                hr, data = win32file.ReadFile(self.pipe, bytesToRead, None)
                fullDataRead.append(data)
            except win32api.error:
                finished = 1
                break

        dataBuf = ''.join(fullDataRead)
        if dataBuf:
            self.receivedCallback(dataBuf)
        if finished:
            self.cleanup()
        return len(dataBuf)
windows_pipe.py 文件源码 项目:gpvdm 作者: roderickmackenzie 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def rod(self,p):
        while(1):
            #print "going to read"
            try:
                res,data = win32file.ReadFile(p, 4096)
                #print res,data
                if res != winerror.ERROR_MORE_DATA:
                    data=data.decode("utf-8") 
                    self.new_data.emit(data)
                else:
                    print("no more data")
                    break
            except:
                print("pipe closed")
                break
#       win32pipe.DisconnectNamedPipe(p)
async_sub.py 文件源码 项目:Projects 作者: it2school 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _recv(self, which, maxsize):
            conn, maxsize = self.get_conn_maxsize(which, maxsize)
            if conn is None:
                return None

            try:
                x = msvcrt.get_osfhandle(conn.fileno())
                (read, nAvail, nMessage) = PeekNamedPipe(x, 0)
                if maxsize < nAvail:
                    nAvail = maxsize
                if nAvail > 0:
                    (errCode, read) = ReadFile(x, nAvail, None)
            except ValueError:
                return self._close(which)
            except (subprocess.pywintypes.error, Exception):
                if geterror()[0] in (109, errno.ESHUTDOWN):
                    return self._close(which)
                raise

            if self.universal_newlines:
                # Translate newlines. For Python 3.x assume read is text.
                # If bytes then another solution is needed.
                read = read.replace("\r\n", "\n").replace("\r", "\n")
            return read
_win32serialport.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def serialReadEvent(self):
        #get that character we set up
        n = win32file.GetOverlappedResult(self._serial.hComPort, self._overlappedRead, 0)
        if n:
            first = str(self.read_buf[:n])
            #now we should get everything that is already in the buffer
            flags, comstat = win32file.ClearCommError(self._serial.hComPort)
            if comstat.cbInQue:
                win32event.ResetEvent(self._overlappedRead.hEvent)
                rc, buf = win32file.ReadFile(self._serial.hComPort,
                                             win32file.AllocateReadBuffer(comstat.cbInQue),
                                             self._overlappedRead)
                n = win32file.GetOverlappedResult(self._serial.hComPort, self._overlappedRead, 1)
                #handle all the received data:
                self.protocol.dataReceived(first + str(buf[:n]))
            else:
                #handle all the received data:
                self.protocol.dataReceived(first)

        #set up next one
        win32event.ResetEvent(self._overlappedRead.hEvent)
        rc, self.read_buf = win32file.ReadFile(self._serial.hComPort,
                                               win32file.AllocateReadBuffer(1),
                                               self._overlappedRead)
winpexpect.py 文件源码 项目:winpexpect 作者: geertj 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _child_reader(self, handle):
        """INTERNAL: Reader thread that reads stdout/stderr of the child
        process."""
        status = 'data'
        while True:
            try:
                err, data = ReadFile(handle, self.maxread)
                assert err == 0  # not expecting error w/o overlapped io
            except WindowsError, e:
                if e.winerror == ERROR_BROKEN_PIPE:
                    status = 'eof'
                    data = ''
                else:
                    status = 'error'
                    data = e.winerror
            self.child_output.put((handle, status, data))
            if status != 'data':
                break
subprocessng.py 文件源码 项目:autoinjection 作者: ChengWiLL 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _recv(self, which, maxsize):
            conn, maxsize = self.get_conn_maxsize(which, maxsize)
            if conn is None:
                return None

            try:
                x = msvcrt.get_osfhandle(conn.fileno())
                (read, nAvail, nMessage) = PeekNamedPipe(x, 0)
                if maxsize < nAvail:
                    nAvail = maxsize
                if nAvail > 0:
                    (errCode, read) = ReadFile(x, nAvail, None)
            except (ValueError, NameError):
                return self._close(which)
            except (subprocess.pywintypes.error, Exception), why:
                if why[0] in (109, errno.ESHUTDOWN):
                    return self._close(which)
                raise

            if self.universal_newlines:
                read = self._translate_newlines(read)
            return read
_pollingfile.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def checkWork(self):
        finished = 0
        fullDataRead = []

        while 1:
            try:
                buffer, bytesToRead, result = win32pipe.PeekNamedPipe(self.pipe, 1)
                # finished = (result == -1)
                if not bytesToRead:
                    break
                hr, data = win32file.ReadFile(self.pipe, bytesToRead, None)
                fullDataRead.append(data)
            except win32api.error:
                finished = 1
                break

        dataBuf = ''.join(fullDataRead)
        if dataBuf:
            self.receivedCallback(dataBuf)
        if finished:
            self.cleanup()
        return len(dataBuf)
_win32serialport.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def serialReadEvent(self):
        #get that character we set up
        n = win32file.GetOverlappedResult(self._serial.hComPort, self._overlappedRead, 0)
        if n:
            first = str(self.read_buf[:n])
            #now we should get everything that is already in the buffer
            flags, comstat = win32file.ClearCommError(self._serial.hComPort)
            if comstat.cbInQue:
                win32event.ResetEvent(self._overlappedRead.hEvent)
                rc, buf = win32file.ReadFile(self._serial.hComPort,
                                             win32file.AllocateReadBuffer(comstat.cbInQue),
                                             self._overlappedRead)
                n = win32file.GetOverlappedResult(self._serial.hComPort, self._overlappedRead, 1)
                #handle all the received data:
                self.protocol.dataReceived(first + str(buf[:n]))
            else:
                #handle all the received data:
                self.protocol.dataReceived(first)

        #set up next one
        win32event.ResetEvent(self._overlappedRead.hEvent)
        rc, self.read_buf = win32file.ReadFile(self._serial.hComPort,
                                               win32file.AllocateReadBuffer(1),
                                               self._overlappedRead)
async_sub.py 文件源码 项目:AIFun 作者: Plottel 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _recv(self, which, maxsize):
            conn, maxsize = self.get_conn_maxsize(which, maxsize)
            if conn is None:
                return None

            try:
                x = msvcrt.get_osfhandle(conn.fileno())
                (read, nAvail, nMessage) = PeekNamedPipe(x, 0)
                if maxsize < nAvail:
                    nAvail = maxsize
                if nAvail > 0:
                    (errCode, read) = ReadFile(x, nAvail, None)
            except ValueError:
                return self._close(which)
            except (subprocess.pywintypes.error, Exception):
                if geterror()[0] in (109, errno.ESHUTDOWN):
                    return self._close(which)
                raise

            if self.universal_newlines:
                # Translate newlines. For Python 3.x assume read is text.
                # If bytes then another solution is needed.
                read = read.replace("\r\n", "\n").replace("\r", "\n")
            return read
pysynch.py 文件源码 项目:remoteControlPPT 作者: htwenning 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def CopyFileToCe(src_name, dest_name, progress = None):
    sh = win32file.CreateFile(src_name, win32con.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
    bytes=0
    try:
        dh = wincerapi.CeCreateFile(dest_name, win32con.GENERIC_WRITE, 0, None, win32con.OPEN_ALWAYS, 0, None)
        try:
            while 1:
                hr, data = win32file.ReadFile(sh, 2048)
                if not data:
                    break
                wincerapi.CeWriteFile(dh, data)
                bytes = bytes + len(data)
                if progress is not None: progress(bytes)
        finally:
            pass
            dh.Close()
    finally:
        sh.Close()
    return bytes
test_win32file.py 文件源码 项目:remoteControlPPT 作者: htwenning 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testSimpleFiles(self):
        fd, filename = tempfile.mkstemp()
        os.close(fd)
        os.unlink(filename)
        handle = win32file.CreateFile(filename, win32file.GENERIC_WRITE, 0, None, win32con.CREATE_NEW, 0, None)
        test_data = str2bytes("Hello\0there")
        try:
            win32file.WriteFile(handle, test_data)
            handle.Close()
            # Try and open for read
            handle = win32file.CreateFile(filename, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
            rc, data = win32file.ReadFile(handle, 1024)
            self.assertEquals(data, test_data)
        finally:
            handle.Close()
            try:
                os.unlink(filename)
            except os.error:
                pass

    # A simple test using normal read/write operations.
test_win32file.py 文件源码 项目:remoteControlPPT 作者: htwenning 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _IOCPServerThread(self, handle, port, drop_overlapped_reference):
        overlapped = pywintypes.OVERLAPPED()
        win32pipe.ConnectNamedPipe(handle, overlapped)
        if drop_overlapped_reference:
            # Be naughty - the overlapped object is now dead, but
            # GetQueuedCompletionStatus will still find it.  Our check of
            # reference counting should catch that error.
            overlapped = None
            # even if we fail, be sure to close the handle; prevents hangs
            # on Vista 64...
            try:
                self.failUnlessRaises(RuntimeError,
                                      win32file.GetQueuedCompletionStatus, port, -1)
            finally:
                handle.Close()
            return

        result = win32file.GetQueuedCompletionStatus(port, -1)
        ol2 = result[-1]
        self.failUnless(ol2 is overlapped)
        data = win32file.ReadFile(handle, 512)[1]
        win32file.WriteFile(handle, data)
pysynch.py 文件源码 项目:CodeReader 作者: jasonrbr 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def CopyFileToCe(src_name, dest_name, progress = None):
    sh = win32file.CreateFile(src_name, win32con.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
    bytes=0
    try:
        dh = wincerapi.CeCreateFile(dest_name, win32con.GENERIC_WRITE, 0, None, win32con.OPEN_ALWAYS, 0, None)
        try:
            while 1:
                hr, data = win32file.ReadFile(sh, 2048)
                if not data:
                    break
                wincerapi.CeWriteFile(dh, data)
                bytes = bytes + len(data)
                if progress is not None: progress(bytes)
        finally:
            pass
            dh.Close()
    finally:
        sh.Close()
    return bytes
win32fileDemo.py 文件源码 项目:CodeReader 作者: jasonrbr 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def SimpleFileDemo():
    testName = os.path.join( win32api.GetTempPath(), "win32file_demo_test_file")
    if os.path.exists(testName): os.unlink(testName)
    # Open the file for writing.
    handle = win32file.CreateFile(testName, 
                                  win32file.GENERIC_WRITE, 
                                  0, 
                                  None, 
                                  win32con.CREATE_NEW, 
                                  0, 
                                  None)
    test_data = "Hello\0there".encode("ascii")
    win32file.WriteFile(handle, test_data)
    handle.Close()
    # Open it for reading.
    handle = win32file.CreateFile(testName, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
    rc, data = win32file.ReadFile(handle, 1024)
    handle.Close()
    if data == test_data:
        print("Successfully wrote and read a file")
    else:
        raise Exception("Got different data back???")
    os.unlink(testName)
test_win32file.py 文件源码 项目:CodeReader 作者: jasonrbr 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def testSimpleFiles(self):
        fd, filename = tempfile.mkstemp()
        os.close(fd)
        os.unlink(filename)
        handle = win32file.CreateFile(filename, win32file.GENERIC_WRITE, 0, None, win32con.CREATE_NEW, 0, None)
        test_data = str2bytes("Hello\0there")
        try:
            win32file.WriteFile(handle, test_data)
            handle.Close()
            # Try and open for read
            handle = win32file.CreateFile(filename, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
            rc, data = win32file.ReadFile(handle, 1024)
            self.assertEquals(data, test_data)
        finally:
            handle.Close()
            try:
                os.unlink(filename)
            except os.error:
                pass

    # A simple test using normal read/write operations.
test_win32file.py 文件源码 项目:CodeReader 作者: jasonrbr 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _IOCPServerThread(self, handle, port, drop_overlapped_reference):
        overlapped = pywintypes.OVERLAPPED()
        win32pipe.ConnectNamedPipe(handle, overlapped)
        if drop_overlapped_reference:
            # Be naughty - the overlapped object is now dead, but
            # GetQueuedCompletionStatus will still find it.  Our check of
            # reference counting should catch that error.
            overlapped = None
            # even if we fail, be sure to close the handle; prevents hangs
            # on Vista 64...
            try:
                self.failUnlessRaises(RuntimeError,
                                      win32file.GetQueuedCompletionStatus, port, -1)
            finally:
                handle.Close()
            return

        result = win32file.GetQueuedCompletionStatus(port, -1)
        ol2 = result[-1]
        self.failUnless(ol2 is overlapped)
        data = win32file.ReadFile(handle, 512)[1]
        win32file.WriteFile(handle, data)
subprocessng.py 文件源码 项目:Eagle 作者: magerx 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _recv(self, which, maxsize):
            conn, maxsize = self.get_conn_maxsize(which, maxsize)
            if conn is None:
                return None

            try:
                x = msvcrt.get_osfhandle(conn.fileno())
                (read, nAvail, nMessage) = PeekNamedPipe(x, 0)
                if maxsize < nAvail:
                    nAvail = maxsize
                if nAvail > 0:
                    (errCode, read) = ReadFile(x, nAvail, None)
            except (ValueError, NameError):
                return self._close(which)
            except (subprocess.pywintypes.error, Exception), why:
                if why[0] in (109, errno.ESHUTDOWN):
                    return self._close(which)
                raise

            if self.universal_newlines:
                read = self._translate_newlines(read)
            return read
subprocessng.py 文件源码 项目:Helix 作者: 3lackrush 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _recv(self, which, maxsize):
            conn, maxsize = self.get_conn_maxsize(which, maxsize)
            if conn is None:
                return None

            try:
                x = msvcrt.get_osfhandle(conn.fileno())
                (read, nAvail, nMessage) = PeekNamedPipe(x, 0)
                if maxsize < nAvail:
                    nAvail = maxsize
                if nAvail > 0:
                    (errCode, read) = ReadFile(x, nAvail, None)
            except (ValueError, NameError):
                return self._close(which)
            except (subprocess.pywintypes.error, Exception), why:
                if why[0] in (109, errno.ESHUTDOWN):
                    return self._close(which)
                raise

            if self.universal_newlines:
                read = self._translate_newlines(read)
            return read
subprocessng.py 文件源码 项目:autoscan 作者: b01u 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _recv(self, which, maxsize):
            conn, maxsize = self.get_conn_maxsize(which, maxsize)
            if conn is None:
                return None

            try:
                x = msvcrt.get_osfhandle(conn.fileno())
                (read, nAvail, nMessage) = PeekNamedPipe(x, 0)
                if maxsize < nAvail:
                    nAvail = maxsize
                if nAvail > 0:
                    (errCode, read) = ReadFile(x, nAvail, None)
            except (ValueError, NameError):
                return self._close(which)
            except (subprocess.pywintypes.error, Exception), why:
                if why[0] in (109, errno.ESHUTDOWN):
                    return self._close(which)
                raise

            if self.universal_newlines:
                read = self._translate_newlines(read)
            return read
_pollingfile.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def checkWork(self):
        finished = 0
        fullDataRead = []

        while 1:
            try:
                buffer, bytesToRead, result = win32pipe.PeekNamedPipe(self.pipe, 1)
                # finished = (result == -1)
                if not bytesToRead:
                    break
                hr, data = win32file.ReadFile(self.pipe, bytesToRead, None)
                fullDataRead.append(data)
            except win32api.error:
                finished = 1
                break

        dataBuf = b''.join(fullDataRead)
        if dataBuf:
            self.receivedCallback(dataBuf)
        if finished:
            self.cleanup()
        return len(dataBuf)
_win32serialport.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def serialReadEvent(self):
        #get that character we set up
        n = win32file.GetOverlappedResult(self._serial.hComPort, self._overlappedRead, 0)
        if n:
            first = str(self.read_buf[:n])
            #now we should get everything that is already in the buffer
            flags, comstat = win32file.ClearCommError(self._serial.hComPort)
            if comstat.cbInQue:
                win32event.ResetEvent(self._overlappedRead.hEvent)
                rc, buf = win32file.ReadFile(self._serial.hComPort,
                                             win32file.AllocateReadBuffer(comstat.cbInQue),
                                             self._overlappedRead)
                n = win32file.GetOverlappedResult(self._serial.hComPort, self._overlappedRead, 1)
                #handle all the received data:
                self.protocol.dataReceived(first + str(buf[:n]))
            else:
                #handle all the received data:
                self.protocol.dataReceived(first)

        #set up next one
        win32event.ResetEvent(self._overlappedRead.hEvent)
        rc, self.read_buf = win32file.ReadFile(self._serial.hComPort,
                                               win32file.AllocateReadBuffer(1),
                                               self._overlappedRead)
TestCmd.py 文件源码 项目:gyp 作者: electron 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def _recv(self, which, maxsize):
            conn, maxsize = self.get_conn_maxsize(which, maxsize)
            if conn is None:
                return None

            try:
                x = msvcrt.get_osfhandle(conn.fileno())
                (read, nAvail, nMessage) = PeekNamedPipe(x, 0)
                if maxsize < nAvail:
                    nAvail = maxsize
                if nAvail > 0:
                    (errCode, read) = ReadFile(x, nAvail, None)
            except ValueError:
                return self._close(which)
            except (subprocess.pywintypes.error, Exception), why:
                if why[0] in (109, errno.ESHUTDOWN):
                    return self._close(which)
                raise

            #if self.universal_newlines:
            #    read = self._translate_newlines(read)
            return read
win32.py 文件源码 项目:ptterm 作者: jonathanslenders 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def add_input_ready_callback(self, callback):
        """
        Add a new callback to be called for when there's input ready to read.
        """
        def poll():
            while True:
                try:
                    status, from_pipe = win32file.ReadFile(self.stdout_handle, 65536)
                except Exception:
                    # The pipe has ended.
                    self.ready_f.set_result(None)
                    return

                result = from_pipe.decode('utf-8', 'ignore')
                self._buffer.append(result)
                self.loop.call_from_executor(callback)

        self.loop.run_in_executor(poll, _daemon=True)
        return

        self._input_ready_callbacks.append(callback)
Expect.py 文件源码 项目:lib9 作者: Jumpscale 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _recv(self, which, maxsize):
            conn, maxsize = self.get_conn_maxsize(which, maxsize)
            if conn is None:
                return None

            try:
                x = msvcrt.get_osfhandle(conn.fileno())
                (read, nAvail, nMessage) = PeekNamedPipe(x, 0)
                if maxsize < nAvail:
                    nAvail = maxsize
                if nAvail > 0:
                    (errCode, read) = ReadFile(x, nAvail, None)
            except ValueError:
                return self._close(which)
            except (subprocess.pywintypes.error, Exception) as why:
                if why[0] in (109, errno.ESHUTDOWN):
                    return self._close(which)
                raise

            if self.universal_newlines:
                read = self._translate_newlines(read)
            return read
async_sub.py 文件源码 项目:Projects 作者: it2school 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def ReadFile(handle, desired_bytes, ol = None):
            c_read = DWORD()
            buffer = ctypes.create_string_buffer(desired_bytes+1)
            success = ctypes.windll.kernel32.ReadFile(handle, buffer, desired_bytes, ctypes.byref(c_read), ol)
            buffer[c_read.value] = null_byte
            return ctypes.windll.kernel32.GetLastError(), decode(buffer.value)
recipe-173976.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def readcom1line(comhandle):
    err = 0
    char = None
    line = ''
    while err == 0 and char != '\r':
        err, char = win32file.ReadFile(comhandle, 1, None)
        if err == 0:
            if char == '\n':
                pass # eat newlines
            else:
                line = line + char
        else:
            break
    return line
_win32serialport.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def __init__(self, protocol, deviceNameOrPortNumber, reactor, 
        baudrate = 9600, bytesize = EIGHTBITS, parity = PARITY_NONE,
        stopbits = STOPBITS_ONE, xonxoff = 0, rtscts = 0):
        self._serial = serial.Serial(deviceNameOrPortNumber, baudrate=baudrate,
                                     bytesize=bytesize, parity=parity,
                                     stopbits=stopbits, timeout=None,
                                     xonxoff=xonxoff, rtscts=rtscts)
        self.flushInput()
        self.flushOutput()
        self.reactor = reactor
        self.protocol = protocol
        self.outQueue = []
        self.closed = 0
        self.closedNotifies = 0
        self.writeInProgress = 0

        self.protocol = protocol
        self._overlappedRead = win32file.OVERLAPPED()
        self._overlappedRead.hEvent = win32event.CreateEvent(None, 1, 0, None)
        self._overlappedWrite = win32file.OVERLAPPED()
        self._overlappedWrite.hEvent = win32event.CreateEvent(None, 0, 0, None)

        self.reactor.addEvent(self._overlappedRead.hEvent, self, 'serialReadEvent')
        self.reactor.addEvent(self._overlappedWrite.hEvent, self, 'serialWriteEvent')

        self.protocol.makeConnection(self)

        flags, comstat = win32file.ClearCommError(self._serial.hComPort)
        rc, self.read_buf = win32file.ReadFile(self._serial.hComPort,
                                               win32file.AllocateReadBuffer(1),
                                               self._overlappedRead)
winpexpect.py 文件源码 项目:winpexpect 作者: geertj 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def ReadFile(handle, size):
        err, data = _ReadFile(handle, size)
        return err, data.decode('ascii')
winpexpect.py 文件源码 项目:winpexpect 作者: geertj 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _read_header(handle, bufsize=4096):
    """INTERNAL: read a stub header from a handle."""
    header = ''
    while '\n\n' not in header:
        err, data = ReadFile(handle, bufsize)
        header += data
    return header


问题


面经


文章

微信
公众号

扫码关注公众号