def __init__(self, logCallback):
MSstore = r"Software\Microsoft\SystemCertificates"
GPstore = r"Software\Policy\Microsoft\SystemCertificates"
self.regKeys = {
"CU_STORE": [reg.HKEY_CURRENT_USER, MSstore],
"LM_STORE": [reg.HKEY_LOCAL_MACHINE, MSstore],
"USER_STORE": [reg.HKEY_USERS, MSstore],
"CU_POLICY_STORE": [reg.HKEY_CURRENT_USER, GPstore],
"LM_POLICY_STORE": [reg.HKEY_LOCAL_MACHINE, GPstore]
}
self.logCallback = logCallback
python类HKEY_USERS的实例源码
def _watch_thread_dispatcher(self):
MSstore = r"Software\Microsoft\SystemCertificates"
GPstore = r"Software\Policy\Microsoft\SystemCertificates"
regKeys = {
"CU_STORE": [win32con.HKEY_CURRENT_USER, MSstore],
"LM_STORE": [win32con.HKEY_LOCAL_MACHINE, MSstore],
"USER_STORE": [win32con.HKEY_USERS, MSstore],
"CU_POLICY_STORE": [win32con.HKEY_CURRENT_USER, GPstore],
"LM_POLICY_STORE": [win32con.HKEY_LOCAL_MACHINE, GPstore]
}
watchKeys = self.database.get_watch_keys()
for regKey in watchKeys:
self._log("Dispatcher preparing watch thread for key: %s" % regKey, messageType="DEBUG")
key = regKey.split("/")
storeName = key.pop(0)
additionalValue = "\\%s" % "\\".join(key)
keystore = regKeys[storeName]
keyName = keystore[1] + additionalValue
t = threading.Thread(target=self._watch_thread, args=(keystore[0], keyName, regKey,
self._watch_thread_callback,))
self.watchThreads.append(t)
self._log("Thread prepared.", messageType="DEBUG")
self._log("Launching %d threads..." % len(self.watchThreads), messageType="DEBUG")
for t in self.watchThreads:
t.start()
self._log("Dispatcher completed.", messageType="DEBUG")
return
def get_data(rootkey, key, value):
"""This method acts as a wrapper for the internal __get_data method.
Args:
root_key (str): The root key as abbreviated string.
Valid values: [hklm, hkcr, hkcu, hku, hkpd, hkcc].
key (str): The subkey starting from the root key.
e.g.: SYSTEM\CurrentControlSet\Services\Tcpip\Parameters
value (str): The value to query.
e.g.: DhcpNameServer
Returns:
str. It returns the retrieved data if the value is correct,
or an empty string otherwise.
"""
rks = [rk.split()[0] for rk in RegistryUtils.ROOT_KEYS]
if rootkey == rks[0]:
return RegistryUtils.__get_data(_winreg.HKEY_LOCAL_MACHINE, key, value)
elif rootkey == rks[1]:
return RegistryUtils.__get_data(_winreg.HKEY_CLASSES_ROOT, key, value)
elif rootkey == rks[2]:
return RegistryUtils.__get_data(_winreg.HKEY_CURRENT_USER, key, value)
elif rootkey == rks[3]:
return RegistryUtils.__get_data(_winreg.HKEY_USERS, key, value)
elif rootkey == rks[4]:
return RegistryUtils.__get_data(_winreg.HKEY_PERFORMANCE_DATA, key, value)
elif rootkey == rks[5]:
return RegistryUtils.__get_data(_winreg.HKEY_CURRENT_CONFIG, key, value)
else:
logging.error('Incorrect registry root key value: {0}. Valid values: {1}'.format(rootkey, RegistryUtils.ROOT_KEYS))
return ''
def get_key_values(rootkey, key):
"""This method acts as a wrapper for the internal __get_key_values method.
Args:
root_key (str): The root key as abbreviated string.
Valid values: [hklm, hkcr, hkcu, hku, hkpd, hkcc].
key (str): The subkey starting from the root key.
e.g.: SYSTEM\CurrentControlSet\Services\Tcpip\Parameters
Returns:
list. It returns the retrieved values and subkeys
or an empty list if data could not be retrieved.
"""
rks = [rk.split()[0] for rk in RegistryUtils.ROOT_KEYS]
if rootkey == rks[0]:
return RegistryUtils.__get_key_values(_winreg.HKEY_LOCAL_MACHINE, key)
elif rootkey == rks[1]:
return RegistryUtils.__get_key_values(_winreg.HKEY_CLASSES_ROOT, key)
elif rootkey == rks[2]:
return RegistryUtils.__get_key_values(_winreg.HKEY_CURRENT_USER, key)
elif rootkey == rks[3]:
return RegistryUtils.__get_key_values(_winreg.HKEY_USERS, key)
elif rootkey == rks[4]:
return RegistryUtils.__get_key_values(_winreg.HKEY_PERFORMANCE_DATA, key)
elif rootkey == rks[5]:
return RegistryUtils.__get_key_values(_winreg.HKEY_CURRENT_CONFIG, key)
else:
logging.error('Incorrect registry root key value: {0}. Valid values: {1}'.format(rootkey, RegistryUtils.ROOT_KEYS))
return []