python类CreateFile()的实例源码

test_win32pipe.py 文件源码 项目:CodeReader 作者: jasonrbr 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def testTransactNamedPipeAsync(self):
        event = threading.Event()
        overlapped = pywintypes.OVERLAPPED()
        overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
        self.startPipeServer(event, 0.5)
        open_mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE

        hpipe = win32file.CreateFile(self.pipename,
                                     open_mode,
                                     0, # no sharing
                                     None, # default security
                                     win32con.OPEN_EXISTING,
                                     win32con.FILE_FLAG_OVERLAPPED,
                                     None)

        # set to message mode.
        win32pipe.SetNamedPipeHandleState(
                        hpipe, win32pipe.PIPE_READMODE_MESSAGE, None, None)

        buffer = win32file.AllocateReadBuffer(1024)
        hr, got = win32pipe.TransactNamedPipe(hpipe, str2bytes("foo\0bar"), buffer, overlapped)
        self.failUnlessEqual(hr, winerror.ERROR_IO_PENDING)
        nbytes = win32file.GetOverlappedResult(hpipe, overlapped, True)
        got = buffer[:nbytes]
        self.failUnlessEqual(got, str2bytes("bar\0foo"))
        event.wait(5)
        self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
test_win32file.py 文件源码 项目:CodeReader 作者: jasonrbr 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def testMoreFiles(self):
        # Create a file in the %TEMP% directory.
        testName = os.path.join( win32api.GetTempPath(), "win32filetest.dat" )
        desiredAccess = win32file.GENERIC_READ | win32file.GENERIC_WRITE
        # Set a flag to delete the file automatically when it is closed.
        fileFlags = win32file.FILE_FLAG_DELETE_ON_CLOSE
        h = win32file.CreateFile( testName, desiredAccess, win32file.FILE_SHARE_READ, None, win32file.CREATE_ALWAYS, fileFlags, 0)

        # Write a known number of bytes to the file.
        data = str2bytes("z") * 1025

        win32file.WriteFile(h, data)

        self.failUnless(win32file.GetFileSize(h) == len(data), "WARNING: Written file does not have the same size as the length of the data in it!")

        # Ensure we can read the data back.
        win32file.SetFilePointer(h, 0, win32file.FILE_BEGIN)
        hr, read_data = win32file.ReadFile(h, len(data)+10) # + 10 to get anything extra
        self.failUnless(hr==0, "Readfile returned %d" % hr)

        self.failUnless(read_data == data, "Read data is not what we wrote!")

        # Now truncate the file at 1/2 its existing size.
        newSize = len(data)//2
        win32file.SetFilePointer(h, newSize, win32file.FILE_BEGIN)
        win32file.SetEndOfFile(h)
        self.failUnlessEqual(win32file.GetFileSize(h), newSize)

        # GetFileAttributesEx/GetFileAttributesExW tests.
        self.failUnlessEqual(win32file.GetFileAttributesEx(testName), win32file.GetFileAttributesExW(testName))

        attr, ct, at, wt, size = win32file.GetFileAttributesEx(testName)
        self.failUnless(size==newSize, 
                        "Expected GetFileAttributesEx to return the same size as GetFileSize()")
        self.failUnless(attr==win32file.GetFileAttributes(testName), 
                        "Expected GetFileAttributesEx to return the same attributes as GetFileAttributes")

        h = None # Close the file by removing the last reference to the handle!

        self.failUnless(not os.path.isfile(testName), "After closing the file, it still exists!")
test_win32file.py 文件源码 项目:CodeReader 作者: jasonrbr 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def testFilePointer(self):
        # via [ 979270 ] SetFilePointer fails with negative offset

        # Create a file in the %TEMP% directory.
        filename = os.path.join( win32api.GetTempPath(), "win32filetest.dat" )

        f = win32file.CreateFile(filename,
                                win32file.GENERIC_READ|win32file.GENERIC_WRITE,
                                0,
                                None,
                                win32file.CREATE_ALWAYS,
                                win32file.FILE_ATTRIBUTE_NORMAL,
                                0)
        try:
            #Write some data
            data = str2bytes('Some data')
            (res, written) = win32file.WriteFile(f, data)

            self.failIf(res)
            self.assertEqual(written, len(data))

            #Move at the beginning and read the data
            win32file.SetFilePointer(f, 0, win32file.FILE_BEGIN)
            (res, s) = win32file.ReadFile(f, len(data))

            self.failIf(res)
            self.assertEqual(s, data)

            #Move at the end and read the data
            win32file.SetFilePointer(f, -len(data), win32file.FILE_END)
            (res, s) = win32file.ReadFile(f, len(data))

            self.failIf(res)
            self.failUnlessEqual(s, data)
        finally:
            f.Close()
            os.unlink(filename)
