python类Event()的实例源码

__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def value(self, timeout=None):
        """Get last value 'yield'ed / value of StopIteration of task.

        NB: This method should _not_ be called from a task! This method is meant
        for main thread in the user program to wait for (main) task(s) it
        creates.

        Once task stops (finishes) executing, the last value is returned.
        """
        value = None
        self._scheduler._lock.acquire()
        if self._complete is None:
            self._complete = threading.Event()
            self._scheduler._lock.release()
            if self._complete.wait(timeout=timeout) is True:
                value = self._value
        elif self._complete == 0:
            self._scheduler._lock.release()
            value = self._value
        else:
            self._scheduler._lock.release()
            if self._complete.wait(timeout=timeout) is True:
                value = self._value
        return value
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def finish(self, timeout=None):
        """Get last value 'yield'ed / value of StopIteration of task. Must be
        used in a task with 'yield' as 'value = yield other_task.finish()'

        Once task stops (finishes) executing, the last value is returned.
        """
        value = None
        if self._complete is None:
            self._complete = Event()
            if (yield self._complete.wait(timeout=timeout)) is True:
                value = self._value
        elif self._complete == 0:
            value = self._value
        elif isinstance(self._complete, Event):
            if (yield self._complete.wait(timeout=timeout)) is True:
                value = self._value
        else:
            raise RuntimeError('invalid wait on %s/%s: %s' %
                               (self._name, self._id, type(self._complete)))
        raise StopIteration(value)
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def value(self, timeout=None):
        """Get last value 'yield'ed / value of StopIteration of task.

        NB: This method should _not_ be called from a task! This method is meant
        for main thread in the user program to wait for (main) task(s) it
        creates.

        Once task stops (finishes) executing, the last value is returned.
        """
        value = None
        self._scheduler._lock.acquire()
        if self._complete is None:
            self._complete = threading.Event()
            self._scheduler._lock.release()
            if self._complete.wait(timeout=timeout) is True:
                value = self._value
        elif self._complete == 0:
            self._scheduler._lock.release()
            value = self._value
        else:
            self._scheduler._lock.release()
            if self._complete.wait(timeout=timeout) is True:
                value = self._value
        return value
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def finish(self, timeout=None):
        """Get last value 'yield'ed / value of StopIteration of task. Must be
        used in a task with 'yield' as 'value = yield other_task.finish()'

        Once task stops (finishes) executing, the last value is returned.
        """
        value = None
        if self._complete is None:
            self._complete = Event()
            if (yield self._complete.wait(timeout=timeout)) is True:
                value = self._value
        elif self._complete == 0:
            value = self._value
        elif isinstance(self._complete, Event):
            if (yield self._complete.wait(timeout=timeout)) is True:
                value = self._value
        else:
            raise RuntimeError('invalid wait on %s/%s: %s' %
                               (self._name, self._id, type(self._complete)))
        raise StopIteration(value)
WorkModule.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, parent):
        self.parent = parent

        # Initialize variables for input data processing
        self.data_queue = Queue.Queue()
        self.empty_queue = False

        # variables for thread management
        self.is_running = True
        self.timeout_check_period = 0.1 # this is in seconds
        self.process_thread_released = False

        # create mutex locks for handling issues with Reset
        self.reset_lock = threading.Lock()
        self.reset_signal = threading.Event()

        # create and start the main thread
        self.process_thread = threading.Thread(target=self.Process)
        self.process_thread.start()
test_08_Messaging.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_EventDevicePortConnection(self):
        self.localEvent = threading.Event()
        self.eventFlag = False

        self._devBooter, self._devMgr = self.launchDeviceManager("/nodes/test_BasicTestDevice_node/DeviceManager.dcd.xml", self._domMgr)
        self.assertNotEqual(self._devBooter, None)
        self._domMgr.installApplication("/waveforms/MessageTestPy/MessageTestPy.sad.xml")
        appFact = self._domMgr._get_applicationFactories()[0]
        self.assertNotEqual(appFact, None)
        app = appFact.create(appFact._get_name(), [], [])
        self.assertNotEqual(app, None)
        app.start()
        time.sleep(2)
        components = app._get_registeredComponents()
        for component in components:
            print component.componentObject._get_identifier()
            if 'MessageReceiverPy_1' in component.componentObject._get_identifier():
                stuff = component.componentObject.query([])
        recval = any.from_any(stuff[0].value)
        self.assertEquals(6, len(recval))
        for val in recval:
            self.assertEquals('test_message' in val, True)
        app.releaseObject()
