def testClassify(self):
cap = self.src.getEEGSecs(time.time() - self.testTime)
self.testTime = time.time()
cap = self.bandpass(cap)
seg = cap.segment(start=self.windowStart, end=self.windowEnd)
seg = self.downsample(seg)
# if verbose XXX - idfah
#wx.LogMessage('nSeg: %d' % seg.getNSeg())
assert seg.getNSeg() == len(self.choices)
stim = [self.markToStim(m) for m in seg.getMarkers()]
x = self.standardizer.apply(seg.chanEmbed())
dv = self.classifier.discrim(x)
choice = stim[np.argmax(dv, axis=0)[0]]
if self.pieMenu.growBar(choice, amount=1.0/self.nTestTrial):
wx.CallAfter(self.controlPlayer, choice)
else:
wx.CallLater(1000.0*self.isi, self.runTestEpoch)
python类CallLater()的实例源码
def testDecision(self, probs):
for i,choice in enumerate(self.choices):
self.pieMenu.growBar(choice, self.gain*(probs[i]-self.loss), refresh=False)
self.pieMenu.refresh()
self.curDecision += 1
finalSelection = self.pieMenu.getSelection()
if finalSelection is None:
wx.CallAfter(self.runTestEpoch)
else:
self.pieMenu.clearAllHighlights(refresh=False)
self.pieMenu.highlight(finalSelection, style='jump', secs=self.pauseSecs)
finalLabel = self.choices.index(finalSelection)
self.src.incrementMarker(finalLabel+1)
self.confusion[finalLabel, self.choices.index(self.curChoice)] += 1.0
wx.CallLater(1000.0*self.pauseSecs, self.testClearTrial)
def testClassify(self):
cap = self.src.getEEGSecs(time.time() - self.testTime)
self.testTime = time.time()
cap = self.decimate(cap)
seg = cap.segment(start=self.windowStart, end=self.windowEnd)
assert seg.getNSeg() == len(self.choices)
stim = [self.markToStim(m) for m in seg.getMarkers()]
x = self.standardizer.apply(seg.chanEmbed())
dv = self.classifier.discrim(x)
choice = stim[np.argmax(dv, axis=0)[0]]
if self.pieMenu.growBar(choice, amount=1.0/self.nTestTrial):
wx.CallAfter(self.moveRobot, choice)
else:
wx.CallLater(1000.0*self.isi, self.runTestEpoch)
def toggleTest(self, event=None):
if self.isRunning():
self.earlyStopFlag = True
self.testButton.Disable()
wx.LogMessage('%s: Testing stopped early.' % self.name)
else:
try:
self.earlyStopFlag = False
self.trainButton.Disable()
self.retrainButton.Disable()
self.testButton.SetLabel('Stop')
self.start()
self.startTime = time.time()
self.beforeTest()
wx.CallLater(1000.0*2.0, self.runTestEpoch)
except Exception:
self.earlyStopFlag = True
self.trainButton.Enable()
self.retrainButton.Enable()
self.testButton.SetLabel('Test')
raise
def textInserted(hwnd, start, text):
global currentEntry, autoFlushTimer
log.debug("textInserted %r" % text)
if currentEntry is not None:
prevStart, prevText = currentEntry
if (not (start == -1 and prevStart == -1)) and (start < prevStart or start > (prevStart + len(prevText))):
flushCurrentEntry()
if currentEntry is not None:
prevStart, prevText = currentEntry
if prevStart == -1 and start == -1:
currentEntry = (-1, prevText + text)
else:
currentEntry = (prevStart, prevText[:start - prevStart] + text)
else:
currentEntry = (start, text)
if autoFlushTimer is not None:
autoFlushTimer.Stop()
autoFlushTimer = None
def autoFlush(*args, **kwargs):
global autoFlushTimer
autoFlushTimer = None
flushCurrentEntry()
autoFlushTimer = wx.CallLater(100, autoFlush)
def _whenPlaying(self):
if self.isPlaying:
self.elapsedTime = time.time() - self.initTime
total = self.elapsedTime + self.sndOffset
try:
pos = total / self.sndDurSecs
except:
pos = 0
if pos < 1:
self.chrono.SetLabel(self.secs2Time(total))
self.timeLeft.SetLabel("- "+ self.secs2Time(self.sndDurSecs-total))
self.sndView.refreshPos(pos)
wx.CallLater(5, self._whenPlaying)
elif pos >= 1:
self.sndEnd()
def trainEpoch(self):
if len(self.curChoices) == 0:
self.curChoices = copy.copy(self.choices)
np.random.shuffle(self.curChoices)
self.curTrial += 1
choice = self.curChoices.pop()
self.pieMenu.highlight(choice, style='pop')
self.src.setMarker(self.choices.index(choice)+1.0)
wx.CallLater(1000.0*self.trainTrialSecs, self.trainClearTrial)
def trainClearTrial(self, event=None):
self.pieMenu.clearAllHighlights()
self.src.setMarker(0.0)
if self.curTrial == self.nTrainTrial and len(self.curChoices) == 0:
wx.CallLater(1000.0*self.pauseSecs, self.endTrain)
else:
wx.CallLater(1000.0*self.pauseSecs, self.runTrainEpoch)
def testEpoch(self):
if self.curDecision == -1:
self.src.setMarker(0.0)
self.highlightTestTarget()
self.curDecision += 1
wx.CallLater(1000.0*self.width*1.1, self.runTestEpoch)
else:
# a little extra at the end to make sure we get the last segment
wx.CallLater(1000.0*self.decisionSecs*1.1, self.testClassify)
def testClassify(self):
# a little extra (0.9*self.pauseSecs) for edge effects in filter XXX - idfah
cap = self.src.getEEGSecs(self.width+0.9*self.pauseSecs).copy(dtype=np.float32) # get rid of copy and dtype after implementing in source XXX - idfah
cap = cap.trim(start=0.9*self.pauseSecs)
seg = cap.segmentSingle()
if self.method == 'Welch Power':
freqs, testData = self.powerize((seg,))
else:
testData = (seg.data,)
testDataStd = self.stand.apply(testData)[0]
label = self.classifier.label(testDataStd)[0]
selection = self.choices[label]
self.pieMenu.growBar(selection, self.gain, refresh=True)
self.curDecision += 1
finalSelection = self.pieMenu.getSelection()
if finalSelection is None:
wx.CallAfter(self.runTestEpoch)
else:
self.pieMenu.clearAllHighlights(refresh=False)
self.pieMenu.highlight(finalSelection, style='jump', secs=self.pauseSecs)
finalLabel = self.choices.index(finalSelection)
self.src.incrementMarker(finalLabel+1)
self.confusion[finalLabel, self.choices.index(self.curChoice)] += 1.0
wx.CallLater(1000.0*self.pauseSecs, self.testClearTrial)
def testClearTrial(self):
self.pieMenu.zeroBars(refresh=False)
self.pieMenu.clearAllHighlights()
self.curDecision = -1
if self.curTrial == self.nTestTrial and len(self.curChoices) == 0:
wx.CallLater(1000.0*self.pauseSecs, self.endTest)
else:
wx.CallLater(1000.0*self.pauseSecs, self.runTestEpoch)
def trainClearStim(self, event=None):
self.pieMenu.clearAllHighlights()
self.src.setMarker(0.0)
wx.CallLater(1000.0*self.isi, self.runTrainEpoch)
def testEpoch(self):
curStim = self.curStimList.pop()
self.src.setMarker(self.stimToMark(curStim))
self.pieMenu.highlight(curStim, style='jump')
wx.CallLater(1000.0*self.si, self.testClearStim)
def testClearStim(self, event=None):
self.pieMenu.clearAllHighlights()
self.src.setMarker(0.0)
if len(self.curStimList) == 0:
self.initCurStimList()
wx.CallLater(1000.0*self.windowEnd*1.05, self.testClassify)
else:
wx.CallLater(1000.0*self.isi, self.runTestEpoch)
def controlPlayer(self, choice):
self.pieMenu.highlight(choice, style='pop')
def moveOn():
wx.CallLater(1000.0*1.0, self.clearPieMenu)
wx.CallLater(1000.0*2.0, self.runTestEpoch)
if choice == 'Play':
self.mplayer.play()
wx.CallLater(1000.0*2.0, self.pieMenu.zeroBars)
elif choice == 'Album ' + rightArrow:
self.mplayer.forAlbum()
moveOn()
elif choice == leftArrow + ' Album':
self.mplayer.rewAlbum()
moveOn()
elif choice == 'Song ' + rightArrow:
self.mplayer.forSong()
moveOn()
elif choice == leftArrow + ' Song':
self.mplayer.rewSong()
moveOn()
elif choice == 'Preview':
self.mplayer.preview()
wx.CallLater(1000.0*2.0, self.pieMenu.zeroBars)
else:
raise Exception('Invalid choice: %s.' % str(choice))
def showTrainSymbol(self):
# random, no bottom row
#self.curRow = np.random.randint(0,5)
#self.curCol = np.random.randint(0,6)
trainSyms = [sym if sym != ' ' else grid.space for sym in self.trainText]
sym = trainSyms[(self.curRep-1) % len(trainSyms)]
self.curRow, self.curCol = self.gridSpeller.getGridLocation(sym)
self.gridSpeller.selectSymbol(self.curRow, self.curCol)
wx.CallLater(1000.0*self.pause, self.gridSpeller.removeHighlight)
wx.CallLater(1000.0*(self.pause+self.windowEnd), self.trainClearStim)
def trainEpoch(self):
# if the stim list is empty
if len(self.curStimList) == 0:
# increment current repetition
self.curRep += 1
# if we have done all reps, then quit
if self.curRep > len(self.trainText):
self.gridSpeller.removeHighlight()
##wx.CallLater(1000.0*self.windowEnd*1.1-1000.0*self.si, self.endTrain)
wx.CallLater(1000.0*self.windowEnd*1.1, self.endTrain)
# otherwise, reset stim list and show another training symbol
else:
self.initCurStimList()
#self.showTrainSymbol()
##wx.CallLater(1000.0*self.windowEnd*1.1-1000.0*self.si, self.showTrainSymbol)
wx.CallLater(1000.0*self.windowEnd*1.1, self.showTrainSymbol)
# if stim list still had elements to show
else:
# grab next symbol index and set marker
curStim = self.curStimList.pop()
self.src.setMarker(self.stimToMark(curStim))
# highlight row or column
if curStim <= self.nRows:
self.gridSpeller.highlightRow(curStim - 1)
else:
self.gridSpeller.highlightCol(curStim - self.nRows - 1)
# clear after si seconds
wx.CallLater(1000.0*self.si, self.trainClearStim)
def trainClearStim(self, event=None):
self.gridSpeller.removeHighlight()
self.src.setMarker(0.0)
wx.CallLater(1000.0*self.isi, self.runTrainEpoch)
def testEpoch(self):
curStim = self.curStimList.pop()
self.src.setMarker(self.stimToMark(curStim))
if curStim <= self.nRows:
self.gridSpeller.highlightRow(curStim - 1)
else:
self.gridSpeller.highlightCol(curStim - self.nRows - 1)
wx.CallLater(1000.0*self.si, self.testClearStim)
def testClearStim(self, event=None):
self.gridSpeller.removeHighlight()
self.src.setMarker(0.0)
if len(self.curStimList) == 0:
self.initCurStimList()
wx.CallLater(1000.0*self.windowEnd*1.1, self.testClassify)
else:
wx.CallLater(1000.0*self.isi, self.runTestEpoch)
def trainEpoch(self):
if len(self.curChoices) == 0:
self.curChoices = copy.copy(self.choices)
np.random.shuffle(self.curChoices)
self.curTrial += 1
choice = self.curChoices.pop()
self.pieMenu.highlight(choice, style='pop')
self.src.setMarker(self.choices.index(choice)+1.0)
wx.CallLater(1000.0*self.trainTrialSecs, self.trainClearTrial)
def trainClearTrial(self, event=None):
self.pieMenu.clearAllHighlights()
self.src.setMarker(0.0)
if self.curTrial == self.nTrainTrial and len(self.curChoices) == 0:
wx.CallLater(1000.0*self.pauseSecs, self.endTrain)
else:
wx.CallLater(1000.0*self.pauseSecs, self.runTrainEpoch)
def testEpoch(self):
if not self.gameActive and self.curDecision == -1:
self.src.setMarker(0.0)
self.highlightTestTarget()
self.curDecision += 1
wx.CallLater(1000.0*self.pauseSecs*1.1, self.runTestEpoch)
else:
wx.CallLater(1000.0*self.decisionSecs*1.1, self.testClassify)
def testClearTrial(self):
self.pieMenu.zeroBars(refresh=False)
self.pieMenu.clearAllHighlights()
self.curDecision = -1
if self.curTrial == self.nTestTrial and len(self.curChoices) == 0:
wx.CallLater(1000.0*self.pauseSecs, self.endTest)
else:
wx.CallLater(1000.0*self.pauseSecs, self.runTestEpoch)
def trainEpoch(self):
if len(self.curStimList) == 0:
self.curTrainTrial += 1
self.initCurStimList()
if self.curTrainTrial >= self.nTrainTrial:
wx.CallLater(1000.0*self.windowEnd*1.05, self.endTrain)
return
curStim = self.curStimList.pop()
self.src.setMarker(self.stimToMark(curStim))
self.pieMenu.highlight(curStim, style='jump')
wx.CallLater(1000.0*self.si, self.trainClearStim)
def trainClearStim(self, event=None):
self.pieMenu.clearAllHighlights()
self.src.setMarker(0.0)
wx.CallLater(1000.0*self.isi, self.runTrainEpoch)
def testEpoch(self):
curStim = self.curStimList.pop()
self.src.setMarker(self.stimToMark(curStim))
self.pieMenu.highlight(curStim, style='jump')
wx.CallLater(1000.0*self.si, self.testClearStim)
def testClearStim(self, event=None):
self.pieMenu.clearAllHighlights()
self.src.setMarker(0.0)
if len(self.curStimList) == 0:
self.initCurStimList()
wx.CallLater(1000.0*self.windowEnd*1.05, self.testClassify)
else:
wx.CallLater(1000.0*self.isi, self.runTestEpoch)
def trainEpoch(self):
choice = self.curChoices.pop()
self.pieMenu.highlight(choice, style='pop')
self.src.setMarker(self.choices.index(choice)+1.0)
wx.CallLater(1000.0*self.trialSecs, self.trainClearTrial)
def trainClearTrial(self, event=None):
self.pieMenu.clearAllHighlights()
self.src.setMarker(0.0)
if len(self.curChoices) > 0:
wx.CallLater(1000.0*self.iti, self.runTrainEpoch)
else:
wx.CallLater(1000.0*self.iti, self.endTrain)