def add(self, event, subscriber, append=True):
"""
Add a subscriber for an event.
:param event: The name of an event.
:param subscriber: The subscriber to be added (and called when the
event is published).
:param append: Whether to append or prepend the subscriber to an
existing subscriber list for the event.
"""
subs = self._subscribers
if event not in subs:
subs[event] = deque([subscriber])
else:
sq = subs[event]
if append:
sq.append(subscriber)
else:
sq.appendleft(subscriber)
python类deque()的实例源码
def dispatch_repr(self, obj, recursive):
if obj is helper:
return u'<span class="help">%r</span>' % helper
if isinstance(obj, (integer_types, float, complex)):
return u'<span class="number">%r</span>' % obj
if isinstance(obj, string_types):
return self.string_repr(obj)
if isinstance(obj, RegexType):
return self.regex_repr(obj)
if isinstance(obj, list):
return self.list_repr(obj, recursive)
if isinstance(obj, tuple):
return self.tuple_repr(obj, recursive)
if isinstance(obj, set):
return self.set_repr(obj, recursive)
if isinstance(obj, frozenset):
return self.frozenset_repr(obj, recursive)
if isinstance(obj, dict):
return self.dict_repr(obj, recursive)
if deque is not None and isinstance(obj, deque):
return self.deque_repr(obj, recursive)
return self.object_repr(obj)
def add(self, event, subscriber, append=True):
"""
Add a subscriber for an event.
:param event: The name of an event.
:param subscriber: The subscriber to be added (and called when the
event is published).
:param append: Whether to append or prepend the subscriber to an
existing subscriber list for the event.
"""
subs = self._subscribers
if event not in subs:
subs[event] = deque([subscriber])
else:
sq = subs[event]
if append:
sq.append(subscriber)
else:
sq.appendleft(subscriber)
def splitby(pred, seq):
trues = deque()
falses = deque()
iseq = iter(seq)
def pull(source, pred, thisval, thisbuf, otherbuf):
while 1:
while thisbuf:
yield thisbuf.popleft()
newitem = next(source)
# uncomment next line to show that source is processed only once
# print "pulled", newitem
if pred(newitem) == thisval:
yield newitem
else:
otherbuf.append(newitem)
true_iter = pull(iseq, pred, True, trues, falses)
false_iter = pull(iseq, pred, False, falses, trues)
return true_iter, false_iter
def _object_hook(dictionary):
object_view = ObjectView(dictionary)
stack = deque()
stack.append((None, None, dictionary))
while len(stack):
instance, member_name, dictionary = stack.popleft()
for name, value in dictionary.iteritems():
if isinstance(value, dict):
stack.append((dictionary, name, value))
if instance is not None:
instance[member_name] = ObjectView(dictionary)
return object_view
def __init__(self,env, is_batch_norm):
self.env = env
self.num_states = env.observation_space.shape[0]
self.num_actions = env.action_space.shape[0]
if is_batch_norm:
self.critic_net = CriticNet_bn(self.num_states, self.num_actions)
self.actor_net = ActorNet_bn(self.num_states, self.num_actions)
else:
self.critic_net = CriticNet(self.num_states, self.num_actions)
self.actor_net = ActorNet(self.num_states, self.num_actions)
#Initialize Buffer Network:
self.replay_memory = deque()
#Intialize time step:
self.time_step = 0
self.counter = 0
action_max = np.array(env.action_space.high).tolist()
action_min = np.array(env.action_space.low).tolist()
action_bounds = [action_max,action_min]
self.grad_inv = grad_inverter(action_bounds)
def __init__ (self, ctx, *args):
self.ctx = ctx
self.ServiceName = "com.sun.star.linguistic2.Proofreader"
self.ImplementationName = "org.openoffice.comp.pyuno.Lightproof." + gce.pkg
self.SupportedServiceNames = (self.ServiceName, )
self.locales = []
for i in gce.locales:
l = gce.locales[i]
self.locales.append(Locale(l[0], l[1], l[2]))
self.locales = tuple(self.locales)
xCurCtx = uno.getComponentContext()
# init
gce.load()
# GC options
# opt_handler.load(xCurCtx)
dOpt = Options.load(xCurCtx)
gce.setOptions(dOpt)
# store for results of big paragraphs
self.dResult = {}
self.nMaxRes = 1500
self.lLastRes = deque(maxlen=self.nMaxRes)
self.nRes = 0
# XServiceName method implementations
def level_order(tree, include_all=False):
""" Returns an iterator over the tree in level-order
If include_all is set to True, empty parts of the tree are filled
with dummy entries and the iterator becomes infinite. """
q = deque()
q.append(tree)
while q:
node = q.popleft()
yield node
if include_all or node.left:
q.append(node.left or node.__class__())
if include_all or node.right:
q.append(node.right or node.__class__())
def __init__(
self,
normalization_parameters,
parameters,
):
self._quantile_states = collections.deque(
maxlen=parameters.action_budget.window_size
)
self._quantile = 100 - parameters.action_budget.action_limit
self.quantile_value = 0
self._limited_action = np.argmax(
np.array(parameters.actions) ==
parameters.action_budget.limited_action
)
self._discount_factor = parameters.rl.gamma
self._quantile_update_rate = \
parameters.action_budget.quantile_update_rate
self._quantile_update_frequency = \
parameters.action_budget.quantile_update_frequency
self._update_counter = 0
super(self.__class__,
self).__init__(normalization_parameters, parameters)
self._max_q = parameters.rl.maxq_learning
def add(self, event, subscriber, append=True):
"""
Add a subscriber for an event.
:param event: The name of an event.
:param subscriber: The subscriber to be added (and called when the
event is published).
:param append: Whether to append or prepend the subscriber to an
existing subscriber list for the event.
"""
subs = self._subscribers
if event not in subs:
subs[event] = deque([subscriber])
else:
sq = subs[event]
if append:
sq.append(subscriber)
else:
sq.appendleft(subscriber)
def dispatch_repr(self, obj, recursive):
if obj is helper:
return u'<span class="help">%r</span>' % helper
if isinstance(obj, (integer_types, float, complex)):
return u'<span class="number">%r</span>' % obj
if isinstance(obj, string_types):
return self.string_repr(obj)
if isinstance(obj, RegexType):
return self.regex_repr(obj)
if isinstance(obj, list):
return self.list_repr(obj, recursive)
if isinstance(obj, tuple):
return self.tuple_repr(obj, recursive)
if isinstance(obj, set):
return self.set_repr(obj, recursive)
if isinstance(obj, frozenset):
return self.frozenset_repr(obj, recursive)
if isinstance(obj, dict):
return self.dict_repr(obj, recursive)
if deque is not None and isinstance(obj, deque):
return self.deque_repr(obj, recursive)
return self.object_repr(obj)
def rtask_avg_proc(channel, threshold, trend_task, window_size, task=None):
import collections
# subscribe to channel (at client)
yield channel.subscribe(task)
# create circular buffer
data = collections.deque(maxlen=window_size)
for i in range(window_size):
data.append(0.0)
cumsum = 0.0
# first message is 'start' command; see 'client_proc'
assert (yield task.receive()) == 'start'
while True:
i, n = yield task.receive()
if n is None:
break
cumsum += (n - data[0])
avg = (cumsum / window_size)
if avg > threshold:
trend_task.send((i, 'high', float(avg)))
elif avg < -threshold:
trend_task.send((i, 'low', float(avg)))
data.append(n)
raise StopIteration(0)
# This generator function is sent to remote dispycos process to save
# the received data in a file (on the remote peer).
def rtask_avg_proc(channel, threshold, trend_task, window_size, task=None):
import collections
# subscribe to channel (at client)
yield channel.subscribe(task)
# create circular buffer
data = collections.deque(maxlen=window_size)
for i in range(window_size):
data.append(0.0)
cumsum = 0.0
# first message is 'start' command; see 'client_proc'
assert (yield task.receive()) == 'start'
while True:
i, n = yield task.receive()
if n is None:
break
cumsum += (n - data[0])
avg = (cumsum / window_size)
if avg > threshold:
trend_task.send((i, 'high', float(avg)))
elif avg < -threshold:
trend_task.send((i, 'low', float(avg)))
data.append(n)
raise StopIteration(0)
# This generator function is sent to remote dispycos process to save
# the received data in a file (on the remote peer).
def __init__(self, task):
"""Categorize messages to task 'task'.
"""
self._task = task
self._categories = {None: collections.deque()}
self._categorize = []
def rtask_avg_proc(channel, threshold, trend_task, window_size, task=None):
import collections
# subscribe to channel (at client)
yield channel.subscribe(task)
# create circular buffer
data = collections.deque(maxlen=window_size)
for i in range(window_size):
data.append(0.0)
cumsum = 0.0
# first message is 'start' command; see 'client_proc'
assert (yield task.receive()) == 'start'
while True:
i, n = yield task.receive()
if n is None:
break
cumsum += (n - data[0])
avg = (cumsum / window_size)
if avg > threshold:
trend_task.send((i, 'high', float(avg)))
elif avg < -threshold:
trend_task.send((i, 'low', float(avg)))
data.append(n)
raise StopIteration(0)
# This generator function is sent to remote dispycos process to save
# the received data in a file (on the remote peer).
def __init__(self, task):
"""Categorize messages to task 'task'.
"""
self._task = task
self._categories = {None: collections.deque()}
self._categorize = []
def receive(self, category=None, timeout=None, alarm_value=None):
"""Similar to 'receive' of Task, except it retrieves (waiting, if
necessary) messages in given 'category'.
"""
# assert Pycos.cur_task() == self._task
c = self._categories.get(category, None)
if c:
msg = c.popleft()
raise StopIteration(msg)
if timeout:
start = _time()
while 1:
msg = yield self._task.receive(timeout=timeout, alarm_value=alarm_value)
if msg == alarm_value:
raise StopIteration(msg)
for categorize in self._categorize:
c = categorize(msg)
if c == category:
raise StopIteration(msg)
if c is not None:
bucket = self._categories.get(c, None)
if not bucket:
bucket = self._categories[c] = collections.deque()
bucket.append(msg)
break
else:
self._categories[None].append(msg)
if timeout:
now = _time()
timeout -= now - start
start = now
def ntscInitPrev():
global ntscOutput
ntscOutput = deque([(128, 128, 128) for p in xrange(4)])
def ntscInitPrev():
global ntscOutput
ntscOutput = deque([(128, 128, 128) for p in xrange(4)])
def make_names_deque():
names_deque = deque()
with open("yob2015.txt") as f:
reader = csv.DictReader(f, fieldnames=fields)
for row in reader:
names_deque.appendleft(row["name"])
return names_deque
# 101 ms