test_08_MessagingCpp.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_EventDevicePortConnectionFromPython(self):
        self.localEvent = threading.Event()
        self.eventFlag = False

        self._devBooter, self._devMgr = self.launchDeviceManager("/nodes/test_BasicTestDevice_node/DeviceManager.dcd.xml", self._domMgr)
        self.assertNotEqual(self._devBooter, None)
        self._domMgr.installApplication("/waveforms/MessageTestPyCpp/MessageTestPyCpp.sad.xml")
        appFact = self._domMgr._get_applicationFactories()[0]
        self.assertNotEqual(appFact, None)
        app = appFact.create(appFact._get_name(), [], [])
        self.assertNotEqual(app, None)
        app.start() # kick off events
        time.sleep(2)
        components = app._get_registeredComponents()
        for component in components:
            print component.componentObject._get_identifier()
            if 'DCE:b1fe6cc1-2562-4878-9a69-f191f89a6ef8' in component.componentObject._get_identifier():
                stuff = component.componentObject.query([])
        recval = any.from_any(stuff[0].value)
        self.assertEquals(6, len(recval))
        for val in recval:
            self.assertEquals('test_message' in val, True)
        app.releaseObject() # kill producer/consumer
test_08_MessagingCpp.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_EventDevicePortConnectionCppOnly(self):
        self.localEvent = threading.Event()
        self.eventFlag = False

        self._devBooter, self._devMgr = self.launchDeviceManager("/nodes/test_BasicTestDevice_node/DeviceManager.dcd.xml", self._domMgr)
        self.assertNotEqual(self._devBooter, None)
        self._domMgr.installApplication("/waveforms/MessageTestCpp/MessageTestCpp.sad.xml")
        appFact = self._domMgr._get_applicationFactories()[0]
        self.assertNotEqual(appFact, None)
        app = appFact.create(appFact._get_name(), [], [])
        self.assertNotEqual(app, None)
        app.start() # kick off events
        time.sleep(2)
        components = app._get_registeredComponents()
        for component in components:
            print component.componentObject._get_identifier()
            if 'MessageReceiverCpp_1' in component.componentObject._get_identifier():
                stuff = component.componentObject.query([])
        recval = any.from_any(stuff[0].value)
        self.assertEquals(6, len(recval))
        for val in recval:
            self.assertEquals('test_message' in val, True)
        app.releaseObject() # kill producer/consumer
test_08_EventChannelManager.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_ECM_CppComponent(self):
        self.localEvent = threading.Event()
        self.eventFlag = False

        self._devBooter, self._devMgr = self.launchDeviceManager("/nodes/test_BasicTestDevice_node/DeviceManager.dcd.xml", self._domMgr)
        self.assertNotEqual(self._devBooter, None)
        self._domMgr.installApplication("/waveforms/ECM1/ECM1.sad.xml")
        appFact = self._domMgr._get_applicationFactories()[0]
        self.assertNotEqual(appFact, None)
        self.app = appFact.create(appFact._get_name(), [], [])
        self.assertNotEqual(self.app, None)
        mlimit, mxmit, mrecv = self._process_results( self.app )
        self.assertNotEquals(mlimit, None )
        self.assertNotEquals(mxmit, None )
        self.assertNotEquals(mrecv, None )
        self.assertEquals(mlimit, mxmit )
        self.assertEquals(mlimit, mrecv )
