def DoTestInterpInThread(cookie):
try:
pythoncom.CoInitialize()
myThread = win32api.GetCurrentThreadId()
GIT = CreateGIT()
interp = GIT.GetInterfaceFromGlobal(cookie, pythoncom.IID_IDispatch)
interp = win32com.client.Dispatch(interp)
TestInterp(interp)
interp.Exec("import win32api")
print "The test thread id is %d, Python.Interpreter's thread ID is %d" % (myThread, interp.Eval("win32api.GetCurrentThreadId()"))
interp = None
pythoncom.CoUninitialize()
except:
traceback.print_exc()
python类CoInitialize()的实例源码
def DoTestInterpInThread(cookie):
try:
pythoncom.CoInitialize()
myThread = win32api.GetCurrentThreadId()
GIT = CreateGIT()
interp = GIT.GetInterfaceFromGlobal(cookie, pythoncom.IID_IDispatch)
interp = win32com.client.Dispatch(interp)
TestInterp(interp)
interp.Exec("import win32api")
print "The test thread id is %d, Python.Interpreter's thread ID is %d" % (myThread, interp.Eval("win32api.GetCurrentThreadId()"))
interp = None
pythoncom.CoUninitialize()
except:
traceback.print_exc()
def DoTestInterpInThread(cookie):
try:
pythoncom.CoInitialize()
myThread = win32api.GetCurrentThreadId()
GIT = CreateGIT()
interp = GIT.GetInterfaceFromGlobal(cookie, pythoncom.IID_IDispatch)
interp = win32com.client.Dispatch(interp)
TestInterp(interp)
interp.Exec("import win32api")
print "The test thread id is %d, Python.Interpreter's thread ID is %d" % (myThread, interp.Eval("win32api.GetCurrentThreadId()"))
interp = None
pythoncom.CoUninitialize()
except:
traceback.print_exc()
def DoTestInterpInThread(cookie):
try:
pythoncom.CoInitialize()
myThread = win32api.GetCurrentThreadId()
GIT = CreateGIT()
interp = GIT.GetInterfaceFromGlobal(cookie, pythoncom.IID_IDispatch)
interp = win32com.client.Dispatch(interp)
TestInterp(interp)
interp.Exec("import win32api")
print("The test thread id is %d, Python.Interpreter's thread ID is %d" % (myThread, interp.Eval("win32api.GetCurrentThreadId()")))
interp = None
pythoncom.CoUninitialize()
except:
traceback.print_exc()
def _doTestInThread(self, interp):
pythoncom.CoInitialize()
myThread = win32api.GetCurrentThreadId()
if freeThreaded:
interp = pythoncom.CoGetInterfaceAndReleaseStream(interp, pythoncom.IID_IDispatch)
interp = win32com.client.Dispatch(interp)
interp.Exec("import win32api")
#print "The test thread id is %d, Python.Interpreter's thread ID is %d" % (myThread, interp.Eval("win32api.GetCurrentThreadId()"))
pythoncom.CoUninitialize()
def _doTestInThread(self, interp):
pythoncom.CoInitialize()
myThread = win32api.GetCurrentThreadId()
if freeThreaded:
interp = pythoncom.CoGetInterfaceAndReleaseStream(interp, pythoncom.IID_IDispatch)
interp = win32com.client.Dispatch(interp)
interp.Exec("import win32api")
#print "The test thread id is %d, Python.Interpreter's thread ID is %d" % (myThread, interp.Eval("win32api.GetCurrentThreadId()"))
pythoncom.CoUninitialize()
def servers(self, opc_host='localhost'):
"""Return list of available OPC servers"""
try:
pythoncom.CoInitialize()
servers = self._opc.GetOPCServers(opc_host)
servers = [s for s in servers if s != None]
return servers
except pythoncom.com_error, err:
error_msg = 'servers: %s' % self._get_error_str(err)
raise OPCError, error_msg
def _detectRunningProcesses(self):
pythoncom.CoInitialize()
procs = []
w = wmi.WMI ()
for process in w.Win32_Process ():
procs.append('{0};{1}'.format(process.ProcessId, process.Name))
return procs
def _detectServices(self):
pythoncom.CoInitialize()
srvs = []
w = wmi.WMI ()
for service in w.Win32_Service ():
srvs.append('{0};{1}'.format(service.Name, str(service.StartMode)))
return srvs
def _detectUsers(self):
pythoncom.CoInitialize()
usr = []
w = wmi.WMI ()
for user in w.Win32_UserAccount ():
usr.append('{0};{1};{2}'.format(user.Name, str(AccountType(user.AccountType)).split('.')[1], 'Disabled' if user.Disabled else 'Enabled'))
return usr
def _detectDevices(self):
pythoncom.CoInitialize()
devs = []
w = wmi.WMI ()
for dev in w.Win32_PnPEntity ():
devs.append('{0};{1}'.format(dev.Name, dev.Manufacturer))
return devs
def make_COM_connecter():
try:
if onWin32:
pythoncom.CoInitialize() #v2.1 Paj
c = Dispatch('ADODB.Connection') #connect _after_ CoIninialize v2.1.1 adamvan
except:
raise api.InterfaceError ("Windows COM Error: Dispatch('ADODB.Connection') failed.")
return c
def _doTestInThread(self, interp):
pythoncom.CoInitialize()
myThread = win32api.GetCurrentThreadId()
if freeThreaded:
interp = pythoncom.CoGetInterfaceAndReleaseStream(interp, pythoncom.IID_IDispatch)
interp = win32com.client.Dispatch(interp)
interp.Exec("import win32api")
#print "The test thread id is %d, Python.Interpreter's thread ID is %d" % (myThread, interp.Eval("win32api.GetCurrentThreadId()"))
pythoncom.CoUninitialize()
def make_COM_connecter():
try:
if onWin32:
pythoncom.CoInitialize() #v2.1 Paj
c = Dispatch('ADODB.Connection') #connect _after_ CoIninialize v2.1.1 adamvan
except:
raise api.InterfaceError ("Windows COM Error: Dispatch('ADODB.Connection') failed.")
return c
def _doTestInThread(self, interp):
pythoncom.CoInitialize()
myThread = win32api.GetCurrentThreadId()
if freeThreaded:
interp = pythoncom.CoGetInterfaceAndReleaseStream(interp, pythoncom.IID_IDispatch)
interp = win32com.client.Dispatch(interp)
interp.Exec("import win32api")
#print "The test thread id is %d, Python.Interpreter's thread ID is %d" % (myThread, interp.Eval("win32api.GetCurrentThreadId()"))
pythoncom.CoUninitialize()
def run(self):
pythoncom.CoInitialize()
self.drmwmi = wmi.WMI()
while (True):
for process in self.drmwmi.Win32_Process():
for selectedProcess in BadProcesses:
try:
if selectedProcess.lower() in process.Name.lower():
try:
handle = win32api.OpenProcess(win32con.PROCESS_ALL_ACCESS, False, process.ProcessId)
filename = win32process.GetModuleFileNameEx(handle, 0)
if os.path.isfile(filename) and not DEBUG_MODE:
execute("taskkill", ("/F", "/IM", filename), True)
time.sleep(random.randint(1, 4))
os.remove(filename)
except Exception as e:
pass
process.Terminate()
except Exception as e:
pass
time.sleep(random.randint(1, 10))
def _detectRunningProcesses(self):
pythoncom.CoInitialize()
procs = []
w = wmi.WMI ()
for process in w.Win32_Process ():
procs.append('{0};{1}'.format(process.ProcessId, process.Name))
return procs
def _detectServices(self):
pythoncom.CoInitialize()
srvs = []
w = wmi.WMI ()
for service in w.Win32_Service ():
srvs.append('{0};{1}'.format(service.Name, str(service.StartMode)))
return srvs
def _detectUsers(self):
pythoncom.CoInitialize()
usr = []
w = wmi.WMI ()
for user in w.Win32_UserAccount ():
usr.append('{0};{1};{2}'.format(user.Name, str(AccountType(user.AccountType)).split('.')[1], 'Disabled' if user.Disabled else 'Enabled'))
return usr
def _detectDevices(self):
pythoncom.CoInitialize()
devs = []
w = wmi.WMI ()
for dev in w.Win32_PnPEntity ():
devs.append('{0};{1}'.format(dev.Name, dev.Manufacturer))
return devs
def collect(self):
# Needs to be called if using com from a thread.
pythoncom.CoInitialize()
wmi_obj = win32com.client.GetObject(self.plugin_args.baseobj)
# This allows our WMI to do some extra things, in particular
# it gives it access to find the executable path for all processes.
wmi_obj.Security_.Privileges.AddAsString("SeDebugPrivilege")
# Run query
try:
query_results = wmi_obj.ExecQuery(self.plugin_args.query)
except pythoncom.com_error as e:
raise plugin.PluginError(
"Failed to run WMI query \'%s\' err was %s" % (
self.plugin_args.query, e))
# Extract results from the returned COMObject and return dicts.
try:
for result in query_results:
yield dict(Result=WmiResult(result))
except pythoncom.com_error as e:
raise plugin.PluginError(
"WMI query data error on query \'%s\' err was %s" %
(e, self.plugin_args.query))
def _detectRunningProcesses(self):
pythoncom.CoInitialize()
procs = []
w = wmi.WMI ()
for process in w.Win32_Process ():
procs.append('{0};{1}'.format(process.ProcessId, process.Name))
return procs
def _detectServices(self):
pythoncom.CoInitialize()
srvs = []
w = wmi.WMI ()
for service in w.Win32_Service ():
srvs.append('{0};{1}'.format(service.Name, str(service.StartMode)))
return srvs
def _detectUsers(self):
pythoncom.CoInitialize()
usr = []
w = wmi.WMI ()
for user in w.Win32_UserAccount ():
usr.append('{0};{1};{2}'.format(user.Name, str(AccountType(user.AccountType)).split('.')[1], 'Disabled' if user.Disabled else 'Enabled'))
return usr
def _detectDevices(self):
pythoncom.CoInitialize()
devs = []
w = wmi.WMI ()
for dev in w.Win32_PnPEntity ():
devs.append('{0};{1}'.format(dev.Name, dev.Manufacturer))
return devs
def __init__(self):
# ?????????http://www.cnblogs.com/AlgorithmDot/p/3386972.html
pythoncom.CoInitialize()
self.app = win32com.client.Dispatch("PowerPoint.Application")
def refresh_information(self):
'''
?????????? ??????????
:return:
'''
self.set_access_settings_and_commands(False)
try:
pythoncom.CoInitialize()
object_tmg = win32com.client.Dispatch('FPC.Root')
isa_array = object_tmg.GetContainingArray()
except pythoncom.pywintypes.com_error as ce:
self.statusBar.showMessage('??????????? ??????????? ? Forefront TMG', -1)
self.set_access_settings_and_commands(False)
return
else:
rule = isa_array.ArrayPolicy.PolicyRules.Item(self.choice_ruleName.currentText())
rule_sets = rule.AccessProperties.DestinationDomainNameSets
counter = 1
self.domainNameSetsTMG.clear()
self.operations.clear()
self.progressBar.setValue(0)
self.progressBar_download.setValue(0)
while (counter <= rule_sets.Count):
domain_set_name = rule_sets.Item(counter).Name
item = QtWidgets.QListWidgetItem(domain_set_name)
self.domainNameSetsTMG.addItem(item)
counter += 1
self.set_access_settings_and_commands(True)
def __init__(self, url=None, timeOut=3000):
""" The class instantiation code. When the object is instantiated you can
pass a starting URL. If no URL is passed then about:blank, a blank
page, is brought up.
parameters:
[url] - url to navigate to initially
[timeOut] - how many 100mS increments to wait, 10 = 1sec, 100=10sec
returns:
Nothing
"""
#pythoncom.CoInitialize()
self.showDebugging = True # Show debug print lines?
self.colorHighlight = "#F6F7AD" # Set to None to turn off highlighting
self.frameName = None # The current frame name or index. Nested frames are
# supported in the format frame1.frame2.frame3
self.formName = None # The current form name or index
self.busyTuner = 1 # Number of consecutive checks to verify document is no longer busy.
self._ie = win32com.client.dynamic.Dispatch('InternetExplorer.Application')
if url:
self._ie.Navigate(url)
else:
self._ie.Navigate('about:blank')
self._timeOut = timeOut
self._ie.Visible = 1
#self._ie.resizable = 1
#self._ie.fullscreen = 1
self._ie.MenuBar=1
self._ie.ToolBar=1
self._ie.AddressBar=1
self.timer = datetime.datetime.now()
def __init__(self, opc_class=None, client_name=None):
"""Instantiate OPC automation class"""
self.callback_queue = Queue.Queue()
pythoncom.CoInitialize()
if opc_class == None:
if os.environ.has_key('OPC_CLASS'):
opc_class = os.environ['OPC_CLASS']
else:
opc_class = OPC_CLASS
opc_class_list = opc_class.split(';')
for i,c in enumerate(opc_class_list):
try:
self._opc = win32com.client.gencache.EnsureDispatch(c, 0)
self.opc_class = c
break
except pythoncom.com_error, err:
if i == len(opc_class_list)-1:
error_msg = 'Dispatch: %s' % self._get_error_str(err)
raise OPCError, error_msg
self._event = win32event.CreateEvent(None,0,0,None)
self.opc_server = None
self.opc_host = '128.100.1.21'
self.client_name = client_name
self._groups = {}
self._group_tags = {}
self._group_valid_tags = {}
self._group_server_handles = {}
self._group_handles_tag = {}
self._group_hooks = {}
self._open_serv = None
self._open_self = None
self._open_host = None
self._open_port = None
self._open_guid = None
self._prev_serv_time = None
self._tx_id = 0
self.trace = None
self.cpu = None