def get_current_process():
hwnd = user32.GetForegroundWindow()
pid = c_ulong(0)
user32.GetWindowThreadProcessId(hwnd, byref(pid))
#process_id = "%d" % pid.value
executable = create_string_buffer("\x00" * 512)
h_process = kernel32.OpenProcess(0x400 | 0x10, False, pid)
psapi.GetModuleBaseNameA(h_process,None,byref(executable),512)
window_title = create_string_buffer("\x00" * 512)
length = user32.GetWindowTextA(hwnd, byref(window_title),512)
kernel32.CloseHandle(hwnd)
kernel32.CloseHandle(h_process)
#return "[ PID: %s - %s - %s ]" % (process_id, executable.value, window_title.value)
return executable.value, window_title.value
python类c_ulong()的实例源码
awa_common_api_wrapper.py 文件源码
项目:creator-system-test-framework
作者: CreatorDev
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def AwaObjectLinkArray_IsValid(self, array, index):
self._lib.AwaObjectLinkArray_IsValid.restype = c_bool
self._lib.AwaObjectLinkArray_IsValid.argtypes = [c_void_p, c_ulong]
return self._lib.AwaObjectLinkArray_IsValid(array, index)
# * @}
def clear_screen(self, param):
mode = to_int(param, 0)
sbinfo = self.screen_buffer_info()
if mode == 1: # Clear from begining of screen to cursor position
clear_start = COORD(0, 0)
clear_length = sbinfo.CursorPosition.X * sbinfo.CursorPosition.Y
elif mode == 2: # Clear entire screen and return cursor to home
clear_start = COORD(0, 0)
clear_length = sbinfo.Size.X * sbinfo.Size.Y
windll.kernel32.SetConsoleCursorPosition(self.hconsole, clear_start)
else: # Clear from cursor position to end of screen
clear_start = sbinfo.CursorPosition
clear_length = ((sbinfo.Size.X - sbinfo.CursorPosition.X) + sbinfo.Size.X * (sbinfo.Size.Y - sbinfo.CursorPosition.Y))
chars_written = c_ulong()
windll.kernel32.FillConsoleOutputCharacterW(self.hconsole, c_wchar(' '), clear_length, clear_start, byref(chars_written))
windll.kernel32.FillConsoleOutputAttribute(self.hconsole, sbinfo.Attributes, clear_length, clear_start, byref(chars_written))
def writeconsole(self, txt):
chars_written = c_ulong()
writeconsole = windll.kernel32.WriteConsoleA
if isinstance(txt, _type):
writeconsole = windll.kernel32.WriteConsoleW
# MSDN says that there is a shared buffer of 64 KB for the console
# writes. Attempt to not get ERROR_NOT_ENOUGH_MEMORY, see waf issue #746
done = 0
todo = len(txt)
chunk = 32<<10
while todo != 0:
doing = min(chunk, todo)
buf = txt[done:done+doing]
r = writeconsole(self.hconsole, buf, doing, byref(chars_written), None)
if r == 0:
chunk >>= 1
continue
done += doing
todo -= doing
def clear_screen(self, param):
mode = to_int(param, 0)
sbinfo = self.screen_buffer_info()
if mode == 1: # Clear from begining of screen to cursor position
clear_start = COORD(0, 0)
clear_length = sbinfo.CursorPosition.X * sbinfo.CursorPosition.Y
elif mode == 2: # Clear entire screen and return cursor to home
clear_start = COORD(0, 0)
clear_length = sbinfo.Size.X * sbinfo.Size.Y
windll.kernel32.SetConsoleCursorPosition(self.hconsole, clear_start)
else: # Clear from cursor position to end of screen
clear_start = sbinfo.CursorPosition
clear_length = ((sbinfo.Size.X - sbinfo.CursorPosition.X) + sbinfo.Size.X * (sbinfo.Size.Y - sbinfo.CursorPosition.Y))
chars_written = c_ulong()
windll.kernel32.FillConsoleOutputCharacterW(self.hconsole, c_wchar(' '), clear_length, clear_start, byref(chars_written))
windll.kernel32.FillConsoleOutputAttribute(self.hconsole, sbinfo.Attributes, clear_length, clear_start, byref(chars_written))
def clear_screen(self, param):
mode = to_int(param, 0)
sbinfo = self.screen_buffer_info()
if mode == 1: # Clear from begining of screen to cursor position
clear_start = COORD(0, 0)
clear_length = sbinfo.CursorPosition.X * sbinfo.CursorPosition.Y
elif mode == 2: # Clear entire screen and return cursor to home
clear_start = COORD(0, 0)
clear_length = sbinfo.Size.X * sbinfo.Size.Y
windll.kernel32.SetConsoleCursorPosition(self.hconsole, clear_start)
else: # Clear from cursor position to end of screen
clear_start = sbinfo.CursorPosition
clear_length = ((sbinfo.Size.X - sbinfo.CursorPosition.X) + sbinfo.Size.X * (sbinfo.Size.Y - sbinfo.CursorPosition.Y))
chars_written = c_ulong()
windll.kernel32.FillConsoleOutputCharacterW(self.hconsole, c_wchar(' '), clear_length, clear_start, byref(chars_written))
windll.kernel32.FillConsoleOutputAttribute(self.hconsole, sbinfo.Attributes, clear_length, clear_start, byref(chars_written))
def writeconsole(self, txt):
chars_written = c_ulong()
writeconsole = windll.kernel32.WriteConsoleA
if isinstance(txt, _type):
writeconsole = windll.kernel32.WriteConsoleW
# MSDN says that there is a shared buffer of 64 KB for the console
# writes. Attempt to not get ERROR_NOT_ENOUGH_MEMORY, see waf issue #746
done = 0
todo = len(txt)
chunk = 32<<10
while todo != 0:
doing = min(chunk, todo)
buf = txt[done:done+doing]
r = writeconsole(self.hconsole, buf, doing, byref(chars_written), None)
if r == 0:
chunk >>= 1
continue
done += doing
todo -= doing
def _ram(self):
kernel32 = ctypes.windll.kernel32
c_ulong = ctypes.c_ulong
class MEMORYSTATUS(ctypes.Structure):
_fields_ = [
('dwLength', c_ulong),
('dwMemoryLoad', c_ulong),
('dwTotalPhys', c_ulong),
('dwAvailPhys', c_ulong),
('dwTotalPageFile', c_ulong),
('dwAvailPageFile', c_ulong),
('dwTotalVirtual', c_ulong),
('dwAvailVirtual', c_ulong)
]
memoryStatus = MEMORYSTATUS()
memoryStatus.dwLength = ctypes.sizeof(MEMORYSTATUS)
kernel32.GlobalMemoryStatus(ctypes.byref(memoryStatus))
return (memoryStatus.dwTotalPhys, memoryStatus.dwAvailPhys)
def write_bytes(self, address, data):
address = int(address)
if not self.isProcessOpen:
raise ProcessException("Can't write_bytes(%s, %s), process %s is not open" % (address, data, self.pid))
buffer = create_string_buffer(data)
sizeWriten = c_ulong(0)
bufferSize = sizeof(buffer) - 1
_address = address
_length = bufferSize + 1
try:
old_protect = self.VirtualProtectEx(_address, _length, PAGE_EXECUTE_READWRITE)
except:
pass
res = windll.kernel32.WriteProcessMemory(self.h_process, address, buffer, bufferSize, byref(sizeWriten))
try:
self.VirtualProtectEx(_address, _length, old_protect)
except:
pass
return res
def get_window_prop(dpy, window, prop_name, max_size=2):
"""
Returns (nitems, property) of specified window or (-1, None) if anything fails.
Returned 'property' is POINTER(c_void_p) and has to be freed using X.free().
"""
prop_atom = intern_atom(dpy, prop_name, False)
type_return, format_return = Atom(), Atom()
nitems, bytes_after = c_ulong(), c_ulong()
prop = c_void_p()
if SUCCESS == get_window_property(dpy, window,
prop_atom, 0, max_size, False, ANYPROPERTYTYPE,
byref(type_return), byref(format_return), byref(nitems),
byref(bytes_after), byref(prop)):
return nitems.value, prop
return -1, None
def get_current_process():
hwnd = user32.GetForegroundWindow()
pid = c_ulong(0)
user32.GetWindowThreadProcessId(hwnd, byref(pid))
#process_id = "%d" % pid.value
executable = create_string_buffer("\x00" * 512)
h_process = kernel32.OpenProcess(0x400 | 0x10, False, pid)
psapi.GetModuleBaseNameA(h_process,None,byref(executable),512)
window_title = create_string_buffer("\x00" * 512)
length = user32.GetWindowTextA(hwnd, byref(window_title),512)
kernel32.CloseHandle(hwnd)
kernel32.CloseHandle(h_process)
#return "[ PID: %s - %s - %s ]" % (process_id, executable.value, window_title.value)
return executable.value, window_title.value
def win32_version():
import ctypes
class OSVERSIONINFOEXW(ctypes.Structure):
_fields_ = [('dwOSVersionInfoSize', ctypes.c_ulong),
('dwMajorVersion', ctypes.c_ulong),
('dwMinorVersion', ctypes.c_ulong),
('dwBuildNumber', ctypes.c_ulong),
('dwPlatformId', ctypes.c_ulong),
('szCSDVersion', ctypes.c_wchar*128),
('wServicePackMajor', ctypes.c_ushort),
('wServicePackMinor', ctypes.c_ushort),
('wSuiteMask', ctypes.c_ushort),
('wProductType', ctypes.c_byte),
('wReserved', ctypes.c_byte)]
"""
Get's the OS major and minor versions. Returns a tuple of
(OS_MAJOR, OS_MINOR).
"""
os_version = OSVERSIONINFOEXW()
os_version.dwOSVersionInfoSize = ctypes.sizeof(os_version)
retcode = ctypes.windll.Ntdll.RtlGetVersion(ctypes.byref(os_version))
if retcode != 0:
raise Exception("Failed to get OS version")
return os_version.dwMajorVersion
def win32_version():
import ctypes
class OSVERSIONINFOEXW(ctypes.Structure):
_fields_ = [('dwOSVersionInfoSize', ctypes.c_ulong),
('dwMajorVersion', ctypes.c_ulong),
('dwMinorVersion', ctypes.c_ulong),
('dwBuildNumber', ctypes.c_ulong),
('dwPlatformId', ctypes.c_ulong),
('szCSDVersion', ctypes.c_wchar*128),
('wServicePackMajor', ctypes.c_ushort),
('wServicePackMinor', ctypes.c_ushort),
('wSuiteMask', ctypes.c_ushort),
('wProductType', ctypes.c_byte),
('wReserved', ctypes.c_byte)]
"""
Get's the OS major and minor versions. Returns a tuple of
(OS_MAJOR, OS_MINOR).
"""
os_version = OSVERSIONINFOEXW()
os_version.dwOSVersionInfoSize = ctypes.sizeof(os_version)
retcode = ctypes.windll.Ntdll.RtlGetVersion(ctypes.byref(os_version))
if retcode != 0:
raise Exception("Failed to get OS version")
return os_version.dwMajorVersion
awa_common_api_wrapper.py 文件源码
项目:creator-system-test-framework
作者: CreatorDev
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def AwaOpaqueArray_IsValid(self, array, index):
self._lib.AwaOpaqueArray_IsValid.restype = c_bool
self._lib.AwaOpaqueArray_IsValid.argtypes = [c_void_p, c_ulong]
return self._lib.AwaOpaqueArray_IsValid(array, index)
def get_current_process():
hwnd = user32.GetForegroundWindow()
pid = c_ulong(0)
user32.GetWindowThreadProcessId(hwnd, byref(pid))
#process_id = "%d" % pid.value
executable = create_string_buffer("\x00" * 512)
h_process = kernel32.OpenProcess(0x400 | 0x10, False, pid)
psapi.GetModuleBaseNameA(h_process,None,byref(executable),512)
window_title = create_string_buffer("\x00" * 512)
length = user32.GetWindowTextA(hwnd, byref(window_title),512)
kernel32.CloseHandle(hwnd)
kernel32.CloseHandle(h_process)
#return "[ PID: %s - %s - %s ]" % (process_id, executable.value, window_title.value)
return executable.value, window_title.value
def get_property_keys(self):
"""
Get all device property keys
:return: Iterable of device property keys
"""
if self._handle is None:
with self.open():
return self.get_property_keys()
handle, dev_info, _ = self._handle
required_size = ctypes.c_ulong()
if not _setupapi.SetupDiGetDevicePropertyKeys(handle, ctypes.byref(dev_info), None, 0,
ctypes.byref(required_size), 0):
err_no = ctypes.GetLastError()
if err_no == 122: # ERROR_INSUFFICIENT_BUFFER
# noinspection SpellCheckingInspection
devpkeys = (DevicePropertyKey * required_size.value)()
if _setupapi.SetupDiGetDevicePropertyKeys(handle, ctypes.byref(dev_info), ctypes.byref(devpkeys),
required_size.value, None, 0):
return list(devpkeys)
err_no = ctypes.GetLastError()
raise WindowsError(err_no, ctypes.FormatError(err_no))
return []
def _mem_info():
kernel32 = ctypes.windll.kernel32
c_ulong = ctypes.c_ulong
class MEMORYSTATUS(ctypes.Structure):
_fields_ = [
('dwLength', c_ulong),
('dwMemoryLoad', c_ulong),
('dwTotalPhys', c_ulong),
('dwAvailPhys', c_ulong),
('dwTotalPageFile', c_ulong),
('dwAvailPageFile', c_ulong),
('dwTotalVirtual', c_ulong),
('dwAvailVirtual', c_ulong)
]
memoryStatus = MEMORYSTATUS()
memoryStatus.dwLength = ctypes.sizeof(MEMORYSTATUS)
kernel32.GlobalMemoryStatus(ctypes.byref(memoryStatus))
return (memoryStatus.dwTotalPhys, memoryStatus.dwAvailPhys)
def set_color(fg_col, bg_col):
global last_fg_col
global last_bg_col
if fg_col == last_fg_col and bg_col == last_bg_col:
return
last_fg_col = fg_col
last_bg_col = bg_col
if is_windows:
# convert from (rgb+2) to bgr
fg_col = rgb3_to_bgr3(fg_col-2)
bg_col = rgb3_to_bgr3(bg_col-2)
col_attr = fg_col | (bg_col << 4)
stdout_handle = ctypes.windll.kernel32.GetStdHandle(ctypes.c_ulong(-11))
ctypes.windll.kernel32.SetConsoleTextAttribute(stdout_handle, col_attr)
else:
color = str(fg_col + 28)
sys.stdout.write('\x1b['+color+'m')
color = str(bg_col + 38)
sys.stdout.write('\x1b['+color+'m')
# TODO: any other encodings to check for?
def getch_impl():
# TODO: This windows impl keeps pipes/redirects from working. Need ReadFile for that,
# with more complicated handling (personally, I'm just going to keep using unix/cygwin
# for pipe-y debug stuff...)
# TODO: Windows escape seqs via ReadConsoleInput, convert to VT100 seqs for more commonality.
if is_windows:
stdin_handle = ctypes.windll.kernel32.GetStdHandle(ctypes.c_ulong(-10))
one_char_buf = ctypes.c_uint32()
chars_read = ctypes.c_uint32()
# NOTE: W version of this function == ERROR_NOACCESS after text color set in photopia!?
result = ctypes.windll.kernel32.ReadConsoleA(stdin_handle,
ctypes.byref(one_char_buf),
1,
ctypes.byref(chars_read),
0)
if result == 0 or chars_read.value != 1:
last_err = ctypes.windll.kernel32.GetLastError()
print('LAST ERR', last_err)
err('failed to read console')
return chr(one_char_buf.value)
else: #Unix
return sys.stdin.read(1)
def win32_version():
import ctypes
class OSVERSIONINFOEXW(ctypes.Structure):
_fields_ = [('dwOSVersionInfoSize', ctypes.c_ulong),
('dwMajorVersion', ctypes.c_ulong),
('dwMinorVersion', ctypes.c_ulong),
('dwBuildNumber', ctypes.c_ulong),
('dwPlatformId', ctypes.c_ulong),
('szCSDVersion', ctypes.c_wchar*128),
('wServicePackMajor', ctypes.c_ushort),
('wServicePackMinor', ctypes.c_ushort),
('wSuiteMask', ctypes.c_ushort),
('wProductType', ctypes.c_byte),
('wReserved', ctypes.c_byte)]
"""
Get's the OS major and minor versions. Returns a tuple of
(OS_MAJOR, OS_MINOR).
"""
os_version = OSVERSIONINFOEXW()
os_version.dwOSVersionInfoSize = ctypes.sizeof(os_version)
retcode = ctypes.windll.Ntdll.RtlGetVersion(ctypes.byref(os_version))
if retcode != 0:
raise Exception("Failed to get OS version")
return os_version.dwMajorVersion
def DevControl(self, controlType,value):
'''
This function is used to change the display properties of the ALP.
The default values are assigned during device allocation by AllocateSequence.
Usage: Control(self, controlType, value)
PARAMETERS
----------
controlType: ctypes c_ulong
Specifies the type of value to set.
SEE ALSO
--------
See AlpDevControl in the ALP API description for control types.
'''
self._checkError(self._ALPLib.AlpDevControl(self.ALP_ID, controlType, ct.c_long(value)),'Error sending request.')
def DevControlEx(self, controlType, userStruct):
'''
Data objects that do not fit into a simple 32-bit number can be written using this function. Meaning and
layout of the data depend on the ControlType.
Usage: Control(self, controlType, value)
PARAMETERS
----------
controlType : ctypes c_ulong
Specifies the type of value to set.
userStruct : tAlpDynSynchOutGate structure
It contains synch parameters.
SEE ALSO
--------
See AlpDevControlEx in the ALP API description for control types.
'''
self._checkError(self._ALPLib.AlpDevControlEx(self.ALP_ID, controlType, userStruct.byref()),'Error sending request.')
def ProjControl(self, controlType, value):
'''
This function controls the system parameters that are in effect for all sequences. These parameters
are maintained until they are modified again or until the ALP is freed. Default values are in effect after
ALP allocation. All parameters can be read out using the AlpProjInquire function.
This function is only allowed if the ALP is in idle wait state (ALP_PROJ_IDLE), which can be enforced
by the AlpProjHalt function.
Usage: Control(self, controlType, value)
PARAMETERS
----------
controlType : attribute flag (ctypes c_ulong)
Specify the paramter to set.
value : c_double
Value of the parameter to set.
SEE ALSO
--------
See AlpProjControl in the ALP API description for control types.
'''
self._checkError(self._ALPLib.AlpProjControl(self.ALP_ID, controlType, ct.c_long(value)),'Error sending request.')
def ProjControlEx(self, controlType, pointerToStruct):
'''
Data objects that do not fit into a simple 32-bit number can be written using this function. These
objects are unique to the ALP device, so they may affect display of all sequences.
Meaning and layout of the data depend on the ControlType.
Usage: Control(self, controlType, value)
PARAMETERS
----------
controlType : attribute flag (ctypes c_ulong)
Specify the paramter to set.
pointerToStruct : ctypes POINTER
Pointer to a tFlutWrite structure. Create a tFlutWrite object and pass it to the function using ctypes.byref
(Requires importing ctypes)
SEE ALSO
--------
See AlpProjControlEx in the ALP API description for control types.
'''
self._checkError(self._ALPLib.AlpProjContro(self.ALP_ID, controlType, pointerToStruct),'Error sending request.')
def consume(self, amount):
'''Consume ``amount`` bytes from the given provider.'''
resultBuffer = (ctypes.c_char*amount)()
amount,resultAmount = ctypes.c_ulong(amount),ctypes.c_ulong(amount)
result = k32.ReadFile(
self.handle, ctypes.byref(resultBuffer),
amount, ctypes.byref(resultAmount),
None
)
if (result == 0) or (resultAmount.value == 0 and amount > 0):
e = OSError(win32error.getLastErrorTuple())
raise error.ConsumeError(self,self.offset,amount,resultAmount.value, exception=e)
if resultAmount.value == amount:
self.offset += resultAmount.value
return str(resultBuffer.raw)
def pids_from_process_name_list(self, namelist):
proclist = []
pidlist = []
buf = create_string_buffer(1024 * 1024)
p = cast(buf, c_void_p)
retlen = c_ulong(0)
retval = NTDLL.NtQuerySystemInformation(5, buf, 1024 * 1024, byref(retlen))
if retval:
return []
proc = cast(p, POINTER(SYSTEM_PROCESS_INFORMATION)).contents
while proc.NextEntryOffset:
p.value += proc.NextEntryOffset
proc = cast(p, POINTER(SYSTEM_PROCESS_INFORMATION)).contents
proclist.append((proc.ImageName.Buffer[:proc.ImageName.Length/2], proc.UniqueProcessId))
for proc in proclist:
lowerproc = proc[0].lower()
for name in namelist:
if lowerproc == name:
pidlist.append(proc[1])
break
return pidlist
def is_critical(self):
"""Determines if process is 'critical' or not, so we can prevent
terminating it
"""
if not self.h_process:
self.open()
if self.critical:
return True
NT_SUCCESS = lambda val: val >= 0
val = c_ulong(0)
retlen = c_ulong(0)
ret = NTDLL.NtQueryInformationProcess(self.h_process, 29, byref(val), sizeof(val), byref(retlen))
if NT_SUCCESS(ret) and val.value:
return True
return False
def get_parent_pid(self):
"""Get the Parent Process ID."""
if not self.h_process:
self.open()
NT_SUCCESS = lambda val: val >= 0
pbi = (ULONG_PTR * 6)()
size = c_ulong()
# Set return value to signed 32bit integer.
NTDLL.NtQueryInformationProcess.restype = c_int
ret = NTDLL.NtQueryInformationProcess(self.h_process,
0,
byref(pbi),
sizeof(pbi),
byref(size))
if NT_SUCCESS(ret) and size.value == sizeof(pbi):
return pbi[5]
return None
def start(self):
"""Start watching the directory for changes."""
self._directory_handle = ctypes.windll.kernel32.CreateFileW(
ctypes.c_wchar_p(self._directory),
ctypes.c_ulong(_FILE_LIST_DIRECTORY),
ctypes.c_ulong(_FILE_SHARE_READ |
_FILE_SHARE_WRITE),
None,
ctypes.c_ulong(_OPEN_EXISTING),
# required to monitor changes.
ctypes.c_ulong(_FILE_FLAG_BACKUP_SEMANTICS),
None)
if self._directory_handle == _INVALID_HANDLE_VALUE:
raise ctypes.WinError()
self._thread = threading.Thread(
target=self._monitor, name='Win32 File Watcher')
self._thread.start()
def _monitor(self):
buff = ctypes.create_string_buffer(_BUFF_SIZE)
while not self._stop.isSet():
size_returned = ctypes.c_ulong(0)
result = ctypes.windll.kernel32.ReadDirectoryChangesW(
self._directory_handle,
buff,
ctypes.c_ulong(_BUFF_SIZE),
True, # recursive.
ctypes.c_ulong(_FILE_NOTIFY_CHANGE_ANY),
ctypes.byref(size_returned),
None,
None) # this is a blocking call.
if result == 0 and ctypes.GetLastError() == _ERROR_NOTIFY_ENUM_DIR:
logging.warning('Buffer overflow while monitoring for file changes.')
# we need to notify that something changed anyway
with self._lock:
self._change_set |= {'Unknown file'}
if result != 0 and size_returned.value != 0:
additional_changes = _parse_buffer(buff)
with self._lock:
self._change_set |= additional_changes
self._change_event.set()