test_08_EventChannelManager.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_ECM_PythonComponent(self):
        self.localEvent = threading.Event()
        self.eventFlag = False

        self._devBooter, self._devMgr = self.launchDeviceManager("/nodes/test_BasicTestDevice_node/DeviceManager.dcd.xml", self._domMgr)
        self.assertNotEqual(self._devBooter, None)
        self._domMgr.installApplication("/waveforms/ECM2/ECM2.sad.xml")
        appFact = self._domMgr._get_applicationFactories()[0]
        self.assertNotEqual(appFact, None)
        self.app = appFact.create(appFact._get_name(), [], [])
        mlimit, mxmit, mrecv = self._process_results( self.app )
        self.assertNotEquals(mlimit, None )
        self.assertNotEquals(mxmit, None )
        self.assertNotEquals(mrecv, None )
        self.assertEquals(mlimit, mxmit )
        self.assertEquals(mlimit, mrecv )
test_08_EventChannelManager.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_ECM_PythonComponent_Callbacks(self):
        self.localEvent = threading.Event()
        self.eventFlag = False

        self._devBooter, self._devMgr = self.launchDeviceManager("/nodes/test_BasicTestDevice_node/DeviceManager.dcd.xml", self._domMgr)
        self.assertNotEqual(self._devBooter, None)
        self._domMgr.installApplication("/waveforms/ECM2/ECM2.sad.xml")
        appFact = self._domMgr._get_applicationFactories()[0]
        self.assertNotEqual(appFact, None)
        self.app = appFact.create(appFact._get_name(), [], [])
        self.assertNotEqual(self.app, None)
        mlimit, mxmit, mrecv = self._process_results( self.app , enablecb=True)
        self.assertNotEquals(mlimit, None )
        self.assertNotEquals(mxmit, None )
        self.assertNotEquals(mrecv, None )
        self.assertEquals(mlimit, mxmit )
        self.assertEquals(mlimit, mrecv )
test_08_EventChannelManager.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_ECM_JavaComponent(self):
        self.localEvent = threading.Event()
        self.eventFlag = False

        self._devBooter, self._devMgr = self.launchDeviceManager("/nodes/test_BasicTestDevice_node/DeviceManager.dcd.xml", self._domMgr)
        self.assertNotEqual(self._devBooter, None)
        self._domMgr.installApplication("/waveforms/ECM3/ECM3.sad.xml")
        appFact = self._domMgr._get_applicationFactories()[0]
        self.assertNotEqual(appFact, None)
        self.app = appFact.create(appFact._get_name(), [], [])
        self.assertNotEqual(self.app, None)
        mlimit, mxmit, mrecv = self._process_results( self.app )
        self.assertNotEquals(mlimit, None )
        self.assertNotEquals(mxmit, None )
        self.assertNotEquals(mrecv, None )
        self.assertEquals(mlimit, mxmit )
        self.assertEquals(mlimit, mrecv )
test_08_EventChannelManager.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_ECM_JavaComponent_Callbacks(self):
        self.localEvent = threading.Event()
        self.eventFlag = False

        self._devBooter, self._devMgr = self.launchDeviceManager("/nodes/test_BasicTestDevice_node/DeviceManager.dcd.xml", self._domMgr)
        self.assertNotEqual(self._devBooter, None)
        self._domMgr.installApplication("/waveforms/ECM3/ECM3.sad.xml")
        appFact = self._domMgr._get_applicationFactories()[0]
        self.assertNotEqual(appFact, None)
        self.app = appFact.create(appFact._get_name(), [], [])
        self.assertNotEqual(self.app, None)
        mlimit, mxmit, mrecv = self._process_results( self.app, enablecb=True )
        self.assertNotEquals(mlimit, None )
        self.assertNotEquals(mxmit, None )
        self.assertNotEquals(mrecv, None )
        self.assertEquals(mlimit, mxmit )
        self.assertEquals(mlimit, mrecv )
