def SimpleFileDemo():
testName = os.path.join( win32api.GetTempPath(), "win32file_demo_test_file")
if os.path.exists(testName): os.unlink(testName)
# Open the file for writing.
handle = win32file.CreateFile(testName,
win32file.GENERIC_WRITE,
0,
None,
win32con.CREATE_NEW,
0,
None)
test_data = "Hello\0there".encode("ascii")
win32file.WriteFile(handle, test_data)
handle.Close()
# Open it for reading.
handle = win32file.CreateFile(testName, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
rc, data = win32file.ReadFile(handle, 1024)
handle.Close()
if data == test_data:
print "Successfully wrote and read a file"
else:
raise Exception("Got different data back???")
os.unlink(testName)
python类OPEN_EXISTING的实例源码
def CopyFileToCe(src_name, dest_name, progress = None):
sh = win32file.CreateFile(src_name, win32con.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
bytes=0
try:
dh = wincerapi.CeCreateFile(dest_name, win32con.GENERIC_WRITE, 0, None, win32con.OPEN_ALWAYS, 0, None)
try:
while 1:
hr, data = win32file.ReadFile(sh, 2048)
if not data:
break
wincerapi.CeWriteFile(dh, data)
bytes = bytes + len(data)
if progress is not None: progress(bytes)
finally:
pass
dh.Close()
finally:
sh.Close()
return bytes
def testTransactNamedPipeBlocking(self):
event = threading.Event()
self.startPipeServer(event)
open_mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE
hpipe = win32file.CreateFile(self.pipename,
open_mode,
0, # no sharing
None, # default security
win32con.OPEN_EXISTING,
0, # win32con.FILE_FLAG_OVERLAPPED,
None)
# set to message mode.
win32pipe.SetNamedPipeHandleState(
hpipe, win32pipe.PIPE_READMODE_MESSAGE, None, None)
hr, got = win32pipe.TransactNamedPipe(hpipe, str2bytes("foo\0bar"), 1024, None)
self.failUnlessEqual(got, str2bytes("bar\0foo"))
event.wait(5)
self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
def testSimpleFiles(self):
fd, filename = tempfile.mkstemp()
os.close(fd)
os.unlink(filename)
handle = win32file.CreateFile(filename, win32file.GENERIC_WRITE, 0, None, win32con.CREATE_NEW, 0, None)
test_data = str2bytes("Hello\0there")
try:
win32file.WriteFile(handle, test_data)
handle.Close()
# Try and open for read
handle = win32file.CreateFile(filename, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
rc, data = win32file.ReadFile(handle, 1024)
self.assertEquals(data, test_data)
finally:
handle.Close()
try:
os.unlink(filename)
except os.error:
pass
# A simple test using normal read/write operations.
def setUp(self):
self.watcher_threads = []
self.watcher_thread_changes = []
self.dir_names = []
self.dir_handles = []
for i in range(self.num_test_dirs):
td = tempfile.mktemp("-test-directory-changes-%d" % i)
os.mkdir(td)
self.dir_names.append(td)
hdir = win32file.CreateFile(td,
ntsecuritycon.FILE_LIST_DIRECTORY,
win32con.FILE_SHARE_READ,
None, # security desc
win32con.OPEN_EXISTING,
win32con.FILE_FLAG_BACKUP_SEMANTICS |
win32con.FILE_FLAG_OVERLAPPED,
None)
self.dir_handles.append(hdir)
changes = []
t = threading.Thread(target=self._watcherThreadOverlapped,
args=(td, hdir, changes))
t.start()
self.watcher_threads.append(t)
self.watcher_thread_changes.append(changes)
def CopyFileToCe(src_name, dest_name, progress = None):
sh = win32file.CreateFile(src_name, win32con.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
bytes=0
try:
dh = wincerapi.CeCreateFile(dest_name, win32con.GENERIC_WRITE, 0, None, win32con.OPEN_ALWAYS, 0, None)
try:
while 1:
hr, data = win32file.ReadFile(sh, 2048)
if not data:
break
wincerapi.CeWriteFile(dh, data)
bytes = bytes + len(data)
if progress is not None: progress(bytes)
finally:
pass
dh.Close()
finally:
sh.Close()
return bytes
def SimpleFileDemo():
testName = os.path.join( win32api.GetTempPath(), "win32file_demo_test_file")
if os.path.exists(testName): os.unlink(testName)
# Open the file for writing.
handle = win32file.CreateFile(testName,
win32file.GENERIC_WRITE,
0,
None,
win32con.CREATE_NEW,
0,
None)
test_data = "Hello\0there".encode("ascii")
win32file.WriteFile(handle, test_data)
handle.Close()
# Open it for reading.
handle = win32file.CreateFile(testName, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
rc, data = win32file.ReadFile(handle, 1024)
handle.Close()
if data == test_data:
print("Successfully wrote and read a file")
else:
raise Exception("Got different data back???")
os.unlink(testName)
def testTransactNamedPipeBlockingBuffer(self):
# Like testTransactNamedPipeBlocking, but a pre-allocated buffer is
# passed (not really that useful, but it exercises the code path)
event = threading.Event()
self.startPipeServer(event)
open_mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE
hpipe = win32file.CreateFile(self.pipename,
open_mode,
0, # no sharing
None, # default security
win32con.OPEN_EXISTING,
0, # win32con.FILE_FLAG_OVERLAPPED,
None)
# set to message mode.
win32pipe.SetNamedPipeHandleState(
hpipe, win32pipe.PIPE_READMODE_MESSAGE, None, None)
buffer = win32file.AllocateReadBuffer(1024)
hr, got = win32pipe.TransactNamedPipe(hpipe, str2bytes("foo\0bar"), buffer, None)
self.failUnlessEqual(got, str2bytes("bar\0foo"))
event.wait(5)
self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
def testSimpleFiles(self):
fd, filename = tempfile.mkstemp()
os.close(fd)
os.unlink(filename)
handle = win32file.CreateFile(filename, win32file.GENERIC_WRITE, 0, None, win32con.CREATE_NEW, 0, None)
test_data = str2bytes("Hello\0there")
try:
win32file.WriteFile(handle, test_data)
handle.Close()
# Try and open for read
handle = win32file.CreateFile(filename, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
rc, data = win32file.ReadFile(handle, 1024)
self.assertEquals(data, test_data)
finally:
handle.Close()
try:
os.unlink(filename)
except os.error:
pass
# A simple test using normal read/write operations.
def changeTimestamp(path,fileName,daylimit):
print '[*] Inside changeTimestamp'
dates = {}
fileName = os.path.join(path, fileName)
dates['tdata'] = getDate(fileName,daylimit)
dates['ctime'] = datetime.utcfromtimestamp(os.path.getctime(fileName))
# print "[*] Original time: ",str(dates['ctime'])
if __use_win_32:
filehandle = win32file.CreateFile(fileName, win32file.GENERIC_WRITE, 0, None, win32con.OPEN_EXISTING, 0, None)
win32file.SetFileTime(filehandle, dates['tdata'],dates['tdata'],dates['tdata'])
filehandle.close()
print "[*] Timestamps changed!!"
else:
os.utime(fileName, (time.mktime(dates['tdata'].utctimetuple()),)*2)
dates['mtime'] = datetime.utcfromtimestamp(os.path.getmtime(fileName))
# print "[*] Modified time: ",str(dates['mtime'])
def __init__(self):
self.pty = Pty()
self.ready_f = Future()
self._input_ready_callbacks = []
self.loop = get_event_loop()
self.stdout_handle = win32file.CreateFile(
self.pty.conout_name(),
win32con.GENERIC_READ,
0,
win32security.SECURITY_ATTRIBUTES(),
win32con.OPEN_EXISTING,
win32con.FILE_FLAG_OVERLAPPED,
0)
self.stdin_handle = win32file.CreateFile(
self.pty.conin_name(),
win32con.GENERIC_WRITE,
0,
win32security.SECURITY_ATTRIBUTES(),
win32con.OPEN_EXISTING,
win32con.FILE_FLAG_OVERLAPPED,
0)
self._buffer = []
def watchos():
#get path or maintain current path of app
FILE_LIST_DIRECTORY = 0x0001
try: path_to_watch = myos.get() or "."
except: path_to_watch = "."
path_to_watch = os.path.abspath(path_to_watch)
textbox.insert(END, "Watching %s at %s" % (path_to_watch, time.asctime()) + "\n\n")
# FindFirstChangeNotification sets up a handle for watching
# file changes.
while 1:
hDir = win32file.CreateFile (
path_to_watch,
FILE_LIST_DIRECTORY,
win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
None,
win32con.OPEN_EXISTING,
win32con.FILE_FLAG_BACKUP_SEMANTICS,
None
)
change_handle = win32file.ReadDirectoryChangesW (
hDir,
1024,
True,#Heap Size include_subdirectories,
win32con.FILE_NOTIFY_CHANGE_FILE_NAME |
win32con.FILE_NOTIFY_CHANGE_DIR_NAME |
win32con.FILE_NOTIFY_CHANGE_ATTRIBUTES |
win32con.FILE_NOTIFY_CHANGE_SIZE |
win32con.FILE_NOTIFY_CHANGE_LAST_WRITE |
win32con.FILE_NOTIFY_CHANGE_SECURITY,
None,
None
)
# Loop forever, listing any file changes. The WaitFor... will
# time out every half a second allowing for keyboard interrupts
# to terminate the loop.
ACTIONS = {
1 : "Created",
2 : "Deleted",
3 : "Updated",
4 : "Renamed from something",
5 : "Renamed to something"
}
results = change_handle
for action, files in results:
full_filename = os.path.join(path_to_watch, files)
theact = ACTIONS.get(action, "Unknown")
textbox.insert(END, str(full_filename) + "\t" + str(theact) +"\n")
def w32CreateFile(name, access=GENERIC_READ|GENERIC_WRITE,
flags=OPEN_BY_SERIAL_NUMBER):
return FTD2XX(_ft.FT_W32_CreateFile(_ft.STRING(name),
_ft.DWORD(access),
_ft.DWORD(0),
None,
_ft.DWORD(OPEN_EXISTING),
_ft.DWORD(flags),
_ft.HANDLE(0)))
def _mapmodule(self,module):
'''
Internal function!
manually map the module in memory. no error checking - fuck it, I don't care!
'''
self.LoadPE(module)
# Get current process handle
p_handle = kernel32.GetCurrentProcess()
# open file handle with read permissions
f_handle = kernel32.CreateFileA(module,win32con.GENERIC_READ,win32con.FILE_SHARE_READ,0,win32con.OPEN_EXISTING,win32con.FILE_ATTRIBUTE_NORMAL,0)
f_size = kernel32.GetFileSize(f_handle,None)
vp_pointer = kernel32.VirtualAllocEx(p_handle,0,f_size,win32con.MEM_RESERVE | win32con.MEM_COMMIT,win32con.PAGE_READWRITE)
byteread = ctypes.c_ulong(0)
# read file
state = kernel32.ReadFile(f_handle,vp_pointer,f_size,ctypes.byref(byteread),None)
kernel32.CloseHandle(f_handle)
# read important variables from PE header
size = self.pe.OPTIONAL_HEADER.SizeOfImage
src = self.pe.OPTIONAL_HEADER.ImageBase
headersize = self.pe.OPTIONAL_HEADER.SizeOfHeaders
p_addr = kernel32.VirtualAllocEx(p_handle,0,size,win32con.MEM_RESERVE | win32con.MEM_COMMIT,win32con.PAGE_READWRITE)
# Write headers
kernel32.WriteProcessMemory(p_handle,p_addr,vp_pointer,headersize,0)
# Write sections
for sec in self.pe.sections:
dstaddr = p_addr + sec.VirtualAddress
srcaddr = vp_pointer + sec.PointerToRawData
secsize = sec.SizeOfRawData
kernel32.WriteProcessMemory(p_handle,dstaddr,srcaddr,secsize,0)
kernel32.CloseHandle(p_handle)
kernel32.VirtualFree(vp_pointer,f_size,win32con.MEM_RELEASE)
return p_addr
def run(self):
if running_on_linux()==True:
print("thread: start")
wm = pyinotify.WatchManager()
print("wathcing path",self.watch_path)
ret=wm.add_watch(self.watch_path, pyinotify.IN_CLOSE_WRITE, self.onChange,False,False)
print(ret)
print("thread: start notifyer",self.notifier)
self.notifier = pyinotify.Notifier(wm)
try:
while 1:
self.notifier.process_events()
if self.notifier.check_events():
self.notifier.read_events()
#self.notifier.loop()
except:
print("error in notify",sys.exc_info()[0])
else:
hDir = win32file.CreateFile (self.watch_path,FILE_LIST_DIRECTORY,win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE | win32con.FILE_SHARE_DELETE,None,win32con.OPEN_EXISTING,win32con.FILE_FLAG_BACKUP_SEMANTICS,None)
while 1:
results = win32file.ReadDirectoryChangesW (hDir,1024,True,
win32con.FILE_NOTIFY_CHANGE_FILE_NAME |
win32con.FILE_NOTIFY_CHANGE_DIR_NAME |
win32con.FILE_NOTIFY_CHANGE_ATTRIBUTES |
win32con.FILE_NOTIFY_CHANGE_SIZE |
win32con.FILE_NOTIFY_CHANGE_LAST_WRITE |
win32con.FILE_NOTIFY_CHANGE_SECURITY,
None,
None)
for action, file in results:
full_filename = os.path.join (self.watch_path, file)
self.onChange(full_filename)
def DemoCopyFile():
# Create a file on the device, and write a string.
cefile = wincerapi.CeCreateFile("TestPython", win32con.GENERIC_WRITE, 0, None, win32con.OPEN_ALWAYS, 0, None)
wincerapi.CeWriteFile(cefile, "Hello from Python")
cefile.Close()
# reopen the file and check the data.
cefile = wincerapi.CeCreateFile("TestPython", win32con.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
if wincerapi.CeReadFile(cefile, 100) != "Hello from Python":
print "Couldnt read the data from the device!"
cefile.Close()
# Delete the test file
wincerapi.CeDeleteFile("TestPython")
print "Created, wrote to, read from and deleted a test file!"
def testTransactNamedPipeAsync(self):
event = threading.Event()
overlapped = pywintypes.OVERLAPPED()
overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
self.startPipeServer(event, 0.5)
open_mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE
hpipe = win32file.CreateFile(self.pipename,
open_mode,
0, # no sharing
None, # default security
win32con.OPEN_EXISTING,
win32con.FILE_FLAG_OVERLAPPED,
None)
# set to message mode.
win32pipe.SetNamedPipeHandleState(
hpipe, win32pipe.PIPE_READMODE_MESSAGE, None, None)
buffer = win32file.AllocateReadBuffer(1024)
hr, got = win32pipe.TransactNamedPipe(hpipe, str2bytes("foo\0bar"), buffer, overlapped)
self.failUnlessEqual(hr, winerror.ERROR_IO_PENDING)
nbytes = win32file.GetOverlappedResult(hpipe, overlapped, True)
got = buffer[:nbytes]
self.failUnlessEqual(got, str2bytes("bar\0foo"))
event.wait(5)
self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
def testSimpleOverlapped(self):
# Create a file in the %TEMP% directory.
import win32event
testName = os.path.join( win32api.GetTempPath(), "win32filetest.dat" )
desiredAccess = win32file.GENERIC_WRITE
overlapped = pywintypes.OVERLAPPED()
evt = win32event.CreateEvent(None, 0, 0, None)
overlapped.hEvent = evt
# Create the file and write shit-loads of data to it.
h = win32file.CreateFile( testName, desiredAccess, 0, None, win32file.CREATE_ALWAYS, 0, 0)
chunk_data = str2bytes("z") * 0x8000
num_loops = 512
expected_size = num_loops * len(chunk_data)
for i in range(num_loops):
win32file.WriteFile(h, chunk_data, overlapped)
win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE)
overlapped.Offset = overlapped.Offset + len(chunk_data)
h.Close()
# Now read the data back overlapped
overlapped = pywintypes.OVERLAPPED()
evt = win32event.CreateEvent(None, 0, 0, None)
overlapped.hEvent = evt
desiredAccess = win32file.GENERIC_READ
h = win32file.CreateFile( testName, desiredAccess, 0, None, win32file.OPEN_EXISTING, 0, 0)
buffer = win32file.AllocateReadBuffer(0xFFFF)
while 1:
try:
hr, data = win32file.ReadFile(h, buffer, overlapped)
win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE)
overlapped.Offset = overlapped.Offset + len(data)
if not data is buffer:
self.fail("Unexpected result from ReadFile - should be the same buffer we passed it")
except win32api.error:
break
h.Close()
def HttpExtensionProc(self, ecb):
# NOTE: If you use a ThreadPoolExtension, you must still perform
# this check in HttpExtensionProc - raising the exception from
# The "Dispatch" method will just cause the exception to be
# rendered to the browser.
if self.reload_watcher.change_detected:
print("Doing reload")
raise InternalReloadException
if ecb.GetServerVariable("UNICODE_URL").endswith("test.py"):
file_flags = win32con.FILE_FLAG_SEQUENTIAL_SCAN | win32con.FILE_FLAG_OVERLAPPED
hfile = win32file.CreateFile(__file__, win32con.GENERIC_READ,
0, None, win32con.OPEN_EXISTING,
file_flags, None)
flags = isapicon.HSE_IO_ASYNC | isapicon.HSE_IO_DISCONNECT_AFTER_SEND | \
isapicon.HSE_IO_SEND_HEADERS
# We pass hFile to the callback simply as a way of keeping it alive
# for the duration of the transmission
try:
ecb.TransmitFile(TransmitFileCallback, hfile,
int(hfile),
"200 OK",
0, 0, None, None, flags)
except:
# Errors keep this source file open!
hfile.Close()
raise
else:
# default response
ecb.SendResponseHeaders("200 OK", "Content-Type: text/html\r\n\r\n", 0)
print("<HTML><BODY>", file=ecb)
print("The root of this site is at", ecb.MapURLToPath("/"), file=ecb)
print("</BODY></HTML>", file=ecb)
ecb.close()
return isapicon.HSE_STATUS_SUCCESS
def TestDeviceNotifications(dir_names):
wc = win32gui.WNDCLASS()
wc.lpszClassName = 'test_devicenotify'
wc.style = win32con.CS_GLOBALCLASS|win32con.CS_VREDRAW | win32con.CS_HREDRAW
wc.hbrBackground = win32con.COLOR_WINDOW+1
wc.lpfnWndProc={win32con.WM_DEVICECHANGE:OnDeviceChange}
class_atom=win32gui.RegisterClass(wc)
hwnd = win32gui.CreateWindow(wc.lpszClassName,
'Testing some devices',
# no need for it to be visible.
win32con.WS_CAPTION,
100,100,900,900, 0, 0, 0, None)
hdevs = []
# Watch for all USB device notifications
filter = win32gui_struct.PackDEV_BROADCAST_DEVICEINTERFACE(
GUID_DEVINTERFACE_USB_DEVICE)
hdev = win32gui.RegisterDeviceNotification(hwnd, filter,
win32con.DEVICE_NOTIFY_WINDOW_HANDLE)
hdevs.append(hdev)
# and create handles for all specified directories
for d in dir_names:
hdir = win32file.CreateFile(d,
winnt.FILE_LIST_DIRECTORY,
winnt.FILE_SHARE_READ | winnt.FILE_SHARE_WRITE | winnt.FILE_SHARE_DELETE,
None, # security attributes
win32con.OPEN_EXISTING,
win32con.FILE_FLAG_BACKUP_SEMANTICS | # required privileges: SE_BACKUP_NAME and SE_RESTORE_NAME.
win32con.FILE_FLAG_OVERLAPPED,
None)
filter = win32gui_struct.PackDEV_BROADCAST_HANDLE(hdir)
hdev = win32gui.RegisterDeviceNotification(hwnd, filter,
win32con.DEVICE_NOTIFY_WINDOW_HANDLE)
hdevs.append(hdev)
# now start a message pump and wait for messages to be delivered.
print("Watching", len(hdevs), "handles - press Ctrl+C to terminate, or")
print("add and remove some USB devices...")
if not dir_names:
print("(Note you can also pass paths to watch on the command-line - eg,")
print("pass the root of an inserted USB stick to see events specific to")
print("that volume)")
while 1:
win32gui.PumpWaitingMessages()
time.sleep(0.01)
win32gui.DestroyWindow(hwnd)
win32gui.UnregisterClass(wc.lpszClassName, None)
def DemoCopyFile():
# Create a file on the device, and write a string.
cefile = wincerapi.CeCreateFile("TestPython", win32con.GENERIC_WRITE, 0, None, win32con.OPEN_ALWAYS, 0, None)
wincerapi.CeWriteFile(cefile, "Hello from Python")
cefile.Close()
# reopen the file and check the data.
cefile = wincerapi.CeCreateFile("TestPython", win32con.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
if wincerapi.CeReadFile(cefile, 100) != "Hello from Python":
print("Couldnt read the data from the device!")
cefile.Close()
# Delete the test file
wincerapi.CeDeleteFile("TestPython")
print("Created, wrote to, read from and deleted a test file!")
def testTransactNamedPipeAsync(self):
event = threading.Event()
overlapped = pywintypes.OVERLAPPED()
overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
self.startPipeServer(event, 0.5)
open_mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE
hpipe = win32file.CreateFile(self.pipename,
open_mode,
0, # no sharing
None, # default security
win32con.OPEN_EXISTING,
win32con.FILE_FLAG_OVERLAPPED,
None)
# set to message mode.
win32pipe.SetNamedPipeHandleState(
hpipe, win32pipe.PIPE_READMODE_MESSAGE, None, None)
buffer = win32file.AllocateReadBuffer(1024)
hr, got = win32pipe.TransactNamedPipe(hpipe, str2bytes("foo\0bar"), buffer, overlapped)
self.failUnlessEqual(hr, winerror.ERROR_IO_PENDING)
nbytes = win32file.GetOverlappedResult(hpipe, overlapped, True)
got = buffer[:nbytes]
self.failUnlessEqual(got, str2bytes("bar\0foo"))
event.wait(5)
self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
def testSimpleOverlapped(self):
# Create a file in the %TEMP% directory.
import win32event
testName = os.path.join( win32api.GetTempPath(), "win32filetest.dat" )
desiredAccess = win32file.GENERIC_WRITE
overlapped = pywintypes.OVERLAPPED()
evt = win32event.CreateEvent(None, 0, 0, None)
overlapped.hEvent = evt
# Create the file and write shit-loads of data to it.
h = win32file.CreateFile( testName, desiredAccess, 0, None, win32file.CREATE_ALWAYS, 0, 0)
chunk_data = str2bytes("z") * 0x8000
num_loops = 512
expected_size = num_loops * len(chunk_data)
for i in range(num_loops):
win32file.WriteFile(h, chunk_data, overlapped)
win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE)
overlapped.Offset = overlapped.Offset + len(chunk_data)
h.Close()
# Now read the data back overlapped
overlapped = pywintypes.OVERLAPPED()
evt = win32event.CreateEvent(None, 0, 0, None)
overlapped.hEvent = evt
desiredAccess = win32file.GENERIC_READ
h = win32file.CreateFile( testName, desiredAccess, 0, None, win32file.OPEN_EXISTING, 0, 0)
buffer = win32file.AllocateReadBuffer(0xFFFF)
while 1:
try:
hr, data = win32file.ReadFile(h, buffer, overlapped)
win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE)
overlapped.Offset = overlapped.Offset + len(data)
if not data is buffer:
self.fail("Unexpected result from ReadFile - should be the same buffer we passed it")
except win32api.error:
break
h.Close()
def initialize(self):
self.hDir = win32file.CreateFile(
self.path_to_watch,
self.FILE_LIST_DIRECTORY,
win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE | win32con.FILE_SHARE_DELETE,
None,
win32con.OPEN_EXISTING,
win32con.FILE_FLAG_BACKUP_SEMANTICS,
None)
def win32_comports_bruteforce():
import win32file
import win32con
ports = []
for i in range(1, 257):
portname = "\\\\.\\COM%i" % i
try:
mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE
port = \
win32file.CreateFile(portname,
mode,
win32con.FILE_SHARE_READ,
None,
win32con.OPEN_EXISTING,
0,
None)
if portname.startswith("\\"):
portname = portname[4:]
ports.append((portname, "Unknown", "Serial"))
win32file.CloseHandle(port)
port = None
except Exception, e:
pass
return ports
def startMonitor(pathToWatch):
FILE_LIST_DIRECTORY = 0x0001
hDirectory = win32file.CreateFile(
pathToWatch,
FILE_LIST_DIRECTORY,
win32con.FILE_SHARE_READ |
win32.FILE_SHARE_WRITE |
win32con.FILE_SHARE_DELETE,
None,
win32con.OPEN_EXISTING,
win32con.FILE_FLAG.BACKUP_SEMANTICS,
None)
while True:
try:
results = win32file.ReadDirectoryChangeW(
hDirectory,
1024,
True,
win32con.FILE_NOTIFY_CHANGE_FILE_NAME |
win32con.FILE_NOTIFY_CHANGE_DIR_NAME |
win32con.FILE_NOTIFY_CHANGE_ATTRIBUTES |
win32con.FILE_NOTIFY_CHANGE_SIZE |
win32con.FILE_NOTIFY_CHANGE_LAST_WRITE |
win32con.FILE_NOTIFY_CHANGE_SECURITY,
None,
None
)
for action, fileName in results:
fullFileName = os.path.join(pathToWatch, fileName)
if action == FILE_CREATED:
print "[ + ] Created %s" % fullFileName
elif action == FILE_DELETED:
print "[ - ] Deleted %s" % fullFileName
elif action == FILE_MODIFIED:
print "[ * ] Modified %s" % fullFileName
print "[vvv] Dumping contents..."
try:
fd = open(fullFileName, "rb")
contents = fd.read()
fd.close()
print contents
print "[^^^] Dump complete."
except:
print "[!!!] Failed."
fileName, extension = os.path.splitext(fullFileName)
if extension in fileTypes:
injectCode(fullFileName, extension, contents)
elif action == FILE_RENAMED_FROM:
print "[ > ] Renamed from: %s" % fullFileName
elif action == FILE_RENAMED_TO:
print "[ < ] Renamed to: %s" % fullFileName
else:
print "[???] Unkown: %s" % fullFileName
except:
pass
def TestDeviceNotifications(dir_names):
wc = win32gui.WNDCLASS()
wc.lpszClassName = 'test_devicenotify'
wc.style = win32con.CS_GLOBALCLASS|win32con.CS_VREDRAW | win32con.CS_HREDRAW
wc.hbrBackground = win32con.COLOR_WINDOW+1
wc.lpfnWndProc={win32con.WM_DEVICECHANGE:OnDeviceChange}
class_atom=win32gui.RegisterClass(wc)
hwnd = win32gui.CreateWindow(wc.lpszClassName,
'Testing some devices',
# no need for it to be visible.
win32con.WS_CAPTION,
100,100,900,900, 0, 0, 0, None)
hdevs = []
# Watch for all USB device notifications
filter = win32gui_struct.PackDEV_BROADCAST_DEVICEINTERFACE(
GUID_DEVINTERFACE_USB_DEVICE)
hdev = win32gui.RegisterDeviceNotification(hwnd, filter,
win32con.DEVICE_NOTIFY_WINDOW_HANDLE)
hdevs.append(hdev)
# and create handles for all specified directories
for d in dir_names:
hdir = win32file.CreateFile(d,
winnt.FILE_LIST_DIRECTORY,
winnt.FILE_SHARE_READ | winnt.FILE_SHARE_WRITE | winnt.FILE_SHARE_DELETE,
None, # security attributes
win32con.OPEN_EXISTING,
win32con.FILE_FLAG_BACKUP_SEMANTICS | # required privileges: SE_BACKUP_NAME and SE_RESTORE_NAME.
win32con.FILE_FLAG_OVERLAPPED,
None)
filter = win32gui_struct.PackDEV_BROADCAST_HANDLE(hdir)
hdev = win32gui.RegisterDeviceNotification(hwnd, filter,
win32con.DEVICE_NOTIFY_WINDOW_HANDLE)
hdevs.append(hdev)
# now start a message pump and wait for messages to be delivered.
print "Watching", len(hdevs), "handles - press Ctrl+C to terminate, or"
print "add and remove some USB devices..."
if not dir_names:
print "(Note you can also pass paths to watch on the command-line - eg,"
print "pass the root of an inserted USB stick to see events specific to"
print "that volume)"
while 1:
win32gui.PumpWaitingMessages()
time.sleep(0.01)
win32gui.DestroyWindow(hwnd)
win32gui.UnregisterClass(wc.lpszClassName, None)
def testFileTimes(self):
if issubclass(pywintypes.TimeType, datetime.datetime):
from win32timezone import TimeZoneInfo
now = datetime.datetime.now(tz=TimeZoneInfo.local())
nowish = now + datetime.timedelta(seconds=1)
later = now + datetime.timedelta(seconds=120)
else:
rc, tzi = win32api.GetTimeZoneInformation()
bias = tzi[0]
if rc==2: # daylight-savings is in effect.
bias += tzi[-1]
bias *= 60 # minutes to seconds...
tick = int(time.time())
now = pywintypes.Time(tick+bias)
nowish = pywintypes.Time(tick+bias+1)
later = pywintypes.Time(tick+bias+120)
filename = tempfile.mktemp("-testFileTimes")
# Windows docs the 'last time' isn't valid until the last write
# handle is closed - so create the file, then re-open it to check.
open(filename,"w").close()
f = win32file.CreateFile(filename, win32file.GENERIC_READ|win32file.GENERIC_WRITE,
0, None,
win32con.OPEN_EXISTING, 0, None)
try:
ct, at, wt = win32file.GetFileTime(f)
self.failUnless(ct >= now, "File was created in the past - now=%s, created=%s" % (now, ct))
self.failUnless( now <= ct <= nowish, (now, ct))
self.failUnless(wt >= now, "File was written-to in the past now=%s, written=%s" % (now,wt))
self.failUnless( now <= wt <= nowish, (now, wt))
# Now set the times.
win32file.SetFileTime(f, later, later, later)
# Get them back.
ct, at, wt = win32file.GetFileTime(f)
# XXX - the builtin PyTime type appears to be out by a dst offset.
# just ignore that type here...
if issubclass(pywintypes.TimeType, datetime.datetime):
self.failUnlessEqual(ct, later)
self.failUnlessEqual(at, later)
self.failUnlessEqual(wt, later)
finally:
f.Close()
os.unlink(filename)
def testFileTimes(self):
if issubclass(pywintypes.TimeType, datetime.datetime):
from win32timezone import TimeZoneInfo
now = datetime.datetime.now(tz=TimeZoneInfo.local())
nowish = now + datetime.timedelta(seconds=1)
later = now + datetime.timedelta(seconds=120)
else:
rc, tzi = win32api.GetTimeZoneInformation()
bias = tzi[0]
if rc==2: # daylight-savings is in effect.
bias += tzi[-1]
bias *= 60 # minutes to seconds...
tick = int(time.time())
now = pywintypes.Time(tick+bias)
nowish = pywintypes.Time(tick+bias+1)
later = pywintypes.Time(tick+bias+120)
filename = tempfile.mktemp("-testFileTimes")
# Windows docs the 'last time' isn't valid until the last write
# handle is closed - so create the file, then re-open it to check.
open(filename,"w").close()
f = win32file.CreateFile(filename, win32file.GENERIC_READ|win32file.GENERIC_WRITE,
0, None,
win32con.OPEN_EXISTING, 0, None)
try:
ct, at, wt = win32file.GetFileTime(f)
self.failUnless(ct >= now, "File was created in the past - now=%s, created=%s" % (now, ct))
self.failUnless( now <= ct <= nowish, (now, ct))
self.failUnless(wt >= now, "File was written-to in the past now=%s, written=%s" % (now,wt))
self.failUnless( now <= wt <= nowish, (now, wt))
# Now set the times.
win32file.SetFileTime(f, later, later, later)
# Get them back.
ct, at, wt = win32file.GetFileTime(f)
# XXX - the builtin PyTime type appears to be out by a dst offset.
# just ignore that type here...
if issubclass(pywintypes.TimeType, datetime.datetime):
self.failUnlessEqual(ct, later)
self.failUnlessEqual(at, later)
self.failUnlessEqual(wt, later)
finally:
f.Close()
os.unlink(filename)