python类deque()的实例源码

util.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def add(self, event, subscriber, append=True):
        """
        Add a subscriber for an event.

        :param event: The name of an event.
        :param subscriber: The subscriber to be added (and called when the
                           event is published).
        :param append: Whether to append or prepend the subscriber to an
                       existing subscriber list for the event.
        """
        subs = self._subscribers
        if event not in subs:
            subs[event] = deque([subscriber])
        else:
            sq = subs[event]
            if append:
                sq.append(subscriber)
            else:
                sq.appendleft(subscriber)
repr.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def dispatch_repr(self, obj, recursive):
        if obj is helper:
            return u'<span class="help">%r</span>' % helper
        if isinstance(obj, (integer_types, float, complex)):
            return u'<span class="number">%r</span>' % obj
        if isinstance(obj, string_types):
            return self.string_repr(obj)
        if isinstance(obj, RegexType):
            return self.regex_repr(obj)
        if isinstance(obj, list):
            return self.list_repr(obj, recursive)
        if isinstance(obj, tuple):
            return self.tuple_repr(obj, recursive)
        if isinstance(obj, set):
            return self.set_repr(obj, recursive)
        if isinstance(obj, frozenset):
            return self.frozenset_repr(obj, recursive)
        if isinstance(obj, dict):
            return self.dict_repr(obj, recursive)
        if deque is not None and isinstance(obj, deque):
            return self.deque_repr(obj, recursive)
        return self.object_repr(obj)
util.py 文件源码 项目:pip-update-requirements 作者: alanhamlett 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def add(self, event, subscriber, append=True):
        """
        Add a subscriber for an event.

        :param event: The name of an event.
        :param subscriber: The subscriber to be added (and called when the
                           event is published).
        :param append: Whether to append or prepend the subscriber to an
                       existing subscriber list for the event.
        """
        subs = self._subscribers
        if event not in subs:
            subs[event] = deque([subscriber])
        else:
            sq = subs[event]
            if append:
                sq.append(subscriber)
            else:
                sq.appendleft(subscriber)
recipe-578416.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def splitby(pred, seq):

    trues = deque()
    falses = deque()
    iseq = iter(seq)

    def pull(source, pred, thisval, thisbuf, otherbuf):
        while 1:
            while thisbuf:
                yield thisbuf.popleft()
            newitem = next(source)
            # uncomment next line to show that source is processed only once
            # print "pulled", newitem
            if pred(newitem) == thisval:
                yield newitem
            else:
                otherbuf.append(newitem)

    true_iter = pull(iseq, pred, True, trues, falses)
    false_iter = pull(iseq, pred, False, falses, trues)
    return true_iter, false_iter
internals.py 文件源码 项目:Splunk_CBER_App 作者: MHaggis 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _object_hook(dictionary):

        object_view = ObjectView(dictionary)
        stack = deque()
        stack.append((None, None, dictionary))

        while len(stack):
            instance, member_name, dictionary = stack.popleft()

            for name, value in dictionary.iteritems():
                if isinstance(value, dict):
                    stack.append((dictionary, name, value))

            if instance is not None:
                instance[member_name] = ObjectView(dictionary)

        return object_view
ddpg.py 文件源码 项目:ddpg-aigym 作者: stevenpjg 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self,env, is_batch_norm):
        self.env = env 
        self.num_states = env.observation_space.shape[0]
        self.num_actions = env.action_space.shape[0]


        if is_batch_norm:
            self.critic_net = CriticNet_bn(self.num_states, self.num_actions) 
            self.actor_net = ActorNet_bn(self.num_states, self.num_actions)

        else:
            self.critic_net = CriticNet(self.num_states, self.num_actions) 
            self.actor_net = ActorNet(self.num_states, self.num_actions)

        #Initialize Buffer Network:
        self.replay_memory = deque()

        #Intialize time step:
        self.time_step = 0
        self.counter = 0

        action_max = np.array(env.action_space.high).tolist()
        action_min = np.array(env.action_space.low).tolist()        
        action_bounds = [action_max,action_min] 
        self.grad_inv = grad_inverter(action_bounds)
