def __init__(self, oobj=None):
if oobj is None:
oobj = pythoncom.new(self.CLSID)
elif isinstance(oobj, DispatchBaseClass):
try:
oobj = oobj._oleobj_.QueryInterface(self.CLSID, pythoncom.IID_IDispatch) # Must be a valid COM instance
except pythoncom.com_error as details:
import winerror
# Some stupid objects fail here, even tho it is _already_ IDispatch!!??
# Eg, Lotus notes.
# So just let it use the existing object if E_NOINTERFACE
if details.hresult != winerror.E_NOINTERFACE:
raise
oobj = oobj._oleobj_
self.__dict__["_oleobj_"] = oobj # so we dont call __setattr__
# Provide a prettier name than the CLSID
python类IID_IDispatch()的实例源码
def TestPyVariant(o, is_generated):
_TestPyVariant(o, is_generated, VARIANT(pythoncom.VT_UI1, 1))
_TestPyVariant(o, is_generated, VARIANT(pythoncom.VT_ARRAY | pythoncom.VT_UI4, [1,2,3]))
_TestPyVariant(o, is_generated, VARIANT(pythoncom.VT_BSTR, "hello"))
_TestPyVariant(o, is_generated, VARIANT(pythoncom.VT_ARRAY | pythoncom.VT_BSTR, ["hello", "there"]))
def check_dispatch(got):
assert isinstance(got._oleobj_, pythoncom.TypeIIDs[pythoncom.IID_IDispatch])
_TestPyVariant(o, is_generated, VARIANT(pythoncom.VT_DISPATCH, o), check_dispatch)
_TestPyVariant(o, is_generated, VARIANT(pythoncom.VT_ARRAY | pythoncom.VT_DISPATCH, [o]))
# an array of variants each with a specific type.
v = VARIANT(pythoncom.VT_ARRAY | pythoncom.VT_VARIANT,
[VARIANT(pythoncom.VT_UI4, 1),
VARIANT(pythoncom.VT_UI4, 2),
VARIANT(pythoncom.VT_UI4, 3)
]
)
_TestPyVariant(o, is_generated, v)
# and failures
_TestPyVariantFails(o, is_generated, VARIANT(pythoncom.VT_UI1, "foo"), ValueError)
def TestVTableMI():
clsctx = pythoncom.CLSCTX_SERVER
ob = pythoncom.CoCreateInstance("Python.Test.PyCOMTestMI", None, clsctx, pythoncom.IID_IUnknown)
# This inherits from IStream.
ob.QueryInterface(pythoncom.IID_IStream)
# This implements IStorage, specifying the IID as a string
ob.QueryInterface(pythoncom.IID_IStorage)
# IDispatch should always work
ob.QueryInterface(pythoncom.IID_IDispatch)
iid = pythoncom.InterfaceNames["IPyCOMTest"]
try:
ob.QueryInterface(iid)
except TypeError:
# Python can't actually _use_ this interface yet, so this is
# "expected". Any COM error is not.
pass
def BeginThreadsSimpleMarshal(self, numThreads):
"""Creates multiple threads using simple (but slower) marshalling.
Single interpreter object, but a new stream is created per thread.
Returns the handles the threads will set when complete.
"""
interp = win32com.client.Dispatch("Python.Interpreter")
events = []
threads = []
for i in range(numThreads):
hEvent = win32event.CreateEvent(None, 0, 0, None)
events.append(hEvent)
interpStream = pythoncom.CoMarshalInterThreadInterfaceInStream(pythoncom.IID_IDispatch, interp._oleobj_)
t = threading.Thread(target=self._testInterpInThread, args=(hEvent, interpStream))
t.setDaemon(1) # so errors dont cause shutdown hang
t.start()
threads.append(t)
interp = None
return threads, events
#
# NOTE - this doesnt quite work - Im not even sure it should, but Greg reckons
# you should be able to avoid the marshal per thread!
# I think that refers to CoMarshalInterface though...
def BeginThreadsFastMarshal(self, numThreads):
"""Creates multiple threads using fast (but complex) marshalling.
The marshal stream is created once, and each thread uses the same stream
Returns the handles the threads will set when complete.
"""
interp = win32com.client.Dispatch("Python.Interpreter")
if freeThreaded:
interp = pythoncom.CoMarshalInterThreadInterfaceInStream(pythoncom.IID_IDispatch, interp._oleobj_)
events = []
threads = []
for i in range(numThreads):
hEvent = win32event.CreateEvent(None, 0, 0, None)
t = threading.Thread(target=self._testInterpInThread, args=(hEvent, interp))
t.setDaemon(1) # so errors dont cause shutdown hang
t.start()
events.append(hEvent)
threads.append(t)
return threads, events
def TestGatewayInheritance():
# By default, wrap() creates and discards a temporary object.
# This is not necessary, but just the current implementation of wrap.
# As the object is correctly discarded, it doesnt affect this test.
o = wrap(Dummy(), pythoncom.IID_IPersistStorage)
o2 = o.QueryInterface(pythoncom.IID_IUnknown)
FailObjectIdentity(o, o2, "IID_IPersistStorage->IID_IUnknown")
o3 = o2.QueryInterface(pythoncom.IID_IDispatch)
FailObjectIdentity(o2, o3, "IID_IUnknown->IID_IDispatch")
FailObjectIdentity(o, o3, "IID_IPersistStorage->IID_IDispatch")
o4 = o3.QueryInterface(pythoncom.IID_IPersistStorage)
FailObjectIdentity(o, o4, "IID_IPersistStorage->IID_IPersistStorage(2)")
FailObjectIdentity(o2, o4, "IID_IUnknown->IID_IPersistStorage(2)")
FailObjectIdentity(o3, o4, "IID_IDispatch->IID_IPersistStorage(2)")
o5 = o4.QueryInterface(pythoncom.IID_IPersist)
FailObjectIdentity(o, o5, "IID_IPersistStorage->IID_IPersist")
FailObjectIdentity(o2, o5, "IID_IUnknown->IID_IPersist")
FailObjectIdentity(o3, o5, "IID_IDispatch->IID_IPersist")
FailObjectIdentity(o4, o5, "IID_IPersistStorage(2)->IID_IPersist")
def TestObjectFromWindow():
# Check we can use ObjectFromLresult to get the COM object from the
# HWND - see KB Q249232
# Locating the HWND is different than the KB says...
hwnd = win32gui.FindWindow('IEFrame', None)
for child_class in ['TabWindowClass', 'Shell DocObject View',
'Internet Explorer_Server']:
hwnd = win32gui.FindWindowEx(hwnd, 0, child_class, None)
# ack - not working for markh on vista with IE8 (or maybe it is the
# lack of the 'accessibility' components mentioned in Q249232)
# either way - not working!
return
# But here is the point - once you have an 'Internet Explorer_Server',
# you can send a message and use ObjectFromLresult to get it back.
msg = win32gui.RegisterWindowMessage("WM_HTML_GETOBJECT")
rc, result = win32gui.SendMessageTimeout(hwnd, msg, 0, 0, win32con.SMTO_ABORTIFHUNG, 1000)
ob = pythoncom.ObjectFromLresult(result, pythoncom.IID_IDispatch, 0)
doc = Dispatch(ob)
# just to prove it works, set the background color of the document.
for color in "red green blue orange white".split():
doc.bgColor = color
time.sleep(0.2)
def test(fn):
print("The main thread is %d" % (win32api.GetCurrentThreadId()))
GIT = CreateGIT()
interp = win32com.client.Dispatch("Python.Interpreter")
cookie = GIT.RegisterInterfaceInGlobal(interp._oleobj_, pythoncom.IID_IDispatch)
events = fn(4, cookie)
numFinished = 0
while 1:
try:
rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT)
if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0+len(events):
numFinished = numFinished + 1
if numFinished >= len(events):
break
elif rc==win32event.WAIT_OBJECT_0 + len(events): # a message
# This is critical - whole apartment model demo will hang.
pythoncom.PumpWaitingMessages()
else: # Timeout
print("Waiting for thread to stop with interfaces=%d, gateways=%d" % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount()))
except KeyboardInterrupt:
break
GIT.RevokeInterfaceFromGlobal(cookie)
del interp
del GIT
def Advise(self, pUnk):
# Creates a connection to the client. Simply allocate a new cookie,
# find the clients interface, and store it in a dictionary.
try:
interface = pUnk.QueryInterface(self._connect_interfaces_[0],pythoncom.IID_IDispatch)
except pythoncom.com_error:
raise Exception(scode=olectl.CONNECT_E_NOCONNECTION)
self.cookieNo = self.cookieNo + 1
self.connections[self.cookieNo] = interface
return self.cookieNo
def _transform_args_(self, args, kwArgs, dispid, lcid, wFlags, serviceProvider):
ret = []
for arg in args:
arg_type = type(arg)
if arg_type == IDispatchType:
import win32com.client
arg = win32com.client.Dispatch(arg)
elif arg_type == IUnknownType:
try:
import win32com.client
arg = win32com.client.Dispatch(arg.QueryInterface(pythoncom.IID_IDispatch))
except pythoncom.error:
pass # Keep it as IUnknown
ret.append(arg)
return tuple(ret), kwArgs
def _GetGoodDispatch(IDispatch, clsctx = pythoncom.CLSCTX_SERVER):
# quick return for most common case
if isinstance(IDispatch, PyIDispatchType):
return IDispatch
if isinstance(IDispatch, _GoodDispatchTypes):
try:
IDispatch = pythoncom.connect(IDispatch)
except pythoncom.ole_error:
IDispatch = pythoncom.CoCreateInstance(IDispatch, None, clsctx, pythoncom.IID_IDispatch)
else:
# may already be a wrapped class.
IDispatch = getattr(IDispatch, "_oleobj_", IDispatch)
return IDispatch
def _get_good_single_object_(self,ob,userName = None, ReturnCLSID=None):
if isinstance(ob, PyIDispatchType):
# make a new instance of (probably this) class.
return self._wrap_dispatch_(ob, userName, ReturnCLSID)
if isinstance(ob, PyIUnknownType):
try:
ob = ob.QueryInterface(pythoncom.IID_IDispatch)
except pythoncom.com_error:
# It is an IUnknown, but not an IDispatch, so just let it through.
return ob
return self._wrap_dispatch_(ob, userName, ReturnCLSID)
return ob
def Moniker(Pathname, clsctx = pythoncom.CLSCTX_ALL):
"""
Python friendly version of GetObject's moniker functionality.
"""
moniker, i, bindCtx = pythoncom.MkParseDisplayName(Pathname)
dispatch = moniker.BindToObject(bindCtx, None, pythoncom.IID_IDispatch)
return __WrapDispatch(dispatch, Pathname, clsctx=clsctx)
def TestObjectSemantics(ob):
# a convenient place to test some of our equality semantics
assert ob==ob._oleobj_
assert not ob!=ob._oleobj_
# same test again, but lhs and rhs reversed.
assert ob._oleobj_==ob
assert not ob._oleobj_!=ob
# same tests but against different pointers. COM identity rules should
# still ensure all works
assert ob._oleobj_==ob._oleobj_.QueryInterface(pythoncom.IID_IUnknown)
assert not ob._oleobj_!=ob._oleobj_.QueryInterface(pythoncom.IID_IUnknown)
assert ob._oleobj_!=None
assert None!=ob._oleobj_
assert ob!=None
assert None!=ob
if sys.version_info > (3,0):
try:
ob < None
raise error("Expected type error")
except TypeError:
pass
try:
None < ob
raise error("Expected type error")
except TypeError:
pass
assert ob._oleobj_.QueryInterface(pythoncom.IID_IUnknown)==ob._oleobj_
assert not ob._oleobj_.QueryInterface(pythoncom.IID_IUnknown)!=ob._oleobj_
assert ob._oleobj_==ob._oleobj_.QueryInterface(pythoncom.IID_IDispatch)
assert not ob._oleobj_!=ob._oleobj_.QueryInterface(pythoncom.IID_IDispatch)
assert ob._oleobj_.QueryInterface(pythoncom.IID_IDispatch)==ob._oleobj_
assert not ob._oleobj_.QueryInterface(pythoncom.IID_IDispatch)!=ob._oleobj_
print "Object semantic tests passed"
def SetSite(self,unknown):
if unknown:
# first get a command target
cmdtarget = unknown.QueryInterface(axcontrol.IID_IOleCommandTarget)
# then travel over to a service provider
serviceprovider = cmdtarget.QueryInterface(pythoncom.IID_IServiceProvider)
# finally ask for the internet explorer application, returned as a dispatch object
self.webbrowser = win32com.client.Dispatch(serviceprovider.QueryService('{0002DF05-0000-0000-C000-000000000046}',pythoncom.IID_IDispatch))
else:
# lose all references
self.webbrowser = None
def Advise(self, pUnk):
# Creates a connection to the client. Simply allocate a new cookie,
# find the clients interface, and store it in a dictionary.
try:
interface = pUnk.QueryInterface(self._connect_interfaces_[0],pythoncom.IID_IDispatch)
except pythoncom.com_error:
raise Exception(scode=olectl.CONNECT_E_NOCONNECTION)
self.cookieNo = self.cookieNo + 1
self.connections[self.cookieNo] = interface
return self.cookieNo
def _transform_args_(self, args, kwArgs, dispid, lcid, wFlags, serviceProvider):
ret = []
for arg in args:
arg_type = type(arg)
if arg_type == IDispatchType:
import win32com.client
arg = win32com.client.Dispatch(arg)
elif arg_type == IUnknownType:
try:
import win32com.client
arg = win32com.client.Dispatch(arg.QueryInterface(pythoncom.IID_IDispatch))
except pythoncom.error:
pass # Keep it as IUnknown
ret.append(arg)
return tuple(ret), kwArgs
def _GetGoodDispatch(IDispatch, clsctx = pythoncom.CLSCTX_SERVER):
# quick return for most common case
if isinstance(IDispatch, PyIDispatchType):
return IDispatch
if isinstance(IDispatch, _GoodDispatchTypes):
try:
IDispatch = pythoncom.connect(IDispatch)
except pythoncom.ole_error:
IDispatch = pythoncom.CoCreateInstance(IDispatch, None, clsctx, pythoncom.IID_IDispatch)
else:
# may already be a wrapped class.
IDispatch = getattr(IDispatch, "_oleobj_", IDispatch)
return IDispatch