def rtask_avg_proc(channel, threshold, trend_task, window_size, task=None):
import collections
# subscribe to channel (at client)
yield channel.subscribe(task)
# create circular buffer
data = collections.deque(maxlen=window_size)
for i in range(window_size):
data.append(0.0)
cumsum = 0.0
# first message is 'start' command; see 'client_proc'
assert (yield task.receive()) == 'start'
while True:
i, n = yield task.receive()
if n is None:
break
cumsum += (n - data[0])
avg = (cumsum / window_size)
if avg > threshold:
trend_task.send((i, 'high', float(avg)))
elif avg < -threshold:
trend_task.send((i, 'low', float(avg)))
data.append(n)
raise StopIteration(0)
# This generator function is sent to remote dispycos process to save
# the received data in a file (on the remote peer).
评论列表
文章目录