def _OpenFileForRead(self, path):
try:
fhandle = self.fhandle = win32file.CreateFile(
path,
win32file.GENERIC_READ,
win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE,
None,
win32file.OPEN_EXISTING,
win32file.FILE_ATTRIBUTE_NORMAL,
None)
self._closer = weakref.ref(
self, lambda x: win32file.CloseHandle(fhandle))
self.write_enabled = False
return fhandle
except pywintypes.error as e:
raise IOError("Unable to open %s: %s" % (path, e))
python类FILE_ATTRIBUTE_NORMAL的实例源码
def _OpenFileForWrite(self, path):
try:
fhandle = self.fhandle = win32file.CreateFile(
path,
win32file.GENERIC_READ | win32file.GENERIC_WRITE,
win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE,
None,
win32file.OPEN_EXISTING,
win32file.FILE_ATTRIBUTE_NORMAL,
None)
self.write_enabled = True
self._closer = weakref.ref(
self, lambda x: win32file.CloseHandle(fhandle))
return fhandle
except pywintypes.error as e:
raise IOError("Unable to open %s: %s" % (path, e))
def testFilePointer(self):
# via [ 979270 ] SetFilePointer fails with negative offset
# Create a file in the %TEMP% directory.
filename = os.path.join( win32api.GetTempPath(), "win32filetest.dat" )
f = win32file.CreateFile(filename,
win32file.GENERIC_READ|win32file.GENERIC_WRITE,
0,
None,
win32file.CREATE_ALWAYS,
win32file.FILE_ATTRIBUTE_NORMAL,
0)
try:
#Write some data
data = str2bytes('Some data')
(res, written) = win32file.WriteFile(f, data)
self.failIf(res)
self.assertEqual(written, len(data))
#Move at the beginning and read the data
win32file.SetFilePointer(f, 0, win32file.FILE_BEGIN)
(res, s) = win32file.ReadFile(f, len(data))
self.failIf(res)
self.assertEqual(s, data)
#Move at the end and read the data
win32file.SetFilePointer(f, -len(data), win32file.FILE_END)
(res, s) = win32file.ReadFile(f, len(data))
self.failIf(res)
self.failUnlessEqual(s, data)
finally:
f.Close()
os.unlink(filename)
def testFilePointer(self):
# via [ 979270 ] SetFilePointer fails with negative offset
# Create a file in the %TEMP% directory.
filename = os.path.join( win32api.GetTempPath(), "win32filetest.dat" )
f = win32file.CreateFile(filename,
win32file.GENERIC_READ|win32file.GENERIC_WRITE,
0,
None,
win32file.CREATE_ALWAYS,
win32file.FILE_ATTRIBUTE_NORMAL,
0)
try:
#Write some data
data = str2bytes('Some data')
(res, written) = win32file.WriteFile(f, data)
self.failIf(res)
self.assertEqual(written, len(data))
#Move at the beginning and read the data
win32file.SetFilePointer(f, 0, win32file.FILE_BEGIN)
(res, s) = win32file.ReadFile(f, len(data))
self.failIf(res)
self.assertEqual(s, data)
#Move at the end and read the data
win32file.SetFilePointer(f, -len(data), win32file.FILE_END)
(res, s) = win32file.ReadFile(f, len(data))
self.failIf(res)
self.failUnlessEqual(s, data)
finally:
f.Close()
os.unlink(filename)
def __enter__(self):
self.Fd = win32file.CreateFile(self.dest, win32file.GENERIC_WRITE,
win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE | win32file.FILE_SHARE_DELETE,
None, win32file.CREATE_ALWAYS, win32file.FILE_ATTRIBUTE_NORMAL, None)
return self
def run(self):
import win32file
self.file_handle = win32file.CreateFile(self.named_pipe_path,
win32file.GENERIC_READ | win32file.GENERIC_WRITE,
0, None,
win32file.OPEN_EXISTING,
0, None)
log.info('Windows named pipe connected')
self.fire_connected()
while True:
# The following code is cleaner, than waiting for an exception while writing to detect pipe closing,
# but causes mpv to hang and crash while closing when closed at the wrong time.
# if win32file.GetFileAttributes(self.named_pipe_path) != win32file.FILE_ATTRIBUTE_NORMAL:
# # pipe was closed
# break
try:
while not self.write_queue.empty():
win32file.WriteFile(self.file_handle, self.write_queue.get_nowait())
except win32file.error:
log.warning('Exception while writing to Windows named pipe. Assuming pipe closed.')
break
size = win32file.GetFileSize(self.file_handle)
if size > 0:
while size > 0:
# pipe has data to read
data = win32file.ReadFile(self.file_handle, 512)
self.on_data(data[1])
size = win32file.GetFileSize(self.file_handle)
else:
time.sleep(1)
log.info('Windows named pipe closed')
win32file.CloseHandle(self.file_handle)
self.file_handle = None
self.fire_disconnected()
def write_mbr_winapi( _file ):
print 'Are you SURE you want to overwrite the MBR?? This will possibly make the volume unbootable.'
response = raw_input( 'Type \"YES\" then Return to continue, anything else then Return to not continue:' )
if response != 'YES':
return
h = None
handles = []
try:
for x in range( num_mbr_handles ):
h = win32file.CreateFile( '\\\\.\\PhysicalDrive0',
win32con.GENERIC_WRITE,
win32file.FILE_SHARE_WRITE,
None,
win32file.OPEN_EXISTING,
win32file.FILE_ATTRIBUTE_NORMAL,
None )
if ( h != win32file.INVALID_HANDLE_VALUE ):
handles.append( h )
f = open( _file, 'rb' )
if f <> None:
fsize = os.path.getsize( _file )
wsize = 512
if fsize > 512:
print 'WARNING: File being written is > 512 bytes, will only write 512...'
wsize = 512
contents = f.read( fsize )
if fsize < 512:
print 'WARNING: Padding file up to 512 bytes, may not have expected results...'
## pad it out to 512 bytes
diff = 512 - 512
for num in xrange( diff ):
contents += 'A'
win32file.WriteFile( h, contents, None )
f.close()
except Exception, e:
print str( e )
print '\tAre you running as Administrator?'
for handle in handles:
win32file.CloseHandle( handle )
#############