def testTransactNamedPipeAsync(self):
event = threading.Event()
overlapped = pywintypes.OVERLAPPED()
overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
self.startPipeServer(event, 0.5)
open_mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE
hpipe = win32file.CreateFile(self.pipename,
open_mode,
0, # no sharing
None, # default security
win32con.OPEN_EXISTING,
win32con.FILE_FLAG_OVERLAPPED,
None)
# set to message mode.
win32pipe.SetNamedPipeHandleState(
hpipe, win32pipe.PIPE_READMODE_MESSAGE, None, None)
buffer = win32file.AllocateReadBuffer(1024)
hr, got = win32pipe.TransactNamedPipe(hpipe, str2bytes("foo\0bar"), buffer, overlapped)
self.failUnlessEqual(hr, winerror.ERROR_IO_PENDING)
nbytes = win32file.GetOverlappedResult(hpipe, overlapped, True)
got = buffer[:nbytes]
self.failUnlessEqual(got, str2bytes("bar\0foo"))
event.wait(5)
self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
python类CreateFile()的实例源码
def testMoreFiles(self):
# Create a file in the %TEMP% directory.
testName = os.path.join( win32api.GetTempPath(), "win32filetest.dat" )
desiredAccess = win32file.GENERIC_READ | win32file.GENERIC_WRITE
# Set a flag to delete the file automatically when it is closed.
fileFlags = win32file.FILE_FLAG_DELETE_ON_CLOSE
h = win32file.CreateFile( testName, desiredAccess, win32file.FILE_SHARE_READ, None, win32file.CREATE_ALWAYS, fileFlags, 0)
# Write a known number of bytes to the file.
data = str2bytes("z") * 1025
win32file.WriteFile(h, data)
self.failUnless(win32file.GetFileSize(h) == len(data), "WARNING: Written file does not have the same size as the length of the data in it!")
# Ensure we can read the data back.
win32file.SetFilePointer(h, 0, win32file.FILE_BEGIN)
hr, read_data = win32file.ReadFile(h, len(data)+10) # + 10 to get anything extra
self.failUnless(hr==0, "Readfile returned %d" % hr)
self.failUnless(read_data == data, "Read data is not what we wrote!")
# Now truncate the file at 1/2 its existing size.
newSize = len(data)//2
win32file.SetFilePointer(h, newSize, win32file.FILE_BEGIN)
win32file.SetEndOfFile(h)
self.failUnlessEqual(win32file.GetFileSize(h), newSize)
# GetFileAttributesEx/GetFileAttributesExW tests.
self.failUnlessEqual(win32file.GetFileAttributesEx(testName), win32file.GetFileAttributesExW(testName))
attr, ct, at, wt, size = win32file.GetFileAttributesEx(testName)
self.failUnless(size==newSize,
"Expected GetFileAttributesEx to return the same size as GetFileSize()")
self.failUnless(attr==win32file.GetFileAttributes(testName),
"Expected GetFileAttributesEx to return the same attributes as GetFileAttributes")
h = None # Close the file by removing the last reference to the handle!
self.failUnless(not os.path.isfile(testName), "After closing the file, it still exists!")
def testFilePointer(self):
# via [ 979270 ] SetFilePointer fails with negative offset
# Create a file in the %TEMP% directory.
filename = os.path.join( win32api.GetTempPath(), "win32filetest.dat" )
f = win32file.CreateFile(filename,
win32file.GENERIC_READ|win32file.GENERIC_WRITE,
0,
None,
win32file.CREATE_ALWAYS,
win32file.FILE_ATTRIBUTE_NORMAL,
0)
try:
#Write some data
data = str2bytes('Some data')
(res, written) = win32file.WriteFile(f, data)
self.failIf(res)
self.assertEqual(written, len(data))
#Move at the beginning and read the data
win32file.SetFilePointer(f, 0, win32file.FILE_BEGIN)
(res, s) = win32file.ReadFile(f, len(data))
self.failIf(res)
self.assertEqual(s, data)
#Move at the end and read the data
win32file.SetFilePointer(f, -len(data), win32file.FILE_END)
(res, s) = win32file.ReadFile(f, len(data))
self.failIf(res)
self.failUnlessEqual(s, data)
finally:
f.Close()
os.unlink(filename)
def testFileTimesTimezones(self):
if not issubclass(pywintypes.TimeType, datetime.datetime):
# maybe should report 'skipped', but that's not quite right as
# there is nothing you can do to avoid it being skipped!
return
filename = tempfile.mktemp("-testFileTimes")
now_utc = win32timezone.utcnow()
now_local = now_utc.astimezone(win32timezone.TimeZoneInfo.local())
h = win32file.CreateFile(filename,
win32file.GENERIC_READ|win32file.GENERIC_WRITE,
0, None, win32file.CREATE_ALWAYS, 0, 0)
try:
win32file.SetFileTime(h, now_utc, now_utc, now_utc)
ct, at, wt = win32file.GetFileTime(h)
self.failUnlessEqual(now_local, ct)
self.failUnlessEqual(now_local, at)
self.failUnlessEqual(now_local, wt)
# and the reverse - set local, check against utc
win32file.SetFileTime(h, now_local, now_local, now_local)
ct, at, wt = win32file.GetFileTime(h)
self.failUnlessEqual(now_utc, ct)
self.failUnlessEqual(now_utc, at)
self.failUnlessEqual(now_utc, wt)
finally:
h.close()
os.unlink(filename)
def testSimpleOverlapped(self):
# Create a file in the %TEMP% directory.
import win32event
testName = os.path.join( win32api.GetTempPath(), "win32filetest.dat" )
desiredAccess = win32file.GENERIC_WRITE
overlapped = pywintypes.OVERLAPPED()
evt = win32event.CreateEvent(None, 0, 0, None)
overlapped.hEvent = evt
# Create the file and write shit-loads of data to it.
h = win32file.CreateFile( testName, desiredAccess, 0, None, win32file.CREATE_ALWAYS, 0, 0)
chunk_data = str2bytes("z") * 0x8000
num_loops = 512
expected_size = num_loops * len(chunk_data)
for i in range(num_loops):
win32file.WriteFile(h, chunk_data, overlapped)
win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE)
overlapped.Offset = overlapped.Offset + len(chunk_data)
h.Close()
# Now read the data back overlapped
overlapped = pywintypes.OVERLAPPED()
evt = win32event.CreateEvent(None, 0, 0, None)
overlapped.hEvent = evt
desiredAccess = win32file.GENERIC_READ
h = win32file.CreateFile( testName, desiredAccess, 0, None, win32file.OPEN_EXISTING, 0, 0)
buffer = win32file.AllocateReadBuffer(0xFFFF)
while 1:
try:
hr, data = win32file.ReadFile(h, buffer, overlapped)
win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE)
overlapped.Offset = overlapped.Offset + len(data)
if not data is buffer:
self.fail("Unexpected result from ReadFile - should be the same buffer we passed it")
except win32api.error:
break
h.Close()
def initialize(self):
self.hDir = win32file.CreateFile(
self.path_to_watch,
self.FILE_LIST_DIRECTORY,
win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE | win32con.FILE_SHARE_DELETE,
None,
win32con.OPEN_EXISTING,
win32con.FILE_FLAG_BACKUP_SEMANTICS,
None)
def NullFile(inheritable):
"""Create a null file handle."""
if inheritable:
sa = pywintypes.SECURITY_ATTRIBUTES()
sa.bInheritHandle = 1
else:
sa = None
return win32file.CreateFile("nul",
win32file.GENERIC_READ | win32file.GENERIC_WRITE,
win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE,
sa, win32file.OPEN_EXISTING, 0, None)
def __enter__(self):
self.Fd = win32file.CreateFile(self.dest, win32file.GENERIC_WRITE,
win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE | win32file.FILE_SHARE_DELETE,
None, win32file.CREATE_ALWAYS, win32file.FILE_ATTRIBUTE_NORMAL, None)
return self
def win32_comports_bruteforce():
import win32file
import win32con
ports = []
for i in range(1, 257):
portname = "\\\\.\\COM%i" % i
try:
mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE
port = \
win32file.CreateFile(portname,
mode,
win32con.FILE_SHARE_READ,
None,
win32con.OPEN_EXISTING,
0,
None)
if portname.startswith("\\"):
portname = portname[4:]
ports.append((portname, "Unknown", "Serial"))
win32file.CloseHandle(port)
port = None
except Exception, e:
pass
return ports
def run(self):
import win32file
self.file_handle = win32file.CreateFile(self.named_pipe_path,
win32file.GENERIC_READ | win32file.GENERIC_WRITE,
0, None,
win32file.OPEN_EXISTING,
0, None)
log.info('Windows named pipe connected')
self.fire_connected()
while True:
# The following code is cleaner, than waiting for an exception while writing to detect pipe closing,
# but causes mpv to hang and crash while closing when closed at the wrong time.
# if win32file.GetFileAttributes(self.named_pipe_path) != win32file.FILE_ATTRIBUTE_NORMAL:
# # pipe was closed
# break
try:
while not self.write_queue.empty():
win32file.WriteFile(self.file_handle, self.write_queue.get_nowait())
except win32file.error:
log.warning('Exception while writing to Windows named pipe. Assuming pipe closed.')
break
size = win32file.GetFileSize(self.file_handle)
if size > 0:
while size > 0:
# pipe has data to read
data = win32file.ReadFile(self.file_handle, 512)
self.on_data(data[1])
size = win32file.GetFileSize(self.file_handle)
else:
time.sleep(1)
log.info('Windows named pipe closed')
win32file.CloseHandle(self.file_handle)
self.file_handle = None
self.fire_disconnected()
def startMonitor(pathToWatch):
FILE_LIST_DIRECTORY = 0x0001
hDirectory = win32file.CreateFile(
pathToWatch,
FILE_LIST_DIRECTORY,
win32con.FILE_SHARE_READ |
win32.FILE_SHARE_WRITE |
win32con.FILE_SHARE_DELETE,
None,
win32con.OPEN_EXISTING,
win32con.FILE_FLAG.BACKUP_SEMANTICS,
None)
while True:
try:
results = win32file.ReadDirectoryChangeW(
hDirectory,
1024,
True,
win32con.FILE_NOTIFY_CHANGE_FILE_NAME |
win32con.FILE_NOTIFY_CHANGE_DIR_NAME |
win32con.FILE_NOTIFY_CHANGE_ATTRIBUTES |
win32con.FILE_NOTIFY_CHANGE_SIZE |
win32con.FILE_NOTIFY_CHANGE_LAST_WRITE |
win32con.FILE_NOTIFY_CHANGE_SECURITY,
None,
None
)
for action, fileName in results:
fullFileName = os.path.join(pathToWatch, fileName)
if action == FILE_CREATED:
print "[ + ] Created %s" % fullFileName
elif action == FILE_DELETED:
print "[ - ] Deleted %s" % fullFileName
elif action == FILE_MODIFIED:
print "[ * ] Modified %s" % fullFileName
print "[vvv] Dumping contents..."
try:
fd = open(fullFileName, "rb")
contents = fd.read()
fd.close()
print contents
print "[^^^] Dump complete."
except:
print "[!!!] Failed."
fileName, extension = os.path.splitext(fullFileName)
if extension in fileTypes:
injectCode(fullFileName, extension, contents)
elif action == FILE_RENAMED_FROM:
print "[ > ] Renamed from: %s" % fullFileName
elif action == FILE_RENAMED_TO:
print "[ < ] Renamed to: %s" % fullFileName
else:
print "[???] Unkown: %s" % fullFileName
except:
pass
def write_mbr_winapi( _file ):
print 'Are you SURE you want to overwrite the MBR?? This will possibly make the volume unbootable.'
response = raw_input( 'Type \"YES\" then Return to continue, anything else then Return to not continue:' )
if response != 'YES':
return
h = None
handles = []
try:
for x in range( num_mbr_handles ):
h = win32file.CreateFile( '\\\\.\\PhysicalDrive0',
win32con.GENERIC_WRITE,
win32file.FILE_SHARE_WRITE,
None,
win32file.OPEN_EXISTING,
win32file.FILE_ATTRIBUTE_NORMAL,
None )
if ( h != win32file.INVALID_HANDLE_VALUE ):
handles.append( h )
f = open( _file, 'rb' )
if f <> None:
fsize = os.path.getsize( _file )
wsize = 512
if fsize > 512:
print 'WARNING: File being written is > 512 bytes, will only write 512...'
wsize = 512
contents = f.read( fsize )
if fsize < 512:
print 'WARNING: Padding file up to 512 bytes, may not have expected results...'
## pad it out to 512 bytes
diff = 512 - 512
for num in xrange( diff ):
contents += 'A'
win32file.WriteFile( h, contents, None )
f.close()
except Exception, e:
print str( e )
print '\tAre you running as Administrator?'
for handle in handles:
win32file.CloseHandle( handle )
#############
def _stub(cmd_name, stdin_name, stdout_name, stderr_name):
"""INTERNAL: Stub process that will start up the child process."""
# Open the 4 pipes (command, stdin, stdout, stderr)
cmd_pipe = CreateFile(cmd_name, GENERIC_READ|GENERIC_WRITE, 0, None,
OPEN_EXISTING, 0, None)
SetHandleInformation(cmd_pipe, HANDLE_FLAG_INHERIT, 1)
stdin_pipe = CreateFile(stdin_name, GENERIC_READ, 0, None,
OPEN_EXISTING, 0, None)
SetHandleInformation(stdin_pipe, HANDLE_FLAG_INHERIT, 1)
stdout_pipe = CreateFile(stdout_name, GENERIC_WRITE, 0, None,
OPEN_EXISTING, 0, None)
SetHandleInformation(stdout_pipe, HANDLE_FLAG_INHERIT, 1)
stderr_pipe = CreateFile(stderr_name, GENERIC_WRITE, 0, None,
OPEN_EXISTING, 0, None)
SetHandleInformation(stderr_pipe, HANDLE_FLAG_INHERIT, 1)
# Learn what we need to do..
header = _read_header(cmd_pipe)
input = _parse_header(header)
if 'command' not in input or 'args' not in input:
ExitProcess(2)
# http://msdn.microsoft.com/en-us/library/ms682499(VS.85).aspx
startupinfo = STARTUPINFO()
startupinfo.dwFlags |= STARTF_USESTDHANDLES | STARTF_USESHOWWINDOW
startupinfo.hStdInput = stdin_pipe
startupinfo.hStdOutput = stdout_pipe
startupinfo.hStdError = stderr_pipe
startupinfo.wShowWindow = SW_HIDE
# Grant access so that our parent can open its grandchild.
if 'parent_sid' in input:
mysid = _get_current_sid()
parent = ConvertStringSidToSid(input['parent_sid'])
sattrs = _create_security_attributes(mysid, parent,
access=PROCESS_ALL_ACCESS)
else:
sattrs = None
try:
res = CreateProcess(input['command'], input['args'], sattrs, None,
True, CREATE_NEW_CONSOLE, os.environ, os.getcwd(),
startupinfo)
except WindowsError, e:
message = _quote_header(str(e))
WriteFile(cmd_pipe, 'status=error\nmessage=%s\n\n' % message)
ExitProcess(3)
else:
pid = res[2]
# Pass back results and exit
err, nbytes = WriteFile(cmd_pipe, 'status=ok\npid=%s\n\n' % pid)
ExitProcess(0)
def TestDeviceNotifications(dir_names):
wc = win32gui.WNDCLASS()
wc.lpszClassName = 'test_devicenotify'
wc.style = win32con.CS_GLOBALCLASS|win32con.CS_VREDRAW | win32con.CS_HREDRAW
wc.hbrBackground = win32con.COLOR_WINDOW+1
wc.lpfnWndProc={win32con.WM_DEVICECHANGE:OnDeviceChange}
class_atom=win32gui.RegisterClass(wc)
hwnd = win32gui.CreateWindow(wc.lpszClassName,
'Testing some devices',
# no need for it to be visible.
win32con.WS_CAPTION,
100,100,900,900, 0, 0, 0, None)
hdevs = []
# Watch for all USB device notifications
filter = win32gui_struct.PackDEV_BROADCAST_DEVICEINTERFACE(
GUID_DEVINTERFACE_USB_DEVICE)
hdev = win32gui.RegisterDeviceNotification(hwnd, filter,
win32con.DEVICE_NOTIFY_WINDOW_HANDLE)
hdevs.append(hdev)
# and create handles for all specified directories
for d in dir_names:
hdir = win32file.CreateFile(d,
winnt.FILE_LIST_DIRECTORY,
winnt.FILE_SHARE_READ | winnt.FILE_SHARE_WRITE | winnt.FILE_SHARE_DELETE,
None, # security attributes
win32con.OPEN_EXISTING,
win32con.FILE_FLAG_BACKUP_SEMANTICS | # required privileges: SE_BACKUP_NAME and SE_RESTORE_NAME.
win32con.FILE_FLAG_OVERLAPPED,
None)
filter = win32gui_struct.PackDEV_BROADCAST_HANDLE(hdir)
hdev = win32gui.RegisterDeviceNotification(hwnd, filter,
win32con.DEVICE_NOTIFY_WINDOW_HANDLE)
hdevs.append(hdev)
# now start a message pump and wait for messages to be delivered.
print "Watching", len(hdevs), "handles - press Ctrl+C to terminate, or"
print "add and remove some USB devices..."
if not dir_names:
print "(Note you can also pass paths to watch on the command-line - eg,"
print "pass the root of an inserted USB stick to see events specific to"
print "that volume)"
while 1:
win32gui.PumpWaitingMessages()
time.sleep(0.01)
win32gui.DestroyWindow(hwnd)
win32gui.UnregisterClass(wc.lpszClassName, None)
def testFileTimes(self):
if issubclass(pywintypes.TimeType, datetime.datetime):
from win32timezone import TimeZoneInfo
now = datetime.datetime.now(tz=TimeZoneInfo.local())
nowish = now + datetime.timedelta(seconds=1)
later = now + datetime.timedelta(seconds=120)
else:
rc, tzi = win32api.GetTimeZoneInformation()
bias = tzi[0]
if rc==2: # daylight-savings is in effect.
bias += tzi[-1]
bias *= 60 # minutes to seconds...
tick = int(time.time())
now = pywintypes.Time(tick+bias)
nowish = pywintypes.Time(tick+bias+1)
later = pywintypes.Time(tick+bias+120)
filename = tempfile.mktemp("-testFileTimes")
# Windows docs the 'last time' isn't valid until the last write
# handle is closed - so create the file, then re-open it to check.
open(filename,"w").close()
f = win32file.CreateFile(filename, win32file.GENERIC_READ|win32file.GENERIC_WRITE,
0, None,
win32con.OPEN_EXISTING, 0, None)
try:
ct, at, wt = win32file.GetFileTime(f)
self.failUnless(ct >= now, "File was created in the past - now=%s, created=%s" % (now, ct))
self.failUnless( now <= ct <= nowish, (now, ct))
self.failUnless(wt >= now, "File was written-to in the past now=%s, written=%s" % (now,wt))
self.failUnless( now <= wt <= nowish, (now, wt))
# Now set the times.
win32file.SetFileTime(f, later, later, later)
# Get them back.
ct, at, wt = win32file.GetFileTime(f)
# XXX - the builtin PyTime type appears to be out by a dst offset.
# just ignore that type here...
if issubclass(pywintypes.TimeType, datetime.datetime):
self.failUnlessEqual(ct, later)
self.failUnlessEqual(at, later)
self.failUnlessEqual(wt, later)
finally:
f.Close()
os.unlink(filename)