python类active_count()的实例源码

zmirror.py 文件源码 项目:zmirror 作者: aploium 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def zmirror_status():
    """????????????"""
    if request.remote_addr and request.remote_addr != '127.0.0.1':
        return generate_simple_resp_page(b'Only 127.0.0.1 are allowed', 403)
    output = ""
    output += strx('extract_real_url_from_embedded_url', extract_real_url_from_embedded_url.cache_info())
    output += strx('\nis_content_type_streamed', is_mime_streamed.cache_info())
    output += strx('\nembed_real_url_to_embedded_url', embed_real_url_to_embedded_url.cache_info())
    output += strx('\ncheck_global_ua_pass', check_global_ua_pass.cache_info())
    output += strx('\nextract_mime_from_content_type', extract_mime_from_content_type.cache_info())
    output += strx('\nis_content_type_using_cdn', is_content_type_using_cdn.cache_info())
    output += strx('\nis_ua_in_whitelist', is_content_type_using_cdn.cache_info())
    output += strx('\nis_mime_represents_text', is_mime_represents_text.cache_info())
    output += strx('\nis_domain_match_glob_whitelist', is_domain_match_glob_whitelist.cache_info())
    output += strx('\nverify_ip_hash_cookie', verify_ip_hash_cookie.cache_info())
    output += strx('\nis_denied_because_of_spider', is_denied_because_of_spider.cache_info())
    output += strx('\nis_ip_not_in_allow_range', is_ip_not_in_allow_range.cache_info())
    output += strx('\n\ncurrent_threads_number', threading.active_count())
    # output += strx('\nclient_requests_text_rewrite', client_requests_text_rewrite.cache_info())
    # output += strx('\nextract_url_path_and_query', extract_url_path_and_query.cache_info())

    output += strx('\n----------------\n')
    output += strx('\ndomain_alias_to_target_set', domain_alias_to_target_set)

    return "<pre>" + output + "</pre>\n"
main.py 文件源码 项目:Qkou_kit 作者: pddg 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def run(self):
        log.debug('[ Start TweetThread ]')
        i = 1
        a = float(1.5)
        # GetInfoThread?GetCancelThread, GetNewsThread?????????
        while active_count() >= 3:
            time.sleep(1)
        else:
            while True:
                try:
                    t = self.queue.get(block=False, timeout=None)
                except Exception:
                    # ????????????
                    log.debug('[ End TweetThread ]\n')
                    break
                if i < 12:
                    i += 1
                # 1.5^(????)???
                w = pow(a, i)
                time.sleep(w)
                lib.tweeter.tweet(t)
pig_chase_baseline.py 文件源码 项目:malmo-challenge 作者: Kaixhin 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def run_experiment(agents_def):
    assert len(agents_def) == 2, 'Not enough agents (required: 2, got: %d)'\
                % len(agents_def)

    processes = []
    for agent in agents_def:
        p = Thread(target=agent_factory, kwargs=agent)
        p.daemon = True
        p.start()

        # Give the server time to start
        if agent['role'] == 0:
            sleep(1)

        processes.append(p)

    try:
        # wait until only the challenge agent is left
        while active_count() > 2:
            sleep(0.1)
    except KeyboardInterrupt:
        print('Caught control-c - shutting down.')
pig_chase_dqn_top_down.py 文件源码 项目:malmo-challenge 作者: Kaixhin 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def run_experiment(agents_def):
    assert len(agents_def) == 2, 'Not enough agents (required: 2, got: %d)' \
                                 % len(agents_def)

    processes = []
    for agent in agents_def:
        p = Thread(target=agent_factory, kwargs=agent)
        p.daemon = True
        p.start()

        # Give the server time to start
        if agent['role'] == 0:
            sleep(1)

        processes.append(p)

    try:
        # wait until only the challenge agent is left
        while active_count() > 2:
            sleep(0.1)
    except KeyboardInterrupt:
        print('Caught control-c - shutting down.')
pig_chase_dqn.py 文件源码 项目:malmo-challenge 作者: Kaixhin 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def run_experiment(agents_def):
    assert len(agents_def) == 2, 'Not enough agents (required: 2, got: %d)' \
                                 % len(agents_def)

    processes = []
    for agent in agents_def:
        p = Thread(target=agent_factory, kwargs=agent)
        p.daemon = True
        p.start()

        # Give the server time to start
        if agent['role'] == 0:
            sleep(1)

        processes.append(p)

    try:
        # wait until only the challenge agent is left
        while active_count() > 2:
            sleep(0.1)
    except KeyboardInterrupt:
        print('Caught control-c - shutting down.')
