def serialReadEvent(self):
#get that character we set up
n = win32file.GetOverlappedResult(self._serial.hComPort, self._overlappedRead, 0)
if n:
first = str(self.read_buf[:n])
#now we should get everything that is already in the buffer
flags, comstat = win32file.ClearCommError(self._serial.hComPort)
if comstat.cbInQue:
win32event.ResetEvent(self._overlappedRead.hEvent)
rc, buf = win32file.ReadFile(self._serial.hComPort,
win32file.AllocateReadBuffer(comstat.cbInQue),
self._overlappedRead)
n = win32file.GetOverlappedResult(self._serial.hComPort, self._overlappedRead, 1)
#handle all the received data:
self.protocol.dataReceived(first + str(buf[:n]))
else:
#handle all the received data:
self.protocol.dataReceived(first)
#set up next one
win32event.ResetEvent(self._overlappedRead.hEvent)
rc, self.read_buf = win32file.ReadFile(self._serial.hComPort,
win32file.AllocateReadBuffer(1),
self._overlappedRead)
python类ResetEvent()的实例源码
def serialReadEvent(self):
#get that character we set up
n = win32file.GetOverlappedResult(self._serial.hComPort, self._overlappedRead, 0)
if n:
first = str(self.read_buf[:n])
#now we should get everything that is already in the buffer
flags, comstat = win32file.ClearCommError(self._serial.hComPort)
if comstat.cbInQue:
win32event.ResetEvent(self._overlappedRead.hEvent)
rc, buf = win32file.ReadFile(self._serial.hComPort,
win32file.AllocateReadBuffer(comstat.cbInQue),
self._overlappedRead)
n = win32file.GetOverlappedResult(self._serial.hComPort, self._overlappedRead, 1)
#handle all the received data:
self.protocol.dataReceived(first + str(buf[:n]))
else:
#handle all the received data:
self.protocol.dataReceived(first)
#set up next one
win32event.ResetEvent(self._overlappedRead.hEvent)
rc, self.read_buf = win32file.ReadFile(self._serial.hComPort,
win32file.AllocateReadBuffer(1),
self._overlappedRead)
def serialReadEvent(self):
#get that character we set up
n = win32file.GetOverlappedResult(self._serial.hComPort, self._overlappedRead, 0)
if n:
first = str(self.read_buf[:n])
#now we should get everything that is already in the buffer
flags, comstat = win32file.ClearCommError(self._serial.hComPort)
if comstat.cbInQue:
win32event.ResetEvent(self._overlappedRead.hEvent)
rc, buf = win32file.ReadFile(self._serial.hComPort,
win32file.AllocateReadBuffer(comstat.cbInQue),
self._overlappedRead)
n = win32file.GetOverlappedResult(self._serial.hComPort, self._overlappedRead, 1)
#handle all the received data:
self.protocol.dataReceived(first + str(buf[:n]))
else:
#handle all the received data:
self.protocol.dataReceived(first)
#set up next one
win32event.ResetEvent(self._overlappedRead.hEvent)
rc, self.read_buf = win32file.ReadFile(self._serial.hComPort,
win32file.AllocateReadBuffer(1),
self._overlappedRead)
def Connect(entryName, bUseCallback):
if bUseCallback:
theCallback = Callback
win32event.ResetEvent(callbackEvent)
else:
theCallback = None
# in order to *use* the username/password of a particular dun entry, one must
# explicitly get those params under win95....
try:
dp, b = win32ras.GetEntryDialParams( None, entryName )
except:
print "Couldn't find DUN entry: %s" % entryName
else:
hras, rc = win32ras.Dial(None, None, (entryName, "", "", dp[ 3 ], dp[ 4 ], ""),theCallback)
# hras, rc = win32ras.Dial(None, None, (entryName, ),theCallback)
# print hras, rc
if not bUseCallback and rc != 0:
print "Could not dial the RAS connection:", win32ras.GetErrorString(rc)
hras = HangUp( hras )
# don't wait here if there's no need to....
elif bUseCallback and win32event.WaitForSingleObject(callbackEvent, 60000)!=win32event.WAIT_OBJECT_0:
print "Gave up waiting for the process to complete!"
# sdk docs state one must explcitly hangup, even if there's an error....
try:
cs = win32ras.GetConnectStatus( hras )
except:
# on error, attempt a hang up anyway....
hras = HangUp( hras )
else:
if int( cs[ 0 ] ) == win32ras.RASCS_Disconnected:
hras = HangUp( hras )
return hras, rc
def testResetEvent(self):
event = win32event.CreateEvent(None, True, True, None)
self.assertSignaled(event)
res = win32event.ResetEvent(event)
self.assertEquals(res, None)
self.assertNotSignaled(event)
event.close()
self.assertRaises(pywintypes.error, win32event.ResetEvent, event)
def Connect(entryName, bUseCallback):
if bUseCallback:
theCallback = Callback
win32event.ResetEvent(callbackEvent)
else:
theCallback = None
# in order to *use* the username/password of a particular dun entry, one must
# explicitly get those params under win95....
try:
dp, b = win32ras.GetEntryDialParams( None, entryName )
except:
print("Couldn't find DUN entry: %s" % entryName)
else:
hras, rc = win32ras.Dial(None, None, (entryName, "", "", dp[ 3 ], dp[ 4 ], ""),theCallback)
# hras, rc = win32ras.Dial(None, None, (entryName, ),theCallback)
# print hras, rc
if not bUseCallback and rc != 0:
print("Could not dial the RAS connection:", win32ras.GetErrorString(rc))
hras = HangUp( hras )
# don't wait here if there's no need to....
elif bUseCallback and win32event.WaitForSingleObject(callbackEvent, 60000)!=win32event.WAIT_OBJECT_0:
print("Gave up waiting for the process to complete!")
# sdk docs state one must explcitly hangup, even if there's an error....
try:
cs = win32ras.GetConnectStatus( hras )
except:
# on error, attempt a hang up anyway....
hras = HangUp( hras )
else:
if int( cs[ 0 ] ) == win32ras.RASCS_Disconnected:
hras = HangUp( hras )
return hras, rc
def testResetEvent(self):
event = win32event.CreateEvent(None, True, True, None)
self.assertSignaled(event)
res = win32event.ResetEvent(event)
self.assertEquals(res, None)
self.assertNotSignaled(event)
event.close()
self.assertRaises(pywintypes.error, win32event.ResetEvent, event)
def __init__(self, tagName='trader_msg', size = 1024*1024):
self.fm = mmap.mmap(-1, size, access=mmap.ACCESS_WRITE, tagname=tagName)
self.eventAu = win32event.OpenEvent(win32event.EVENT_ALL_ACCESS, 0 ,"aauto_trigger")
self.eventPy = win32event.OpenEvent(win32event.EVENT_ALL_ACCESS, 0 ,"python_trigger")
self.timeOut = 0xFFFFFFFF #????
win32event.ResetEvent(self.eventAu)
win32event.ResetEvent(self.eventPy)
def sendCmd(self, cmd_id, arg=None):
self.fm.seek(0)
# ?????
cmd = struct.pack('i', cmd_id)
self.fm.write(cmd)
# ????struct???
if arg:
self.fm.write(arg)
# ??AAuto??
win32event.ResetEvent(self.eventPy)
win32event.SetEvent(self.eventAu)