test_win32file.py 文件源码 项目:CodeReader 作者: jasonrbr 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def testFileTimesTimezones(self):
        if not issubclass(pywintypes.TimeType, datetime.datetime):
            # maybe should report 'skipped', but that's not quite right as
            # there is nothing you can do to avoid it being skipped!
            return
        filename = tempfile.mktemp("-testFileTimes")
        now_utc = win32timezone.utcnow()
        now_local = now_utc.astimezone(win32timezone.TimeZoneInfo.local())
        h = win32file.CreateFile(filename,
                                 win32file.GENERIC_READ|win32file.GENERIC_WRITE,
                                 0, None, win32file.CREATE_ALWAYS, 0, 0)
        try:
            win32file.SetFileTime(h, now_utc, now_utc, now_utc)
            ct, at, wt = win32file.GetFileTime(h)
            self.failUnlessEqual(now_local, ct)
            self.failUnlessEqual(now_local, at)
            self.failUnlessEqual(now_local, wt)
            # and the reverse - set local, check against utc
            win32file.SetFileTime(h, now_local, now_local, now_local)
            ct, at, wt = win32file.GetFileTime(h)
            self.failUnlessEqual(now_utc, ct)
            self.failUnlessEqual(now_utc, at)
            self.failUnlessEqual(now_utc, wt)
        finally:
            h.close()
            os.unlink(filename)
test_win32file.py 文件源码 项目:CodeReader 作者: jasonrbr 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def testSimpleOverlapped(self):
        # Create a file in the %TEMP% directory.
        import win32event
        testName = os.path.join( win32api.GetTempPath(), "win32filetest.dat" )
        desiredAccess = win32file.GENERIC_WRITE
        overlapped = pywintypes.OVERLAPPED()
        evt = win32event.CreateEvent(None, 0, 0, None)
        overlapped.hEvent = evt
        # Create the file and write shit-loads of data to it.
        h = win32file.CreateFile( testName, desiredAccess, 0, None, win32file.CREATE_ALWAYS, 0, 0)
        chunk_data = str2bytes("z") * 0x8000
        num_loops = 512
        expected_size = num_loops * len(chunk_data)
        for i in range(num_loops):
            win32file.WriteFile(h, chunk_data, overlapped)
            win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE)
            overlapped.Offset = overlapped.Offset + len(chunk_data)
        h.Close()
        # Now read the data back overlapped
        overlapped = pywintypes.OVERLAPPED()
        evt = win32event.CreateEvent(None, 0, 0, None)
        overlapped.hEvent = evt
        desiredAccess = win32file.GENERIC_READ
        h = win32file.CreateFile( testName, desiredAccess, 0, None, win32file.OPEN_EXISTING, 0, 0)
        buffer = win32file.AllocateReadBuffer(0xFFFF)
        while 1:
            try:
                hr, data = win32file.ReadFile(h, buffer, overlapped)
                win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE)
                overlapped.Offset = overlapped.Offset + len(data)
                if not data is buffer:
                    self.fail("Unexpected result from ReadFile - should be the same buffer we passed it")
            except win32api.error:
                break
        h.Close()
watcherlib.py 文件源码 项目:SDV-Summary 作者: Sketchy502 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def initialize(self):
        self.hDir = win32file.CreateFile(
            self.path_to_watch,
            self.FILE_LIST_DIRECTORY,
            win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE | win32con.FILE_SHARE_DELETE,
            None,
            win32con.OPEN_EXISTING,
            win32con.FILE_FLAG_BACKUP_SEMANTICS,
            None)
win32popen.py 文件源码 项目:viewvc 作者: viewvc 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def NullFile(inheritable):
  """Create a null file handle."""
  if inheritable:
    sa = pywintypes.SECURITY_ATTRIBUTES()
    sa.bInheritHandle = 1
  else:
    sa = None

  return win32file.CreateFile("nul",
    win32file.GENERIC_READ | win32file.GENERIC_WRITE,
    win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE,
    sa, win32file.OPEN_EXISTING, 0, None)
fileutils.py 文件源码 项目:xpybuild 作者: xpybuild 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __enter__(self):
                self.Fd = win32file.CreateFile(self.dest, win32file.GENERIC_WRITE, 
                    win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE  | win32file.FILE_SHARE_DELETE, 
                    None, win32file.CREATE_ALWAYS, win32file.FILE_ATTRIBUTE_NORMAL, None)
                return self
