def run_webhook(self, webhook_url, **options):
"""
Convenience method for running bots in webhook mode
:Example:
>>> if __name__ == '__main__':
>>> bot.run_webhook(webhook_url="https://yourserver.com/webhooktoken")
Additional documentation on https://core.telegram.org/bots/api#setwebhook
"""
loop = asyncio.get_event_loop()
loop.run_until_complete(self.set_webhook(webhook_url, **options))
if webhook_url:
url = urlparse(webhook_url)
app = self.create_webhook_app(url.path, loop)
host = os.environ.get('HOST', '0.0.0.0')
port = int(os.environ.get('PORT', 0)) or url.port
web.run_app(app, host=host, port=port)
python类web()的实例源码
def handle_incoming_webhook(self, request):
"""This coroutine handles incoming webhooks: It receives incoming
webhooks and relays the messages to XMPP."""
if request.content_type == 'application/json':
payload = await request.json()
# print(payload)
else:
# TODO: Handle other content types
payload = await request.post()
# Disgard empty messages
if payload['text'] == "":
return aiohttp.web.Response()
token = payload['token']
logging.debug("--> Handling incoming request from token "
"'{}'...".format(token))
username = payload['user_name']
msg = payload['text']
for bridge in self.bridges:
bridge.handle_incoming_webhook(token, username, msg)
return aiohttp.web.Response()
def dps_baglan(request):
link=""
global tox
durum=""
data = yield from request.post()
toxid = data['kdugum']
print (toxid)
port =33999
lport=random.randrange(38000,40000)
komut="./tuntox -i "+str(toxid)+" -L "+str(lport)+":127.0.0.1:"+str(port)
print ("dugumler aras? tunel ac?l?yor.")
#tunel id kaydetmek için-?u an iptal
#open("yenidugum","w").write(toxid)
durum=yield from komutar(komut)
link=lokalhost+":"+str(lport)
return web.json_response(data=link)
def replace(self,
registry: CollectorRegistry) -> aiohttp.web.Response:
'''
``replace`` pushes new values for a group of metrics to the push
gateway.
.. note::
All existing metrics with the same grouping key specified in the
URL will be replaced with the new metrics value.
'''
async with aiohttp.ClientSession(loop=self.loop) as session:
payload = self.formatter.marshall(registry)
resp = await session.put(
self.path, data=payload, headers=self.headers)
await resp.release()
return resp
def __init__(self,
registry: Registry = None,
loop: BaseEventLoop = None) -> None:
'''
Initialise the Prometheus metrics service.
:param registry: A :class:`CollectorRegistry` instance that will
hold all the metrics for this service. If no registry if specified
then the default registry is used.
:param loop: The event loop instance to use. If no loop is specified
then the default event loop will be retrieved.
:raises: Exception if the registry object is not an instance of the
Registry type.
'''
self.loop = loop or asyncio.get_event_loop()
if registry is not None and not isinstance(registry, Registry):
raise Exception(
'registry must be a Registry, got: {}'.format(registry))
self.registry = registry or Registry()
self._svr = None # type: Server
self._svc = None # type: aiohttp.web.Application
self._handler = None # type: aiohttp.web.RequestHandlerFactory
self._https = None # type: bool
def __init__(self, app_or_server, *, scheme=sentinel, host=sentinel,
cookie_jar=None, **kwargs):
if isinstance(app_or_server, BaseTestServer):
if scheme is not sentinel or host is not sentinel:
raise ValueError("scheme and host are mutable exclusive "
"with TestServer parameter")
self._server = app_or_server
elif isinstance(app_or_server, Application):
scheme = "http" if scheme is sentinel else scheme
host = '127.0.0.1' if host is sentinel else host
self._server = TestServer(app_or_server,
scheme=scheme, host=host)
else:
raise TypeError("app_or_server should be either web.Application "
"or TestServer instance")
self._loop = self._server._loop
if cookie_jar is None:
cookie_jar = aiohttp.CookieJar(unsafe=True,
loop=self._loop)
self._session = ClientSession(loop=self._loop,
cookie_jar=cookie_jar,
**kwargs)
self._closed = False
self._responses = []
self._websockets = []
def test_start_runserver_app_instance(tmpworkdir, loop):
mktree(tmpworkdir, {
'app.py': """\
from aiohttp import web
async def hello(request):
return web.Response(text='<h1>hello world</h1>', content_type='text/html')
app = web.Application()
app.router.add_get('/', hello)
"""
})
asyncio.set_event_loop(loop)
aux_app, aux_port, _ = runserver(app_path='app.py', host='foobar.com')
assert isinstance(aux_app, aiohttp.web.Application)
assert aux_port == 8001
assert len(aux_app.on_startup) == 1
assert len(aux_app.on_shutdown) == 1
def test_start_runserver_no_loop_argument(tmpworkdir, loop):
mktree(tmpworkdir, {
'app.py': """\
from aiohttp import web
async def hello(request):
return web.Response(text='<h1>hello world</h1>', content_type='text/html')
def app():
a = web.Application()
a.router.add_get('/', hello)
return a
"""
})
asyncio.set_event_loop(loop)
aux_app, aux_port, _ = runserver(app_path='app.py')
assert isinstance(aux_app, aiohttp.web.Application)
assert aux_port == 8001
assert len(aux_app.on_startup) == 1
assert len(aux_app.on_shutdown) == 1
def test_serve_main_app_app_instance(tmpworkdir, loop, mocker):
mktree(tmpworkdir, {
'app.py': """\
from aiohttp import web
async def hello(request):
return web.Response(text='<h1>hello world</h1>', content_type='text/html')
app = web.Application()
app.router.add_get('/', hello)
"""
})
asyncio.set_event_loop(loop)
mocker.spy(loop, 'create_server')
mock_modify_main_app = mocker.patch('aiohttp_devtools.runserver.serve.modify_main_app')
loop.call_later(0.5, loop.stop)
config = Config(app_path='app.py')
serve_main_app(config, '/dev/tty')
assert loop.is_closed()
loop.create_server.assert_called_with(mock.ANY, '0.0.0.0', 8000, backlog=128)
mock_modify_main_app.assert_called_with(mock.ANY, config)
def jsproxy_get(self, request):
"""Handle GET requests to jsproxy, decoding encrypted query strings."""
loop = asyncio.get_event_loop()
client = aiohttp.ClientSession(loop=loop)
p, k = yield from self.session.tx_decrypt_uri(request.query_string)
LOGGER.info('*** SEND_PLAINTEXT_URL: {}'.format(p))
resp = yield from client.get(self.target_url + request.path)
try:
data = yield from resp.read()
finally:
yield from resp.release()
headers = dict()
for k, v in resp.headers.items():
if k == 'CONTENT-ENCODING':
continue
headers[k] = v
yield from client.close()
return aiohttp.web.Response(status=resp.status, headers=headers, body=data)
def forward_request(self, request):
"""Handle any other requests."""
loop = asyncio.get_event_loop()
client = aiohttp.ClientSession(loop=loop)
resp = yield from client.request(
request.method,
self.target_url + request.path)
try:
data = yield from resp.read()
finally:
yield from resp.release()
headers = dict()
for k, v in resp.headers.items():
if k.lower() == 'content-encoding':
continue
headers[k] = v
yield from client.close()
return aiohttp.web.Response(status=resp.status, headers=headers, body=data)
def webhook_handle(self, request):
"""
aiohttp.web handle for processing web hooks
:Example:
>>> from aiohttp import web
>>> app = web.Application()
>>> app.router.add_route('/webhook')
"""
update = await request.json()
self._process_update(update)
return web.Response()
def create_webhook_app(self, path, loop=None):
"""
Shorthand for creating aiohttp.web.Application with registered webhook hanlde
"""
app = web.Application(loop=loop)
app.router.add_route('POST', path, self.webhook_handle)
return app
def upload(self, files: Sequence[str]):
rqst = Request('POST', '/kernel/{}/upload'.format(self.kernel_id))
rqst.content = [
# name filename file content_type headers
aiohttp.web.FileField(
'src', path, open(path, 'rb'), 'application/octet-stream', None
) for path in files
]
return (await rqst.asend())
# only supported in AsyncKernel
def handle_get_root(
self,
request,
):
return aiohttp.web.HTTPFound('/dashboard/index.html')
def get_special_handler(self, handler_func):
"""Wrap a special handler into a Tornado-compatible handler class."""
class SpecialHandler(tornado.web.RequestHandler):
"""Wrapper handler for special resources.
"""
def get(self):
status_code, reason, headers, content = handler_func()
self.set_status(status_code, reason)
for name, value in headers.items():
self.set_header(name, value)
self.write(content.encode('utf-8'))
return SpecialHandler
def __init__(self, routes, app_name, loop=None):
self.app = aiohttp.web.Application()
self.app_name = app_name
# TODO(seirl): integrate with HANDLED_URLS
for route in routes:
self.app.router.add_route(*route)
self.loop = loop or asyncio.get_event_loop()
def run(self, **kwargs):
aiohttp.web.run_app(self.app, loop=self.loop,
print=lambda *_: None, **kwargs)
def sendMessage(request):
global tox
result=""
fno = request.match_info.get('arkadasno')
text = "Mesaj gönderildi--> {}".format(fno)
no=int(fno)
if tox.friend_get_connection_status(no):
result=tox.friend_send_message(no,0,"selam ben milis p2p servisiyim.")
text+="-"+str(result)
return web.Response(body=text.encode('utf-8'))
def deleteFriend(request):
global tox
result=""
fno = request.match_info.get('arkadasno')
text = "Dugum silindi--> {}".format(fno)
no=int(fno)
result=tox.friend_delete(no)
data = tox.get_savedata()
ProfileHelper.save_profile(data)
text+="-"+str(result)
return web.Response(body=text.encode('utf-8'))
def guncelle(request):
komut="git pull > guncelleme.log"
durum=yield from komutar(komut)
log=open("guncelleme.log","r").read()
loghtml="<html>güncellendi:<p>"+log+"<p><a href='/'>ana sayfa</a> </html>"
return web.Response(body=loghtml.encode(kodsayfasi))
def dugumler(request):
global tox
text=""
for num in tox.self_get_friend_list():
text+=str(num)+"-"+tox.friend_get_name(tox.self_get_friend_list()[num])+"\n"
return web.Response(body=text.encode(kodsayfasi))
def add(self,
registry: CollectorRegistry) -> aiohttp.web.Response:
'''
Add works like replace, but only metrics with the same name as the
newly pushed metrics are replaced.
'''
async with aiohttp.ClientSession(loop=self.loop) as session:
payload = self.formatter.marshall(registry)
resp = await session.post(
self.path, data=payload, headers=self.headers)
await resp.release()
return resp
def delete(self,
registry: CollectorRegistry) -> aiohttp.web.Response:
'''
``delete`` deletes metrics from the push gateway. All metrics with
the grouping key specified in the URL are deleted.
'''
async with aiohttp.ClientSession(loop=self.loop) as session:
payload = self.formatter.marshall(registry)
resp = await session.delete(
self.path, data=payload, headers=self.headers)
await resp.release()
return resp
def accepts(self,
request: aiohttp.web.Request) -> Set[str]:
''' Return a sequence of accepts items in the request headers '''
accepts = set() # type: Set[str]
accept_headers = request.headers.getall(ACCEPT)
logger.debug('accept: {}'.format(accept_headers))
for accept_items in accept_headers:
if ';' in accept_items:
accept_items = [i.strip() for i in accept_items.split(';')]
else:
accept_items = [accept_items]
accepts.update(accept_items)
return accepts
def get_app(self, loop):
"""
This method should be overridden
to return the aiohttp.web.Application
object to test.
:param loop: the event_loop to use
:type loop: asyncio.BaseEventLoop
"""
pass # pragma: no cover
def test_run_app_test_client(tmpworkdir, test_client):
mktree(tmpworkdir, SIMPLE_APP)
config = Config(app_path='app.py')
app_factory = config.import_app_factory()
app = app_factory()
modify_main_app(app, config)
assert isinstance(app, aiohttp.web.Application)
cli = await test_client(app)
r = await cli.get('/')
assert r.status == 200
text = await r.text()
assert text == 'hello world'
def make_svg(self, request):
try:
parts = request.GET["fen"].replace("_", " ").split(" ", 1)
board = chess.BaseBoard("/".join(parts[0].split("/")[0:8]))
except KeyError:
raise aiohttp.web.HTTPBadRequest(reason="fen required")
except ValueError:
raise aiohttp.web.HTTPBadRequest(reason="invalid fen")
try:
size = min(max(int(request.GET.get("size", 360)), 16), 1024)
except ValueError:
raise aiohttp.web.HTTPBadRequest(reason="size is not a number")
try:
uci = request.GET.get("lastMove") or request.GET["lastmove"]
lastmove = chess.Move.from_uci(uci)
except KeyError:
lastmove = None
except ValueError:
raise aiohttp.web.HTTPBadRequest(reason="lastMove is not a valid uci move")
try:
check = chess.SQUARE_NAMES.index(request.GET["check"])
except KeyError:
check = None
except ValueError:
raise aiohttp.web.HTTPBadRequest(reason="check is not a valid square name")
try:
arrows = [arrow(s.strip()) for s in request.GET.get("arrows", "").split(",") if s.strip()]
except ValueError:
raise aiohttp.web.HTTPBadRequest(reason="invalid arrow")
flipped = request.GET.get("orientation", "white") == "black"
return chess.svg.board(board, coordinates=False, flipped=flipped, lastmove=lastmove, check=check, arrows=arrows, size=size, style=self.css)
def render_svg(self, request):
return aiohttp.web.Response(text=self.make_svg(request), content_type="image/svg+xml")
def render_png(self, request):
svg_data = self.make_svg(request)
png_data = cairosvg.svg2png(bytestring=svg_data)
return aiohttp.web.Response(body=png_data, content_type="image/png")