def run_webhook(self, webhook_url, **options):
"""
Convenience method for running bots in webhook mode
:Example:
>>> if __name__ == '__main__':
>>> bot.run_webhook(webhook_url="https://yourserver.com/webhooktoken")
Additional documentation on https://core.telegram.org/bots/api#setwebhook
"""
loop = asyncio.get_event_loop()
loop.run_until_complete(self.set_webhook(webhook_url, **options))
if webhook_url:
url = urlparse(webhook_url)
app = self.create_webhook_app(url.path, loop)
host = os.environ.get('HOST', '0.0.0.0')
port = int(os.environ.get('PORT', 0)) or url.port
web.run_app(app, host=host, port=port)
python类run_app()的实例源码
def run(self, arguments, settings, app):
if arguments.reload:
if not HAS_AUTORELOAD:
sys.stderr.write(
'You must install aiohttp_autoreload for the --reload option to work.\n'
'Use `pip install aiohttp_autoreload` to install aiohttp_autoreload.\n'
)
return 1
aiohttp_autoreload.start()
port = arguments.port or settings.get('address', settings.get('port'))
host = arguments.host or settings.get('host', '0.0.0.0')
try:
web.run_app(app, host=host, port=port, loop=self.get_loop(),
access_log_format=settings.get('access_log_format'))
except asyncio.CancelledError:
# server shut down, we're good here.
pass
def __main():
global __wxSQL
if not myid3.init('config.xml'):
print('?????????')
return
loop = asyncio.get_event_loop()
# __initwxmenu(loop)
__wxSQL = WXSQL()
app = web.Application(loop=loop)
app.router.add_route('GET', '/', getIndex)
app.router.add_route('GET', '/WeiXin', getWX)
app.router.add_route('POST', '/WeiXin', postWX)
web.run_app(app, port=6670) # hu ??6670??
loop.close()
def run(cmdargs=None, logprefix=''):
if cmdargs:
config.args = config.parser.parse_args(args=cmdargs)
else:
config.args = config.parser.parse_args()
logging.basicConfig(
format=logprefix + '{asctime} {levelname} {name}: {message}',
style='{',
level=logging.DEBUG if config.args.verbose else logging.WARNING
)
if not os.path.exists(config.args.queue):
os.makedirs(config.args.queue)
replication.dellog = DeletionLog(os.path.join(config.args.queue, 'deletion.log'))
logger.info('Starting up...')
loop = asyncio.get_event_loop()
start_replication_workers(loop)
app = create_app(loop)
app.on_response_prepare.append(on_response_prepare)
web.run_app(app, port=config.args.port, host=config.args.host, print=logger.info)
logger.info('Starting to tear down workers...')
stop_replication_workers(loop)
logger.info('Goodbye.')
def main():
# enable_request_logging()
parser = argparse.ArgumentParser()
parser.add_argument('module', help='Module used as source for programs')
parser.add_argument('--host')
parser.add_argument('--port', type=int)
args = parser.parse_args()
app_kw = {}
if args.host is not None:
app_kw['host'] = args.host
if args.port is not None:
app_kw['port'] = args.port
app = CCApplication()
app.initialize(args.module)
web.run_app(app, **app_kw)
def main():
loop = asyncio.get_event_loop()
root = logging.getLogger()
if root.handlers:
for handler in root.handlers:
root.removeHandler(handler)
logging.basicConfig(
level=logging.DEBUG,
format='%(levelname)-8s [%(asctime)s.%(msecs)03d] '
'(%(name)s): %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logging.Formatter.converter = time.gmtime
app = loop.run_until_complete(init())
# More useful log format than default
log_format = '%a (%{X-Real-IP}i) %t "%r" %s %b %Tf ' \
'"%{Referrer}i" "%{User-Agent}i"'
web.run_app(app, access_log_format=log_format)
def run_server_idsfree(config: IdsFreeRunServeRemoveModel):
"""
This functions does:
- Check that remote host has required software and versions
- Build the environment and launch the attacks
- Load raw results, transform it and return in selected format as string
It returns the name of cyphered network created.
"""
assert isinstance(config, IdsFreeRunServeRemoveModel)
loop = asyncio.get_event_loop()
# Check remote Docker version
if not config.skip_check_requisites:
loop.run_until_complete(check_remote_requisites(config))
app['IDSFREE_CONFIG'] = config
app['GLOBAL_CONFIG'] = config_to_dict(config)
# Launch attacks
web.run_app(app,
host=config.listen_addr,
port=int(config.listen_port))
def main(filename = 'postgresql2websocket.conf'):
config = configparser.ConfigParser()
if not config.read(filename):
print("Unable to read %s" % filename)
exit(1)
loop = asyncio.get_event_loop()
app = loop.run_until_complete(init_app(config))
app['websockets'] = []
app.on_shutdown.append(on_shutdown)
try:
web.run_app(app,
host = config.get('web', 'host'),
port = config.getint('web', 'port'),
)
except KeyboardInterrupt:
pass
finally:
loop.close()
def run(self, host, port):
from .view import AbstractSQLView
self.route.bind()
for _, cls in self.route.views:
if issubclass(cls, AbstractSQLView):
self.tables[cls.table_name] = cls
self.permissions[cls.table_name] = cls.permission
cls.permission.app = self
# Configure default CORS settings.
cors = aiohttp_cors.setup(self._raw_app, defaults={
"*": aiohttp_cors.ResourceOptions(
allow_credentials=True,
expose_headers="*",
allow_headers="*",
)
})
# Configure CORS on all routes.
for r in list(self._raw_app.router.routes()):
if type(r.resource) != StaticResource:
cors.add(r)
web.run_app(host=host, port=port, app=self._raw_app)
def run(self):
try:
subcommand = self.argv[1]
except IndexError:
subcommand = 'help'
if subcommand == 'runserver':
try:
host, port = self.argv[2].split(':')
port = int(port)
if not port:
port = 8080
except (IndexError, ValueError):
print('WARNING! Incorrect host:port - using default settings.')
host = '0.0.0.0'
port = 8080
web.run_app(self.app, host=host, port=port, loop=self.app.loop)
elif subcommand == 'routes':
from djaio.ext.routes import print_routes
print_routes(self.app)
elif subcommand == 'help':
print('=' * 60)
print('Usage: {} <command> <options>'.format(self.argv[0].rsplit('/', 1)[-1]))
print('Available commands:')
print(' * help - shows this message')
print(' * runserver host:port - runs web server')
for key, comm_obj in self.app.commands.items():
print(' * {} <options> - {}'.format(key, comm_obj.get('description')))
print('=' * 60)
elif subcommand == 'shell':
import IPython
IPython.start_ipython(argv=[])
elif subcommand in self.app.commands:
_args = self.argv[2:]
_coro = self.app.commands[subcommand].get('func')
self.app.loop.run_until_complete(_coro(self.app, *_args))
def run(self) -> None:
web.run_app(self.app, host=self.host, port=self.port, loop=self.loop)
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('--occlude-root', action='store_true',
help='Add an explicit handler function for /.')
args = parser.parse_args()
app = web.Application(middlewares=[IndexMiddleware()])
app.router.add_route('GET', '/test/', make_handler('test'))
if args.occlude_root:
app.router.add_route('GET', '/', make_handler('I occluded /index.html.'))
app.router.add_static('/', APP_PATH / 'static')
web.run_app(app)
def main():
application = Application(
[HelloWorldService],
tns='aiohttp_spyne.examples.hello',
in_protocol=Soap11(validator='lxml'),
out_protocol=Soap11())
spyne_app = AioApplication(application)
spyne_app['test_text'] = "Hello, %s"
app = web.Application()
app.add_subapp('/say_hello/', spyne_app)
web.run_app(app, port=8080)
def spyne_app_process():
class TestService(ServiceBase):
@rpc(Unicode, _returns=Unicode)
def ping(self, data):
return data
application = Application(
[TestService],
tns='aiohttp_spyne.tests.test',
in_protocol=Soap11(validator='lxml'),
out_protocol=Soap11())
spyne_app = AioApplication(application, client_max_size=1024 ** 2 * 2)
web.run_app(spyne_app, host="0.0.0.0", port=PORT)
def serve(serve_root, subdirectory, port, asset_file=None):
app = create_app(serve_root, subdirectory=subdirectory, asset_file=asset_file)
# TODO in theory file watching could be replaced by accessing tool_chain.source_map
observer = Observer()
event_handler = DevServerEventEventHandler(app, serve_root)
observer.schedule(event_handler, str(serve_root), recursive=True)
observer.start()
logger.info('Started dev server at http://localhost:%s, use Ctrl+C to quit', port)
try:
web.run_app(app, port=port, print=lambda msg: None)
except KeyboardInterrupt:
pass
finally:
observer.stop()
observer.join()
def run(self, arguments, settings, app):
web.run_app(app, port=settings['address'])
def main():
if not options.syslog:
coloredlogs.install(level=logging.DEBUG if options.debug else logging.INFO,
fmt='[%(levelname).1s %(asctime)s %(module)s:%(lineno)d] %(message)s',
datefmt='%y%m%d %H:%M:%S')
else:
syslog.enable_system_logging(level=logging.DEBUG if options.debug else logging.INFO,
fmt='vj4[%(process)d] %(programname)s %(levelname).1s %(message)s')
logging.getLogger('sockjs').setLevel(logging.WARNING)
url = urllib.parse.urlparse(options.listen)
if url.scheme == 'http':
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
host, port_str = url.netloc.rsplit(':', 1)
sock.bind((host, int(port_str)))
elif url.scheme == 'unix':
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
os.remove(url.path)
except FileNotFoundError:
pass
sock.bind(url.path)
else:
_logger.error('Invalid listening scheme %s', url.scheme)
return 1
for i in range(1, options.prefork):
pid = os.fork()
if not pid:
break
else:
atexit.register(lambda: os.kill(pid, signal.SIGTERM))
web.run_app(app.Application(), sock=sock, access_log=None, shutdown_timeout=0)
def _core(socket, backend_host, backend_watchdog,
debug, identity, identity_key, i_am_john, cache_size):
app = unix_socket_app.UnixSocketApplication()
components = core_components_factory(app, backend_host, backend_watchdog, cache_size)
dispatcher = components.get_dispatcher()
register_core_api(app, dispatcher)
# TODO: remove me once RSA key loading and backend handling are easier
if i_am_john:
async def load_identity(app):
from parsec.core.identity import EIdentityLoad
eff = Effect(EIdentityLoad(JOHN_DOE_IDENTITY, JOHN_DOE_PRIVATE_KEY))
await asyncio_perform(dispatcher, eff)
print('Welcome back M. Doe')
app.on_startup.append(load_identity)
elif identity:
async def load_identity(app):
from parsec.core.identity import EIdentityLoad, EIdentityLogin
if identity_key:
print("Reading %s's key from `%s`" % (identity, identity_key))
password = getpass()
eff = Effect(EIdentityLoad(identity, identity_key.read(), password))
await asyncio_perform(dispatcher, eff)
else:
print("Fetching %s's key from backend privkey store." % (identity))
password = getpass()
eff = Effect(EIdentityLogin(identity, password))
await asyncio_perform(dispatcher, eff)
print('Connected as %s' % identity)
app.on_startup.append(load_identity)
if debug:
loop = asyncio.get_event_loop()
loop.set_debug(True)
else:
logger_stream.level = WARNING
print('Starting parsec core on %s (connecting to backend %s)' % (socket, backend_host))
unix_socket_app.run_app(app, path=socket)
print('Bye ;-)')
def _backend(host, port, pubkeys, no_client_auth, store, block_store, debug):
host = host or environ.get('HOST', 'localhost')
port = port or int(environ.get('PORT', 6777))
app = web.Application()
if not block_store:
block_store = '/blockstore'
register_in_memory_block_store_api(app, prefix=block_store)
if store:
if store.startswith('postgres://'):
store_type = 'PostgreSQL'
backend_components = postgresql_components_factory(app, store, block_store)
else:
raise SystemExit('Unknown store `%s` (should be a postgresql db url).' % store)
else:
store_type = 'mocked in memory'
backend_components = mocked_components_factory(block_store)
dispatcher = backend_components.get_dispatcher()
register_backend_api(app, dispatcher)
register_start_api(app, dispatcher)
if debug:
loop = asyncio.get_event_loop()
loop.set_debug(True)
else:
logger_stream.level = WARNING
# TODO: remove me once RSA key loading and backend handling are easier
async def insert_john(app):
from parsec.backend.pubkey import EPubKeyGet, EPubKeyAdd
dispatcher = backend_components.get_dispatcher()
try:
await asyncio_perform(dispatcher, Effect(EPubKeyGet(JOHN_DOE_IDENTITY)))
except PubKeyNotFound:
await asyncio_perform(
dispatcher, Effect(EPubKeyAdd(JOHN_DOE_IDENTITY, JOHN_DOE_PUBLIC_KEY)))
app.on_startup.append(insert_john)
print('Starting parsec backend on %s:%s with store %s' % (host, port, store_type))
web.run_app(app, host=host, port=port)
print('Bye ;-)')
def handle(self, *args, **kwargs):
LOGGER.addHandler(logging.StreamHandler())
LOGGER.setLevel(logging.DEBUG)
loop = asyncio.get_event_loop()
app = web.Application(loop=loop)
app.router.add_get('/file/{id}/', download)
app.router.add_put('/file/{id}/', upload)
web.run_app(app, host='localhost', port=8080)
def handle(self, *args, **kwargs):
LOGGER.addHandler(logging.StreamHandler())
LOGGER.setLevel(logging.DEBUG)
loop = asyncio.get_event_loop()
app = web.Application(loop=loop)
app.router.add_get('/file/{id}/', download)
app.router.add_put('/file/{id}/', upload)
web.run_app(app, host='localhost', port=8080)
def handle(self, *args, **kwargs):
LOGGER.addHandler(logging.StreamHandler())
LOGGER.setLevel(logging.DEBUG)
loop = asyncio.get_event_loop()
app = web.Application(loop=loop)
app.router.add_get('/file/{id}/', download)
app.router.add_put('/file/{id}/', upload)
web.run_app(app, host='localhost', port=8080)
def run_webserver(app, port=PORT):
aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader(os.curdir))
app.router.add_route('GET', '/', handler)
app.router.add_route('GET', '/{max_pkgs}', handler)
app.router.add_static('/static/', path='./static')
web.run_app(app, port=PORT)
def main():
web.run_app(get_app(), port=PORT)
def run(self, host: str = '0.0.0.0', port: int = 8080):
"""
Start sirbot
Configure sirbot and start the aiohttp.web.Application
Args:
host (str): host
port (int): port
"""
self._loop.run_until_complete(self._configure_plugins())
web.run_app(self._app, host=host, port=port) # pragma: no cover
def main():
# init logging
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
app, host, port = loop.run_until_complete(init(loop))
web.run_app(app, host=host, port=port)
def run_forever(self, print=print, **kwargs):
kwargs['host'] = self.config.http.host
kwargs['port'] = self.config.http.port
kwargs['print'] = print
web.run_app(self, **kwargs)
def serve(opts):
if opts.mock:
bot = MockEiBotBoard()
else:
bot = EiBotBoard.find()
try:
app = make_app(bot)
web.run_app(app, port=opts.port)
finally:
bot.close()
def start(locust, options):
# wsgi.WSGIServer((options.web_host, options.port), app, log=None).serve_forever()
setup_app(app)
web.run_app(app)
def main():
# init logging
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
app, host, port = loop.run_until_complete(init(loop))
web.run_app(app, host=host, port=port)