platform.py 文件源码 项目:chirp_fork 作者: mach327 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def win32_comports_bruteforce():
    import win32file
    import win32con

    ports = []
    for i in range(1, 257):
        portname = "\\\\.\\COM%i" % i
        try:
            mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE
            port = \
                win32file.CreateFile(portname,
                                     mode,
                                     win32con.FILE_SHARE_READ,
                                     None,
                                     win32con.OPEN_EXISTING,
                                     0,
                                     None)
            if portname.startswith("\\"):
                portname = portname[4:]
            ports.append((portname, "Unknown", "Serial"))
            win32file.CloseHandle(port)
            port = None
        except Exception, e:
            pass

    return ports
mpv.py 文件源码 项目:mpv-trakt-sync-daemon 作者: StareInTheAir 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def run(self):
        import win32file
        self.file_handle = win32file.CreateFile(self.named_pipe_path,
                                                win32file.GENERIC_READ | win32file.GENERIC_WRITE,
                                                0, None,
                                                win32file.OPEN_EXISTING,
                                                0, None)

        log.info('Windows named pipe connected')
        self.fire_connected()

        while True:
            # The following code is cleaner, than waiting for an exception while writing to detect pipe closing,
            # but causes mpv to hang and crash while closing when closed at the wrong time.

            # if win32file.GetFileAttributes(self.named_pipe_path) != win32file.FILE_ATTRIBUTE_NORMAL:
            #     # pipe was closed
            #     break

            try:
                while not self.write_queue.empty():
                    win32file.WriteFile(self.file_handle, self.write_queue.get_nowait())
            except win32file.error:
                log.warning('Exception while writing to Windows named pipe. Assuming pipe closed.')
                break

            size = win32file.GetFileSize(self.file_handle)
            if size > 0:
                while size > 0:
                    # pipe has data to read
                    data = win32file.ReadFile(self.file_handle, 512)
                    self.on_data(data[1])
                    size = win32file.GetFileSize(self.file_handle)
            else:
                time.sleep(1)

        log.info('Windows named pipe closed')
        win32file.CloseHandle(self.file_handle)
        self.file_handle = None

        self.fire_disconnected()
fileMonitor.py 文件源码 项目:PyHack 作者: lanxia 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def startMonitor(pathToWatch):
    FILE_LIST_DIRECTORY = 0x0001

    hDirectory = win32file.CreateFile(
        pathToWatch,
        FILE_LIST_DIRECTORY,
        win32con.FILE_SHARE_READ |
        win32.FILE_SHARE_WRITE |
        win32con.FILE_SHARE_DELETE,
        None,
        win32con.OPEN_EXISTING,
        win32con.FILE_FLAG.BACKUP_SEMANTICS,
        None)

    while True:
        try:
            results = win32file.ReadDirectoryChangeW(
                hDirectory,
                1024,
                True,
                win32con.FILE_NOTIFY_CHANGE_FILE_NAME |
                win32con.FILE_NOTIFY_CHANGE_DIR_NAME |
                win32con.FILE_NOTIFY_CHANGE_ATTRIBUTES |
                win32con.FILE_NOTIFY_CHANGE_SIZE |
                win32con.FILE_NOTIFY_CHANGE_LAST_WRITE |
                win32con.FILE_NOTIFY_CHANGE_SECURITY,
                None,
                None
            )

            for action, fileName in results:
                fullFileName = os.path.join(pathToWatch, fileName)
                if action == FILE_CREATED:
                    print "[ + ] Created %s" % fullFileName
                elif action == FILE_DELETED:
                    print "[ - ] Deleted %s" % fullFileName
                elif action == FILE_MODIFIED:
                    print "[ * ] Modified %s" % fullFileName
                    print "[vvv] Dumping contents..."
                    try:
                        fd = open(fullFileName, "rb")
                        contents = fd.read()
                        fd.close()
                        print contents
                        print "[^^^] Dump complete."
                    except:
                        print "[!!!] Failed."

                    fileName, extension = os.path.splitext(fullFileName)

                    if extension in fileTypes:
                        injectCode(fullFileName, extension, contents)

                    elif action == FILE_RENAMED_FROM:
                        print "[ > ] Renamed from: %s" % fullFileName
                    elif action == FILE_RENAMED_TO:
                        print "[ < ] Renamed to: %s" % fullFileName
                    else:
                        print "[???] Unkown: %s" % fullFileName
        except:
            pass
