python类pause()的实例源码

capture_bci.py 文件源码 项目:esys-pbi 作者: fsxfreak 项目源码 文件源码 阅读 53 收藏 0 点赞 0 评论 0
def begin(queue, event=None):
  signal.signal(signal.SIGINT, sigint_handler)
  signal.signal(signal.SIGTERM, sigterm_handler)

  if sys.platform == 'win32':
    win.SetConsoleCtrlHandler(win_handler,1)

  load(queue)
  queue.put('CONNECTED')
  start()

  try:
    # linux signal handling
    while True:
      signal.pause()
  except AttributeError:
    # signal.pause() not implemented on windows
    while not event.is_set():
      time.sleep(1)
    print('event was set in bci, stopping')
    stop(queue)
matplotlib_standalone.py 文件源码 项目:esys-pbi 作者: fsxfreak 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def begin(queue, event=None):
  signal.signal(signal.SIGINT, sigint_handler)
  signal.signal(signal.SIGTERM, sigterm_handler)

  load(queue)
  start()

  try:
    while True:
      signal.pause()
  except AttributeError:
    # signal.pause() not implemented on windows
   # while not event.is_set():
    while not event:
      time.sleep(1)

    print('event was set, stopping')
    stop()
test_threadsignals.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_signals(self):
        signalled_all.acquire()
        self.spawnSignallingThread()
        signalled_all.acquire()
        # the signals that we asked the kernel to send
        # will come back, but we don't know when.
        # (it might even be after the thread exits
        # and might be out of order.)  If we haven't seen
        # the signals yet, send yet another signal and
        # wait for it return.
        if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
           or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
            signal.alarm(1)
            signal.pause()
            signal.alarm(0)

        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
                           thread.get_ident())
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
                           thread.get_ident())
        signalled_all.release()
test_threadsignals.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_signals(self):
        signalled_all.acquire()
        self.spawnSignallingThread()
        signalled_all.acquire()
        # the signals that we asked the kernel to send
        # will come back, but we don't know when.
        # (it might even be after the thread exits
        # and might be out of order.)  If we haven't seen
        # the signals yet, send yet another signal and
        # wait for it return.
        if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
           or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
            signal.alarm(1)
            signal.pause()
            signal.alarm(0)

        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
                           thread.get_ident())
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
                           thread.get_ident())
        signalled_all.release()
test_threadsignals.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_signals(self):
        signalled_all.acquire()
        self.spawnSignallingThread()
        signalled_all.acquire()
        # the signals that we asked the kernel to send
        # will come back, but we don't know when.
        # (it might even be after the thread exits
        # and might be out of order.)  If we haven't seen
        # the signals yet, send yet another signal and
        # wait for it return.
        if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
           or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
            signal.alarm(1)
            signal.pause()
            signal.alarm(0)

        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
                           thread.get_ident())
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
                           thread.get_ident())
        signalled_all.release()
test_threadsignals.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_signals(self):
        signalled_all.acquire()
        self.spawnSignallingThread()
        signalled_all.acquire()
        # the signals that we asked the kernel to send
        # will come back, but we don't know when.
        # (it might even be after the thread exits
        # and might be out of order.)  If we haven't seen
        # the signals yet, send yet another signal and
        # wait for it return.
        if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
           or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
            signal.alarm(1)
            signal.pause()
            signal.alarm(0)

        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
                           thread.get_ident())
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
                           thread.get_ident())
        signalled_all.release()
sessions.py 文件源码 项目:python_gray_8_9_11_12 作者: 3xp10it 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def do_everything (self):
        if "pause" in self.path:
            self.session.pause_flag = True

        if "resume" in self.path:
            self.session.pause_flag = False

        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.end_headers()

        if "view_crash" in self.path:
            response = self.view_crash(self.path)
        elif "view_pcap" in self.path:
            response = self.view_pcap(self.path)
        else:
            response = self.view_index()

        self.wfile.write(response)
server.py 文件源码 项目:aiogrpc 作者: hubo1016 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def main(listen_addrs=['127.0.0.1:9901']):
    s = create_server(listen_addrs)
    print("Server created on", listen_addrs)
    s.start()
    print("Server started")
    import signal
    old1 = signal.signal(signal.SIGINT, signal_handler)
    old2 = signal.signal(signal.SIGTERM, signal_handler)
    import time
    # signal.pause is not valid in windows
    try:
        while True:
            time.sleep(3600 * 24)
    except QuitException:
        print("Quit server")
        shutdown_event = s.stop(5)
        shutdown_event.wait()
    finally:
        signal.signal(signal.SIGINT, old1)
        signal.signal(signal.SIGTERM, old2)
__init__.py 文件源码 项目:picraftzero 作者: WayneKeenan 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def stop(self):
        """Spops all animation"""
        if self.fading:
            self.fader.stop()
            self.fading = False

        if self.pulsing:
            self.pulsing = False
            self.pulser.pause()

        if self.blinking:
            self.blinking = False

        #self.gpio_pwm.stop()
        #time.sleep(0.01)
        if self._value:
            self.duty_cycle(100)
        else:
            self.duty_cycle(0)

        #GPIO.output(self.pin, self._value)

        return True