test_08_MessagingJava.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_EventDevicePortConnectionFromPython(self):
        self.localEvent = threading.Event()
        self.eventFlag = False

        self._devBooter, self._devMgr = self.launchDeviceManager("/nodes/test_BasicTestDevice_node/DeviceManager.dcd.xml", self._domMgr)
        self.assertNotEqual(self._devBooter, None)
        self._domMgr.installApplication("/waveforms/MessageTestPyJava/MessageTestPyJava.sad.xml")
        appFact = self._domMgr._get_applicationFactories()[0]
        self.assertNotEqual(appFact, None)
        app = appFact.create(appFact._get_name(), [], [])
        self.assertNotEqual(app, None)
        app.start() # kick off events
        time.sleep(2)
        components = app._get_registeredComponents()
        for component in components:
            print component.componentObject._get_identifier()
            if 'DCE:b1fe6cc1-2562-4878-9a69-f191f89a6ef8' in component.componentObject._get_identifier():
                stuff = component.componentObject.query([])
        recval = any.from_any(stuff[0].value)
        self.assertEquals(6, len(recval))
        for val in recval:
            self.assertEquals('test_message' in val, True)
        app.releaseObject() # kill producer/consumer
test_08_MessagingJava.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_EventDevicePortConnectionJavaOnly(self):
        self.localEvent = threading.Event()
        self.eventFlag = False

        self._devBooter, self._devMgr = self.launchDeviceManager("/nodes/test_BasicTestDevice_node/DeviceManager.dcd.xml", self._domMgr)
        self.assertNotEqual(self._devBooter, None)
        self._domMgr.installApplication("/waveforms/MessageTestJava/MessageTestJava.sad.xml")
        appFact = self._domMgr._get_applicationFactories()[0]
        self.assertNotEqual(appFact, None)
        app = appFact.create(appFact._get_name(), [], [])
        self.assertNotEqual(app, None)
        components = app._get_registeredComponents()
        app.start() # kick off events
        time.sleep(2)
        for component in components:
            print component.componentObject._get_identifier()
            if 'EventReceiveJava_1' in component.componentObject._get_identifier():
                stuff = component.componentObject.query([CF.DataType("received_messages", any.to_any(None))])
        recval = any.from_any(stuff[0].value)
        self.assertEquals(6, len(recval))
        for val in recval:
            self.assertEquals('test_message' in val, True)
        app.releaseObject() # kill producer/consumer
arm.py 文件源码 项目:aws-greengrass-mini-fulfillment 作者: awslabs 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _activate_command(self, cmd):
        """Use the shared `threading.Event` instance to signal a mini
        fulfillment shadow command to the running Control thread.
        """
        self.last_state = self.active_state
        self.active_state = cmd
        log.info("[arm._activate_command] last_state='{0}' state='{1}'".format(
            self.last_state, cmd))

        if self.active_state == 'run':
            log.info("[arm._activate_command] START RUN")
            self.cmd_event.set()
        elif self.active_state == 'stop':
            log.info("[arm._activate_command] STOP")
            self.cmd_event.clear()
        return
periodic_executor.py 文件源码 项目:mongodb-monitoring 作者: jruaux 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, interval, min_interval, target, name=None):
        """"Run a target function periodically on a background thread.

        If the target's return value is false, the executor stops.

        :Parameters:
          - `interval`: Seconds between calls to `target`.
          - `min_interval`: Minimum seconds between calls if `wake` is
            called very often.
          - `target`: A function.
          - `name`: A name to give the underlying thread.
        """
        # threading.Event and its internal condition variable are expensive
        # in Python 2, see PYTHON-983. Use a boolean to know when to wake.
        # The executor's design is constrained by several Python issues, see
        # "periodic_executor.rst" in this repository.
        self._event = False
        self._interval = interval
        self._min_interval = min_interval
        self._target = target
        self._stopped = False
        self._thread = None
        self._name = name
test_connection.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_single_connection(self):
        """
        Test a single connection with sequential requests.
        """
        conn = self.get_connection()
        query = "SELECT keyspace_name FROM system.schema_keyspaces LIMIT 1"
        event = Event()

        def cb(count, *args, **kwargs):
            count += 1
            if count >= 10:
                conn.close()
                event.set()
            else:
                conn.send_msg(
                    QueryMessage(query=query, consistency_level=ConsistencyLevel.ONE),
                    request_id=0,
                    cb=partial(cb, count))

        conn.send_msg(
            QueryMessage(query=query, consistency_level=ConsistencyLevel.ONE),
            request_id=0,
            cb=partial(cb, 0))
        event.wait()