Grammalecte.py 文件源码 项目:grammalecte 作者: seeschloss 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def __init__ (self, ctx, *args):
        self.ctx = ctx
        self.ServiceName = "com.sun.star.linguistic2.Proofreader"
        self.ImplementationName = "org.openoffice.comp.pyuno.Lightproof." + gce.pkg
        self.SupportedServiceNames = (self.ServiceName, )
        self.locales = []
        for i in gce.locales:
            l = gce.locales[i]
            self.locales.append(Locale(l[0], l[1], l[2]))
        self.locales = tuple(self.locales)
        xCurCtx = uno.getComponentContext()
        # init
        gce.load()
        # GC options
        # opt_handler.load(xCurCtx)
        dOpt = Options.load(xCurCtx)
        gce.setOptions(dOpt)
        # store for results of big paragraphs
        self.dResult = {}
        self.nMaxRes = 1500
        self.lLastRes = deque(maxlen=self.nMaxRes)
        self.nRes = 0

    # XServiceName method implementations
kdtree.py 文件源码 项目:pytorch-nec 作者: mjacar 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def level_order(tree, include_all=False):
    """ Returns an iterator over the tree in level-order
    If include_all is set to True, empty parts of the tree are filled
    with dummy entries and the iterator becomes infinite. """

    q = deque()
    q.append(tree)
    while q:
        node = q.popleft()
        yield node

        if include_all or node.left:
            q.append(node.left or node.__class__())

        if include_all or node.right:
            q.append(node.right or node.__class__())
limited_discrete_action_trainer.py 文件源码 项目:BlueWhale 作者: caffe2 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(
        self,
        normalization_parameters,
        parameters,
    ):
        self._quantile_states = collections.deque(
            maxlen=parameters.action_budget.window_size
        )
        self._quantile = 100 - parameters.action_budget.action_limit
        self.quantile_value = 0
        self._limited_action = np.argmax(
            np.array(parameters.actions) ==
            parameters.action_budget.limited_action
        )
        self._discount_factor = parameters.rl.gamma
        self._quantile_update_rate = \
            parameters.action_budget.quantile_update_rate
        self._quantile_update_frequency = \
            parameters.action_budget.quantile_update_frequency
        self._update_counter = 0
        super(self.__class__,
              self).__init__(normalization_parameters, parameters)
        self._max_q = parameters.rl.maxq_learning
util.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def add(self, event, subscriber, append=True):
        """
        Add a subscriber for an event.

        :param event: The name of an event.
        :param subscriber: The subscriber to be added (and called when the
                           event is published).
        :param append: Whether to append or prepend the subscriber to an
                       existing subscriber list for the event.
        """
        subs = self._subscribers
        if event not in subs:
            subs[event] = deque([subscriber])
        else:
            sq = subs[event]
            if append:
                sq.append(subscriber)
            else:
                sq.appendleft(subscriber)
repr.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def dispatch_repr(self, obj, recursive):
        if obj is helper:
            return u'<span class="help">%r</span>' % helper
        if isinstance(obj, (integer_types, float, complex)):
            return u'<span class="number">%r</span>' % obj
        if isinstance(obj, string_types):
            return self.string_repr(obj)
        if isinstance(obj, RegexType):
            return self.regex_repr(obj)
        if isinstance(obj, list):
            return self.list_repr(obj, recursive)
        if isinstance(obj, tuple):
            return self.tuple_repr(obj, recursive)
        if isinstance(obj, set):
            return self.set_repr(obj, recursive)
        if isinstance(obj, frozenset):
            return self.frozenset_repr(obj, recursive)
        if isinstance(obj, dict):
            return self.dict_repr(obj, recursive)
        if deque is not None and isinstance(obj, deque):
            return self.deque_repr(obj, recursive)
        return self.object_repr(obj)
dispycos_client6_channel.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def rtask_avg_proc(channel, threshold, trend_task, window_size, task=None):
    import collections
    # subscribe to channel (at client)
    yield channel.subscribe(task)
    # create circular buffer
    data = collections.deque(maxlen=window_size)
    for i in range(window_size):
        data.append(0.0)
    cumsum = 0.0
    # first message is 'start' command; see 'client_proc'
    assert (yield task.receive()) == 'start'
    while True:
        i, n = yield task.receive()
        if n is None:
            break
        cumsum += (n - data[0])
        avg = (cumsum / window_size)
        if avg > threshold:
            trend_task.send((i, 'high', float(avg)))
        elif avg < -threshold:
            trend_task.send((i, 'low', float(avg)))
        data.append(n)
    raise StopIteration(0)