capture_bci.py 文件源码 项目:esys-pbi 作者: fsxfreak 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def main():
  signal.signal(signal.SIGINT, sigint_handler)
  signal.signal(signal.SIGTERM, sigterm_handler)
  load(queue=None)
  start()

  signal.pause()
capture_pupil.py 文件源码 项目:esys-pbi 作者: fsxfreak 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def main():
  signal.signal(signal.SIGINT, sigint_handler)
  signal.signal(signal.SIGTERM, sigterm_handler)
  load(queue=None)

  # Required message for subprocess comms
  print('CONNECTED')

  start()

  signal.pause()
capture_pupil.py 文件源码 项目:esys-pbi 作者: fsxfreak 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def begin(queue):
  signal.signal(signal.SIGTERM, sigterm_handler)

  load(queue)
  queue.put('CONNECTED')
  start()

  signal.pause()
graph_matplotlib.py 文件源码 项目:esys-pbi 作者: fsxfreak 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def begin(queue, event=None):
  signal.signal(signal.SIGINT, sigint_handler)
  signal.signal(signal.SIGTERM, sigterm_handler)

  if sys.platform == 'win32':
    win.SetConsoleCtrlHandler(win_handler, 1)

  load(queue)
  start()

  while True:
    try:
      signal.pause()
    except AttributeError:
    # signal.pause() not implemented on windows
      while not event.is_set():
        time.sleep(1)
    print('event was set in graphing utility, stopping')
    stop()
matplotlib_standalone.py 文件源码 项目:esys-pbi 作者: fsxfreak 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _graph_lsl(self):
    print('checking if stream has be initialized')
    self.streams = resolve_byprop('name', 'bci', timeout=2.5)
    try:
      self.inlet = StreamInlet(self.streams[0])
    except IndexError:
      raise ValueError('Make sure stream name=bci is opened first.')
    while self.running:
      # initial run
      self.sample, self.timestamp = self.inlet.pull_sample(timeout=5)
    # time correction to sync to local_clock()
      try:
        if self.timestamp is not None and self.sample is not None:
          self.timestamp = self.timestamp + self.inlet.time_correction(timeout=5) 

      except TimeoutError:
        pass
      self.SecondTimes.append(self.sample[1])                         #add time stamps to array 'timeValSeconds'
      self.ProcessedSig.append(self.sample[0])                           #add processed signal values to 'processedSig'

      self.count = self.count + 1

      if((self.count % 20 == 0) and (self.count != 0)):   #every 20 samples (ie ~ 0.10 s) is when plot updates
    self.lineHandle[0].set_ydata(self.ProcessedSig)
    self.lineHandle[0].set_xdata(self.SecondTimes)
    #plt.xlim(0, 5)
    plt.xlim(self.SecondTimes[0], self.SecondTimes[-1])
    plt.ylim(0, 10)
    plt.pause(0.01)

      if(self.count >= 399): 
        self.ProcessedSig.pop(0)    
        self.SecondTimes.pop(0)

    plt.pause(0.01)
    print('closing graphing utility')
    self.inlet.close_stream()
matplotlib_standalone.py 文件源码 项目:esys-pbi 作者: fsxfreak 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def main():
  signal.signal(signal.SIGINT, sigint_handler)
  signal.signal(signal.SIGTERM, sigterm_handler)
  load(queue=None)
  start()

  try:
    signal.pause()
  except AttributeError:
    while True:
      time.sleep(1)

  stop()
graph_realtime.py 文件源码 项目:esys-pbi 作者: fsxfreak 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def main():
  signal.signal(signal.SIGINT, sigint_handler)
  signal.signal(signal.SIGTERM, sigterm_handler)
  load(queue=None)
  start()

  try:
    signal.pause()
  except AttributeError:
    while True:
      time.sleep(1)

  stop()
signal_threads.py 文件源码 项目:pymotw3 作者: reingart 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def wait_for_signal():
    print('Waiting for signal in',
          threading.currentThread().name)
    signal.pause()
    print('Done waiting')

# Start a thread that will not receive the signal
argosd.py 文件源码 项目:argosd 作者: danielkoster 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def run(self):
        """Starts all runners and processes."""
        logging.info('ArgosD starting')

        self._create_database()

        logging.info('Starting taskscheduler')
        self.taskscheduler.run()

        logging.info('Starting taskrunner')
        self.taskrunner.run()

        logging.info('Starting API')
        self.api.run()

        if settings.TELEGRAM_BOT_TOKEN:
            logging.info('Starting telegrambot')
            self.bot.run()

        # Stop everything when a SIGTERM is received
        signal.signal(signal.SIGTERM, self._handle_signal)

        logging.info('ArgosD running')

        # Wait for a signal. This causes our main thread to remain alive,
        # which is needed to properly process any signals.
        signal.pause()
runbuilds.py 文件源码 项目:isar 作者: ilbers 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def handle_noargs(self, **options):
        self.runbuild()

        signal.signal(signal.SIGUSR1, lambda sig, frame: None)

        while True:
            signal.pause()
            self.runbuild()
