python类time()的实例源码

test_manager.py 文件源码 项目:dsq 作者: baverman 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_worker_alarm(manager):
    called = []
    def handler(signal, frame):
        called.append(True)
    signal.signal(signal.SIGALRM, handler)

    @manager.task
    def foo(sleep):
        time.sleep(sleep)

    w = Worker(manager, task_timeout=1)
    w.process_one(make_task('foo', args=(0.1,)))
    assert not called

    w.process_one(make_task('foo', args=(1.1,)))
    assert called
zhihu_login_requests.py 文件源码 项目:ArticleSpider 作者: mtianyan 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def get_captcha():
    import time
    t = str(int(time.time()*1000))
    captcha_url = "https://www.zhihu.com/captcha.gif?r={0}&type=login".format(t)
    t = session.get(captcha_url, headers=header)
    with open("captcha.jpg","wb") as f:
        f.write(t.content)
        f.close()

    from PIL import Image
    try:
        im = Image.open('captcha.jpg')
        im.show()
        im.close()
    except:
        pass

    captcha = input("?????\n>")
    return captcha
dispycos.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def run_async_at(self, where, gen, *args, **kwargs):
        """Must be used with 'yield' as

        'rtask = yield computation.run_async_at(where, gen, ...)'

        Run given generator function 'gen' with arguments 'args' and 'kwargs' at
        remote server 'where'.  If the request is successful, 'rtask' will be a
        (remote) task; check result with 'isinstance(rtask,
        pycos.Task)'. The generator is supposed to be (mostly) I/O bound and
        not consume CPU time. Unlike other 'run' variants, tasks created
        with 'async' are not "tracked" by scheduler (see online documentation for
        more details).

        If 'where' is a string, it is assumed to be IP address of a node, in
        which case the task is scheduled at that node on a server at that
        node. If 'where' is a Location instance, it is assumed to be server
        location in which case the task is scheduled at that server.

        'gen' must be generator function, as it is used to run task at
        remote location.
        """
        yield self._run_request('run_async', where, 0, gen, *args, **kwargs)
dispycos.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def _pulse_proc(self, task=None):
        """For internal use only.
        """
        task.set_daemon()
        last_pulse = time.time()
        timeout = 2 * self._pulse_interval
        while 1:
            msg = yield task.receive(timeout=timeout)
            if msg == 'pulse':
                last_pulse = time.time()
            elif msg == 'quit':
                break
            elif msg is None and (time.time() - last_pulse) > (10 * self._pulse_interval):
                logger.warning('scheduler may have gone away!')
            else:
                logger.debug('ignoring invalid pulse message')
        self._pulse_task = None
dispycos.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def __init__(self, name, addr):
            self.name = name
            self.addr = addr
            self.cpus_used = 0
            self.cpus = 0
            self.platform = None
            self.avail_info = None
            self.servers = {}
            self.disabled_servers = {}
            self.load = 0.0
            self.status = Scheduler.NodeClosed
            self.task = None
            self.last_pulse = time.time()
            self.lock = pycos.Lock()
            self.avail = pycos.Event()
            self.avail.clear()
httpd.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def shutdown(self, wait=True):
        """This method should be called by user program to close the
        http server. If 'wait' is True the server waits for poll_sec
        so the http client gets all the updates before server is
        closed.
        """
        if wait:
            pycos.logger.info('HTTP server waiting for %s seconds for client updates '
                              'before quitting', self._poll_sec)
            if pycos.Pycos().cur_task():
                def _shutdown(task=None):
                    yield task.sleep(self._poll_sec + 0.5)
                    self._server.shutdown()
                    self._server.server_close()
                pycos.Task(_shutdown)
            else:
                time.sleep(self._poll_sec + 0.5)
                self._server.shutdown()
                self._server.server_close()
        else:
            self._server.shutdown()
            self._server.server_close()