pig_chase_baseline.py 文件源码 项目:malmo-challenge 作者: Microsoft 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def run_experiment(agents_def):
    assert len(agents_def) == 2, 'Not enough agents (required: 2, got: %d)'\
                % len(agents_def)

    processes = []
    for agent in agents_def:
        p = Thread(target=agent_factory, kwargs=agent)
        p.daemon = True
        p.start()

        # Give the server time to start
        if agent['role'] == 0:
            sleep(1)

        processes.append(p)

    try:
        # wait until only the challenge agent is left
        while active_count() > 2:
            sleep(0.1)
    except KeyboardInterrupt:
        print('Caught control-c - shutting down.')
pig_chase_dqn_top_down.py 文件源码 项目:malmo-challenge 作者: Microsoft 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def run_experiment(agents_def):
    assert len(agents_def) == 2, 'Not enough agents (required: 2, got: %d)' \
                                 % len(agents_def)

    processes = []
    for agent in agents_def:
        p = Thread(target=agent_factory, kwargs=agent)
        p.daemon = True
        p.start()

        # Give the server time to start
        if agent['role'] == 0:
            sleep(1)

        processes.append(p)

    try:
        # wait until only the challenge agent is left
        while active_count() > 2:
            sleep(0.1)
    except KeyboardInterrupt:
        print('Caught control-c - shutting down.')
pig_chase_dqn.py 文件源码 项目:malmo-challenge 作者: Microsoft 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def run_experiment(agents_def):
    assert len(agents_def) == 2, 'Not enough agents (required: 2, got: %d)' \
                                 % len(agents_def)

    processes = []
    for agent in agents_def:
        p = Thread(target=agent_factory, kwargs=agent)
        p.daemon = True
        p.start()

        # Give the server time to start
        if agent['role'] == 0:
            sleep(1)

        processes.append(p)

    try:
        # wait until only the challenge agent is left
        while active_count() > 2:
            sleep(0.1)
    except KeyboardInterrupt:
        print('Caught control-c - shutting down.')
test_multi_thread_handler.py 文件源码 项目:wqueue 作者: waltsu 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_executes_events(self):
        queue = Queue()

        first_queue_mock = Mock()
        queue_name = "queue"

        multi_thread_handler = MultiThreadHandler(queue)
        multi_thread_handler.add_function(queue_name, first_queue_mock)

        event_count = 10
        for i in range(0, 10):
            queue.put_nowait(Event(queue_name, "some data"))

        try:
            multi_thread_handler.start()
            wait_until_success(lambda: self.assertEqual(first_queue_mock.call_count, event_count))

            thread_count = get_config()["handlers"]["multi_thread"]["thread_count"]
            self.assertTrue(threading.active_count() >= thread_count + 1)
        finally:
            multi_thread_handler.stop()
test_multi.py 文件源码 项目:execnet 作者: pytest-dev 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_safe_terminate(execmodel):
    if execmodel.backend != "threading":
        pytest.xfail("execution model %r does not support task count" %
                     execmodel.backend)
    import threading
    active = threading.active_count()
    l = []

    def term():
        py.std.time.sleep(3)

    def kill():
        l.append(1)
    safe_terminate(execmodel, 1, [(term, kill)] * 10)
    assert len(l) == 10
    sleep(0.1)
    py.std.gc.collect()
    assert execmodel.active_count() == active
test_multi.py 文件源码 项目:execnet 作者: pytest-dev 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_safe_terminate2(execmodel):
    if execmodel.backend != "threading":
        pytest.xfail("execution model %r does not support task count" %
                     execmodel.backend)
    import threading
    active = threading.active_count()
    l = []

    def term():
        return

    def kill():
        l.append(1)
    safe_terminate(execmodel, 3, [(term, kill)] * 10)
    assert len(l) == 0
    sleep(0.1)
    py.std.gc.collect()
    assert threading.active_count() == active
halremote.py 文件源码 项目:piphat 作者: bschousek 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def main():
    basic = BasicClass()

    print('starting')
    basic.start()

    try:
        while True:
            time.sleep(0.5)
    except KeyboardInterrupt:
        pass

    print('stopping threads')
    basic.stop()

    # wait for all threads to terminate
    while threading.active_count() > 1:
        time.sleep(0.1)

    print('threads stopped')
    sys.exit(0)