pseudo_ransomware.py 文件源码 项目:threat-research-tools 作者: carbonblack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def write_mbr_winapi( _file ):

    print 'Are you SURE you want to overwrite the MBR?? This will possibly make the volume unbootable.'
    response = raw_input( 'Type \"YES\" then Return to continue, anything else then Return to not continue:' )

    if response != 'YES':
        return

    h       = None
    handles = []

    try:

        for x in range( num_mbr_handles ):

            h = win32file.CreateFile( '\\\\.\\PhysicalDrive0',
                    win32con.GENERIC_WRITE,
                    win32file.FILE_SHARE_WRITE,
                    None,
                    win32file.OPEN_EXISTING,
                    win32file.FILE_ATTRIBUTE_NORMAL,
                    None )

            if ( h != win32file.INVALID_HANDLE_VALUE ):
                handles.append( h )

        f = open( _file, 'rb' )

        if f <> None:

            fsize = os.path.getsize( _file )
            wsize = 512

            if fsize > 512:
                print 'WARNING: File being written is > 512 bytes, will only write 512...'
                wsize = 512

            contents = f.read( fsize )

            if fsize < 512:

                print 'WARNING: Padding file up to 512 bytes, may not have expected results...'

                ## pad it out to 512 bytes
                diff = 512 - 512

                for num in xrange( diff ):
                    contents += 'A'

            win32file.WriteFile( h, contents, None )

            f.close()

    except Exception, e:
        print str( e )
        print '\tAre you running as Administrator?'

    for handle in handles:
        win32file.CloseHandle( handle )

#############
winpexpect.py 文件源码 项目:winpexpect 作者: geertj 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _stub(cmd_name, stdin_name, stdout_name, stderr_name):
    """INTERNAL: Stub process that will start up the child process."""
    # Open the 4 pipes (command, stdin, stdout, stderr)
    cmd_pipe = CreateFile(cmd_name, GENERIC_READ|GENERIC_WRITE, 0, None,
                          OPEN_EXISTING, 0, None)
    SetHandleInformation(cmd_pipe, HANDLE_FLAG_INHERIT, 1)
    stdin_pipe = CreateFile(stdin_name, GENERIC_READ, 0, None,
                            OPEN_EXISTING, 0, None)
    SetHandleInformation(stdin_pipe, HANDLE_FLAG_INHERIT, 1)
    stdout_pipe = CreateFile(stdout_name, GENERIC_WRITE, 0, None,
                             OPEN_EXISTING, 0, None)
    SetHandleInformation(stdout_pipe, HANDLE_FLAG_INHERIT, 1)
    stderr_pipe = CreateFile(stderr_name, GENERIC_WRITE, 0, None,
                             OPEN_EXISTING, 0, None)
    SetHandleInformation(stderr_pipe, HANDLE_FLAG_INHERIT, 1)

    # Learn what we need to do..
    header = _read_header(cmd_pipe)
    input = _parse_header(header)
    if 'command' not in input or 'args' not in input:
        ExitProcess(2)

    # http://msdn.microsoft.com/en-us/library/ms682499(VS.85).aspx
    startupinfo = STARTUPINFO()
    startupinfo.dwFlags |= STARTF_USESTDHANDLES | STARTF_USESHOWWINDOW
    startupinfo.hStdInput = stdin_pipe
    startupinfo.hStdOutput = stdout_pipe
    startupinfo.hStdError = stderr_pipe
    startupinfo.wShowWindow = SW_HIDE

    # Grant access so that our parent can open its grandchild.
    if 'parent_sid' in input:
        mysid = _get_current_sid()
        parent = ConvertStringSidToSid(input['parent_sid'])
        sattrs = _create_security_attributes(mysid, parent,
                                             access=PROCESS_ALL_ACCESS)
    else:
        sattrs = None

    try:
        res = CreateProcess(input['command'], input['args'], sattrs, None,
                            True, CREATE_NEW_CONSOLE, os.environ, os.getcwd(),
                            startupinfo)
    except WindowsError, e:
        message = _quote_header(str(e))
        WriteFile(cmd_pipe, 'status=error\nmessage=%s\n\n' % message)
        ExitProcess(3)
    else:
        pid = res[2]

    # Pass back results and exit
    err, nbytes = WriteFile(cmd_pipe, 'status=ok\npid=%s\n\n' % pid)
    ExitProcess(0)
