def get_MIB_TCP6TABLE_OWNER_PID_from_buffer(buffer):
x = windows.generated_def.winstructs.MIB_TCP6TABLE_OWNER_PID.from_buffer(buffer)
nb_entry = x.dwNumEntries
# Struct _MIB_TCP6TABLE_OWNER_PID definitions
class _GENERATED_MIB_TCP6TABLE_OWNER_PID(Structure):
_fields_ = [
("dwNumEntries", DWORD),
("table", TCP6Connection * nb_entry),
]
return _GENERATED_MIB_TCP6TABLE_OWNER_PID.from_buffer(buffer)
python类Structure()的实例源码
def query_working_set(self):
if self.bitness == 64 or windows.current_process.bitness == 64:
WSET_BLOCK = EPSAPI_WORKING_SET_BLOCK64
dummy = PSAPI_WORKING_SET_INFORMATION64()
else:
WSET_BLOCK = EPSAPI_WORKING_SET_BLOCK32
dummy = PSAPI_WORKING_SET_INFORMATION32()
try:
windows.winproxy.QueryWorkingSet(self.handle, ctypes.byref(dummy), ctypes.sizeof(dummy))
except WindowsError as e:
if e.winerror != 24:
raise
NumberOfEntriesType = [f for f in WSET_BLOCK._fields_ if f[0] == "Flags"][0][1]
for i in range(10):
# use the same type as WSET_BLOCK.Flags
class GENERATED_PSAPI_WORKING_SET_INFORMATION(ctypes.Structure):
_fields_ = [
("NumberOfEntries", NumberOfEntriesType),
("WorkingSetInfo", WSET_BLOCK * dummy.NumberOfEntries),
]
res = GENERATED_PSAPI_WORKING_SET_INFORMATION()
try:
if windows.current_process.bitness == 32 and self.bitness == 64:
windows.syswow64.NtQueryVirtualMemory_32_to_64(self.handle, 0, MemoryWorkingSetList, res)
else:
windows.winproxy.QueryWorkingSet(self.handle, ctypes.byref(res), ctypes.sizeof(res))
except WindowsError as e:
if e.winerror != 24:
raise
dummy.NumberOfEntries = res.NumberOfEntries
continue
except windows.generated_def.ntstatus.NtStatusException as e:
if e.code != STATUS_INFO_LENGTH_MISMATCH:
raise
dummy.NumberOfEntries = res.NumberOfEntries
continue
return res.WorkingSetInfo
# Raise ?
return None
def NtQueryVirtualMemory_32_to_64(ProcessHandle, BaseAddress, MemoryInformationClass, MemoryInformation=NeededParameter, MemoryInformationLength=0, ReturnLength=None):
if ReturnLength is None:
ReturnLength = byref(ULONG())
if MemoryInformation is not None and MemoryInformationLength == 0:
MemoryInformationLength = ctypes.sizeof(MemoryInformation)
if isinstance(MemoryInformation, ctypes.Structure):
MemoryInformation = byref(MemoryInformation)
return NtQueryVirtualMemory_32_to_64.ctypes_function(ProcessHandle, BaseAddress, MemoryInformationClass, MemoryInformation, MemoryInformationLength, ReturnLength)
def is_union_type(x):
return issubclass(x, ctypes.Union)
# ### My types ### #
# # 64bits pointer types # #
# I know direct inheritance from _SimpleCData seems bad
# But it seems to be the only way to have the normal
# ctypes.Structure way of working (need to investigate)
def create_remote_array(subtype, len):
class RemoteArray(_ctypes.Array):
_length_ = len
_type_ = subtype
def __init__(self, addr, target):
self._base_addr = addr
self.target = target
def __getitem__(self, slice):
# import pdb;pdb.set_trace()
if not isinstance(slice, (int, long)):
raise NotImplementedError("RemoteArray slice __getitem__")
if slice >= len:
raise IndexError("Access to {0} for a RemoteArray of size {1}".format(slice, len))
item_addr = self._base_addr + (ctypes.sizeof(subtype) * slice)
# TODO: do better ?
class TST(ctypes.Structure):
_fields_ = [("TST", subtype)]
return RemoteStructure.from_structure(TST)(item_addr, target=self.target).TST
def __getslice__(self, start, stop): # Still used even for python 2.7 wtf :F
stop = min(stop, len)
start = max(start, 0)
# dummy implementation
return [self[i] for i in range(start, stop)]
return RemoteArray
# 64bits pointers
def _handle_field_getattr(self, ftype, fosset, fsize):
s = self._target.read_memory(self._base_addr + fosset, fsize)
if ftype in self._field_type_to_remote_type:
return self._field_type_to_remote_type[ftype].from_buffer_with_target(bytearray(s), target=self._target).value
if issubclass(ftype, _ctypes._Pointer): # Pointer
return RemoteStructurePointer.from_buffer_with_target_and_ptr_type(bytearray(s), target=self._target, ptr_type=ftype)
if issubclass(ftype, RemotePtr64): # Pointer to remote64 bits process
return RemoteStructurePointer64.from_buffer_with_target_and_ptr_type(bytearray(s), target=self._target, ptr_type=ftype)
if issubclass(ftype, RemotePtr32): # Pointer to remote32 bits process
return RemoteStructurePointer32.from_buffer_with_target_and_ptr_type(bytearray(s), target=self._target, ptr_type=ftype)
if issubclass(ftype, RemoteStructureUnion): # Structure|Union already transfomed in remote
return ftype(self._base_addr + fosset, self._target)
if issubclass(ftype, ctypes.Structure): # Structure that must be transfomed
return RemoteStructure.from_structure(ftype)(self._base_addr + fosset, self._target)
if issubclass(ftype, ctypes.Union): # Union that must be transfomed
return RemoteUnion.from_structure(ftype)(self._base_addr + fosset, self._target)
if issubclass(ftype, _ctypes.Array): # Arrays
# if this is a string: just cast the read value to string
if ftype._type_ == ctypes.c_char: # Use issubclass instead ?
return s.split("\x00", 1)[0]
elif ftype._type_ == ctypes.c_wchar: # Use issubclass instead ?
# Decode from utf16 -> size /=2 | put it in a wchar array | split at the first "\x00"
return (ftype._type_ * (fsize / 2)).from_buffer_copy(s.decode('utf16'))[:].split("\x00", 1)[0] # Sorry..
# I am pretty sur something smarter is possible..
return create_remote_array(ftype._type_, ftype._length_)(self._base_addr + fosset, self._target)
# Normal types
# Follow the ctypes usage: if it's not directly inherited from _SimpleCData
# We do not apply the .value
# Seems weird but it's mandatory AND useful :D (in pe_parse)
if _SimpleCData not in ftype.__bases__:
return ftype.from_buffer(bytearray(s))
return ftype.from_buffer(bytearray(s)).value
def _create_vtable(self, interface):
implems = []
names = []
for index, name, method in self.extract_methods_order(interface):
func_implem = getattr(self, name)
#PVOID is 'this'
types = [method.restype, PVOID] + list(method.argtypes)
implems.append(ctypes.WINFUNCTYPE(*types)(func_implem))
names.append(name)
class Vtable(ctypes.Structure):
_fields_ = [(name, ctypes.c_void_p) for name in names]
return Vtable(*[ctypes.cast(x, ctypes.c_void_p) for x in implems]), implems
def __init__(self, *args):
"""
The initialization function may take an OGREnvelope structure, 4-element
tuple or list, or 4 individual arguments.
"""
if len(args) == 1:
if isinstance(args[0], OGREnvelope):
# OGREnvelope (a ctypes Structure) was passed in.
self._envelope = args[0]
elif isinstance(args[0], (tuple, list)):
# A tuple was passed in.
if len(args[0]) != 4:
raise GDALException('Incorrect number of tuple elements (%d).' % len(args[0]))
else:
self._from_sequence(args[0])
else:
raise TypeError('Incorrect type of argument: %s' % str(type(args[0])))
elif len(args) == 4:
# Individual parameters passed in.
# Thanks to ww for the help
self._from_sequence([float(a) for a in args])
else:
raise GDALException('Incorrect number (%d) of arguments.' % len(args))
# Checking the x,y coordinates
if self.min_x > self.max_x:
raise GDALException('Envelope minimum X > maximum X.')
if self.min_y > self.max_y:
raise GDALException('Envelope minimum Y > maximum Y.')
operating_system_helper.py 文件源码
项目:spread-knowledge-repository
作者: danieldev13
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def __init__(self, uuidstr):
uuid = UUID(uuidstr)
ctypes.Structure.__init__(self)
self.Data1, self.Data2, self.Data3, \
self.Data4[0], self.Data4[1], rest = uuid.fields
for i in range(2, 8):
self.Data4[i] = rest>>(8-i-1)*8 & 0xff
def new_struct_type(self, name):
return self._new_struct_or_union('struct', name, ctypes.Structure)
def __init__(self, *args):
"""
The initialization function may take an OGREnvelope structure, 4-element
tuple or list, or 4 individual arguments.
"""
if len(args) == 1:
if isinstance(args[0], OGREnvelope):
# OGREnvelope (a ctypes Structure) was passed in.
self._envelope = args[0]
elif isinstance(args[0], (tuple, list)):
# A tuple was passed in.
if len(args[0]) != 4:
raise GDALException('Incorrect number of tuple elements (%d).' % len(args[0]))
else:
self._from_sequence(args[0])
else:
raise TypeError('Incorrect type of argument: %s' % str(type(args[0])))
elif len(args) == 4:
# Individual parameters passed in.
# Thanks to ww for the help
self._from_sequence([float(a) for a in args])
else:
raise GDALException('Incorrect number (%d) of arguments.' % len(args))
# Checking the x,y coordinates
if self.min_x > self.max_x:
raise GDALException('Envelope minimum X > maximum X.')
if self.min_y > self.max_y:
raise GDALException('Envelope minimum Y > maximum Y.')
def __init__(self):
ctypes.Structure.__init__(self)
self.width = 0
self.height = 0
self.red_bits = 0
self.green_bits = 0
self.blue_bits = 0
self.refresh_rate = 0
def __init__(self):
ctypes.Structure.__init__(self)
self.red = None
self.red_array = None
self.green = None
self.green_array = None
self.blue = None
self.blue_array = None
self.size = 0
def new_struct_type(self, name):
return self._new_struct_or_union('struct', name, ctypes.Structure)
def __init__(self, *args):
"""
The initialization function may take an OGREnvelope structure, 4-element
tuple or list, or 4 individual arguments.
"""
if len(args) == 1:
if isinstance(args[0], OGREnvelope):
# OGREnvelope (a ctypes Structure) was passed in.
self._envelope = args[0]
elif isinstance(args[0], (tuple, list)):
# A tuple was passed in.
if len(args[0]) != 4:
raise GDALException('Incorrect number of tuple elements (%d).' % len(args[0]))
else:
self._from_sequence(args[0])
else:
raise TypeError('Incorrect type of argument: %s' % str(type(args[0])))
elif len(args) == 4:
# Individual parameters passed in.
# Thanks to ww for the help
self._from_sequence([float(a) for a in args])
else:
raise GDALException('Incorrect number (%d) of arguments.' % len(args))
# Checking the x,y coordinates
if self.min_x > self.max_x:
raise GDALException('Envelope minimum X > maximum X.')
if self.min_y > self.max_y:
raise GDALException('Envelope minimum Y > maximum Y.')
def makeDataObject(num):
"""
Creates the ctypes data object used by both processes - defines the layout
of the shared memory.
"""
class DataBlock(ctypes.Structure):
_fields_ = [("y", ctypes.c_float*(num*3)),
("y10", ctypes.c_float*(num//10*3)),
("y100", ctypes.c_float*(num//100*3)),
("endpoint", ctypes.c_ssize_t),
("startpoint", ctypes.c_ssize_t),
("pltcurrent", ctypes.c_ssize_t)]
return num
def __init__(self, ArcName=None, ArcNameW=u'', OpenMode=RAR_OM_LIST):
self.CmtBuf = ctypes.c_buffer(64*1024)
ctypes.Structure.__init__(self, ArcName=ArcName, ArcNameW=ArcNameW, OpenMode=OpenMode, _CmtBuf=ctypes.addressof(self.CmtBuf), CmtBufSize=ctypes.sizeof(self.CmtBuf))
def __init__(self):
self.CmtBuf = ctypes.c_buffer(64*1024)
ctypes.Structure.__init__(self, _CmtBuf=ctypes.addressof(self.CmtBuf), CmtBufSize=ctypes.sizeof(self.CmtBuf))
def new_struct_type(self, name):
return self._new_struct_or_union('struct', name, ctypes.Structure)
def import_windows_ca(common_name, certfile):
import ctypes
with open(certfile, 'rb') as fp:
certdata = fp.read()
if certdata.startswith(b'-----'):
begin = b'-----BEGIN CERTIFICATE-----'
end = b'-----END CERTIFICATE-----'
certdata = base64.b64decode(b''.join(certdata[certdata.find(begin)+len(begin):certdata.find(end)].strip().splitlines()))
crypt32 = ctypes.WinDLL(b'crypt32.dll'.decode())
store_handle = crypt32.CertOpenStore(10, 0, 0, 0x4000 | 0x20000, b'ROOT'.decode())
if not store_handle:
return False
CERT_FIND_SUBJECT_STR = 0x00080007
CERT_FIND_HASH = 0x10000
X509_ASN_ENCODING = 0x00000001
class CRYPT_HASH_BLOB(ctypes.Structure):
_fields_ = [('cbData', ctypes.c_ulong), ('pbData', ctypes.c_char_p)]
assert CertUtil.ca_thumbprint
crypt_hash = CRYPT_HASH_BLOB(20, binascii.a2b_hex(CertUtil.ca_thumbprint.replace(':', '')))
crypt_handle = crypt32.CertFindCertificateInStore(store_handle, X509_ASN_ENCODING, 0, CERT_FIND_HASH, ctypes.byref(crypt_hash), None)
if crypt_handle:
crypt32.CertFreeCertificateContext(crypt_handle)
return True
ret = crypt32.CertAddEncodedCertificateToStore(store_handle, 0x1, certdata, len(certdata), 4, None)
crypt32.CertCloseStore(store_handle, 0)
del crypt32
if not ret and __name__ != "__main__":
#res = CertUtil.win32_notify(msg=u'Import PHP_proxy Ca?', title=u'Authority need')
#if res == 2:
# return -1
import win32elevate
win32elevate.elevateAdminRun(os.path.abspath(__file__))
return True
return True if ret else False