dispycos.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def _pulse_proc(self, task=None):
        """For internal use only.
        """
        task.set_daemon()
        last_pulse = time.time()
        timeout = 2 * self._pulse_interval
        while 1:
            msg = yield task.receive(timeout=timeout)
            if msg == 'pulse':
                last_pulse = time.time()
            elif msg == 'quit':
                break
            elif msg is None and (time.time() - last_pulse) > (10 * self._pulse_interval):
                logger.warning('scheduler may have gone away!')
            else:
                logger.debug('ignoring invalid pulse message')
        self._pulse_task = None
dispycos.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __init__(self, name, addr):
            self.name = name
            self.addr = addr
            self.cpus_used = 0
            self.cpus = 0
            self.platform = None
            self.avail_info = None
            self.servers = {}
            self.disabled_servers = {}
            self.load = 0.0
            self.status = Scheduler.NodeClosed
            self.task = None
            self.last_pulse = time.time()
            self.lock = pycos.Lock()
            self.avail = pycos.Event()
            self.avail.clear()
httpd.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def shutdown(self, wait=True):
        """This method should be called by user program to close the
        http server. If 'wait' is True the server waits for poll_sec
        so the http client gets all the updates before server is
        closed.
        """
        if wait:
            pycos.logger.info('HTTP server waiting for %s seconds for client updates '
                              'before quitting', self._poll_sec)
            if pycos.Pycos().cur_task():
                def _shutdown(task=None):
                    yield task.sleep(self._poll_sec + 0.5)
                    self._server.shutdown()
                    self._server.server_close()
                pycos.Task(_shutdown)
            else:
                time.sleep(self._poll_sec + 0.5)
                self._server.shutdown()
                self._server.server_close()
        else:
            self._server.shutdown()
            self._server.server_close()
attract-repel.py 文件源码 项目:attract-repel 作者: nmrksic 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def create_vector_dictionary(self):
        """
        Extracts the current word vectors from TensorFlow embeddings and (if print_simlex=True) prints their SimLex scores. 
        """
        log_time = time.time()

        [current_vectors] = self.sess.run([self.W_dynamic])
        self.word_vectors = {}
        for idx in range(0, self.vocabulary_size):
            self.word_vectors[self.inverted_index[idx]] = normalise_vector(current_vectors[idx, :])

        if self.log_scores_over_time or self.print_simlex:
            (score_simlex, score_wordsim) = simlex_scores(self.word_vectors, self.print_simlex)
            return (score_simlex, score_wordsim)

        return (1.0, 1.0)
utils.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 50 收藏 0 点赞 0 评论 0
def service_restarted(self, sentry_unit, service, filename,
                          pgrep_full=None, sleep_time=20):
        """Check if service was restarted.

           Compare a service's start time vs a file's last modification time
           (such as a config file for that service) to determine if the service
           has been restarted.
           """
        # /!\ DEPRECATION WARNING (beisner):
        # This method is prone to races in that no before-time is known.
        # Use validate_service_config_changed instead.

        # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
        # used instead of pgrep.  pgrep_full is still passed through to ensure
        # deprecation WARNS.  lp1474030
        self.log.warn('DEPRECATION WARNING:  use '
                      'validate_service_config_changed instead of '
                      'service_restarted due to known races.')

        time.sleep(sleep_time)
        if (self._get_proc_start_time(sentry_unit, service, pgrep_full) >=
                self._get_file_mtime(sentry_unit, filename)):
            return True
        else:
            return False
streams.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def add_to_redis(content, through, keys):
    """Add content to a list of Redis ordered sets.

    :param content: Content object to add
    :param through: Content through object. For example on shares, this is the linked share content object
    :param keys: List of keys to add to
    """
    if not keys:
        return
    r = get_redis_connection()
    for key in keys:
        # Only add if not in the set already
        # This stops shares popping up more than once, for example
        if not r.zrank(key, content.id):
            r.zadd(key, int(time.time()), content.id)
            r.hset(BaseStream.get_throughs_key(key), content.id, through.id)