win32gui_devicenotify.py 文件源码 项目:remoteControlPPT 作者: htwenning 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def TestDeviceNotifications(dir_names):
    wc = win32gui.WNDCLASS()
    wc.lpszClassName = 'test_devicenotify'
    wc.style =  win32con.CS_GLOBALCLASS|win32con.CS_VREDRAW | win32con.CS_HREDRAW
    wc.hbrBackground = win32con.COLOR_WINDOW+1
    wc.lpfnWndProc={win32con.WM_DEVICECHANGE:OnDeviceChange}
    class_atom=win32gui.RegisterClass(wc)
    hwnd = win32gui.CreateWindow(wc.lpszClassName,
        'Testing some devices',
        # no need for it to be visible.
        win32con.WS_CAPTION,
        100,100,900,900, 0, 0, 0, None)

    hdevs = []
    # Watch for all USB device notifications
    filter = win32gui_struct.PackDEV_BROADCAST_DEVICEINTERFACE(
                                        GUID_DEVINTERFACE_USB_DEVICE)
    hdev = win32gui.RegisterDeviceNotification(hwnd, filter,
                                               win32con.DEVICE_NOTIFY_WINDOW_HANDLE)
    hdevs.append(hdev)
    # and create handles for all specified directories
    for d in dir_names:
        hdir = win32file.CreateFile(d, 
                                    winnt.FILE_LIST_DIRECTORY, 
                                    winnt.FILE_SHARE_READ | winnt.FILE_SHARE_WRITE | winnt.FILE_SHARE_DELETE,
                                    None, # security attributes
                                    win32con.OPEN_EXISTING,
                                    win32con.FILE_FLAG_BACKUP_SEMANTICS | # required privileges: SE_BACKUP_NAME and SE_RESTORE_NAME.
                                    win32con.FILE_FLAG_OVERLAPPED,
                                    None)

        filter = win32gui_struct.PackDEV_BROADCAST_HANDLE(hdir)
        hdev = win32gui.RegisterDeviceNotification(hwnd, filter,
                                          win32con.DEVICE_NOTIFY_WINDOW_HANDLE)
        hdevs.append(hdev)

    # now start a message pump and wait for messages to be delivered.
    print "Watching", len(hdevs), "handles - press Ctrl+C to terminate, or"
    print "add and remove some USB devices..."
    if not dir_names:
        print "(Note you can also pass paths to watch on the command-line - eg,"
        print "pass the root of an inserted USB stick to see events specific to"
        print "that volume)"
    while 1:
        win32gui.PumpWaitingMessages()
        time.sleep(0.01)
    win32gui.DestroyWindow(hwnd)
    win32gui.UnregisterClass(wc.lpszClassName, None)
test_win32file.py 文件源码 项目:remoteControlPPT 作者: htwenning 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def testFileTimes(self):
        if issubclass(pywintypes.TimeType, datetime.datetime):
            from win32timezone import TimeZoneInfo
            now = datetime.datetime.now(tz=TimeZoneInfo.local())
            nowish = now + datetime.timedelta(seconds=1)
            later = now + datetime.timedelta(seconds=120)
        else:
            rc, tzi = win32api.GetTimeZoneInformation()
            bias = tzi[0]
            if rc==2: # daylight-savings is in effect.
                bias += tzi[-1]
            bias *= 60 # minutes to seconds...
            tick = int(time.time())
            now = pywintypes.Time(tick+bias)
            nowish = pywintypes.Time(tick+bias+1)
            later = pywintypes.Time(tick+bias+120)

        filename = tempfile.mktemp("-testFileTimes")
        # Windows docs the 'last time' isn't valid until the last write
        # handle is closed - so create the file, then re-open it to check.
        open(filename,"w").close()
        f = win32file.CreateFile(filename, win32file.GENERIC_READ|win32file.GENERIC_WRITE,
                                 0, None,
                                 win32con.OPEN_EXISTING, 0, None)
        try:
            ct, at, wt = win32file.GetFileTime(f)
            self.failUnless(ct >= now, "File was created in the past - now=%s, created=%s" % (now, ct))
            self.failUnless( now <= ct <= nowish, (now, ct))
            self.failUnless(wt >= now, "File was written-to in the past now=%s, written=%s" % (now,wt))
            self.failUnless( now <= wt <= nowish, (now, wt))

            # Now set the times.
            win32file.SetFileTime(f, later, later, later)
            # Get them back.
            ct, at, wt = win32file.GetFileTime(f)
            # XXX - the builtin PyTime type appears to be out by a dst offset.
            # just ignore that type here...
            if issubclass(pywintypes.TimeType, datetime.datetime):
                self.failUnlessEqual(ct, later)
                self.failUnlessEqual(at, later)
                self.failUnlessEqual(wt, later)

        finally:
            f.Close()
            os.unlink(filename)


问题


面经


文章

微信
公众号

扫码关注公众号