test_threading.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 52 收藏 0 点赞 0 评论 0
def test_dummy_thread_after_fork(self):
        # Issue #14308: a dummy thread in the active list doesn't mess up
        # the after-fork mechanism.
        code = """if 1:
            import thread, threading, os, time

            def background_thread(evt):
                # Creates and registers the _DummyThread instance
                threading.current_thread()
                evt.set()
                time.sleep(10)

            evt = threading.Event()
            thread.start_new_thread(background_thread, (evt,))
            evt.wait()
            assert threading.active_count() == 2, threading.active_count()
            if os.fork() == 0:
                assert threading.active_count() == 1, threading.active_count()
                os._exit(0)
            else:
                os.wait()
        """
        _, out, err = assert_python_ok("-c", code)
        self.assertEqual(out, '')
        self.assertEqual(err, '')
test_threading.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def test_dummy_thread_after_fork(self):
        # Issue #14308: a dummy thread in the active list doesn't mess up
        # the after-fork mechanism.
        code = """if 1:
            import thread, threading, os, time

            def background_thread(evt):
                # Creates and registers the _DummyThread instance
                threading.current_thread()
                evt.set()
                time.sleep(10)

            evt = threading.Event()
            thread.start_new_thread(background_thread, (evt,))
            evt.wait()
            assert threading.active_count() == 2, threading.active_count()
            if os.fork() == 0:
                assert threading.active_count() == 1, threading.active_count()
                os._exit(0)
            else:
                os.wait()
        """
        _, out, err = assert_python_ok("-c", code)
        self.assertEqual(out, '')
        self.assertEqual(err, '')
test_base.py 文件源码 项目:rteeg 作者: kaczmarj 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_BaseStream_connect():
    event = threading.Event()
    def dummy_func():
        while not event.is_set():
            time.sleep(1.)
    base = BaseStream()
    n_threads_0 = threading.active_count()
    base.connect(dummy_func, "TEST")
    n_threads_1 = threading.active_count()
    # Check that a thread was started.
    assert n_threads_1 - n_threads_0 == 1, "Thread not started."

    # Check that the thread was created and named properly.
    name = [t.getName() for t in threading.enumerate() if t.getName() == "TEST"]
    assert name[0] == "TEST", "Thread not named properly."

    # Check that connect method only allows one connection.
    with pytest.raises(RuntimeError):
        base.connect(dummy_func, "SECOND_TEST")

    # Clean up.
    event.set()
firewall-controller.py 文件源码 项目:p4c_firewall 作者: open-nfpsw 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def natAndSend(self,packet,port):
        # NAT for first packet - This removes the need to dynamically add the 3rd rule (Controller Rule)
        # A static rule in P4 now sends packets received back form the controller out the external port.
        packet[IP].src = self.router_ext_ip
        packet[TCP].sport = port

        # Recalculate checksum Scapy way?
        del packet[IP].chksum
        del packet[TCP].chksum
        packet = packet.__class__(str(packet))

        # packet already processed so don't process again - resubmit so reason won't be valid anymore
        new_p_str = '\x00' * 6 + str(packet)

        #print "threading.active_count " + str(threading.active_count())
        s = conf.L2socket(iface=self.controller_port)
        # sendp(new_p_str, iface=self.controller_port, verbose=0)
        s.send(new_p_str)
        print port
test_threading.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_dummy_thread_after_fork(self):
        # Issue #14308: a dummy thread in the active list doesn't mess up
        # the after-fork mechanism.
        code = """if 1:
            import _thread, threading, os, time

            def background_thread(evt):
                # Creates and registers the _DummyThread instance
                threading.current_thread()
                evt.set()
                time.sleep(10)

            evt = threading.Event()
            _thread.start_new_thread(background_thread, (evt,))
            evt.wait()
            assert threading.active_count() == 2, threading.active_count()
            if os.fork() == 0:
                assert threading.active_count() == 1, threading.active_count()
                os._exit(0)
            else:
                os.wait()
        """
        _, out, err = assert_python_ok("-c", code)
        self.assertEqual(out, b'')
        self.assertEqual(err, b'')
weather_gui.py 文件源码 项目:Weather-App 作者: Tomasz-Kluczkowski 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def imperial_pushed(self):
        """Activates imperial units and changes the look of the units
        buttons.

        Returns:
            None
        """
        if self.v_link["var_units"].get() == "metric":
            self.v_link["var_units"].set("imperial")
            self.controller.update_buttons()

            if all([self.controller.data_present,
                    self.v_link["error_status"] == 0,
                    threading.active_count() < 2]):
                self.v_link["scrollbar_offset"] = self.yscrollbar.get()
                self.controller.show_display("imperial")
