def test_greenlet_context_copying(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
def g():
self.assert_false(flask.request)
self.assert_false(flask.current_app)
with reqctx:
self.assert_true(flask.request)
self.assert_equal(flask.current_app, app)
self.assert_equal(flask.request.path, '/')
self.assert_equal(flask.request.args['foo'], 'bar')
self.assert_false(flask.request)
return 42
greenlets.append(greenlet(g))
return 'Hello World!'
rv = app.test_client().get('/?foo=bar')
self.assert_equal(rv.data, b'Hello World!')
result = greenlets[0].run()
self.assert_equal(result, 42)
python类greenlet()的实例源码
def testBenchmark():
import time
def printThreadNum():
import gc
from greenlet import greenlet
objs = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)]
print "Greenlets: %s" % len(objs)
printThreadNum()
test = TestNoblock()
s = time.time()
for i in range(3):
gevent.spawn(test.count, i + 1)
print "Created in %.3fs" % (time.time() - s)
printThreadNum()
time.sleep(5)
def __new__(cls, func, sentinel=''):
if greenlet is None:
raise RuntimeError('IterI requires greenlet support')
stream = object.__new__(cls)
stream._parent = greenlet.getcurrent()
stream._buffer = []
stream.closed = False
stream.sentinel = sentinel
stream.pos = 0
def run():
func(stream)
stream.close()
g = greenlet.greenlet(run, stream._parent)
while 1:
rv = g.switch()
if not rv:
return
yield rv[0]
def test_greenlet_context_copying_api(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
@flask.copy_current_request_context
def g():
self.assert_true(flask.request)
self.assert_equal(flask.current_app, app)
self.assert_equal(flask.request.path, '/')
self.assert_equal(flask.request.args['foo'], 'bar')
return 42
greenlets.append(greenlet(g))
return 'Hello World!'
rv = app.test_client().get('/?foo=bar')
self.assert_equal(rv.data, b'Hello World!')
result = greenlets[0].run()
self.assert_equal(result, 42)
# Disable test if we don't have greenlets available
def __new__(cls, func, sentinel=''):
if greenlet is None:
raise RuntimeError('IterI requires greenlet support')
stream = object.__new__(cls)
stream._parent = greenlet.getcurrent()
stream._buffer = []
stream.closed = False
stream.sentinel = sentinel
stream.pos = 0
def run():
func(stream)
stream.close()
g = greenlet.greenlet(run, stream._parent)
while 1:
rv = g.switch()
if not rv:
return
yield rv[0]
def resolve(self, host, port, family):
"""Return list of (family, address) pairs."""
child_gr = greenlet.getcurrent()
main = child_gr.parent
assert main is not None, "Should be on child greenlet"
def handler(exc_typ, exc_val, exc_tb):
# If netutil.Resolver is configured to use TwistedResolver.
if DomainError and issubclass(exc_typ, DomainError):
exc_typ = socket.gaierror
exc_val = socket.gaierror(str(exc_val))
# Depending on the resolver implementation, we could be on any
# thread or greenlet. Return to the loop's thread and raise the
# exception on the calling greenlet from there.
self.io_loop.add_callback(functools.partial(
child_gr.throw, exc_typ, exc_val, exc_tb))
return True # Don't propagate the exception.
with stack_context.ExceptionStackContext(handler):
self.resolver.resolve(host, port, family, callback=child_gr.switch)
return main.switch()
def refresh(self):
assert greenlet.getcurrent().parent is not None,\
"Should be on child greenlet"
try:
self.rsc.refresh()
except pymongo.errors.AutoReconnect:
pass
# RSC has been collected or there
# was an unexpected error.
except:
return
finally:
# Switch to greenlets blocked in wait_for_refresh().
self.refreshed.set()
self.timeout_obj = self.io_loop.add_timeout(
time.time() + self._refresh_interval, self.async_refresh)
def __del__(self):
# This MotorCursor is deleted on whatever greenlet does the last
# decref, or (if it's referenced from a cycle) whichever is current
# when the GC kicks in. We may need to send the server a killCursors
# message, but in Motor only direct children of the main greenlet can
# do I/O. First, do a quick check whether the cursor is still alive on
# the server:
if self.cursor_id and self.alive:
if greenlet.getcurrent().parent is not None:
# We're on a child greenlet, send the message.
self.delegate.close()
else:
# We're on the main greenlet, start the operation on a child.
self.close()
# Paper over some differences between PyMongo Cursor and CommandCursor.
def refresh(self):
assert greenlet.getcurrent().parent is not None,\
"Should be on child greenlet"
try:
self.rsc.refresh()
except pymongo.errors.AutoReconnect:
pass
# RSC has been collected or there
# was an unexpected error.
except:
return
finally:
# Switch to greenlets blocked in wait_for_refresh().
self.refreshed.set()
self.timeout_obj = self.io_loop.add_timeout(
time.time() + self._refresh_interval, self.async_refresh)
def __del__(self):
# This MotorCursor is deleted on whatever greenlet does the last
# decref, or (if it's referenced from a cycle) whichever is current
# when the GC kicks in. We may need to send the server a killCursors
# message, but in Motor only direct children of the main greenlet can
# do I/O. First, do a quick check whether the cursor is still alive on
# the server:
if self.cursor_id and self.alive:
if greenlet.getcurrent().parent is not None:
# We're on a child greenlet, send the message.
self.delegate.close()
else:
# We're on the main greenlet, start the operation on a child.
self.close()
# Paper over some differences between PyMongo Cursor and CommandCursor.
def resolve(self, host, port, family):
"""Return list of (family, address) pairs."""
child_gr = greenlet.getcurrent()
main = child_gr.parent
assert main is not None, "Should be on child greenlet"
def handler(exc_typ, exc_val, exc_tb):
# If netutil.Resolver is configured to use TwistedResolver.
if DomainError and issubclass(exc_typ, DomainError):
exc_typ = socket.gaierror
exc_val = socket.gaierror(str(exc_val))
# Depending on the resolver implementation, we could be on any
# thread or greenlet. Return to the loop's thread and raise the
# exception on the calling greenlet from there.
self.io_loop.add_callback(functools.partial(
child_gr.throw, exc_typ, exc_val, exc_tb))
return True # Don't propagate the exception.
with stack_context.ExceptionStackContext(handler):
self.resolver.resolve(host, port, family, callback=child_gr.switch)
return main.switch()
def refresh(self):
assert greenlet.getcurrent().parent is not None,\
"Should be on child greenlet"
try:
self.rsc.refresh()
except pymongo.errors.AutoReconnect:
pass
# RSC has been collected or there
# was an unexpected error.
except:
return
finally:
# Switch to greenlets blocked in wait_for_refresh().
self.refreshed.set()
self.timeout_obj = self.io_loop.add_timeout(
time.time() + self._refresh_interval, self.async_refresh)
def resolve(self, host, port, family):
"""Return list of (family, address) pairs."""
child_gr = greenlet.getcurrent()
main = child_gr.parent
assert main is not None, "Should be on child greenlet"
def handler(exc_typ, exc_val, exc_tb):
# If netutil.Resolver is configured to use TwistedResolver.
if DomainError and issubclass(exc_typ, DomainError):
exc_typ = socket.gaierror
exc_val = socket.gaierror(str(exc_val))
# Depending on the resolver implementation, we could be on any
# thread or greenlet. Return to the loop's thread and raise the
# exception on the calling greenlet from there.
self.io_loop.add_callback(functools.partial(
child_gr.throw, exc_typ, exc_val, exc_tb))
return True # Don't propagate the exception.
with stack_context.ExceptionStackContext(handler):
self.resolver.resolve(host, port, family, callback=child_gr.switch)
return main.switch()
def refresh(self):
assert greenlet.getcurrent().parent is not None,\
"Should be on child greenlet"
try:
self.rsc.refresh()
except pymongo.errors.AutoReconnect:
pass
# RSC has been collected or there
# was an unexpected error.
except:
return
finally:
# Switch to greenlets blocked in wait_for_refresh().
self.refreshed.set()
self.timeout_obj = self.io_loop.add_timeout(
time.time() + self._refresh_interval, self.async_refresh)
def __del__(self):
# This MotorCursor is deleted on whatever greenlet does the last
# decref, or (if it's referenced from a cycle) whichever is current
# when the GC kicks in. We may need to send the server a killCursors
# message, but in Motor only direct children of the main greenlet can
# do I/O. First, do a quick check whether the cursor is still alive on
# the server:
if self.cursor_id and self.alive:
if greenlet.getcurrent().parent is not None:
# We're on a child greenlet, send the message.
self.delegate.close()
else:
# We're on the main greenlet, start the operation on a child.
self.close()
# Paper over some differences between PyMongo Cursor and CommandCursor.
def __new__(cls, func, sentinel=''):
if greenlet is None:
raise RuntimeError('IterI requires greenlet support')
stream = object.__new__(cls)
stream._parent = greenlet.getcurrent()
stream._buffer = []
stream.closed = False
stream.sentinel = sentinel
stream.pos = 0
def run():
func(stream)
stream.close()
g = greenlet.greenlet(run, stream._parent)
while 1:
rv = g.switch()
if not rv:
return
yield rv[0]
def test_greenlet_context_copying(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
def g():
self.assert_false(flask.request)
self.assert_false(flask.current_app)
with reqctx:
self.assert_true(flask.request)
self.assert_equal(flask.current_app, app)
self.assert_equal(flask.request.path, '/')
self.assert_equal(flask.request.args['foo'], 'bar')
self.assert_false(flask.request)
return 42
greenlets.append(greenlet(g))
return 'Hello World!'
rv = app.test_client().get('/?foo=bar')
self.assert_equal(rv.data, b'Hello World!')
result = greenlets[0].run()
self.assert_equal(result, 42)
def test_greenlet_context_copying_api(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
@flask.copy_current_request_context
def g():
self.assert_true(flask.request)
self.assert_equal(flask.current_app, app)
self.assert_equal(flask.request.path, '/')
self.assert_equal(flask.request.args['foo'], 'bar')
return 42
greenlets.append(greenlet(g))
return 'Hello World!'
rv = app.test_client().get('/?foo=bar')
self.assert_equal(rv.data, b'Hello World!')
result = greenlets[0].run()
self.assert_equal(result, 42)
# Disable test if we don't have greenlets available
def __new__(cls, func, sentinel=''):
if greenlet is None:
raise RuntimeError('IterI requires greenlet support')
stream = object.__new__(cls)
stream._parent = greenlet.getcurrent()
stream._buffer = []
stream.closed = False
stream.sentinel = sentinel
stream.pos = 0
def run():
func(stream)
stream.close()
g = greenlet.greenlet(run, stream._parent)
while 1:
rv = g.switch()
if not rv:
return
yield rv[0]
def __new__(cls, func, sentinel=''):
if greenlet is None:
raise RuntimeError('IterI requires greenlet support')
stream = object.__new__(cls)
stream._parent = greenlet.getcurrent()
stream._buffer = []
stream.closed = False
stream.sentinel = sentinel
stream.pos = 0
def run():
func(stream)
stream.close()
g = greenlet.greenlet(run, stream._parent)
while 1:
rv = g.switch()
if not rv:
return
yield rv[0]
def test_greenlet_tracing(self):
main = greenlet.getcurrent()
actions = []
def trace(*args):
actions.append(args)
def dummy():
pass
def dummyexc():
raise SomeError()
oldtrace = greenlet.settrace(trace)
try:
g1 = greenlet.greenlet(dummy)
g1.switch()
g2 = greenlet.greenlet(dummyexc)
self.assertRaises(SomeError, g2.switch)
finally:
greenlet.settrace(oldtrace)
self.assertEqual(actions, [
('switch', (main, g1)),
('switch', (g1, main)),
('switch', (main, g2)),
('throw', (g2, main)),
])
def test_exception_disables_tracing(self):
main = greenlet.getcurrent()
actions = []
def trace(*args):
actions.append(args)
raise SomeError()
def dummy():
main.switch()
g = greenlet.greenlet(dummy)
g.switch()
oldtrace = greenlet.settrace(trace)
try:
self.assertRaises(SomeError, g.switch)
self.assertEqual(greenlet.gettrace(), None)
finally:
greenlet.settrace(oldtrace)
self.assertEqual(actions, [
('switch', (main, g)),
])
def test_setparent(self):
def foo():
def bar():
greenlet.getcurrent().parent.switch()
# This final switch should go back to the main greenlet, since
# the test_setparent() function in the C extension should have
# reparented this greenlet.
greenlet.getcurrent().parent.switch()
raise AssertionError("Should never have reached this code")
child = greenlet.greenlet(bar)
child.switch()
greenlet.getcurrent().parent.switch(child)
greenlet.getcurrent().parent.throw(
AssertionError("Should never reach this code"))
foo_child = greenlet.greenlet(foo).switch()
self.assertEqual(None, _test_extension.test_setparent(foo_child))
def test_throw(self):
seen = []
def foo():
try:
greenlet.getcurrent().parent.switch()
except ValueError:
seen.append(sys.exc_info()[1])
except greenlet.GreenletExit:
raise AssertionError
g = greenlet.greenlet(foo)
g.switch()
_test_extension.test_throw(g)
self.assertEqual(len(seen), 1)
self.assertTrue(
isinstance(seen[0], ValueError),
"ValueError was not raised in foo()")
self.assertEqual(
str(seen[0]),
'take that sucka!',
"message doesn't match")
def test_threaded_leak(self):
gg = []
def worker():
# only main greenlet present
gg.append(weakref.ref(greenlet.getcurrent()))
for i in range(2):
t = threading.Thread(target=worker)
t.start()
t.join()
del t
greenlet.getcurrent() # update ts_current
self.recycle_threads()
greenlet.getcurrent() # update ts_current
gc.collect()
greenlet.getcurrent() # update ts_current
for g in gg:
self.assertTrue(g() is None)
def test_threaded_adv_leak(self):
gg = []
def worker():
# main and additional *finished* greenlets
ll = greenlet.getcurrent().ll = []
def additional():
ll.append(greenlet.getcurrent())
for i in range(2):
greenlet.greenlet(additional).switch()
gg.append(weakref.ref(greenlet.getcurrent()))
for i in range(2):
t = threading.Thread(target=worker)
t.start()
t.join()
del t
greenlet.getcurrent() # update ts_current
self.recycle_threads()
greenlet.getcurrent() # update ts_current
gc.collect()
greenlet.getcurrent() # update ts_current
for g in gg:
self.assertTrue(g() is None)
def __new__(cls, func, sentinel=''):
if greenlet is None:
raise RuntimeError('IterI requires greenlet support')
stream = object.__new__(cls)
stream._parent = greenlet.getcurrent()
stream._buffer = []
stream.closed = False
stream.sentinel = sentinel
stream.pos = 0
def run():
func(stream)
stream.close()
g = greenlet.greenlet(run, stream._parent)
while 1:
rv = g.switch()
if not rv:
return
yield rv[0]
def __new__(cls, func, sentinel=''):
if greenlet is None:
raise RuntimeError('IterI requires greenlet support')
stream = object.__new__(cls)
stream._parent = greenlet.getcurrent()
stream._buffer = []
stream.closed = False
stream.sentinel = sentinel
stream.pos = 0
def run():
func(stream)
stream.close()
g = greenlet.greenlet(run, stream._parent)
while 1:
rv = g.switch()
if not rv:
return
yield rv[0]
def __new__(cls, func, sentinel=''):
if greenlet is None:
raise RuntimeError('IterI requires greenlet support')
stream = object.__new__(cls)
stream._parent = greenlet.getcurrent()
stream._buffer = []
stream.closed = False
stream.sentinel = sentinel
stream.pos = 0
def run():
func(stream)
stream.close()
g = greenlet.greenlet(run, stream._parent)
while 1:
rv = g.switch()
if not rv:
return
yield rv[0]
def test_greenlet_context_copying(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
def g():
self.assert_false(flask.request)
self.assert_false(flask.current_app)
with reqctx:
self.assert_true(flask.request)
self.assert_equal(flask.current_app, app)
self.assert_equal(flask.request.path, '/')
self.assert_equal(flask.request.args['foo'], 'bar')
self.assert_false(flask.request)
return 42
greenlets.append(greenlet(g))
return 'Hello World!'
rv = app.test_client().get('/?foo=bar')
self.assert_equal(rv.data, b'Hello World!')
result = greenlets[0].run()
self.assert_equal(result, 42)