def _consume(self, msg):
io_loop = tornado.ioloop.IOLoop.instance()
if msg.content_type != 'application/json':
LOG.warn('invalid content-type header.'
' only json content is acceptable.'
' message rejected.')
msg.reject(requeue=False)
return False
try:
data = json_decode(msg.body)
except ValueError as e:
msg.reject(requeue=False)
LOG.warn('malformed json message: %s. reason: %s '
'message rejected.' % (msg.body, e))
else:
future = maybe_future(self._on_message(data))
io_loop.add_future(future, lambda f: self._ack(f, msg))
python类maybe_future()的实例源码
def _read_chunked_body(self, delegate):
# TODO: "chunk extensions" http://tools.ietf.org/html/rfc2616#section-3.6.1
total_size = 0
while True:
chunk_len = yield self.stream.read_until(b"\r\n", max_bytes=64)
chunk_len = int(chunk_len.strip(), 16)
if chunk_len == 0:
return
total_size += chunk_len
if total_size > self._max_body_size:
raise httputil.HTTPInputError("chunked body too large")
bytes_to_read = chunk_len
while bytes_to_read:
chunk = yield self.stream.read_bytes(
min(bytes_to_read, self.params.chunk_size), partial=True)
bytes_to_read -= len(chunk)
if not self._write_finished or self.is_client:
with _ExceptionLoggingContext(app_log):
yield gen.maybe_future(delegate.data_received(chunk))
# chunk ends with \r\n
crlf = yield self.stream.read_bytes(2)
assert crlf == b"\r\n"
def _read_chunked_body(self, delegate):
# TODO: "chunk extensions" http://tools.ietf.org/html/rfc2616#section-3.6.1
total_size = 0
while True:
chunk_len = yield self.stream.read_until(b"\r\n", max_bytes=64)
chunk_len = int(chunk_len.strip(), 16)
if chunk_len == 0:
return
total_size += chunk_len
if total_size > self._max_body_size:
raise httputil.HTTPInputError("chunked body too large")
bytes_to_read = chunk_len
while bytes_to_read:
chunk = yield self.stream.read_bytes(
min(bytes_to_read, self.params.chunk_size), partial=True)
bytes_to_read -= len(chunk)
if not self._write_finished or self.is_client:
with _ExceptionLoggingContext(app_log):
yield gen.maybe_future(delegate.data_received(chunk))
# chunk ends with \r\n
crlf = yield self.stream.read_bytes(2)
assert crlf == b"\r\n"
def get(self):
"""Crossdock sends GET requests with query params to initiate test."""
behavior = self.get_query_argument('behavior')
respw = ResponseWriter()
params = {
'respw': respw,
'server': self.get_query_argument('server', None),
'transport': self.get_query_argument('transport', None),
'encoding': self.get_query_argument('encoding', None),
}
fn = BEHAVIORS.get(behavior)
if fn is None:
self.write(json.dumps([{
"status": SKIPPED,
"output": "Not implemented",
}]))
return
try:
yield gen.maybe_future(fn(**params))
self.write(json.dumps(respw.entries))
except Exception as e:
self.write(json.dumps([{
"status": FAILED,
"output": "%s" % e
}]))
return
def process_sayHello(self, seqid, iprot, oprot):
args = sayHello_args()
args.read(iprot)
iprot.readMessageEnd()
result = sayHello_result()
result.success = yield gen.maybe_future(self._handler.sayHello())
oprot.writeMessageBegin("sayHello", TMessageType.REPLY, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
# HELPER FUNCTIONS AND STRUCTURES
def process_sayHello(self, seqid, iprot, oprot):
args = sayHello_args()
args.read(iprot)
iprot.readMessageEnd()
result = sayHello_result()
result.success = yield gen.maybe_future(self._handler.sayHello())
oprot.writeMessageBegin("sayHello", TMessageType.REPLY, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
def process_getData(self, seqid, iprot, oprot):
args = getData_args()
args.read(iprot)
iprot.readMessageEnd()
result = getData_result()
result.success = yield gen.maybe_future(self._handler.getData(args.input))
oprot.writeMessageBegin("getData", TMessageType.REPLY, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
# HELPER FUNCTIONS AND STRUCTURES
def process_sayHello(self, seqid, iprot, oprot):
args = sayHello_args()
args.read(iprot)
iprot.readMessageEnd()
result = sayHello_result()
result.success = yield gen.maybe_future(self._handler.sayHello())
oprot.writeMessageBegin("sayHello", TMessageType.REPLY, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
def process_getData(self, seqid, iprot, oprot):
args = getData_args()
args.read(iprot)
iprot.readMessageEnd()
result = getData_result()
result.success = yield gen.maybe_future(self._handler.getData(args.input))
oprot.writeMessageBegin("getData", TMessageType.REPLY, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
# HELPER FUNCTIONS AND STRUCTURES
def async_fetch(self, task, callback=None):
'''Do one fetch'''
url = task.get('url', 'data:,')
if callback is None:
callback = self.send_result
type = 'None'
start_time = time.time()
try:
if url.startswith('data:'):
type = 'data'
result = yield gen.maybe_future(self.data_fetch(url, task))
elif task.get('fetch', {}).get('fetch_type') in ('js', 'phantomjs'):
type = 'phantomjs'
result = yield self.phantomjs_fetch(url, task)
elif task.get('fetch', {}).get('fetch_type') in ('splash', ):
type = 'splash'
result = yield self.splash_fetch(url, task)
else:
type = 'http'
result = yield self.http_fetch(url, task)
except Exception as e:
logger.exception(e)
result = self.handle_error(type, url, task, start_time, e)
callback(type, task, result)
self.on_result(type, task, result)
raise gen.Return(result)
def can_fetch(self, user_agent, url):
parsed = urlsplit(url)
domain = parsed.netloc
if domain in self.robots_txt_cache:
robot_txt = self.robots_txt_cache[domain]
if time.time() - robot_txt.mtime() > self.robot_txt_age:
robot_txt = None
else:
robot_txt = None
if robot_txt is None:
robot_txt = RobotFileParser()
try:
response = yield gen.maybe_future(self.http_client.fetch(
urljoin(url, '/robots.txt'), connect_timeout=10, request_timeout=30))
content = response.body
except tornado.httpclient.HTTPError as e:
logger.error('load robots.txt from %s error: %r', domain, e)
content = ''
try:
content = content.decode('utf8', 'ignore')
except UnicodeDecodeError:
content = ''
robot_txt.parse(content.splitlines())
self.robots_txt_cache[domain] = robot_txt
raise gen.Return(robot_txt.can_fetch(user_agent, url))
def mock_tornado(*args, **kwargs):
m = mock.Mock(*args, **kwargs)
if not len(args) and not kwargs.get('return_value'):
m.return_value = gen.maybe_future(mock_tornado)
return m
def test_wait_event(self):
bot = AB.Bot()
test_event = {'unittest': True}
waiter = bot.wait_for_event(**test_event)
bot.event_to_chat = mock_tornado()
bot._get_next_event = mock_tornado(
side_effect=[gen.maybe_future(test_event), TestException])
try:
yield bot.start()
except TestException:
pass
event = yield waiter
self.assertEquals(event, test_event)
def _read_fixed_body(self, content_length, delegate):
while content_length > 0:
body = yield self.stream.read_bytes(
min(self.params.chunk_size, content_length), partial=True)
content_length -= len(body)
if not self._write_finished or self.is_client:
with _ExceptionLoggingContext(app_log):
yield gen.maybe_future(delegate.data_received(body))
def data_received(self, chunk):
if self._decompressor:
compressed_data = chunk
while compressed_data:
decompressed = self._decompressor.decompress(
compressed_data, self._chunk_size)
if decompressed:
yield gen.maybe_future(
self._delegate.data_received(decompressed))
compressed_data = self._decompressor.unconsumed_tail
else:
yield gen.maybe_future(self._delegate.data_received(chunk))
def _read_fixed_body(self, content_length, delegate):
while content_length > 0:
body = yield self.stream.read_bytes(
min(self.params.chunk_size, content_length), partial=True)
content_length -= len(body)
if not self._write_finished or self.is_client:
with _ExceptionLoggingContext(app_log):
yield gen.maybe_future(delegate.data_received(body))
def _async_request(self, task):
"""Async request."""
url = task.get('url')
if url.startswith('first_task'):
result = yield gen.maybe_future(self._fake_request(url, task))
else:
try:
if task.get('fetch', {}).get('fetch_type') == 'js':
result = yield self._phantomjs_request(url, task)
else:
result = yield self._http_request(url, task)
except Exception as e:
logger.exception(e)
if task.get('process', {}).get('callback'):
results, follows, db_name, coll_name = self.processor.handle_result(task, result)
if results:
# put results to resultdb
self.processor.put_results(results, db_name, coll_name, task)
if follows:
# put new tasks to newtask_queue
self.put_follows(follows)
raise gen.Return(result)
def process_startTrace(self, seqid, iprot, oprot):
args = startTrace_args()
args.read(iprot)
iprot.readMessageEnd()
result = startTrace_result()
result.success = yield gen.maybe_future(self._handler.startTrace(args.request))
oprot.writeMessageBegin("startTrace", TMessageType.REPLY, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
def process_joinTrace(self, seqid, iprot, oprot):
args = joinTrace_args()
args.read(iprot)
iprot.readMessageEnd()
result = joinTrace_result()
result.success = yield gen.maybe_future(self._handler.joinTrace(args.request))
oprot.writeMessageBegin("joinTrace", TMessageType.REPLY, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
# HELPER FUNCTIONS AND STRUCTURES
def submit(fn, io_loop, *args, **kwargs):
"""Submit Tornado Coroutine to IOLoop.current().
:param fn: Tornado Coroutine to execute
:param io_loop: Tornado IOLoop where to schedule the coroutine
:param args: Args to pass to coroutine
:param kwargs: Kwargs to pass to coroutine
:returns concurrent.futures.Future: future result of coroutine
"""
future = Future()
def execute():
"""Execute fn on the IOLoop."""
try:
result = gen.maybe_future(fn(*args, **kwargs))
except Exception:
# The function we ran didn't return a future and instead raised
# an exception. Let's pretend that it returned this dummy
# future with our stack trace.
f = gen.Future()
f.set_exc_info(sys.exc_info())
on_done(f)
else:
result.add_done_callback(on_done)
def on_done(tornado_future):
"""
Set tornado.Future results to the concurrent.Future.
:param tornado_future:
"""
exception = tornado_future.exception()
if not exception:
future.set_result(tornado_future.result())
else:
future.set_exception(exception)
io_loop.add_callback(execute)
return future
def process_submitZipkinBatch(self, seqid, iprot, oprot):
args = submitZipkinBatch_args()
args.read(iprot)
iprot.readMessageEnd()
result = submitZipkinBatch_result()
result.success = yield gen.maybe_future(self._handler.submitZipkinBatch(args.spans))
oprot.writeMessageBegin("submitZipkinBatch", TMessageType.REPLY, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
# HELPER FUNCTIONS AND STRUCTURES
def process_emitZipkinBatch(self, seqid, iprot, oprot):
args = emitZipkinBatch_args()
args.read(iprot)
iprot.readMessageEnd()
yield gen.maybe_future(self._handler.emitZipkinBatch(args.spans))
# HELPER FUNCTIONS AND STRUCTURES
def process_ping(self, seqid, iprot, oprot):
args = ping_args()
args.read(iprot)
iprot.readMessageEnd()
result = ping_result()
result.success = yield gen.maybe_future(self._handler.ping())
oprot.writeMessageBegin("ping", TMessageType.REPLY, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
# HELPER FUNCTIONS AND STRUCTURES
def handle_stream(self, stream, address):
host, port = address
trans = TTornadoStreamTransport(
host=host, port=port, stream=stream,
io_loop=self.io_loop, read_timeout=self.transport_read_timeout)
try:
oprot = self._oprot_factory.get_protocol(trans)
iprot = self._iprot_factory.get_protocol(TMemoryBuffer())
while not trans.stream.closed():
# TODO: maybe read multiple frames in advance for concurrency
try:
frame = yield trans.read_frame()
except TTransportException as e:
if e.type == TTransportException.END_OF_FILE:
break
else:
raise
iprot.trans.setvalue(frame)
api, seqid, result, call = self._processor.process_in(iprot)
if isinstance(result, TApplicationException):
self._processor.send_exception(oprot, api, result, seqid)
else:
try:
result.success = yield gen.maybe_future(call())
except Exception as e:
# raise if api don't have throws
self._processor.handle_exception(e, result)
self._processor.send_result(oprot, api, result, seqid)
except Exception:
logger.exception('thrift exception in handle_stream')
trans.close()
logger.info('client disconnected %s:%d', host, port)
def start_kernel(self, kernel_id=None, *args, **kwargs):
self.log.debug("RemoteMappingKernelManager.start_kernel: {}".format(kwargs['kernel_name']))
kernel_id = yield gen.maybe_future(super(RemoteMappingKernelManager, self).start_kernel(*args, **kwargs))
self.parent.kernel_session_manager.create_session(kernel_id, **kwargs)
raise gen.Return(kernel_id)
def process_echo(self, seqid, iprot, oprot):
args = echo_args()
args.read(iprot)
iprot.readMessageEnd()
result = echo_result()
result.success = yield gen.maybe_future(self._handler.echo(args.str))
oprot.writeMessageBegin("echo", TMessageType.REPLY, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
# HELPER FUNCTIONS AND STRUCTURES