weather_gui.py 文件源码 项目:Weather-App 作者: Tomasz-Kluczkowski 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def begin_get_report(self):
        """Begin getting data for the weather report to display it on 
        the main_canvas.
        The call goes to the Controller first. Then to the Model.

        Returns:
            None
        """
        # Do nothing if no location is entered or an active sub thread
        # is running.
        if (self.v_link["var_loc"].get() == "") \
                or (threading.active_count() > 1):
            return
        # Clear any error status message.
        self.v_link["error_message"] = ""
        self.v_link["var_status"].set("Gathering data, please wait...")
        # Request a report using a Mediating Controller in a new thread.
        report_thread = threading.Thread(target=self.controller.get_report)
        report_thread.start()
jboss.py 文件源码 项目:python_pen 作者: RASSec 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def check(i,total):
    global eu
    #os.system("title Spider,Current threads: %d,URLs left: %d,URLs exists:%d" %(threading.active_count(),total,eu)) 
    try:
        #payload = {'username': 'admin', 'passwd': '123456'}
        r=requests.get(i+'/invoker/JMXInvokerServlet',timeout=5)
        status=r.status_code
        c=r.content.count('jboss')
        r_l=len(r.text)
    except:
        print i,'Timeout'
        status = 0
    if  status == 200 and c !=0:
        r = 0
        print i,'Exists!!!!!'
        eu+=1
        f = open("good_jboss.txt", 'a')
        f.write(i+'\n')
        f.close()
jboss.py 文件源码 项目:python_pen 作者: RASSec 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def main():
    global eu
    eu = 0
    total=len(open('8080.txt','rU').readlines())
    print 'Total URLs:%d' %total
    for i in open("8080.txt").readlines():
        i=i.strip('\n')
        t=threading.Thread(target=check, args=(i,total))
        t.setDaemon(True)
        total-=1
        while True:
            if(threading.active_count() == 1 and total == 0 ):
                print 'All Done at %s' %time.strftime("%Y-%m-%d[%H.%M.%S]")
                break
            elif (threading.active_count() < 200):
                if (total == 0):
                    time.sleep(10)
                else:
                    os.system("title Spider,Current threads: %d,URLs left: %d,URLs exists:%d" %(threading.active_count(),total,eu))
                    t.start()
                    break
webadmin.py 文件源码 项目:python_pen 作者: RASSec 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def main():
    global eu
    eu = 0
    total=len(open('10000.txt','rU').readlines())
    print 'Total URLs:%d' %total
    for i in open("10000.txt").readlines():
        i=i.strip('\n')
        t=threading.Thread(target=check, args=(i,total))
        t.setDaemon(True)
        total-=1
        while True:
            if(threading.active_count() == 1 and total == 0 ):
                print 'All Done at %s' %time.strftime("%Y-%m-%d[%H.%M.%S]")
                break
            elif (threading.active_count() < 200):
                if (total == 0):
                    time.sleep(10)
                else:
                    os.system("title Spider,Current threads: %d,URLs left: %d,URLs exists:%d" %(threading.active_count(),total,eu))
                    t.start()
                    break
wdcp.py 文件源码 项目:python_pen 作者: RASSec 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def main():
    global eu
    eu = 0
    total=len(open('8080.txt','rU').readlines())
    print 'Total URLs:%d' %total
    for i in open("8080.txt").readlines():
        i=i.strip('\n')
        t=threading.Thread(target=check, args=(i,total))
        t.setDaemon(True)
        total-=1
        while True:
            if(threading.active_count() == 1 and total == 0 ):
                print 'All Done at %s' %time.strftime("%Y-%m-%d[%H.%M.%S]")
                break
            elif (threading.active_count() < 200):
                if (total == 0):
                    time.sleep(10)
                else:
                    os.system("title Spider,Current threads: %d,URLs left: %d,URLs exists:%d" %(threading.active_count(),total,eu))
                    t.start()
                    break
test_threading.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_dummy_thread_after_fork(self):
        # Issue #14308: a dummy thread in the active list doesn't mess up
        # the after-fork mechanism.
        code = """if 1:
            import thread, threading, os, time

            def background_thread(evt):
                # Creates and registers the _DummyThread instance
                threading.current_thread()
                evt.set()
                time.sleep(10)

            evt = threading.Event()
            thread.start_new_thread(background_thread, (evt,))
            evt.wait()
            assert threading.active_count() == 2, threading.active_count()
            if os.fork() == 0:
                assert threading.active_count() == 1, threading.active_count()
                os._exit(0)
            else:
                os.wait()
        """
        _, out, err = assert_python_ok("-c", code)
        self.assertEqual(out, '')
        self.assertEqual(err, '')