# This generator function is sent to remote dispycos process to save
# the received data in a file (on the remote peer).
dispycos_client6_channel.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def rtask_avg_proc(channel, threshold, trend_task, window_size, task=None):
    import collections
    # subscribe to channel (at client)
    yield channel.subscribe(task)
    # create circular buffer
    data = collections.deque(maxlen=window_size)
    for i in range(window_size):
        data.append(0.0)
    cumsum = 0.0
    # first message is 'start' command; see 'client_proc'
    assert (yield task.receive()) == 'start'
    while True:
        i, n = yield task.receive()
        if n is None:
            break
        cumsum += (n - data[0])
        avg = (cumsum / window_size)
        if avg > threshold:
            trend_task.send((i, 'high', float(avg)))
        elif avg < -threshold:
            trend_task.send((i, 'low', float(avg)))
        data.append(n)
    raise StopIteration(0)


# This generator function is sent to remote dispycos process to save
# the received data in a file (on the remote peer).
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, task):
        """Categorize messages to task 'task'.
        """
        self._task = task
        self._categories = {None: collections.deque()}
        self._categorize = []
dispycos_client6_channel.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def rtask_avg_proc(channel, threshold, trend_task, window_size, task=None):
    import collections
    # subscribe to channel (at client)
    yield channel.subscribe(task)
    # create circular buffer
    data = collections.deque(maxlen=window_size)
    for i in range(window_size):
        data.append(0.0)
    cumsum = 0.0
    # first message is 'start' command; see 'client_proc'
    assert (yield task.receive()) == 'start'
    while True:
        i, n = yield task.receive()
        if n is None:
            break
        cumsum += (n - data[0])
        avg = (cumsum / window_size)
        if avg > threshold:
            trend_task.send((i, 'high', float(avg)))
        elif avg < -threshold:
            trend_task.send((i, 'low', float(avg)))
        data.append(n)
    raise StopIteration(0)


# This generator function is sent to remote dispycos process to save
# the received data in a file (on the remote peer).
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, task):
        """Categorize messages to task 'task'.
        """
        self._task = task
        self._categories = {None: collections.deque()}
        self._categorize = []
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def receive(self, category=None, timeout=None, alarm_value=None):
        """Similar to 'receive' of Task, except it retrieves (waiting, if
        necessary) messages in given 'category'.
        """
        # assert Pycos.cur_task() == self._task
        c = self._categories.get(category, None)
        if c:
            msg = c.popleft()
            raise StopIteration(msg)
        if timeout:
            start = _time()
        while 1:
            msg = yield self._task.receive(timeout=timeout, alarm_value=alarm_value)
            if msg == alarm_value:
                raise StopIteration(msg)
            for categorize in self._categorize:
                c = categorize(msg)
                if c == category:
                    raise StopIteration(msg)
                if c is not None:
                    bucket = self._categories.get(c, None)
                    if not bucket:
                        bucket = self._categories[c] = collections.deque()
                    bucket.append(msg)
                    break
            else:
                self._categories[None].append(msg)
            if timeout:
                now = _time()
                timeout -= now - start
                start = now
ntsc-640-err.py 文件源码 项目:SHR-NTSC 作者: dschmenk 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def ntscInitPrev():
    global ntscOutput
    ntscOutput = deque([(128, 128, 128) for p in xrange(4)])
ntsc-640.py 文件源码 项目:SHR-NTSC 作者: dschmenk 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def ntscInitPrev():
    global ntscOutput
    ntscOutput = deque([(128, 128, 128) for p in xrange(4)])
backup_analyze.py 文件源码 项目:STA141C 作者: clarkfitzg 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def make_names_deque():
    names_deque = deque()
    with open("yob2015.txt") as f:
        reader = csv.DictReader(f, fieldnames=fields)
        for row in reader:
            names_deque.appendleft(row["name"])
    return names_deque


# 101 ms


问题


面经


文章

微信
公众号

扫码关注公众号