def SimpleFileDemo():
testName = os.path.join( win32api.GetTempPath(), "win32file_demo_test_file")
if os.path.exists(testName): os.unlink(testName)
# Open the file for writing.
handle = win32file.CreateFile(testName,
win32file.GENERIC_WRITE,
0,
None,
win32con.CREATE_NEW,
0,
None)
test_data = "Hello\0there".encode("ascii")
win32file.WriteFile(handle, test_data)
handle.Close()
# Open it for reading.
handle = win32file.CreateFile(testName, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
rc, data = win32file.ReadFile(handle, 1024)
handle.Close()
if data == test_data:
print "Successfully wrote and read a file"
else:
raise Exception("Got different data back???")
os.unlink(testName)
python类WriteFile()的实例源码
def testSimpleFiles(self):
fd, filename = tempfile.mkstemp()
os.close(fd)
os.unlink(filename)
handle = win32file.CreateFile(filename, win32file.GENERIC_WRITE, 0, None, win32con.CREATE_NEW, 0, None)
test_data = str2bytes("Hello\0there")
try:
win32file.WriteFile(handle, test_data)
handle.Close()
# Try and open for read
handle = win32file.CreateFile(filename, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
rc, data = win32file.ReadFile(handle, 1024)
self.assertEquals(data, test_data)
finally:
handle.Close()
try:
os.unlink(filename)
except os.error:
pass
# A simple test using normal read/write operations.
def _IOCPServerThread(self, handle, port, drop_overlapped_reference):
overlapped = pywintypes.OVERLAPPED()
win32pipe.ConnectNamedPipe(handle, overlapped)
if drop_overlapped_reference:
# Be naughty - the overlapped object is now dead, but
# GetQueuedCompletionStatus will still find it. Our check of
# reference counting should catch that error.
overlapped = None
# even if we fail, be sure to close the handle; prevents hangs
# on Vista 64...
try:
self.failUnlessRaises(RuntimeError,
win32file.GetQueuedCompletionStatus, port, -1)
finally:
handle.Close()
return
result = win32file.GetQueuedCompletionStatus(port, -1)
ol2 = result[-1]
self.failUnless(ol2 is overlapped)
data = win32file.ReadFile(handle, 512)[1]
win32file.WriteFile(handle, data)
def SimpleFileDemo():
testName = os.path.join( win32api.GetTempPath(), "win32file_demo_test_file")
if os.path.exists(testName): os.unlink(testName)
# Open the file for writing.
handle = win32file.CreateFile(testName,
win32file.GENERIC_WRITE,
0,
None,
win32con.CREATE_NEW,
0,
None)
test_data = "Hello\0there".encode("ascii")
win32file.WriteFile(handle, test_data)
handle.Close()
# Open it for reading.
handle = win32file.CreateFile(testName, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
rc, data = win32file.ReadFile(handle, 1024)
handle.Close()
if data == test_data:
print("Successfully wrote and read a file")
else:
raise Exception("Got different data back???")
os.unlink(testName)
def testSimpleFiles(self):
fd, filename = tempfile.mkstemp()
os.close(fd)
os.unlink(filename)
handle = win32file.CreateFile(filename, win32file.GENERIC_WRITE, 0, None, win32con.CREATE_NEW, 0, None)
test_data = str2bytes("Hello\0there")
try:
win32file.WriteFile(handle, test_data)
handle.Close()
# Try and open for read
handle = win32file.CreateFile(filename, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
rc, data = win32file.ReadFile(handle, 1024)
self.assertEquals(data, test_data)
finally:
handle.Close()
try:
os.unlink(filename)
except os.error:
pass
# A simple test using normal read/write operations.
def _IOCPServerThread(self, handle, port, drop_overlapped_reference):
overlapped = pywintypes.OVERLAPPED()
win32pipe.ConnectNamedPipe(handle, overlapped)
if drop_overlapped_reference:
# Be naughty - the overlapped object is now dead, but
# GetQueuedCompletionStatus will still find it. Our check of
# reference counting should catch that error.
overlapped = None
# even if we fail, be sure to close the handle; prevents hangs
# on Vista 64...
try:
self.failUnlessRaises(RuntimeError,
win32file.GetQueuedCompletionStatus, port, -1)
finally:
handle.Close()
return
result = win32file.GetQueuedCompletionStatus(port, -1)
ol2 = result[-1]
self.failUnless(ol2 is overlapped)
data = win32file.ReadFile(handle, 512)[1]
win32file.WriteFile(handle, data)
def WriteFile(handle, data, ol = None):
c_written = DWORD()
success = ctypes.windll.kernel32.WriteFile(handle, ctypes.create_string_buffer(encode(data)), len(data), ctypes.byref(c_written), ol)
return ctypes.windll.kernel32.GetLastError(), c_written.value
def send(self, input):
if not self.stdin:
return None
try:
x = msvcrt.get_osfhandle(self.stdin.fileno())
(errCode, written) = WriteFile(x, input)
except ValueError:
return self._close('stdin')
except (subprocess.pywintypes.error, Exception):
if geterror()[0] in (109, errno.ESHUTDOWN):
return self._close('stdin')
raise
return written
def checkWork(self):
numBytesWritten = 0
if not self.outQueue:
if self.disconnecting:
self.writeConnectionLost()
return 0
try:
win32file.WriteFile(self.writePipe, '', None)
except pywintypes.error:
self.writeConnectionLost()
return numBytesWritten
while self.outQueue:
data = self.outQueue.pop(0)
errCode = 0
try:
errCode, nBytesWritten = win32file.WriteFile(self.writePipe,
data, None)
except win32api.error:
self.writeConnectionLost()
break
else:
# assert not errCode, "wtf an error code???"
numBytesWritten += nBytesWritten
if len(data) > nBytesWritten:
self.outQueue.insert(0, data[nBytesWritten:])
break
else:
resumed = self.bufferEmpty()
if not resumed and self.disconnecting:
self.writeConnectionLost()
return numBytesWritten
def write(self, data):
if data:
if self.writeInProgress:
self.outQueue.append(data)
else:
self.writeInProgress = 1
win32file.WriteFile(self._serial.hComPort, data, self._overlappedWrite)
def serialWriteEvent(self):
try:
dataToWrite = self.outQueue.pop(0)
except IndexError:
self.writeInProgress = 0
return
else:
win32file.WriteFile(self._serial.hComPort, dataToWrite, self._overlappedWrite)
def WriteFile(handle, s):
return _WriteFile(handle, s.encode('ascii'))
def send(self, input):
if not self.stdin:
return None
try:
x = msvcrt.get_osfhandle(self.stdin.fileno())
(errCode, written) = WriteFile(x, input)
except ValueError:
return self._close('stdin')
except (subprocess.pywintypes.error, Exception), why:
if why[0] in (109, errno.ESHUTDOWN):
return self._close('stdin')
raise
return written
def checkWork(self):
numBytesWritten = 0
if not self.outQueue:
if self.disconnecting:
self.writeConnectionLost()
return 0
try:
win32file.WriteFile(self.writePipe, '', None)
except pywintypes.error:
self.writeConnectionLost()
return numBytesWritten
while self.outQueue:
data = self.outQueue.pop(0)
errCode = 0
try:
errCode, nBytesWritten = win32file.WriteFile(self.writePipe,
data, None)
except win32api.error:
self.writeConnectionLost()
break
else:
# assert not errCode, "wtf an error code???"
numBytesWritten += nBytesWritten
if len(data) > nBytesWritten:
self.outQueue.insert(0, data[nBytesWritten:])
break
else:
resumed = self.bufferEmpty()
if not resumed and self.disconnecting:
self.writeConnectionLost()
return numBytesWritten
def write(self, data):
if data:
if self.writeInProgress:
self.outQueue.append(data)
else:
self.writeInProgress = 1
win32file.WriteFile(self._serial.hComPort, data, self._overlappedWrite)
def serialWriteEvent(self):
try:
dataToWrite = self.outQueue.pop(0)
except IndexError:
self.writeInProgress = 0
return
else:
win32file.WriteFile(self._serial.hComPort, dataToWrite, self._overlappedWrite)
def send(self, string, flags=0):
err, nbytes = win32file.WriteFile(self._handle, string)
return nbytes
def WriteFile(handle, data, ol = None):
c_written = DWORD()
success = ctypes.windll.kernel32.WriteFile(handle, ctypes.create_string_buffer(encode(data)), len(data), ctypes.byref(c_written), ol)
return ctypes.windll.kernel32.GetLastError(), c_written.value
def send(self, input):
if not self.stdin:
return None
try:
x = msvcrt.get_osfhandle(self.stdin.fileno())
(errCode, written) = WriteFile(x, input)
except ValueError:
return self._close('stdin')
except (subprocess.pywintypes.error, Exception):
if geterror()[0] in (109, errno.ESHUTDOWN):
return self._close('stdin')
raise
return written
def _serverThread(self, pipe_handle, event, wait_time):
# just do one connection and terminate.
hr = win32pipe.ConnectNamedPipe(pipe_handle)
self.failUnless(hr in (0, winerror.ERROR_PIPE_CONNECTED), "Got error code 0x%x" % (hr,))
hr, got = win32file.ReadFile(pipe_handle, 100)
self.failUnlessEqual(got, str2bytes("foo\0bar"))
time.sleep(wait_time)
win32file.WriteFile(pipe_handle, str2bytes("bar\0foo"))
pipe_handle.Close()
event.set()
def testMoreFiles(self):
# Create a file in the %TEMP% directory.
testName = os.path.join( win32api.GetTempPath(), "win32filetest.dat" )
desiredAccess = win32file.GENERIC_READ | win32file.GENERIC_WRITE
# Set a flag to delete the file automatically when it is closed.
fileFlags = win32file.FILE_FLAG_DELETE_ON_CLOSE
h = win32file.CreateFile( testName, desiredAccess, win32file.FILE_SHARE_READ, None, win32file.CREATE_ALWAYS, fileFlags, 0)
# Write a known number of bytes to the file.
data = str2bytes("z") * 1025
win32file.WriteFile(h, data)
self.failUnless(win32file.GetFileSize(h) == len(data), "WARNING: Written file does not have the same size as the length of the data in it!")
# Ensure we can read the data back.
win32file.SetFilePointer(h, 0, win32file.FILE_BEGIN)
hr, read_data = win32file.ReadFile(h, len(data)+10) # + 10 to get anything extra
self.failUnless(hr==0, "Readfile returned %d" % hr)
self.failUnless(read_data == data, "Read data is not what we wrote!")
# Now truncate the file at 1/2 its existing size.
newSize = len(data)//2
win32file.SetFilePointer(h, newSize, win32file.FILE_BEGIN)
win32file.SetEndOfFile(h)
self.failUnlessEqual(win32file.GetFileSize(h), newSize)
# GetFileAttributesEx/GetFileAttributesExW tests.
self.failUnlessEqual(win32file.GetFileAttributesEx(testName), win32file.GetFileAttributesExW(testName))
attr, ct, at, wt, size = win32file.GetFileAttributesEx(testName)
self.failUnless(size==newSize,
"Expected GetFileAttributesEx to return the same size as GetFileSize()")
self.failUnless(attr==win32file.GetFileAttributes(testName),
"Expected GetFileAttributesEx to return the same attributes as GetFileAttributes")
h = None # Close the file by removing the last reference to the handle!
self.failUnless(not os.path.isfile(testName), "After closing the file, it still exists!")
def testFilePointer(self):
# via [ 979270 ] SetFilePointer fails with negative offset
# Create a file in the %TEMP% directory.
filename = os.path.join( win32api.GetTempPath(), "win32filetest.dat" )
f = win32file.CreateFile(filename,
win32file.GENERIC_READ|win32file.GENERIC_WRITE,
0,
None,
win32file.CREATE_ALWAYS,
win32file.FILE_ATTRIBUTE_NORMAL,
0)
try:
#Write some data
data = str2bytes('Some data')
(res, written) = win32file.WriteFile(f, data)
self.failIf(res)
self.assertEqual(written, len(data))
#Move at the beginning and read the data
win32file.SetFilePointer(f, 0, win32file.FILE_BEGIN)
(res, s) = win32file.ReadFile(f, len(data))
self.failIf(res)
self.assertEqual(s, data)
#Move at the end and read the data
win32file.SetFilePointer(f, -len(data), win32file.FILE_END)
(res, s) = win32file.ReadFile(f, len(data))
self.failIf(res)
self.failUnlessEqual(s, data)
finally:
f.Close()
os.unlink(filename)
def _serverThread(self, pipe_handle, event, wait_time):
# just do one connection and terminate.
hr = win32pipe.ConnectNamedPipe(pipe_handle)
self.failUnless(hr in (0, winerror.ERROR_PIPE_CONNECTED), "Got error code 0x%x" % (hr,))
hr, got = win32file.ReadFile(pipe_handle, 100)
self.failUnlessEqual(got, str2bytes("foo\0bar"))
time.sleep(wait_time)
win32file.WriteFile(pipe_handle, str2bytes("bar\0foo"))
pipe_handle.Close()
event.set()
def testMoreFiles(self):
# Create a file in the %TEMP% directory.
testName = os.path.join( win32api.GetTempPath(), "win32filetest.dat" )
desiredAccess = win32file.GENERIC_READ | win32file.GENERIC_WRITE
# Set a flag to delete the file automatically when it is closed.
fileFlags = win32file.FILE_FLAG_DELETE_ON_CLOSE
h = win32file.CreateFile( testName, desiredAccess, win32file.FILE_SHARE_READ, None, win32file.CREATE_ALWAYS, fileFlags, 0)
# Write a known number of bytes to the file.
data = str2bytes("z") * 1025
win32file.WriteFile(h, data)
self.failUnless(win32file.GetFileSize(h) == len(data), "WARNING: Written file does not have the same size as the length of the data in it!")
# Ensure we can read the data back.
win32file.SetFilePointer(h, 0, win32file.FILE_BEGIN)
hr, read_data = win32file.ReadFile(h, len(data)+10) # + 10 to get anything extra
self.failUnless(hr==0, "Readfile returned %d" % hr)
self.failUnless(read_data == data, "Read data is not what we wrote!")
# Now truncate the file at 1/2 its existing size.
newSize = len(data)//2
win32file.SetFilePointer(h, newSize, win32file.FILE_BEGIN)
win32file.SetEndOfFile(h)
self.failUnlessEqual(win32file.GetFileSize(h), newSize)
# GetFileAttributesEx/GetFileAttributesExW tests.
self.failUnlessEqual(win32file.GetFileAttributesEx(testName), win32file.GetFileAttributesExW(testName))
attr, ct, at, wt, size = win32file.GetFileAttributesEx(testName)
self.failUnless(size==newSize,
"Expected GetFileAttributesEx to return the same size as GetFileSize()")
self.failUnless(attr==win32file.GetFileAttributes(testName),
"Expected GetFileAttributesEx to return the same attributes as GetFileAttributes")
h = None # Close the file by removing the last reference to the handle!
self.failUnless(not os.path.isfile(testName), "After closing the file, it still exists!")
def testFilePointer(self):
# via [ 979270 ] SetFilePointer fails with negative offset
# Create a file in the %TEMP% directory.
filename = os.path.join( win32api.GetTempPath(), "win32filetest.dat" )
f = win32file.CreateFile(filename,
win32file.GENERIC_READ|win32file.GENERIC_WRITE,
0,
None,
win32file.CREATE_ALWAYS,
win32file.FILE_ATTRIBUTE_NORMAL,
0)
try:
#Write some data
data = str2bytes('Some data')
(res, written) = win32file.WriteFile(f, data)
self.failIf(res)
self.assertEqual(written, len(data))
#Move at the beginning and read the data
win32file.SetFilePointer(f, 0, win32file.FILE_BEGIN)
(res, s) = win32file.ReadFile(f, len(data))
self.failIf(res)
self.assertEqual(s, data)
#Move at the end and read the data
win32file.SetFilePointer(f, -len(data), win32file.FILE_END)
(res, s) = win32file.ReadFile(f, len(data))
self.failIf(res)
self.failUnlessEqual(s, data)
finally:
f.Close()
os.unlink(filename)
def send(self, input):
if not self.stdin:
return None
try:
x = msvcrt.get_osfhandle(self.stdin.fileno())
(errCode, written) = WriteFile(x, input)
except ValueError:
return self._close('stdin')
except (subprocess.pywintypes.error, Exception), why:
if why[0] in (109, errno.ESHUTDOWN):
return self._close('stdin')
raise
return written
def send(self, input):
if not self.stdin:
return None
try:
x = msvcrt.get_osfhandle(self.stdin.fileno())
(errCode, written) = WriteFile(x, input)
except ValueError:
return self._close('stdin')
except (subprocess.pywintypes.error, Exception), why:
if why[0] in (109, errno.ESHUTDOWN):
return self._close('stdin')
raise
return written
def send(self, input):
if not self.stdin:
return None
try:
x = msvcrt.get_osfhandle(self.stdin.fileno())
(errCode, written) = WriteFile(x, input)
except ValueError:
return self._close('stdin')
except (subprocess.pywintypes.error, Exception), why:
if why[0] in (109, errno.ESHUTDOWN):
return self._close('stdin')
raise
return written
def checkWork(self):
numBytesWritten = 0
if not self.outQueue:
if self.disconnecting:
self.writeConnectionLost()
return 0
try:
win32file.WriteFile(self.writePipe, b'', None)
except pywintypes.error:
self.writeConnectionLost()
return numBytesWritten
while self.outQueue:
data = self.outQueue.pop(0)
errCode = 0
try:
errCode, nBytesWritten = win32file.WriteFile(self.writePipe,
data, None)
except win32api.error:
self.writeConnectionLost()
break
else:
# assert not errCode, "wtf an error code???"
numBytesWritten += nBytesWritten
if len(data) > nBytesWritten:
self.outQueue.insert(0, data[nBytesWritten:])
break
else:
resumed = self.bufferEmpty()
if not resumed and self.disconnecting:
self.writeConnectionLost()
return numBytesWritten
def write(self, data):
if data:
if self.writeInProgress:
self.outQueue.append(data)
else:
self.writeInProgress = 1
win32file.WriteFile(self._serial.hComPort, data, self._overlappedWrite)