Gateway.py 文件源码 项目:kiota 作者: Morteo 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def start(self):

    while True:
      try:
        self.connect()
        while True:
          #time.sleep(0.01) # attempt to reduce number of OSError: [Errno 104] ECONNRESET 
          self.client.check_msg()
          #time.sleep(0.01) # attempt to reduce number of OSError: [Errno 104] ECONNRESET 
          self.push()
          time.sleep(0.01)
      except OSError as e:
          Util.log(self,"failed to connect, retrying....", e)
          time.sleep(self.config["wait_to_reconnect"])

    self.client.disconnect()
test_retry.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _test_should_sleep(self, seconds_left, slept):
        attempt = 5
        timeout = 20
        interval = 3
        randint = 2
        deadline = self.now + seconds_left
        retry = h_retry.Retry(mock.Mock(), timeout=timeout, interval=interval)

        with mock.patch('random.randint') as m_randint, \
                mock.patch('time.sleep') as m_sleep:
            m_randint.return_value = randint

            ret = retry._sleep(deadline, attempt, _EX2())

            self.assertEqual(slept, ret)
            m_randint.assert_called_once_with(1, 2 ** attempt - 1)
            m_sleep.assert_called_once_with(slept)
vif_pool.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _populate_pool(self, pool_key, pod, subnets):
        # REVISIT(ltomasbo): Drop the subnets parameter and get the information
        # from the pool_key, which will be required when multi-network is
        # supported
        now = time.time()
        if (now - oslo_cfg.CONF.vif_pool.ports_pool_update_frequency <
                self._last_update.get(pool_key, 0)):
            LOG.info("Not enough time since the last pool update")
            return
        self._last_update[pool_key] = now

        pool_size = self._get_pool_size(pool_key)
        if pool_size < oslo_cfg.CONF.vif_pool.ports_pool_min:
            num_ports = max(oslo_cfg.CONF.vif_pool.ports_pool_batch,
                            oslo_cfg.CONF.vif_pool.ports_pool_min - pool_size)
            vifs = self._drv_vif.request_vifs(
                pod=pod,
                project_id=pool_key[1],
                subnets=subnets,
                security_groups=list(pool_key[2]),
                num_ports=num_ports)
            for vif in vifs:
                self._existing_vifs[vif.id] = vif
                self._available_ports_pools.setdefault(pool_key,
                                                       []).append(vif.id)
tox_xmlrpc_client.py 文件源码 项目:toxxmlrpc 作者: merlink01 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def start(self):
        self.client.start()
        #~ if not self.disable_auto_login:
            #~ while self.client.status == 'offline':
                #~ time.sleep(1)
            #~ logger.info('Client: %s'%self.client.status)
        if self.server_id:
            already_added = False
            for f in self.client.get_friend_list():
                if self.client.friend_get_public_key(f) in self.server_id:
                    already_added = True
                    logger.info('Server already in added')
                    break
            if not already_added:
                self.client.friend_add_with_request(self.server_id,self.password)
                logger.info('Started Friend request to Server')
        else:
            logger.info('No Server ID given')
tox_xmlrpc_client.py 文件源码 项目:toxxmlrpc 作者: merlink01 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __request(self,methodname,args):
        logger.info('Execute: %s%s'%(methodname,repr(args)))
        data = xmlrpclib.dumps(args,methodname,allow_none=True)

        self.exec_lock.acquire()
        if not self.client.data_send(0,data,self.timeout):
            logger.warning('Raising Error, Timeout reached')
            self.exec_lock.release()
            raise IOError, 'Timeout'
        recdata = None

        time_to_wait = int(time.time()) + self.timeout
        while not recdata:
            timenow = int(time.time())
            if timenow > time_to_wait:
                logger.warning('Raising Error, Timeout reached')
                self.exec_lock.release()
                raise IOError, 'Timeout'
            recdata = self.client.data_recv()
            time.sleep(0.1)
        self.exec_lock.release()

        returndata = xmlrpclib.loads(recdata['data'],use_datetime=True)
        logger.info('got %s'%str(returndata))
        return returndata[0][0]