test_threading.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_dummy_thread_after_fork(self):
        # Issue #14308: a dummy thread in the active list doesn't mess up
        # the after-fork mechanism.
        code = """if 1:
            import _thread, threading, os, time

            def background_thread(evt):
                # Creates and registers the _DummyThread instance
                threading.current_thread()
                evt.set()
                time.sleep(10)

            evt = threading.Event()
            _thread.start_new_thread(background_thread, (evt,))
            evt.wait()
            assert threading.active_count() == 2, threading.active_count()
            if os.fork() == 0:
                assert threading.active_count() == 1, threading.active_count()
                os._exit(0)
            else:
                os.wait()
        """
        _, out, err = assert_python_ok("-c", code)
        self.assertEqual(out, b'')
        self.assertEqual(err, b'')
test_threading.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_dummy_thread_after_fork(self):
        # Issue #14308: a dummy thread in the active list doesn't mess up
        # the after-fork mechanism.
        code = """if 1:
            import thread, threading, os, time

            def background_thread(evt):
                # Creates and registers the _DummyThread instance
                threading.current_thread()
                evt.set()
                time.sleep(10)

            evt = threading.Event()
            thread.start_new_thread(background_thread, (evt,))
            evt.wait()
            assert threading.active_count() == 2, threading.active_count()
            if os.fork() == 0:
                assert threading.active_count() == 1, threading.active_count()
                os._exit(0)
            else:
                os.wait()
        """
        _, out, err = assert_python_ok("-c", code)
        self.assertEqual(out, '')
        self.assertEqual(err, '')
pig_chase_experiment.py 文件源码 项目:malmo-challenge 作者: rhaps0dy 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def run_experiment(agents_def):
    assert len(agents_def) == 2, 'Not enough agents (required: 2, got: %d)'\
                % len(agents_def)

    processes = []
    for agent in agents_def:
        p = Thread(target=agent_factory, kwargs=agent)
        p.daemon = True
        p.start()

        # Give the server time to start
        if agent['role'] == 0:
            sleep(1)

        processes.append(p)

    try:
        # wait until only the challenge agent is left
        while active_count() > 2:
            sleep(0.1)
    except KeyboardInterrupt:
        print('Caught control-c - shutting down.')
pig_chase_dqn_top_down.py 文件源码 项目:malmo-challenge 作者: rhaps0dy 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def run_experiment(agents_def):
    assert len(agents_def) == 2, 'Not enough agents (required: 2, got: %d)' \
                                 % len(agents_def)

    processes = []
    for agent in agents_def:
        p = Thread(target=agent_factory, kwargs=agent)
        p.daemon = True
        p.start()

        # Give the server time to start
        if agent['role'] == 0:
            sleep(1)

        processes.append(p)

    try:
        # wait until only the challenge agent is left
        while active_count() > 2:
            sleep(0.1)
    except KeyboardInterrupt:
        print('Caught control-c - shutting down.')
pig_chase_dqn.py 文件源码 项目:malmo-challenge 作者: rhaps0dy 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def run_experiment(agents_def):
    assert len(agents_def) == 2, 'Not enough agents (required: 2, got: %d)' \
                                 % len(agents_def)

    processes = []
    for agent in agents_def:
        p = Thread(target=agent_factory, kwargs=agent)
        p.daemon = True
        p.start()

        # Give the server time to start
        if agent['role'] == 0:
            sleep(1)

        processes.append(p)

    try:
        # wait until only the challenge agent is left
        while active_count() > 2:
            sleep(0.1)
    except KeyboardInterrupt:
        print('Caught control-c - shutting down.')
test_threading.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_dummy_thread_after_fork(self):
        # Issue #14308: a dummy thread in the active list doesn't mess up
        # the after-fork mechanism.
        code = """if 1:
            import _thread, threading, os, time

            def background_thread(evt):
                # Creates and registers the _DummyThread instance
                threading.current_thread()
                evt.set()
                time.sleep(10)

            evt = threading.Event()
            _thread.start_new_thread(background_thread, (evt,))
            evt.wait()
            assert threading.active_count() == 2, threading.active_count()
            if os.fork() == 0:
                assert threading.active_count() == 1, threading.active_count()
                os._exit(0)
            else:
                os.wait()
        """
        _, out, err = assert_python_ok("-c", code)
        self.assertEqual(out, b'')
        self.assertEqual(err, b'')


问题


面经


文章

微信
公众号

扫码关注公众号