test_connection.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_single_connection_pipelined_requests(self):
        """
        Test a single connection with pipelined requests.
        """
        conn = self.get_connection()
        query = "SELECT keyspace_name FROM system.schema_keyspaces LIMIT 1"
        responses = [False] * 100
        event = Event()

        def cb(response_list, request_num, *args, **kwargs):
            response_list[request_num] = True
            if all(response_list):
                conn.close()
                event.set()

        for i in range(100):
            conn.send_msg(
                QueryMessage(query=query, consistency_level=ConsistencyLevel.ONE),
                request_id=i,
                cb=partial(cb, responses, i))

        event.wait()
cluster.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def __init__(self, session, message, query, timeout, metrics=None, prepared_statement=None,
                 retry_policy=RetryPolicy(), row_factory=None, load_balancer=None, start_time=None, speculative_execution_plan=None):
        self.session = session
        # TODO: normalize handling of retry policy and row factory
        self.row_factory = row_factory or session.row_factory
        self._load_balancer = load_balancer or session.cluster._default_load_balancing_policy
        self.message = message
        self.query = query
        self.timeout = timeout
        self._retry_policy = retry_policy
        self._metrics = metrics
        self.prepared_statement = prepared_statement
        self._callback_lock = Lock()
        self._start_time = start_time or time.time()
        self._make_query_plan()
        self._event = Event()
        self._errors = {}
        self._callbacks = []
        self._errbacks = []
        self._spec_execution_plan = speculative_execution_plan or self._spec_execution_plan
        self.attempted_hosts = []
        self._start_timer()
annotation.py 文件源码 项目:active_stream 作者: flinder 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, data, train_threshold=1):
        super(Annotator, self).__init__(name='Annotator')
        self.database = data['database']
        self.train = data['events']['train_model']
        self.stoprequest = threading.Event()
        self.n_positive = False
        self.n_negative = False
        self.train_threshold = train_threshold
        self.annotation_response = data['queues']['annotation_response']
        self.socket = data['socket']
        self.annotated_text = {}
        self.message_queue = data['queues']['messages']
        self.n_trainer_triggered = 0
        self.clf_performance = {
                'true_positive': 0,
                'true_negative': 0,
                'false_positive': 0,
                'false_negative': 0
                }
        self.first = True
monitor.py 文件源码 项目:active_stream 作者: flinder 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, data, streamer, classifier, annotator):
        super(Monitor, self).__init__(name='Monitor')
        self.database = data['database']
        self.stoprequest = threading.Event()
        self.socket = data['socket']
        self.mif_queue = data['queues']['most_important_features']
        self.limit_queue = data['queues']['limit']
        self.mif = None
        self.streamer = streamer
        self.last_count = 0
        self.clf = classifier
        self.annotator = annotator
        self.counts = []
        self.missed = 0
        self.message_queue = data['queues']['messages']
        self.report_interval = 0.3
Win32NativeWifiApiTests.py 文件源码 项目:win32wifi 作者: kedos 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def testWlanRegisterNotification(self):
        handle = WlanOpenHandle()
        wlan_ifaces = WlanEnumInterfaces(handle)
        data_type = wlan_ifaces.contents.InterfaceInfo._type_
        num = wlan_ifaces.contents.NumberOfItems
        ifaces_pointer = addressof(wlan_ifaces.contents.InterfaceInfo)
        wlan_iface_info_list = (data_type * num).from_address(ifaces_pointer)
        msg = "We expect at least one wireless interface."
        self.assertGreaterEqual(len(wlan_iface_info_list), 1, msg)

        import threading
        ev = threading.Event()

        def callback(wnd, p):
            ev.set()

        cb = WlanRegisterNotification(handle, callback)
        ev.wait(5)

        if not ev.is_set():
            self.fail("Didn't receive any notification.")
imaplib2.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __init__(self, parent, name=None, callback=None, cb_arg=None, cb_self=False):
        self.parent = parent
        self.name = name
        self.callback = callback               # Function called to process result
        if not cb_self:
            self.callback_arg = cb_arg         # Optional arg passed to "callback"
        else:
            self.callback_arg = (self, cb_arg) # Self reference required in callback arg

        self.tag = '%s%s' % (parent.tagpre, parent.tagnum)
        parent.tagnum += 1

        self.ready = threading.Event()
        self.response = None
        self.aborted = None
        self.data = None