training.py 文件源码 项目:drl.pth 作者: seba-1511 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def test(args, env, agent):
    if args.record:
        if 'env' in vars(args):
            env = wrappers.Monitor(env, './videos/' + args.env + str(time()) + '/')
        else:
            env = wrappers.Monitor(env, './videos/' + str(time()) + '/')
    test_rewards = []
    test_start = time()
    test_steps = 0
    for iteration in range(1, 1 + args.n_test_iter):
        state = env.reset()
        iter_rewards = 0.0
        done = False
        while not done:
            test_steps += 1
            action, _ = agent.forward(state)
            state, reward, done, _ = env.step(action)
            iter_rewards += reward
        test_rewards.append(iter_rewards)
    print_stats('Test', test_rewards, args.n_test_iter,
                time() - test_start, test_steps, 0, agent)
    return test_rewards
eval.py 文件源码 项目:DREAM 作者: LaceyChen17 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def eval_pred(dr_model, ub):
    '''
        evaluate dream model for predicting next basket on all training users
        in batches
    '''
    item_embedding = dr_model.encode.weight
    dr_model.eval()
    dr_hidden = dr_model.init_hidden(dr_model.config.batch_size)
    start_time = time()
    id_u, score_u = [], [] # user's id, user's score
    num_batchs = ceil(len(ub) / dr_model.config.batch_size)
    for i,x in enumerate(batchify(ub, dr_model.config.batch_size)):
        print(i)
        baskets, lens, uids = x
        _, dynamic_user, _ = dr_model(baskets, lens, dr_hidden)# shape: batch_size, max_len, embedding_size
        dr_hidden = repackage_hidden(dr_hidden)
        for i,l,du in zip(uids, lens, dynamic_user):
            du_latest = du[l - 1].unsqueeze(0) # shape: 1, embedding_size
            score_up = torch.mm(du_latest, item_embedding.t()) # shape: 1, num_item
            score_u.append(score_up.cpu().data.numpy())
            id_u.append(i)
    elapsed = time() - start_time 
    print('[Predicting] Elapsed: {02.2f}'.format(elapsed))
    return score_ub, id_u
train.py 文件源码 项目:DREAM 作者: LaceyChen17 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def evaluate_dream():
    dr_model.eval()
    dr_hidden = dr_model.init_hidden(dr_config.batch_size) 

    total_loss = 0
    start_time = time()
    num_batchs = ceil(len(test_ub) / dr_config.batch_size)
    for i,x in enumerate(batchify(test_ub, dr_config.batch_size)):
        baskets, lens, _ = x
        dynamic_user, _  = dr_model(baskets, lens, dr_hidden)
        loss = bpr_loss(baskets, dynamic_user, dr_model.encode.weight, dr_config)
        dr_hidden = repackage_hidden(dr_hidden)
        total_loss += loss.data

    # Logging
    elapsed = (time() - start_time) * 1000 / num_batchs
    total_loss = total_loss[0] / num_batchs
    print('[Evaluation]| Epochs {:3d} | Elapsed {:02.2f} | Loss {:05.2f} |'.format(epoch, elapsed, total_loss))
    return total_loss
nbNetBase.py 文件源码 项目:Starfish 作者: BillWang139967 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def check_fd(self):
        '''??fd??
        ??read ?????????????????
        ?????????????
        '''
        while True:
            for fd in self.conn_state.keys():
                sock_state = self.conn_state[fd]
                # fd?read???? read_time ???
                # ???fd?epoll?????????????????
                if sock_state.state == "read" and sock_state.read_stime \
                    and (time.time() - sock_state.read_stime) >= sock_state.read_itime:
                    # ??????????fd
                    sock_state.state = "closing"
                    self.state_machine(fd)
            # ??????
            time.sleep(60)

#}}}
#{{{fork_processes
nbNetBase.py 文件源码 项目:Starfish 作者: BillWang139967 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def check_fd(self):
        '''??fd??
        ??read ?????????????????
        ?????????????
        '''
        while True:
            for fd in self.conn_state.keys():
                sock_state = self.conn_state[fd]
                # fd?read???? read_time ???
                # ???fd?epoll?????????????????
                if sock_state.state == "read" and sock_state.read_stime \
                    and (time.time() - sock_state.read_stime) >= sock_state.read_itime:
                    # ??????????fd
                    sock_state.state = "closing"
                    self.state_machine(fd)
            # ??????
            time.sleep(60)

