def _notify(self):
try:
while not self.stopping:
if not self.out_q:
# wait for jobs to complete
self.out_q_has_data.clear()
self.out_q_has_data.wait()
continue
self.out_q.popleft().set()
except BaseException:
self._error()
python类wait()的实例源码
def get(self, block=True, timeout=None):
if not self._value == self.NoValue:
return self._value
if block:
gevent.wait([self._future], timeout)
self._value = self._future.get()
else:
self._value = self._future.get(block, timeout)
return self._value
def retry_with_recovery(
protocol,
data,
receiver_address,
event_stop,
event_healthy,
event_unhealthy,
backoff):
""" Send data while the node is healthy until it's acknowledged.
Note:
backoff must be an infinite iterator, otherwise this task will
become a hot loop.
"""
# The underlying unhealthy will be cleared, care must be taken to properly
# clear stop_or_unhealthy too.
stop_or_unhealthy = event_first_of(
event_stop,
event_unhealthy,
)
acknowledged = False
while not event_stop.is_set() and not acknowledged:
# Packets must not be sent to an unhealthy node, nor should the task
# wait for it to become available if the message has been acknowledged.
if event_unhealthy.is_set():
wait_recovery(
event_stop,
event_healthy,
)
# Assume wait_recovery returned because unhealthy was cleared and
# continue execution, this is safe to do because event_stop is
# checked below.
stop_or_unhealthy.clear()
if event_stop.is_set():
return
acknowledged = retry(
protocol,
data,
receiver_address,
# retry will stop when this event is set, allowing this task to
# wait for recovery when the node becomes unhealthy or to quit if
# the stop event is set.
stop_or_unhealthy,
# Intentionally reusing backoff to restart from the last
# timeout/number of iterations.
backoff,
)
return acknowledged
def test_latency(apps, tokens, num_transfers, amount):
def start_transfers(idx, curr_token, num_transfers):
curr_app = apps[idx]
graph = curr_app.raiden.token_to_channelgraph[curr_token]
all_paths = graph.get_paths_of_length(
source=curr_app.raiden.address,
num_hops=2,
)
path = all_paths[0]
target = path[-1]
finished = gevent.event.Event()
def _transfer():
api = curr_app.raiden.api
for i in range(num_transfers):
async_result = api.transfer_async(
curr_token,
amount,
target,
1 # TODO: fill in identifier
)
async_result.wait()
finished.set()
gevent.spawn(_transfer)
return finished
finished_events = []
# Start all transfers
start_time = time.time()
for idx, curr_token in enumerate(tokens):
finished = start_transfers(idx, curr_token, num_transfers)
finished_events.append(finished)
# Wait until the transfers for all tokens are done
gevent.wait(finished_events)
elapsed = time.time() - start_time
completed_transfers = num_transfers * len(tokens)
tps = completed_transfers / elapsed
print('Completed {} transfers. tps:{:.5} latency:{:.5} time:{:.5}s'.format(
completed_transfers,
tps,
elapsed / completed_transfers,
elapsed,
))
def processItem(args, contentArgs=None):
blogger = EasyBlogger(args.clientid, args.secret, args.blogid,
args.url)
try:
if args.command == "post":
newPost = blogger.post(args.title,
args.content or args.file,
args.labels,
args.filters,
isDraft=not args.publish,
fmt=args.format)
postId = newPost['id']
logger.debug("Created post: %s", postId)
if contentArgs:
contentArgs.updateFileWithPostId(postId)
print(newPost['url'])
if args.command == 'delete':
logger.debug("Deleting post: %s", args.postIds)
for postId in args.postIds:
blogger.deletePost(postId)
if args.command == 'update':
logger.debug("Updating post: %s", args.postId)
updated = blogger.updatePost(
args.postId,
args.title,
args.content or args.file,
args.labels,
args.filters,
isDraft=not args.publish,
fmt=args.format)
print(updated['url'])
if args.command == "get":
if args.postId:
posts = blogger.getPosts(postId=args.postId)
elif args.query:
posts = blogger.getPosts(
query=args.query,
maxResults=args.count)
elif args.u:
posts = blogger.getPosts(
url=args.u)
else:
posts = blogger.getPosts(
labels=args.labels,
maxResults=args.count)
jobs = [gevent.spawn(printPosts,
item, args.fields, args.doc, args.tofiles)
for item in posts]
gevent.wait(jobs)
except AccessTokenRefreshError:
# The AccessTokenRefreshError exception is raised if the credentials
# have been revoked by the user or they have expired.
print('The credentials have been revoked or expired, please re-run'
' the application to re-authorize')
return -1
return 0
def _wait(self):
gevent.wait()
def main(filename):
print('Importing {}'.format(filename))
group_singular = {
'conifers': [
'conifer', 'plant', 'land plant', 'botany'],
'reptiles': [
'reptile', 'animal', 'cold blood', 'cold bloded', 'vertebrate',
'fauna'],
'turtles (non-marine)': [
'turtle', 'animal', 'non-marine', 'cold blood', 'cold bloded',
'vertebrate', 'fauna'],
'butterflies': [
'butterfly', 'animal', 'insect', 'moths and butterflies', 'fauna',
'invertebrate'],
'dragonflies': [
'dragonfly', 'animal', 'insect', 'dragonflies and damseflies',
'invertebrate', 'fauna'],
'mammals': [
'mammal', 'animal', 'warm blood', 'warm blooded', 'vertebrate',
'fauna'],
'birds': [
'bird', 'animal', 'warm blood', 'warm blooded', 'vertebrate',
'fauna'],
'amphibians': [
'amfibian', 'animal', 'vertebrate', 'fauna'],
'sphingid moths': [
'sphingid moth', 'moth', 'animal', 'insect', 'invertebrate',
'fauna', 'moths and butterflies'],
'bumblebees': [
'bumblebee', 'bee', 'bees', 'animal', 'insect', 'invertebrate'],
}
with open(filename, newline='') as f:
count = 0
# "Scientific Name","Common Name","Family","Taxonomic Group"
for row in csv.reader(f, delimiter=',', quotechar='"'):
count += 1
common = row[1]
if common == 'null':
common = row[0]
gevent.spawn(
post_species, row[0], common,
[row[2], row[3]] + group_singular[row[3].lower()])
if count >= 100:
gevent.wait()
count = 0
gevent.wait()
return 0
def test_pickle(graph):
graph.add_component("Generate", GenerateTestData, COUNT=5)
passthru = graph.add_component("Pass", SlowPass, DELAY=0.1)
count = graph.add_component("Counter", Counter)
dis1 = graph.add_component("Discard1", Discard)
dis2 = graph.add_component("Discard2", Discard)
graph.connect("Generate.OUT", "Pass.IN")
graph.connect("Pass.OUT", "Counter.IN")
graph.connect("Counter.COUNT", "Discard1.IN")
graph.connect("Counter.OUT", "Discard2.IN")
net = Network(graph)
netrunner = gevent.spawn(net.go)
try:
with gevent.Timeout(.35) as timeout:
gevent.wait([netrunner])
except gevent.Timeout:
print(count.execute)
assert count.count == 4
assert dis2.values == ['000005', '000004', '000003', '000002']
import pickle
# dump before terminating to get the runner statuses
data = pickle.dumps(net)
# FIXME: do we need to auto-terminate inside wait_for_all if there is an error?
net.terminate()
net.wait_for_all()
# gevent.wait([netrunner]) # this causes more packets to be sent. no good.
net2 = pickle.loads(data)
assert net2.graph.component('Counter').count == 4
assert net2.graph.component('Discard2').values == ['000005', '000004', '000003', '000002']
net2.go(resume=True)
assert net2.graph.component('Counter').count == 5
assert net2.graph.component('Discard2').values == ['000005', '000004', '000003', '000002', '000001']
# FIXME: test the case where a packet is lost due to being shut-down
# packet counting should catch it. to test, use this component, which
# can be killed while it sleeps holding a packet:
# @component
# @outport("OUT")
# @inport("IN")
# @inport("DELAY", type=float, required=True)
# def SlowPass(IN, DELAY, OUT):
# """
# Pass a stream of packets to an output stream with a delay between packets
# """
# delay = DELAY.receive_once()
# for p in IN:
# time.sleep(delay)
# OUT.send(p)