test_consumer.py 文件源码 项目:seqlog 作者: tintoy 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_batchsize_2_pre_fill(self):
        record_queue = Queue()
        record_queue.put("Item1")
        record_queue.put("Item2")

        batch_received = Event()

        def handler(record_batch):
            assert len(record_batch) == 2, \
                "Incorrect batch size (expected 2, but found {}.".format(len(record_batch))

            batch_received.set()

        consumer = QueueConsumer("Test Consumer", record_queue, handler, batch_size=2)
        consumer.start()

        batch_received.wait(timeout=2000)

        consumer.stop()
test_consumer.py 文件源码 项目:seqlog 作者: tintoy 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_batchsize_2_post_fill(self):
        record_queue = Queue()

        batch_received = Event()

        def handler(record_batch):
            assert len(record_batch) == 2, \
                "Incorrect batch size (expected 2, but found {}.".format(len(record_batch))

            batch_received.set()

        consumer = QueueConsumer("Test Consumer", record_queue, handler, batch_size=2)
        consumer.start()

        record_queue.put("Item1")
        record_queue.put("Item2")

        batch_received.wait(timeout=2000)

        consumer.stop()

    #
    # With flush timeout
    #
test_consumer.py 文件源码 项目:seqlog 作者: tintoy 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_batchsize_3_post_fill_flush_timeout(self):
        record_queue = Queue()

        batch_received = Event()

        def handler(record_batch):
            assert len(record_batch) == 2, \
                "Incorrect batch size (expected 2, but found {}.".format(len(record_batch))

            batch_received.set()

        consumer = QueueConsumer("Test Consumer", record_queue, handler, batch_size=3, auto_flush_timeout=0.2)
        consumer.start()

        record_queue.put("Item1")
        record_queue.put("Item2")
        sleep(300 / 1000)
        record_queue.put("Item3")

        batch_received.wait(timeout=2000)

        consumer.stop()
ioloop_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_add_callback_while_closing(self):
        # Issue #635: add_callback() should raise a clean exception
        # if called while another thread is closing the IOLoop.
        closing = threading.Event()

        def target():
            other_ioloop.add_callback(other_ioloop.stop)
            other_ioloop.start()
            closing.set()
            other_ioloop.close(all_fds=True)
        other_ioloop = IOLoop()
        thread = threading.Thread(target=target)
        thread.start()
        closing.wait()
        for i in range(1000):
            try:
                other_ioloop.add_callback(lambda: None)
            except RuntimeError as e:
                self.assertEqual("IOLoop is closing", str(e))
                break
ioloop_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_add_callback_while_closing(self):
        # Issue #635: add_callback() should raise a clean exception
        # if called while another thread is closing the IOLoop.
        closing = threading.Event()

        def target():
            other_ioloop.add_callback(other_ioloop.stop)
            other_ioloop.start()
            closing.set()
            other_ioloop.close(all_fds=True)
        other_ioloop = IOLoop()
        thread = threading.Thread(target=target)
        thread.start()
        closing.wait()
        for i in range(1000):
            try:
                other_ioloop.add_callback(lambda: None)
            except RuntimeError as e:
                self.assertEqual("IOLoop is closing", str(e))
                break
ioloop_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_add_callback_while_closing(self):
        # Issue #635: add_callback() should raise a clean exception
        # if called while another thread is closing the IOLoop.
        closing = threading.Event()

        def target():
            other_ioloop.add_callback(other_ioloop.stop)
            other_ioloop.start()
            closing.set()
            other_ioloop.close(all_fds=True)
        other_ioloop = IOLoop()
        thread = threading.Thread(target=target)
        thread.start()
        closing.wait()
        for i in range(1000):
            try:
                other_ioloop.add_callback(lambda: None)
            except RuntimeError as e:
                self.assertEqual("IOLoop is closing", str(e))
                break


问题


面经


文章

微信
公众号

扫码关注公众号