#}}}
#{{{fork_processes
si.py 文件源码 项目:Starfish 作者: BillWang139967 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def uptime(self):
        with open('/proc/uptime', 'r') as f:
            uptime, idletime = f.readline().split()
            up_seconds = int(float(uptime))
            idle_seconds = int(float(idletime))
            # in some machine like Linode VPS, idle time may bigger than up time
            if idle_seconds > up_seconds:
                cpu_count = multiprocessing.cpu_count()
                idle_seconds = idle_seconds/cpu_count
                # in some VPS, this value may still bigger than up time
                # may be the domain 0 machine has more cores
                # we calclate approximately for it
                if idle_seconds > up_seconds:
                    for n in range(2,10):
                        if idle_seconds/n < up_seconds:
                            idle_seconds = idle_seconds/n
                            break
            fmt = '{days} ? {hours} ?? {minutes} ? {seconds} ?'
            uptime_string = strfdelta(datetime.timedelta(seconds = up_seconds), fmt)
            idletime_string = strfdelta(datetime.timedelta(seconds = idle_seconds), fmt)
        return {
            'up': uptime_string,
            'idle': idletime_string,
            'idle_rate': div_percent(idle_seconds, up_seconds),
        }
piksi_driver.py 文件源码 项目:piksi_ros 作者: uscresl 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def setup_pubsub(self):

        freq_params = diagnostic_updater.FrequencyStatusParam({'min':self.diag_update_freq, 'max':self.diag_update_freq}, self.diag_freq_tolerance, self.diag_window_size)
        time_params = diagnostic_updater.TimeStampStatusParam(self.diag_min_delay, self.diag_max_delay)

        self.pub_fix = rospy.Publisher("~fix", NavSatFix, queue_size=1000)

        self.pub_spp_fix = diagnostic_updater.DiagnosedPublisher(rospy.Publisher("~spp_fix", NavSatFix, queue_size=1000), self.diag_updater, freq_params, time_params)

        self.pub_rtk_fix = diagnostic_updater.DiagnosedPublisher(rospy.Publisher("~rtk_fix", NavSatFix, queue_size=1000), self.diag_updater, freq_params, time_params)
        #self.pub_rtk = diagnostic_updater.DiagnosedPublisher(rospy.Publisher("~rtk_odom", Odometry, queue_size=1000), self.diag_updater, freq_params, time_params)
        self.pub_odom = diagnostic_updater.DiagnosedPublisher(rospy.Publisher("~odom", Odometry, queue_size=1000), self.diag_updater, freq_params, time_params)
        self.pub_time = diagnostic_updater.DiagnosedPublisher(rospy.Publisher("~time", TimeReference, queue_size=1000), self.diag_updater, freq_params, time_params)

        if self.publish_utm_rtk_tf or self.publish_rtk_child_tf:
            self.tf_br = tf2_ros.TransformBroadcaster()

        if self.publish_ephemeris:
            self.pub_eph = rospy.Publisher("~ephemeris", Ephemeris, queue_size=1000)

        if self.publish_observations:
            self.pub_obs = rospy.Publisher('~observations', Observations, queue_size=1000)
OSC3.py 文件源码 项目:pyOSC3 作者: Qirky 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def append(self, argument, typehint = None):
        """Appends data to the bundle, creating an OSCMessage to encapsulate
        the provided argument unless this is already an OSCMessage.
        Any newly created OSCMessage inherits the OSCBundle's address at the time of creation.
        If 'argument' is an iterable, its elements will be encapsuated by a single OSCMessage.
        Finally, 'argument' can be (or contain) a dict, which will be 'converted' to an OSCMessage;
          - if 'addr' appears in the dict, its value overrides the OSCBundle's address
          - if 'args' appears in the dict, its value(s) become the OSCMessage's arguments
        """
        if isinstance(argument, OSCMessage):
            binary = OSCBlob(argument.getBinary())
        else:
            msg = OSCMessage(self.address)
            if type(argument) == dict:
                if 'addr' in argument:
                    msg.setAddress(argument['addr'])
                if 'args' in argument:
                    msg.append(argument['args'], typehint)
            else:
                msg.append(argument, typehint)

            binary = OSCBlob(msg.getBinary())

        self.message += binary
        self.typetags += 'b'
