def remove_ca(self, name):
import ctypes
import ctypes.wintypes
class CERT_CONTEXT(ctypes.Structure):
_fields_ = [
('dwCertEncodingType', ctypes.wintypes.DWORD),
('pbCertEncoded', ctypes.POINTER(ctypes.wintypes.BYTE)),
('cbCertEncoded', ctypes.wintypes.DWORD),
('pCertInfo', ctypes.c_void_p),
('hCertStore', ctypes.c_void_p),]
crypt32 = ctypes.WinDLL(b'crypt32.dll'.decode())
store_handle = crypt32.CertOpenStore(10, 0, 0, 0x4000 | 0x20000, b'ROOT'.decode())
pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, None)
while pCertCtx:
certCtx = CERT_CONTEXT.from_address(pCertCtx)
certdata = ctypes.string_at(certCtx.pbCertEncoded, certCtx.cbCertEncoded)
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, certdata)
if hasattr(cert, 'get_subject'):
cert = cert.get_subject()
cert_name = next((v for k, v in cert.get_components() if k == 'CN'), '')
if cert_name and name.lower() == cert_name.split()[0].lower():
crypt32.CertDeleteCertificateFromStore(crypt32.CertDuplicateCertificateContext(pCertCtx))
pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, pCertCtx)
return 0
python类wintypes()的实例源码
def get_dnsserver_list():
if os.name == 'nt':
import ctypes
import ctypes.wintypes
DNS_CONFIG_DNS_SERVER_LIST = 6
buf = ctypes.create_string_buffer(2048)
ctypes.windll.dnsapi.DnsQueryConfig(DNS_CONFIG_DNS_SERVER_LIST, 0, None, None, ctypes.byref(buf), ctypes.byref(ctypes.wintypes.DWORD(len(buf))))
ipcount = struct.unpack('I', buf[0:4])[0]
iplist = [socket.inet_ntoa(buf[i:i+4]) for i in xrange(4, ipcount*4+4, 4)]
return iplist
elif os.path.isfile('/etc/resolv.conf'):
with open('/etc/resolv.conf', 'rb') as fp:
return re.findall(r'(?m)^nameserver\s+(\S+)', fp.read())
else:
logging.warning("get_dnsserver_list failed: unsupport platform '%s-%s'", sys.platform, os.name)
return []
def remove_cert(name):
if os.name == 'nt':
import ctypes, ctypes.wintypes
class CERT_CONTEXT(ctypes.Structure):
_fields_ = [
('dwCertEncodingType', ctypes.wintypes.DWORD),
('pbCertEncoded', ctypes.POINTER(ctypes.wintypes.BYTE)),
('cbCertEncoded', ctypes.wintypes.DWORD),
('pCertInfo', ctypes.c_void_p),
('hCertStore', ctypes.c_void_p),]
crypt32 = ctypes.WinDLL(b'crypt32.dll'.decode())
store_handle = crypt32.CertOpenStore(10, 0, 0, 0x4000 | 0x20000, b'ROOT'.decode())
pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, None)
while pCertCtx:
certCtx = CERT_CONTEXT.from_address(pCertCtx)
certdata = ctypes.string_at(certCtx.pbCertEncoded, certCtx.cbCertEncoded)
cert = crypto.load_certificate(crypto.FILETYPE_ASN1, certdata)
if hasattr(cert, 'get_subject'):
cert = cert.get_subject()
cert_name = next((v for k, v in cert.get_components() if k == 'CN'), '')
if cert_name and name == cert_name:
crypt32.CertDeleteCertificateFromStore(crypt32.CertDuplicateCertificateContext(pCertCtx))
pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, pCertCtx)
return 0
return -1
def notify(class_):
"""
Notify other windows that the environment has changed (following
http://support.microsoft.com/kb/104011).
"""
# TODO: Implement Microsoft UIPI (User Interface Privilege Isolation) to
# elevate privilege to system level so the system gets this notification
# for now, this must be run as admin to work as expected
return_val = ctypes.wintypes.DWORD()
res = message.SendMessageTimeout(
message.HWND_BROADCAST,
message.WM_SETTINGCHANGE,
0, # wparam must be null
'Environment',
message.SMTO_ABORTIFHUNG,
5000, # timeout in ms
return_val,
)
error.handle_nonzero_success(res)
def _is_gui_available():
UOI_FLAGS = 1
WSF_VISIBLE = 0x0001
class USEROBJECTFLAGS(ctypes.Structure):
_fields_ = [("fInherit", ctypes.wintypes.BOOL),
("fReserved", ctypes.wintypes.BOOL),
("dwFlags", ctypes.wintypes.DWORD)]
dll = ctypes.windll.user32
h = dll.GetProcessWindowStation()
if not h:
raise ctypes.WinError()
uof = USEROBJECTFLAGS()
needed = ctypes.wintypes.DWORD()
res = dll.GetUserObjectInformationW(h,
UOI_FLAGS,
ctypes.byref(uof),
ctypes.sizeof(uof),
ctypes.byref(needed))
if not res:
raise ctypes.WinError()
return bool(uof.dwFlags & WSF_VISIBLE)
def _is_gui_available():
UOI_FLAGS = 1
WSF_VISIBLE = 0x0001
class USEROBJECTFLAGS(ctypes.Structure):
_fields_ = [("fInherit", ctypes.wintypes.BOOL),
("fReserved", ctypes.wintypes.BOOL),
("dwFlags", ctypes.wintypes.DWORD)]
dll = ctypes.windll.user32
h = dll.GetProcessWindowStation()
if not h:
raise ctypes.WinError()
uof = USEROBJECTFLAGS()
needed = ctypes.wintypes.DWORD()
res = dll.GetUserObjectInformationW(h,
UOI_FLAGS,
ctypes.byref(uof),
ctypes.sizeof(uof),
ctypes.byref(needed))
if not res:
raise ctypes.WinError()
return bool(uof.dwFlags & WSF_VISIBLE)
def isfile_cached(self):
# optimize for nt.stat calls, assuming there are many files for few folders
try:
cache = self.__class__.cache_isfile_cache
except AttributeError:
cache = self.__class__.cache_isfile_cache = {}
try:
c1 = cache[id(self.parent)]
except KeyError:
c1 = cache[id(self.parent)] = []
curpath = self.parent.abspath()
findData = ctypes.wintypes.WIN32_FIND_DATAW()
find = FindFirstFile(TP % curpath, ctypes.byref(findData))
if find == INVALID_HANDLE_VALUE:
Logs.error("invalid win32 handle isfile_cached %r" % self.abspath())
return os.path.isfile(self.abspath())
try:
while True:
if findData.cFileName not in UPPER_FOLDERS:
thatsadir = findData.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY
if not thatsadir:
c1.append(str(findData.cFileName))
if not FindNextFile(find, ctypes.byref(findData)):
break
except Exception as e:
Logs.error('exception while listing a folder %r %r' % (self.abspath(), e))
return os.path.isfile(self.abspath())
finally:
FindClose(find)
return self.name in c1
def isfile_cached(self):
# optimize for nt.stat calls, assuming there are many files for few folders
try:
cache = self.__class__.cache_isfile_cache
except AttributeError:
cache = self.__class__.cache_isfile_cache = {}
try:
c1 = cache[id(self.parent)]
except KeyError:
c1 = cache[id(self.parent)] = []
curpath = self.parent.abspath()
findData = ctypes.wintypes.WIN32_FIND_DATAW()
find = FindFirstFile(TP % curpath, ctypes.byref(findData))
if find == INVALID_HANDLE_VALUE:
Logs.error("invalid win32 handle isfile_cached %r" % self.abspath())
return os.path.isfile(self.abspath())
try:
while True:
if findData.cFileName not in UPPER_FOLDERS:
thatsadir = findData.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY
if not thatsadir:
c1.append(str(findData.cFileName))
if not FindNextFile(find, ctypes.byref(findData)):
break
except Exception as e:
Logs.error('exception while listing a folder %r %r' % (self.abspath(), e))
return os.path.isfile(self.abspath())
finally:
FindClose(find)
return self.name in c1
def getPreferencesDir():
buf = ctypes.create_unicode_buffer( ctypes.wintypes.MAX_PATH )
ctypes.windll.shell32.SHGetFolderPathW( 0, CSIDL_APPDATA, 0, SHGFP_TYPE_CURRENT, buf )
return pathlib.Path( buf.value )
def getWindowsDir():
buf = ctypes.create_unicode_buffer( ctypes.wintypes.MAX_PATH )
ctypes.windll.shell32.SHGetFolderPathW( 0, CSIDL_WINDOWS, 0, SHGFP_TYPE_CURRENT, buf )
return pathlib.Path( buf.value )
def getProgramFilesDir():
buf = ctypes.create_unicode_buffer( ctypes.wintypes.MAX_PATH )
ctypes.windll.shell32.SHGetFolderPathW( 0, CSIDL_PROGRAM_FILES, 0, SHGFP_TYPE_CURRENT, buf )
return pathlib.Path( buf.value )
def format_system_message(errno):
"""
Call FormatMessage with a system error number to retrieve
the descriptive error message.
"""
# first some flags used by FormatMessageW
ALLOCATE_BUFFER = 0x100
ARGUMENT_ARRAY = 0x2000
FROM_HMODULE = 0x800
FROM_STRING = 0x400
FROM_SYSTEM = 0x1000
IGNORE_INSERTS = 0x200
# Let FormatMessageW allocate the buffer (we'll free it below)
# Also, let it know we want a system error message.
flags = ALLOCATE_BUFFER | FROM_SYSTEM
source = None
message_id = errno
language_id = 0
result_buffer = ctypes.wintypes.LPWSTR()
buffer_size = 0
arguments = None
bytes = ctypes.windll.kernel32.FormatMessageW(
flags,
source,
message_id,
language_id,
ctypes.byref(result_buffer),
buffer_size,
arguments,
)
# note the following will cause an infinite loop if GetLastError
# repeatedly returns an error that cannot be formatted, although
# this should not happen.
handle_nonzero_success(bytes)
message = result_buffer.value
ctypes.windll.kernel32.LocalFree(result_buffer)
return message
def GetTokenInformation(token, information_class):
"""
Given a token, get the token information for it.
"""
data_size = ctypes.wintypes.DWORD()
ctypes.windll.advapi32.GetTokenInformation(token, information_class.num,
0, 0, ctypes.byref(data_size))
data = ctypes.create_string_buffer(data_size.value)
handle_nonzero_success(ctypes.windll.advapi32.GetTokenInformation(token,
information_class.num,
ctypes.byref(data), ctypes.sizeof(data),
ctypes.byref(data_size)))
return ctypes.cast(data, ctypes.POINTER(TOKEN_USER)).contents
def remove_windows_ca(name):
import ctypes
import ctypes.wintypes
class CERT_CONTEXT(ctypes.Structure):
_fields_ = [
('dwCertEncodingType', ctypes.wintypes.DWORD),
('pbCertEncoded', ctypes.POINTER(ctypes.wintypes.BYTE)),
('cbCertEncoded', ctypes.wintypes.DWORD),
('pCertInfo', ctypes.c_void_p),
('hCertStore', ctypes.c_void_p),]
try:
crypt32 = ctypes.WinDLL(b'crypt32.dll'.decode())
store_handle = crypt32.CertOpenStore(10, 0, 0, 0x4000 | 0x20000, b'ROOT'.decode())
pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, None)
while pCertCtx:
certCtx = CERT_CONTEXT.from_address(pCertCtx)
certdata = ctypes.string_at(certCtx.pbCertEncoded, certCtx.cbCertEncoded)
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, certdata)
if hasattr(cert, 'get_subject'):
cert = cert.get_subject()
cert_name = next((v for k, v in cert.get_components() if k == 'CN'), '')
if cert_name and name == cert_name:
crypt32.CertDeleteCertificateFromStore(crypt32.CertDuplicateCertificateContext(pCertCtx))
pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, pCertCtx)
except Exception as e:
logging.warning('CertUtil.remove_windows_ca failed: %r', e)
def remove_windows_ca(name):
import ctypes
import ctypes.wintypes
class CERT_CONTEXT(ctypes.Structure):
_fields_ = [
('dwCertEncodingType', ctypes.wintypes.DWORD),
('pbCertEncoded', ctypes.POINTER(ctypes.wintypes.BYTE)),
('cbCertEncoded', ctypes.wintypes.DWORD),
('pCertInfo', ctypes.c_void_p),
('hCertStore', ctypes.c_void_p),]
try:
crypt32 = ctypes.WinDLL(b'crypt32.dll'.decode())
store_handle = crypt32.CertOpenStore(10, 0, 0, 0x4000 | 0x20000, b'ROOT'.decode())
pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, None)
while pCertCtx:
certCtx = CERT_CONTEXT.from_address(pCertCtx)
certdata = ctypes.string_at(certCtx.pbCertEncoded, certCtx.cbCertEncoded)
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, certdata)
if hasattr(cert, 'get_subject'):
cert = cert.get_subject()
cert_name = next((v for k, v in cert.get_components() if k == 'CN'), '')
if cert_name and name == cert_name:
crypt32.CertDeleteCertificateFromStore(crypt32.CertDuplicateCertificateContext(pCertCtx))
pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, pCertCtx)
except Exception as e:
xlog.warning('CertUtil.remove_windows_ca failed: %r', e)
def remove_windows_ca(name):
import ctypes
import ctypes.wintypes
class CERT_CONTEXT(ctypes.Structure):
_fields_ = [
('dwCertEncodingType', ctypes.wintypes.DWORD),
('pbCertEncoded', ctypes.POINTER(ctypes.wintypes.BYTE)),
('cbCertEncoded', ctypes.wintypes.DWORD),
('pCertInfo', ctypes.c_void_p),
('hCertStore', ctypes.c_void_p),]
try:
crypt32 = ctypes.WinDLL(b'crypt32.dll'.decode())
store_handle = crypt32.CertOpenStore(10, 0, 0, 0x4000 | 0x20000, b'ROOT'.decode())
pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, None)
while pCertCtx:
certCtx = CERT_CONTEXT.from_address(pCertCtx)
certdata = ctypes.string_at(certCtx.pbCertEncoded, certCtx.cbCertEncoded)
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, certdata)
if hasattr(cert, 'get_subject'):
cert = cert.get_subject()
cert_name = next((v for k, v in cert.get_components() if k == 'CN'), '')
if cert_name and name == cert_name:
crypt32.CertDeleteCertificateFromStore(crypt32.CertDuplicateCertificateContext(pCertCtx))
pCertCtx = crypt32.CertEnumCertificatesInStore(store_handle, pCertCtx)
except Exception as e:
xlog.warning('CertUtil.remove_windows_ca failed: %r', e)
def to_rect(coords):
"""Converts a set of coordinates to a windows RECT
Converts the form [x1, y1, x2, y2] to a windows RECT structure.
Args:
coords: List of coordinates in the form [x1, y1, x2, y2].
Returns:
A windows RECT structure.
"""
rect = ctypes.wintypes.RECT(*coords)
return rect
def CreateNamedPipe(name, open_mode, pipe_mode, max_instances, out_buffer_size, in_buffer_size, default_time_out, security_attributes):
"""See: CreateNamedPipe function
https://msdn.microsoft.com/en-us/library/windows/desktop/aa365150(v=vs.85).aspx
"""
CreateNamedPipe_Fn = windll.kernel32.CreateNamedPipe
CreateNamedPipe_Fn.argtypes = [
wintypes.LPCSTR, #LPCTSTR lpName,
wintypes.DWORD, #_In_ DWORD dwOpenMode,
wintypes.DWORD, #_In_ DWORD dwPipeMode,
wintypes.DWORD, #_In_ DWORD nMaxInstances,
wintypes.DWORD, #_In_ DWORD nOutBufferSize,
wintypes.DWORD, #_In_ DWORD nInBufferSize,
wintypes.DWORD, #_In_ DWORD nDefaultTimeOut,
LPSECURITY_ATTRIBUTES #_In_opt_ LPSECURITY_ATTRIBUTES lpSecurityAttributes
]
CreateNamedPipe_Fn.restype = wintypes.HANDLE
handle = wintypes.HANDLE(CreateNamedPipe_Fn(
name,
open_mode,
pipe_mode,
max_instances,
out_buffer_size,
in_buffer_size,
default_time_out,
security_attributes
))
return handle
def ConnectNamedPipe(named_pipe, overlapped):
"""See: ConnectNamedPipe function
https://msdn.microsoft.com/en-us/library/windows/desktop/aa365146(v=vs.85).aspx
"""
ConnectNamedPipe_Fn = windll.kernel32.ConnectNamedPipe
ConnectNamedPipe_Fn.argtypes = [
wintypes.HANDLE, # _In_ HANDLE hNamedPipe,
LPOVERLAPPED # _Inout_opt_ LPOVERLAPPED lpOverlapped
]
ConnectNamedPipe_Fn.restype = wintypes.BOOL
ret = wintypes.BOOL(ConnectNamedPipe_Fn(
named_pipe,
overlapped
))
return ret
def ReadFile(file, buffer, number_of_bytes_to_read, number_of_bytes_read, overlapped):
"""See: ReadFile function
https://msdn.microsoft.com/en-us/library/windows/desktop/aa365467(v=vs.85).aspx
"""
ReadFile_Fn = windll.kernel32.ReadFile
ReadFile_Fn.argtypes = [
wintypes.HANDLE, # _In_ HANDLE hFile,
LPVOID, # _Out_ LPVOID lpBuffer,
wintypes.DWORD, # _In_ DWORD nNumberOfBytesToRead,
LPDWORD, # _Out_opt_ LPDWORD lpNumberOfBytesRead,
LPOVERLAPPED # _Inout_opt_ LPOVERLAPPED lpOverlapped
]
ReadFile_Fn.restype = wintypes.BOOL
ret = wintypes.BOOL(ReadFile_Fn(
file,
buffer,
number_of_bytes_to_read,
number_of_bytes_read,
overlapped
))