def win_handle_is_a_console(handle):
"""Returns True if a Windows file handle is a handle to a console."""
# These types are available on linux but not Mac.
# pylint: disable=no-name-in-module,F0401
from ctypes import byref, POINTER, windll, WINFUNCTYPE
from ctypes.wintypes import BOOL, DWORD, HANDLE
FILE_TYPE_CHAR = 0x0002
FILE_TYPE_REMOTE = 0x8000
INVALID_HANDLE_VALUE = DWORD(-1).value
# <http://msdn.microsoft.com/en-us/library/ms683167.aspx>
GetConsoleMode = WINFUNCTYPE(BOOL, HANDLE, POINTER(DWORD))(
('GetConsoleMode', windll.kernel32))
# <http://msdn.microsoft.com/en-us/library/aa364960.aspx>
GetFileType = WINFUNCTYPE(DWORD, DWORD)(('GetFileType', windll.kernel32))
# GetStdHandle returns INVALID_HANDLE_VALUE, NULL, or a valid handle.
if handle == INVALID_HANDLE_VALUE or handle is None:
return False
return (
(GetFileType(handle) & ~FILE_TYPE_REMOTE) == FILE_TYPE_CHAR and
GetConsoleMode(handle, byref(DWORD())))
python类HANDLE的实例源码
def _wait_for_handles(handles, timeout=-1):
"""
Waits for multiple handles. (Similar to 'select') Returns the handle which is ready.
Returns `None` on timeout.
http://msdn.microsoft.com/en-us/library/windows/desktop/ms687025(v=vs.85).aspx
"""
arrtype = HANDLE * len(handles)
handle_array = arrtype(*handles)
ret = windll.kernel32.WaitForMultipleObjects(
len(handle_array), handle_array, BOOL(False), DWORD(timeout))
if ret == WAIT_TIMEOUT:
return None
else:
h = handle_array[ret]
return h
def _wait_for_handles(handles, timeout=-1):
"""
Waits for multiple handles. (Similar to 'select') Returns the handle which is ready.
Returns `None` on timeout.
http://msdn.microsoft.com/en-us/library/windows/desktop/ms687025(v=vs.85).aspx
"""
arrtype = HANDLE * len(handles)
handle_array = arrtype(*handles)
ret = windll.kernel32.WaitForMultipleObjects(
len(handle_array), handle_array, BOOL(False), DWORD(timeout))
if ret == WAIT_TIMEOUT:
return None
else:
h = handle_array[ret]
return h
def KerberosInit():
hLsaConnection = HANDLE()
status = DWORD(0)
LPTR = (0x0000 | 0x0040)
MICROSOFT_KERBEROS_NAME_A = PWSTR()
MICROSOFT_KERBEROS_NAME_A = windll.kernel32.LocalAlloc(LPTR, len("Kerberos") + 1)
memmove(MICROSOFT_KERBEROS_NAME_A, "Kerberos", len("Kerberos"))
status = LsaConnectUntrusted(byref(hLsaConnection))
if status != STATUS_SUCCESS:
print "LsaConnectUntrusted, cannot get LSA handle, error %d " % status
windll.kernel32.LocalFree(MICROSOFT_KERBEROS_NAME_A)
return None, None
kerberosPackageName = UNICODE_STRING()
kerberosPackageName.Length = USHORT(8)
kerberosPackageName.MaximumLength = USHORT(9)
kerberosPackageName.Buffer = MICROSOFT_KERBEROS_NAME_A
dwKerberosAuthenticationPackageId = DWORD(0)
status = LsaLookupAuthenticationPackage(hLsaConnection, byref(kerberosPackageName), byref(dwKerberosAuthenticationPackageId))
windll.kernel32.LocalFree(MICROSOFT_KERBEROS_NAME_A)
if status == STATUS_SUCCESS:
return hLsaConnection, dwKerberosAuthenticationPackageId
else:
return None, None
def WlanScan(hClientHandle, pInterfaceGuid, ssid=""):
"""
The WlanScan function requests a scan for available networks on the
indicated interface.
DWORD WINAPI WlanScan(
_In_ HANDLE hClientHandle,
_In_ const GUID *pInterfaceGuid,
_In_opt_ const PDOT11_SSID pDot11Ssid,
_In_opt_ const PWLAN_RAW_DATA pIeData,
_Reserved_ PVOID pReserved
);
"""
func_ref = wlanapi.WlanScan
func_ref.argtypes = [HANDLE,
POINTER(GUID),
POINTER(DOT11_SSID),
POINTER(WLAN_RAW_DATA),
c_void_p]
func_ref.restype = DWORD
if ssid:
length = len(ssid)
if length > DOT11_SSID_MAX_LENGTH:
raise Exception("SSIDs have a maximum length of 32 characters.")
# data = tuple(ord(char) for char in ssid)
data = ssid
dot11_ssid = byref(DOT11_SSID(length, data))
else:
dot11_ssid = None
# TODO: Support WLAN_RAW_DATA argument.
result = func_ref(hClientHandle,
byref(pInterfaceGuid),
dot11_ssid,
None,
None)
if result != ERROR_SUCCESS:
raise Exception("WlanScan failed.")
return result
def WlanGetAvailableNetworkList(hClientHandle, pInterfaceGuid):
"""
The WlanGetAvailableNetworkList function retrieves the list of
available networks on a wireless LAN interface.
DWORD WINAPI WlanGetAvailableNetworkList(
_In_ HANDLE hClientHandle,
_In_ const GUID *pInterfaceGuid,
_In_ DWORD dwFlags,
_Reserved_ PVOID pReserved,
_Out_ PWLAN_AVAILABLE_NETWORK_LIST *ppAvailableNetworkList
);
"""
func_ref = wlanapi.WlanGetAvailableNetworkList
func_ref.argtypes = [HANDLE,
POINTER(GUID),
DWORD,
c_void_p,
POINTER(POINTER(WLAN_AVAILABLE_NETWORK_LIST))]
func_ref.restype = DWORD
wlan_available_network_list = pointer(WLAN_AVAILABLE_NETWORK_LIST())
result = func_ref(hClientHandle,
byref(pInterfaceGuid),
0,
None,
byref(wlan_available_network_list))
if result != ERROR_SUCCESS:
raise Exception("WlanGetAvailableNetworkList failed.")
return wlan_available_network_list
def WlanGetProfileList(hClientHandle, pInterfaceGuid):
"""
The WlanGetProfileList function retrieves the list of profiles in
preference order.
DWORD WINAPI WlanGetProfileList(
_In_ HANDLE hClientHandle,
_In_ const GUID *pInterfaceGuid,
_Reserved_ PVOID pReserved,
_Out_ PWLAN_PROFILE_INFO_LIST *ppProfileList
);
"""
func_ref = wlanapi.WlanGetProfileList
func_ref.argtypes = [HANDLE,
POINTER(GUID),
c_void_p,
POINTER(POINTER(WLAN_PROFILE_INFO_LIST))]
func_ref.restype = DWORD
wlan_profile_info_list = pointer(WLAN_PROFILE_INFO_LIST())
result = func_ref(hClientHandle,
byref(pInterfaceGuid),
None,
byref(wlan_profile_info_list))
if result != ERROR_SUCCESS:
raise Exception("WlanGetProfileList failed.")
return wlan_profile_info_list
def WlanGetProfile(hClientHandle, pInterfaceGuid, profileName):
"""
The WlanGetProfile function retrieves all information about a specified
wireless profile.
DWORD WINAPI WlanGetProfile(
_In_ HANDLE hClientHandle,
_In_ const GUID *pInterfaceGuid,
_In_ LPCWSTR strProfileName,
_Reserved_ PVOID pReserved,
_Out_ LPWSTR *pstrProfileXml,
_Inout_opt_ DWORD *pdwFlags,
_Out_opt_ PDWORD pdwGrantedAccess
);
"""
func_ref = wlanapi.WlanGetProfile
func_ref.argtypes = [HANDLE,
POINTER(GUID),
LPCWSTR,
c_void_p,
POINTER(LPWSTR),
POINTER(DWORD),
POINTER(DWORD)]
func_ref.restype = DWORD
pdw_granted_access = DWORD()
xml = LPWSTR()
flags = DWORD(WLAN_PROFILE_GET_PLAINTEXT_KEY)
result = func_ref(hClientHandle,
byref(pInterfaceGuid),
profileName,
None,
byref(xml),
byref(flags),
byref(pdw_granted_access))
if result != ERROR_SUCCESS:
raise Exception("WlanGetProfile failed.")
return xml
def WlanQueryInterface(hClientHandle, pInterfaceGuid, OpCode):
"""
DWORD WINAPI WlanQueryInterface(
_In_ HANDLE hClientHandle,
_In_ const GUID *pInterfaceGuid,
_In_ WLAN_INTF_OPCODE OpCode,
_Reserved_ PVOID pReserved,
_Out_ PDWORD pdwDataSize,
_Out_ PVOID *ppData,
_Out_opt_ PWLAN_OPCODE_VALUE_TYPE pWlanOpcodeValueType
);
"""
func_ref = wlanapi.WlanQueryInterface
#TODO: Next two lines sketchy due to incomplete implementation.
opcode_name = WLAN_INTF_OPCODE_DICT[OpCode.value]
return_type = WLAN_INTF_OPCODE_TYPE_DICT[opcode_name]
func_ref.argtypes = [HANDLE,
POINTER(GUID),
WLAN_INTF_OPCODE,
c_void_p,
POINTER(DWORD),
POINTER(POINTER(return_type)),
POINTER(WLAN_OPCODE_VALUE_TYPE)]
func_ref.restype = DWORD
pdwDataSize = DWORD()
ppData = pointer(return_type())
pWlanOpcodeValueType = WLAN_OPCODE_VALUE_TYPE()
result = func_ref(hClientHandle,
byref(pInterfaceGuid),
OpCode,
None,
pdwDataSize,
ppData,
pWlanOpcodeValueType)
if result != ERROR_SUCCESS:
raise Exception("WlanQueryInterface failed.")
return ppData
def get(intFolder):
_SHGetFolderPath.argtypes = [_HWND, _ctypes.c_int, _HANDLE, _DWORD, _LPCWSTR]
auPathBuffer = _cub(_MAX_PATH)
exit_code=_SHGetFolderPath(0, intFolder, 0, 0, auPathBuffer)
return auPathBuffer.value
def pipe_non_blocking_set(fd):
# Constant could define globally but avoid polluting the name-space
# thanks to: http://stackoverflow.com/questions/34504970
import msvcrt
from ctypes import windll, byref, wintypes, WinError, POINTER
from ctypes.wintypes import HANDLE, DWORD, BOOL
LPDWORD = POINTER(DWORD)
PIPE_NOWAIT = wintypes.DWORD(0x00000001)
def pipe_no_wait(pipefd):
SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
SetNamedPipeHandleState.restype = BOOL
h = msvcrt.get_osfhandle(pipefd)
res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
if res == 0:
print(WinError())
return False
return True
return pipe_no_wait(fd)
def CreateNamedPipe(name, open_mode, pipe_mode, max_instances, out_buffer_size, in_buffer_size, default_time_out, security_attributes):
"""See: CreateNamedPipe function
https://msdn.microsoft.com/en-us/library/windows/desktop/aa365150(v=vs.85).aspx
"""
CreateNamedPipe_Fn = windll.kernel32.CreateNamedPipe
CreateNamedPipe_Fn.argtypes = [
wintypes.LPCSTR, #LPCTSTR lpName,
wintypes.DWORD, #_In_ DWORD dwOpenMode,
wintypes.DWORD, #_In_ DWORD dwPipeMode,
wintypes.DWORD, #_In_ DWORD nMaxInstances,
wintypes.DWORD, #_In_ DWORD nOutBufferSize,
wintypes.DWORD, #_In_ DWORD nInBufferSize,
wintypes.DWORD, #_In_ DWORD nDefaultTimeOut,
LPSECURITY_ATTRIBUTES #_In_opt_ LPSECURITY_ATTRIBUTES lpSecurityAttributes
]
CreateNamedPipe_Fn.restype = wintypes.HANDLE
handle = wintypes.HANDLE(CreateNamedPipe_Fn(
name,
open_mode,
pipe_mode,
max_instances,
out_buffer_size,
in_buffer_size,
default_time_out,
security_attributes
))
return handle
def ConnectNamedPipe(named_pipe, overlapped):
"""See: ConnectNamedPipe function
https://msdn.microsoft.com/en-us/library/windows/desktop/aa365146(v=vs.85).aspx
"""
ConnectNamedPipe_Fn = windll.kernel32.ConnectNamedPipe
ConnectNamedPipe_Fn.argtypes = [
wintypes.HANDLE, # _In_ HANDLE hNamedPipe,
LPOVERLAPPED # _Inout_opt_ LPOVERLAPPED lpOverlapped
]
ConnectNamedPipe_Fn.restype = wintypes.BOOL
ret = wintypes.BOOL(ConnectNamedPipe_Fn(
named_pipe,
overlapped
))
return ret
def ReadFile(file, buffer, number_of_bytes_to_read, number_of_bytes_read, overlapped):
"""See: ReadFile function
https://msdn.microsoft.com/en-us/library/windows/desktop/aa365467(v=vs.85).aspx
"""
ReadFile_Fn = windll.kernel32.ReadFile
ReadFile_Fn.argtypes = [
wintypes.HANDLE, # _In_ HANDLE hFile,
LPVOID, # _Out_ LPVOID lpBuffer,
wintypes.DWORD, # _In_ DWORD nNumberOfBytesToRead,
LPDWORD, # _Out_opt_ LPDWORD lpNumberOfBytesRead,
LPOVERLAPPED # _Inout_opt_ LPOVERLAPPED lpOverlapped
]
ReadFile_Fn.restype = wintypes.BOOL
ret = wintypes.BOOL(ReadFile_Fn(
file,
buffer,
number_of_bytes_to_read,
number_of_bytes_read,
overlapped
))
def send_ioctl(self, ioctl, inbuf, inbufsiz, outbuf, outbufsiz):
"""See: DeviceIoControl function
http://msdn.microsoft.com/en-us/library/aa363216(v=vs.85).aspx
"""
DeviceIoControl_Fn = windll.kernel32.DeviceIoControl
DeviceIoControl_Fn.argtypes = [
wintypes.HANDLE, # _In_ HANDLE hDevice
wintypes.DWORD, # _In_ DWORD dwIoControlCode
wintypes.LPVOID, # _In_opt_ LPVOID lpInBuffer
wintypes.DWORD, # _In_ DWORD nInBufferSize
wintypes.LPVOID, # _Out_opt_ LPVOID lpOutBuffer
wintypes.DWORD, # _In_ DWORD nOutBufferSize
LPDWORD, # _Out_opt_ LPDWORD lpBytesReturned
LPOVERLAPPED] # _Inout_opt_ LPOVERLAPPED lpOverlapped
DeviceIoControl_Fn.restype = wintypes.BOOL
print ioctl
# allocate a DWORD, and take its reference
dwBytesReturned = wintypes.DWORD(0)
lpBytesReturned = ctypes.byref(dwBytesReturned)
status = DeviceIoControl_Fn(self.handle,
ioctl,
inbuf,
inbufsiz,
outbuf,
outbufsiz,
lpBytesReturned,
None)
return status, dwBytesReturned
def regkey_exists(rootkey, subkey):
res_handle = HANDLE()
res = RegOpenKeyExW(
rootkey, subkey, 0, _winreg.KEY_QUERY_VALUE, byref(res_handle)
)
RegCloseKey(res_handle)
return not res
def get_user_config_dir():
"""Platform specific directory for user configuration.
:returns: returns the user configuration directory.
:rtype: string
"""
if os.name == 'nt':
try:
csidl_appdata = 26
shgetfolderpath = windll.shell32.SHGetFolderPathW
shgetfolderpath.argtypes = [wintypes.HWND, ctypes.c_int, \
wintypes.HANDLE, wintypes.DWORD, wintypes.LPCWSTR]
path_buf = wintypes.create_unicode_buffer(wintypes.MAX_PATH)
result = shgetfolderpath(0, csidl_appdata, 0, 0, path_buf)
if result == 0:
return path_buf.value
except ImportError:
pass
return os.environ['APPDATA']
else:
return os.path.expanduser('~')
def init(storage) :
import ctypes
import ctypes.wintypes as wintypes
class __PROCESS_INFORMATION(ctypes.Structure):
"""see:
http://msdn.microsoft.com/en-us/library/windows/desktop/ms684873(v=vs.85).aspx
"""
_fields_ = [("hProcess", wintypes.HANDLE),
("hThread", wintypes.HANDLE),
("dwProcessId", wintypes.DWORD),
("dwThreadId", wintypes.DWORD),]
wintypes.PROCESS_INFORMATION = __PROCESS_INFORMATION
pid = wintypes.PROCESS_INFORMATION().dwProcessId
PROCESS_ALL_ACCESS = (0x000F0000|0x00100000|0xFFF)
handle = ctypes.windll.kernel32.OpenProcess(
PROCESS_ALL_ACCESS,
False,
pid
)
storage['process_pid'] = pid
storage['process_handle'] = handle
ModuleHandle = ctypes.windll.kernel32.GetModuleHandleA("kernel32.dll")
LoadLibraryA = ctypes.windll.kernel32.GetProcAddress(
wintypes.HANDLE(ModuleHandle),
"LoadLibraryA",
)
storage['LoadLibraryA'] = LoadLibraryA
return True
def pipe_non_blocking_set(fd):
# Constant could define globally but avoid polluting the name-space
# thanks to: http://stackoverflow.com/questions/34504970
import msvcrt
from ctypes import windll, byref, wintypes, WinError, POINTER
from ctypes.wintypes import HANDLE, DWORD, BOOL
LPDWORD = POINTER(DWORD)
PIPE_NOWAIT = wintypes.DWORD(0x00000001)
def pipe_no_wait(pipefd):
SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
SetNamedPipeHandleState.restype = BOOL
h = msvcrt.get_osfhandle(pipefd)
res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
if res == 0:
print(WinError())
return False
return True
return pipe_no_wait(fd)
def Close(self):
if self.value and self.value != HANDLE(-1).value:
self.CloseHandle(self)
self.value = 0