OSC3.py 文件源码 项目:pyOSC3 作者: Qirky 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def OSCTimeTag(time):
    """Convert a time in floating seconds to its
    OSC binary representation
    """
    if time > 0:
        fract, secs = math.modf(time)
        secs = secs - NTP_epoch
        binary = struct.pack('>LL', int(secs), int(fract * NTP_units_per_second))
    else:
        binary = struct.pack('>LL', 0, 1)

    return binary

######
#
# OSCMessage decoding functions
#
######
OSC2.py 文件源码 项目:pyOSC3 作者: Qirky 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def append(self, argument, typehint = None):
        """Appends data to the bundle, creating an OSCMessage to encapsulate
        the provided argument unless this is already an OSCMessage.
        Any newly created OSCMessage inherits the OSCBundle's address at the time of creation.
        If 'argument' is an iterable, its elements will be encapsuated by a single OSCMessage.
        Finally, 'argument' can be (or contain) a dict, which will be 'converted' to an OSCMessage;
          - if 'addr' appears in the dict, its value overrides the OSCBundle's address
          - if 'args' appears in the dict, its value(s) become the OSCMessage's arguments
        """
        if isinstance(argument, OSCMessage):
            binary = OSCBlob(argument.getBinary())
        else:
            msg = OSCMessage(self.address)
            if type(argument) == types.DictType:
                if 'addr' in argument:
                    msg.setAddress(argument['addr'])
                if 'args' in argument:
                    msg.append(argument['args'], typehint)
            else:
                msg.append(argument, typehint)

            binary = OSCBlob(msg.getBinary())

        self.message += binary
        self.typetags += 'b'
OSC2.py 文件源码 项目:pyOSC3 作者: Qirky 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def OSCTimeTag(time):
    """Convert a time in floating seconds to its
    OSC binary representation
    """
    if time > 0:
        fract, secs = math.modf(time)
        secs = secs - NTP_epoch
        binary = struct.pack('>LL', long(secs), long(fract * NTP_units_per_second))
    else:
        binary = struct.pack('>LL', 0L, 1L)

    return binary

######
#
# OSCMessage decoding functions
#
######
cli.py 文件源码 项目:dsq 作者: baverman 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def forwarder(tasks, interval, batch_size, source, dest):
    '''Forward items from one storage to another.'''
    from .utils import RunFlag, load_manager, redis_client
    from .store import QueueStore
    log = logging.getLogger('dsq.forwarder')

    if not tasks and not source:
        print('--tasks or --source must be provided')
        sys.exit(1)

    s = QueueStore(redis_client(source)) if source else load_manager(tasks).queue
    d = QueueStore(redis_client(dest))
    run = RunFlag()
    while run:
        batch = s.take_many(batch_size)
        if batch['schedule'] or batch['queues']:
            try:
                d.put_many(batch)
            except Exception:
                s.put_many(batch)
                log.exception('Forward error')
                raise
        else:
            time.sleep(interval)
worker.py 文件源码 项目:dsq 作者: baverman 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def process(self, queue_list, burst=False):  # pragma: no cover
        signal.signal(signal.SIGALRM, self.alarm_handler)

        run = RunFlag()
        start = time()
        while run:
            task = self.manager.pop(queue_list, 1)
            if task:
                try:
                    self.process_one(task)
                except StopWorker:
                    break
            elif burst:
                break

            if self.lifetime and time() - start > self.lifetime:
                break

        self.manager.close()


问题


面经


文章

微信
公众号

扫码关注公众号