radio.py 文件源码 项目:orangepi-radio 作者: thk4711 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def signal_term_handler(signal, frame):
        print('Exiting ...')
        sys.exit(0)

#-----------------------------------------------------------------#
#              main program                                       #
#-----------------------------------------------------------------#
#signal.pause()
test_threadsignals.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_signals(self):
        # Test signal handling semantics of threads.
        # We spawn a thread, have the thread send two signals, and
        # wait for it to finish. Check that we got both signals
        # and that they were run by the main thread.
        signalled_all.acquire()
        self.spawnSignallingThread()
        signalled_all.acquire()
        # the signals that we asked the kernel to send
        # will come back, but we don't know when.
        # (it might even be after the thread exits
        # and might be out of order.)  If we haven't seen
        # the signals yet, send yet another signal and
        # wait for it return.
        if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
           or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
            signal.alarm(1)
            signal.pause()
            signal.alarm(0)

        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
                           thread.get_ident())
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
                           thread.get_ident())
        signalled_all.release()
test_signal.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_itimer_real(self):
        self.itimer = signal.ITIMER_REAL
        signal.setitimer(self.itimer, 1.0)
        if support.verbose:
            print("\ncall pause()...")
        signal.pause()

        self.assertEqual(self.hndl_called, True)

    # Issue 3864, unknown if this affects earlier versions of freebsd also
rockettm_server.py 文件源码 项目:RocketTM 作者: kianxineki 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def main():
    # start basicevents
    default_handler = signal.getsignal(signal.SIGINT)
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    run(finish_tasks.is_set)
    list_process = []
    for queue in settings.queues:
        for x in range(queue['concurrency']):
            p = Worker(event_kill, ip=settings.RABBITMQ_IP, **queue)
            logging.info("start process worker: %s queue: %s" % (p,
                                                                 queue))
            list_process.append(p)
            p.start()

    signal.signal(signal.SIGINT, default_handler)

    try:
        signal.pause()
    except:
        print("init stop")
        event_kill.set()

    for x in list_process:
        x.join()
    print("finish tasks")
    finish_tasks.set()
test_signal.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_itimer_real(self):
        self.itimer = signal.ITIMER_REAL
        signal.setitimer(self.itimer, 1.0)
        if test_support.verbose:
            print("\ncall pause()...")
        signal.pause()

        self.assertEqual(self.hndl_called, True)

    # Issue 3864. Unknown if this affects earlier versions of freebsd also.
test_signal.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_itimer_real(self):
        self.itimer = signal.ITIMER_REAL
        signal.setitimer(self.itimer, 1.0)
        if test_support.verbose:
            print("\ncall pause()...")
        signal.pause()

        self.assertEqual(self.hndl_called, True)

    # Issue 3864. Unknown if this affects earlier versions of freebsd also.
test_threadsignals.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_signals(self):
        # Test signal handling semantics of threads.
        # We spawn a thread, have the thread send two signals, and
        # wait for it to finish. Check that we got both signals
        # and that they were run by the main thread.
        signalled_all.acquire()
        self.spawnSignallingThread()
        signalled_all.acquire()
        # the signals that we asked the kernel to send
        # will come back, but we don't know when.
        # (it might even be after the thread exits
        # and might be out of order.)  If we haven't seen
        # the signals yet, send yet another signal and
        # wait for it return.
        if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
           or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
            signal.alarm(1)
            signal.pause()
            signal.alarm(0)

        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
                           thread.get_ident())
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
                           thread.get_ident())
        signalled_all.release()
test_signal.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_itimer_real(self):
        self.itimer = signal.ITIMER_REAL
        signal.setitimer(self.itimer, 1.0)
        signal.pause()
        self.assertEqual(self.hndl_called, True)

    # Issue 3864, unknown if this affects earlier versions of freebsd also
session_runner.py 文件源码 项目:karton 作者: karton 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def main():
    pid = os.getpid()
    print('Started with PID %d' % pid)

    signal.signal(signal.SIGINT, signal.SIG_IGN)
    signal.signal(signal.SIGTERM, terminate)

    with open('/tmp/karton-session-runner.pid', 'w') as pid_file:
        pid_file.write(str(pid))

    while True:
        signal.pause()
        print('Got a signal, continuing.')
main.py 文件源码 项目:CodeLabs 作者: TheIoTLearningInitiative 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def main():

    try:

        mq2 = threading.Thread(name='fMq2', target=fMq2)
        mq2.setDaemon(True)
        mq5 = threading.Thread(name='fMq5', target=fMq5)
        mq5.setDaemon(True)
        mq2.start()
        mq5.start()
        signal.pause()

    except(KeyboardInterrupt, SystemExit):

        print 'Exiting program'
test_signal.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_itimer_real(self):
        self.itimer = signal.ITIMER_REAL
        signal.setitimer(self.itimer, 1.0)
        if test_support.verbose:
            print("\ncall pause()...")
        signal.pause()

        self.assertEqual(self.hndl_called, True)

    # Issue 3864. Unknown if this affects earlier versions of freebsd also.


问题


面经


文章

微信
公众号

扫码关注公众号