def create_app(loop=None):
app = web.Application(
middlewares=[
session_middleware(EncryptedCookieStorage(settings.COOKIE_SECRET)),
error_middleware],
loop=loop if loop is not None else settings.loop)
aiohttp_jinja2.setup(
app,
loader=jinja2.FileSystemLoader(settings.TEMPLATES_PATH))
for route in routes.routes:
app.router.add_route(*route[0], **route[1])
if settings.DEBUG:
app.router.add_static(settings.STATIC_URL, settings.STATIC_PATH)
return app
python类Application()的实例源码
def create_server(loop, config_mod_name):
try:
config = __import__(config_mod_name, fromlist=['get config'])
except ImportError as e:
raise e
await create_pool(loop, **config.db_config)
app = web.Application(loop=loop, middlewares=[
logger_factory, auth_factory, data_factory, response_factory])
add_routes(app, 'app.route')
add_routes(app, 'app.api')
add_routes(app, 'app.api_v2')
add_static(app)
init_jinja2(app, filters=dict(datetime=datetime_filter, marked=marked_filter), **config.jinja2_config)
server = await loop.create_server(app.make_handler(), '127.0.0.1', 9900)
logging.info('server started at http://127.0.0.1:9900...')
return server
def create_app(loop):
app = web.Application(loop=loop)
app["config"] = {"test": "foo"}
app.router.add_route('GET', '/', handle)
add_route(app, multiple_query_params)
add_route(app, multiply)
add_route(app, get_id)
add_route(app, header_response)
route(app, config)
route(app, get_optional)
route(app, body_and_header)
route(app, error)
route(app, api_exception)
# this should be at the end, to ensure all routes are considered when
# constructing the handler.
add_swagger(app, "/swagger.json", "/swagger")
return app
def setup_routes(app: web.Application) -> None:
app.router.add_route('*', '/active_monitor/', view.ActiveMonitorView)
app.router.add_route('*', '/active_monitor_alert/', view.ActiveMonitorAlertView)
app.router.add_route('*', '/active_monitor_contact/', view.ActiveMonitorContactView)
app.router.add_route('*', '/active_monitor_contact_group/', view.ActiveMonitorContactGroupView)
app.router.add_route('*', '/active_monitor_def/', view.ActiveMonitorDefView)
app.router.add_route('*', '/active_monitor_def_arg/', view.ActiveMonitorDefArgView)
app.router.add_route('*', '/monitor_group/', view.MonitorGroupView)
app.router.add_route('*', '/monitor_group_active_monitor/', view.MonitorGroupActiveMonitorView)
app.router.add_route('*', '/monitor_group_contact/', view.MonitorGroupContactView)
app.router.add_route('*', '/monitor_group_contact_group/', view.MonitorGroupContactGroupView)
app.router.add_route('*', '/contact/', view.ContactView)
app.router.add_route('*', '/contact_group/', view.ContactGroupView)
app.router.add_route('*', '/contact_group_contact/', view.ContactGroupContactView)
app.router.add_route('*', '/metadata/', view.MetadataView)
app.router.add_route('*', '/bindata/', view.BindataView)
app.router.add_route('*', '/statistics/', view.StatisticsView)
def initialize(loop: asyncio.AbstractEventLoop, port: int, username: str, password: str, dbcon: DBConnection,
active_monitor_manager: ActiveMonitorManager) -> None:
"""Initialize the webapi listener."""
stats.set('num_calls', 0, 'WEBAPI')
app = web.Application(loop=loop, logger=log.logger,
middlewares=[
middleware.logging_middleware_factory,
middleware.error_handler_middleware_factory,
middleware.basic_auth_middleware_factory,
])
app['username'] = username
app['password'] = password
app['dbcon'] = dbcon
app['active_monitor_manager'] = active_monitor_manager
setup_routes(app)
listener = loop.create_server(app.make_handler(), '0.0.0.0', port)
loop.create_task(listener)
log.msg('Webapi listening on port %s' % port)
def get_app(self, static_serve=False) -> web.Application:
"""
Create aiohttp application for webhook handling
"""
app = get_app(self, static_serve=static_serve)
# webhook handler
webhook_path = urlparse(self.webhook).path
app.router.add_post(webhook_path, self.webhook_handle)
# viber webhooks registering
if self._unset_webhook_on_cleanup:
app.on_cleanup.append(lambda a: a.bot.api.unset_webhook())
if self._set_webhook_on_startup:
app.on_startup.append(lambda a: a.bot.set_webhook_on_startup())
return app
test_aiohttp_swaggerify.py 文件源码
项目:aiohttp_swaggerify
作者: dchaplinsky
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_validate_fullfledged(test_client, loop):
app = web.Application(loop=loop)
app.router.add_get('/', full_fledged_handler)
app.router.add_post('/', full_fledged_handler)
app = swaggerify(
app,
basePath="/",
host="127.0.0.1:8080"
)
client = await test_client(app)
resp = await client.get('/swagger.json')
assert resp.status == 200
text = await resp.json()
with open("tests/validate_swagger.json", "r") as fp:
assert schema_validate(text, json.load(fp)) is None
def aiohttp_server(loop, addr):
async def handle(request):
payload_size = int(request.match_info.get('size', 1024))
resp = _RESP_CACHE.get(payload_size)
if resp is None:
resp = b'X' * payload_size
_RESP_CACHE[payload_size] = resp
return web.Response(body=resp)
app = web.Application(loop=loop)
app.router.add_route('GET', '/{size}', handle)
app.router.add_route('GET', '/', handle)
handler = app.make_handler()
server = loop.create_server(handler, *addr)
return server
def auth_through_token(app: web.Application, handler):
async def middleware_handler(request: web.Request):
headers = request.headers
x_auth_token = headers.get("X-Auth-Token")
project_id = request.match_info.get('project_id')
c = config.Config.config_instance()
try:
auth = identity.Token(c.auth_url,
token=x_auth_token,
project_id=project_id)
sess = session.Session(auth=auth)
ks = client.Client(session=sess,
project_id=project_id)
ks.authenticate(token=x_auth_token)
except Exception as ex:
return web.json_response(status=401, data={
"error": {
"message": ("Not authorized. Reason: {}"
.format(str(ex)))
}
})
return await handler(request)
return middleware_handler
def setUp(self):
self._old_loop = asyncio.get_event_loop()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
# logging.basicConfig(level=logging.DEBUG)
# self.loop.set_debug(True)
self.parsed_url = urlparse(self.GOOD_WEBHOOK_URL)
dummy_mattermost_api_app = web.Application()
dummy_mattermost_api_app.router.add_post(
"/hooks/{webhook_id}",
partial(handle_incoming_webhook,
webhook_urls=[self.parsed_url.path])
)
dummy_mattermost_api_factory = self.loop.create_server(
dummy_mattermost_api_app.make_handler(),
*self.parsed_url.netloc.split(":")
)
self.dummy_mattermost_api_server = self.loop.run_until_complete(dummy_mattermost_api_factory)
def get_app():
redis = await aioredis.create_redis(('localhost', 6379,), db=1)
app = web.Application()
app['redis'] = redis
aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader('templates/'),
context_processors=(static_processor,))
app.router.add_route('GET', '/', handlers.index)
app.router.add_route('GET', '/login', handlers.login_task)
app.router.add_route('POST', '/login', handlers.login)
app.router.add_static('/static', 'static')
async def close_redis(app):
app['redis'].close()
app.on_shutdown.append(close_redis)
return app
def test_invalid_request(test_client, loop):
app = web.Application(loop=loop)
app.router.add_post('/', hello)
app.router.add_get('/', hello)
client = await test_client(app)
resp = await client.get('/')
assert resp.status == 400
text = await resp.json()
assert 'Request is malformed' in text["error"]
resp = await client.post('/')
assert resp.status == 400
text = await resp.json()
assert 'Request is malformed' in text["error"]
resp = await client.post('/', data="123afasdf")
assert resp.status == 400
text = await resp.json()
assert 'Request is malformed' in text["error"]
def test_websocket_wrapper_authentication_error():
loop = asyncio.get_event_loop()
@websocket("/", authenticate=True)
def handler(ws, params, **kwargs):
ws.send_str("hello, world!")
@asyncio.coroutine
def start_server(loop):
app = Application()
app.router.add_route(*handler.route)
srv = yield from loop.create_server(app.make_handler(), "localhost", 9000)
return srv
@asyncio.coroutine
def go(loop):
srv = yield from start_server(loop)
with pytest.raises(WSServerHandshakeError):
client = yield from ws_connect("http://localhost:9000")
yield from client.close()
srv.close()
yield from srv.wait_closed()
loop.run_until_complete(go(loop))
def test_websocket_wrapper_invalid_token_error():
loop = asyncio.get_event_loop()
@websocket("/", authenticate=True)
def handler(ws, params, **kwargs):
ws.send_str("hello, world!")
@asyncio.coroutine
def start_server(loop):
app = Application()
app.router.add_route(*handler.route)
srv = yield from loop.create_server(app.make_handler(), "localhost", 9000)
return srv
@asyncio.coroutine
def go(loop):
srv = yield from start_server(loop)
with pytest.raises(WSServerHandshakeError):
client = yield from ws_connect("http://localhost:9000?token=ooo")
yield from client.close()
srv.close()
yield from srv.wait_closed()
loop.run_until_complete(go(loop))
def test_websocket_pubsub_wrapper_authentication_error():
loop = asyncio.get_event_loop()
@websocket_pubsub("/", authenticate=True)
def handler(ws, params, **kwargs):
ws.send_str("hello, world!")
@asyncio.coroutine
def start_server(loop):
app = Application()
app.router.add_route(*handler.route)
srv = yield from loop.create_server(app.make_handler(), "localhost", 9000)
return srv
@asyncio.coroutine
def go(loop):
srv = yield from start_server(loop)
with pytest.raises(WSServerHandshakeError):
client = yield from ws_connect("http://localhost:9000")
yield from client.close()
srv.close()
yield from srv.wait_closed()
loop.run_until_complete(go(loop))
def test_websocket_pubsub_wrapper_invalid_token_error():
loop = asyncio.get_event_loop()
@websocket_pubsub("/", authenticate=True)
def handler(ws, params, **kwargs):
ws.send_str("hello, world!")
@asyncio.coroutine
def start_server(loop):
app = Application()
app.router.add_route(*handler.route)
srv = yield from loop.create_server(app.make_handler(), "localhost", 9000)
return srv
@asyncio.coroutine
def go(loop):
srv = yield from start_server(loop)
with pytest.raises(WSServerHandshakeError):
client = yield from ws_connect("http://localhost:9000?token=ooo")
yield from client.close()
srv.close()
yield from srv.wait_closed()
loop.run_until_complete(go(loop))
def aiohttp_start(bot, name, port, certfile, requesthandlerclass, group,
callback=None):
requesthandler = requesthandlerclass(bot)
app = web.Application()
requesthandler.addroutes(app.router)
handler = app.make_handler()
if certfile:
sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
sslcontext.load_cert_chain(certfile)
else:
sslcontext = None
loop = asyncio.get_event_loop()
server = loop.create_server(handler, name, port, ssl=sslcontext)
asyncio.async(server).add_done_callback(
functools.partial(aiohttp_started, handler=handler, app=app,
group=group, callback=callback))
tracking.register_aiohttp_web(group)
def __init__(self, config=None, *, loop=None):
self.config = config or {}
self._configure()
logger.info('Initializing Sir Bot-a-lot')
self._loop = loop or asyncio.get_event_loop()
self._tasks = {}
self._dispatcher = None
self._pm = None
self._plugins = dict()
self._start_priority = defaultdict(list)
self._import_plugins()
self._app = web.Application(loop=self._loop)
self._app.on_startup.append(self._start)
self._app.on_cleanup.append(self._stop)
self._initialize_plugins()
self._register_factory()
headers = {'User-Agent': self.config['sirbot']['user-agent']}
self._session = aiohttp.ClientSession(loop=self._loop, headers=headers)
logger.info('Sir Bot-a-lot Initialized')
def __main():
global __wxSQL
if not myid3.init('config.xml'):
print('?????????')
return
loop = asyncio.get_event_loop()
# __initwxmenu(loop)
__wxSQL = WXSQL()
app = web.Application(loop=loop)
app.router.add_route('GET', '/', getIndex)
app.router.add_route('GET', '/WeiXin', getWX)
app.router.add_route('POST', '/WeiXin', postWX)
web.run_app(app, port=6670) # hu ??6670??
loop.close()
def test_set_get(loop, test_client):
app = web.Application()
app.router.add_get(
'/test/1',
lambda x: web.json_response(["Python"]))
client = await test_client(app)
url = client.make_url('/')
data = 'Python'
config = MergeDict(
storage=MergeDict(
cls='aioworkers.storage.http.Storage',
prefix=str(url),
semaphore=1,
format='json',
),
)
async with Context(config=config, loop=loop) as context:
storage = context.storage
assert data in await storage.get('test/1')
with pytest.raises(StorageError):
await storage.set('test/1', data)
def make_app(loop):
# TODO switch this to socket.getaddrinfo() -- see https://docs.python.org/3/library/socket.html
serverip = config.read('REST', 'ServerIP')
if serverip is None:
return None
serverport = int(config.read('REST', 'ServerPort'))
app = web.Application()
app.router.add_get('/', frontpage)
app.router.add_get('/api/{name}', api)
handler = app.make_handler()
f = loop.create_server(handler, serverip, serverport)
srv = loop.run_until_complete(f)
LOGGER.info('REST serving on %s', srv.sockets[0].getsockname())
app['cocrawler'] = handler, srv, loop
return app
def init_app(event_source, *, loop=None):
"""Init the application server.
Return:
An aiohttp application.
"""
dispatcher = Dispatcher(event_source)
# Schedule the dispatcher
loop.create_task(dispatcher.publish())
app = web.Application(loop=loop)
app['dispatcher'] = dispatcher
app.router.add_get(EVENTS_ENDPOINT, websocket_handler)
return app
def __init__(self, *, config, controller, loop):
self.config = config
self.loop = loop
# start server
logger.info("Starting API server ...")
middlewares = [error_overrides({405: self.handle_405}), auth_middleware]
self.app = web.Application(loop=loop, middlewares=middlewares)
self.app["controller"] = controller
self.app.router.add_get("/", self.index_handler)
self.app.router.add_get("/api/v1/controldata", self.control_get)
self.app.router.add_put("/api/v1/controldata", self.control_put)
self.app.router.add_post("/api/v1/session", self.login, expect_handler=web.Request.json)
self.handler = self.app.make_handler()
f = self.loop.create_server(self.handler, self.config["host"], self.config["port"])
self.srv = loop.run_until_complete(f) if not loop.is_running() else None
logger.info("... API server started up")
def build_app() -> Application:
app = Application()
jinja = Environment(
loader=FileSystemLoader(str(TEMPLATES_DIR)),
enable_async=True,
)
app.router.add_get('/', render_view(jinja, 'index.html'))
app.router.add_get('/html/', render_view(jinja, 'form.html'))
app.router.add_post('/form/', render_view(jinja, 'data.html', process_form))
app.router.add_get('/js/', render_view(jinja, 'js.html'))
app.router.add_get('/cookie/', render_view(jinja, 'data.html', process_cookies))
app.router.add_get('/actions/', render_view(jinja, 'actions.html'))
app.router.add_get('/screenshot/', render_view(jinja, 'screenshot.html'))
app.router.add_get('/file/', render_view(jinja, 'file_form.html'))
app.router.add_post('/file/', render_view(jinja, 'file_data.html', process_file_form))
return app
def main():
app = web.Application()
app.router.add_route('GET', '/count/{key}', count_endpoint)
loop = asyncio.get_event_loop()
handler = app.make_handler()
f = loop.create_server(handler, '0.0.0.0', 8080)
srv = loop.run_until_complete(f)
log.info('serving on %s', srv.sockets[0].getsockname())
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(handler.finish_connections(1.0))
srv.close()
loop.run_until_complete(srv.wait_closed())
loop.run_until_complete(app.finish())
loop.close()
def main():
app = web.Application()
app.router.add_route('GET', r'/count/{key}', count)
app.router.add_route('GET', r'/fibonacci/{n:\d+}', fibonacci)
loop = asyncio.get_event_loop()
handler = app.make_handler()
f = loop.create_server(handler, '0.0.0.0', 8081)
srv = loop.run_until_complete(f)
log.info('serving on %s', srv.sockets[0].getsockname())
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(handler.finish_connections(1.0))
srv.close()
loop.run_until_complete(srv.wait_closed())
loop.run_until_complete(app.finish())
loop.close()
def init(loop):
conf = load_config(str(PROJ_ROOT / 'config' / 'config.yml'))
app = web.Application(loop=loop)
cookie_storage = SimpleCookieStorage()
app = web.Application(middlewares=[session_middleware(cookie_storage)])
mongo = await setup_mongo(app, conf, loop)
setup_jinja(app)
admin = setup_admin(app, mongo)
app.router.add_subapp('/admin', admin)
app.router.add_static('/static', path=str(PROJ_ROOT / 'static'))
# setup views and routes
handler = SiteHandler(mongo)
setup_routes(app, handler, PROJ_ROOT)
host, port = conf['host'], conf['port']
return app, host, port
def init(loop):
# load config from yaml file
conf = load_config(str(PROJ_ROOT / 'config' / 'dev.yml'))
# setup application and extensions
app = web.Application(loop=loop)
pg = await setup_pg(app, conf, loop)
# init modules
aiohttp_jinja2.setup(
app, loader=jinja2.FileSystemLoader(str(TEMPLATES_ROOT)))
admin = setup_admin(app, pg)
app.router.add_subapp('/admin/', admin)
# setup views and routes
handler = SiteHandler(pg)
add_route = app.router.add_route
add_route('GET', '/', handler.index)
app.router.add_static('/static', path=str(PROJ_ROOT / 'static'))
host, port = conf['host'], conf['port']
return app, host, port
def init(loop):
# ????????
await orm.create_pool(loop=loop, host='127.0.0.1', port=3306, user='www', password='www', db='awesome')
# ??app???????????????middlewares
app = web.Application(loop=loop, middlewares=[
logger_factory, auth_factory, response_factory
])
# ???jinja2???????????
init_jinja2(app, filters=dict(datetime=datetime_filter))
# ????????coroweb???
add_routes(app, 'handlers') # handlers???handlers?????handlers.py
add_static(app)
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 9000)
logging.info('server started at http://127.0.0.1:9000...')
return srv
# asyncio????????????????????asyncio?????????eventloop??????????//
# ????????????eventloop??????????IO
# ??????eventloop
# get_event_loop()????python????18.5.2.5
# get_event_loop() => ?????????????????event loop??(????????'asyncio.windows_events._WindowsSelectorEventLoop')???AbstractEventLoop???????????
# ????????????????????get_event_loop()??????None
def setup_app(loop=None, js_profiles_path=None):
app = web.Application(loop=loop, middlewares=[error_middleware])
js_profiles = {}
if js_profiles_path:
root, _, files, _ = next(os.fwalk(js_profiles_path))
js_files = filter(lambda f: os.path.splitext(f)[1] == '.js', files)
_, profile_name = os.path.split(root)
log.debug('adding profile "{}"'.format(profile_name))
js_profiles[profile_name] = ""
for f in js_files:
code = open(os.path.join(root, f)).read()
js_profiles[profile_name] += '{}\n'.format(code)
app.on_shutdown.append(on_shutdown)
c = Chrome(host=HOST, port=PORT)
app['chrome-driver'] = c
app['js-profiles'] = js_profiles
setup_routes(app)
return app