def validate(self, data, request):
# We need only the body
data = data[data.find('\r\n\r\n') + 4:]
# Build the request for the validator service, using the
# original request as the base
headers = request.received_headers
data, contentType = encodeFormData(self.arg, data)
headers['content-type'] = contentType
headers.pop('cookie', None)
headers.pop('referer', None)
headers.pop('host', None)
return client.getPage(
self.uri, method='POST', headers=headers, postdata=data
)
python类getPage()的实例源码
def doRequest(self, service, body=missing, type=pyamf.AMF3, raw=False,
decode=True):
if not raw:
if body is self.missing:
body = []
else:
body = [body]
env = remoting.Envelope(type)
request = remoting.Request(service, body=body)
env['/1'] = request
body = remoting.encode(env).getvalue()
d = self.getPage(body)
if decode:
d.addCallback(lambda result: remoting.decode(result))
return d
def _next_request(self):
while self.start_requests:
try:
request = next(self.start_requests)
except StopIteration:
self.start_requests = None
else:
self.scheduler.enqueue_request(request)
while len(self.inprogress) < 5 and self.scheduler.size() > 0: # ??????5
request = self.scheduler.next_request()
if not request:
break
self.inprogress.add(request)
d = getPage(bytes(request.url, encoding='utf-8'))
d.addBoth(self._handle_downloader_output, request)
d.addBoth(lambda x, req: self.inprogress.remove(req), request)
d.addBoth(lambda x: self._next_request())
if len(self.inprogress) == 0 and self.scheduler.size() == 0:
self._closewait.callback(None)
def _get_page(self, method, url, data=None):
try:
response = yield getPage(url, method=method,
headers={b'Content-Type': b'application/json'},
postdata=data)
except Error as e:
raise RestError(int(e.status), e.response)
except Exception as e:
raise RestError(-1, str(e))
try:
obj = json.loads(response.decode('utf-8'))
except Exception as e:
raise RestError(-1, response)
defer.returnValue(obj)
# rpc wrapper, only usable for QRest class members
def test_afterFoundGet(self):
"""
Enabling unsafe redirection behaviour overwrites the method of
redirected C{POST} requests with C{GET}.
"""
url = self.getURL('extendedRedirect?code=302')
f = client.HTTPClientFactory(url, followRedirect=True, method=b"POST")
self.assertFalse(
f.afterFoundGet,
"By default, afterFoundGet must be disabled")
def gotPage(page):
self.assertEqual(
self.extendedRedirect.lastMethod,
b"GET",
"With afterFoundGet, the HTTP method must change to GET")
d = client.getPage(
url, followRedirect=True, afterFoundGet=True, method=b"POST")
d.addCallback(gotPage)
return d
def test_downloadAfterFoundGet(self):
"""
Passing C{True} for C{afterFoundGet} to L{client.downloadPage} invokes
the same kind of redirect handling as passing that argument to
L{client.getPage} invokes.
"""
url = self.getURL('extendedRedirect?code=302')
def gotPage(page):
self.assertEqual(
self.extendedRedirect.lastMethod,
b"GET",
"With afterFoundGet, the HTTP method must change to GET")
d = client.downloadPage(url, "downloadTemp",
followRedirect=True, afterFoundGet=True, method=b"POST")
d.addCallback(gotPage)
return d
def test_getPageDeprecated(self):
"""
L{client.getPage} is deprecated.
"""
port = reactor.listenTCP(
0, server.Site(Data(b'', 'text/plain')), interface="127.0.0.1")
portno = port.getHost().port
self.addCleanup(port.stopListening)
url = networkString("http://127.0.0.1:%d" % (portno,))
d = client.getPage(url)
warningInfo = self.flushWarnings([self.test_getPageDeprecated])
self.assertEqual(len(warningInfo), 1)
self.assertEqual(warningInfo[0]['category'], DeprecationWarning)
self.assertEqual(
warningInfo[0]['message'],
"twisted.web.client.getPage was deprecated in "
"Twisted 16.7.0; please use https://pypi.org/project/treq/ or twisted.web.client.Agent instead")
return d.addErrback(lambda _: None)
def _get_page(self, url):
def got_page(content):
logging.debug("Cookies: %r" % self._cookies)
return content
def got_page_error(e, url):
logging.error(url)
log.err(e)
return e
url = url.encode("utf-8")
if 'HLS_RESET_COOKIES' in os.environ.keys():
self._cookies = {}
headers = {}
if self.referer:
headers['Referer'] = self.referer
d = client.getPage(url, cookies=self._cookies, headers=headers)
d.addCallback(got_page)
d.addErrback(got_page_error, url)
return d
def login(self):
print("[GoogleReader] login")
if not self.username or not self.password:
return
headers = {'Content-Type': 'application/x-www-form-urlencoded; charset=utf-8'}
data = {
'service': 'reader',
'Email': self.username,
'Passwd': self.password,
'source': 'enigma2-plugin-extensions-simplerss',
'continue': 'http://www.google.com/',
}
defer = Deferred()
getPage('https://www.google.com/accounts/ClientLogin', method = 'POST', headers = headers, postdata = urlencode(data)).addCallback(self.loginFinished, defer).addErrback(self.loginFailed, defer)
return defer
def _login(self, callback=None):
self.debug("")
if self._callScreen:
self._callScreen.updateStatus(_("login"))
if self._md5LoginTimestamp and ((time.time() - self._md5LoginTimestamp) < float(9.5 * 60)) and self._md5Sid != '0000000000000000': # new login after 9.5 minutes inactivity
self.debug("[FritzCallFBF] renew timestamp: " + time.ctime(self._md5LoginTimestamp) + " time: " + time.ctime())
self._md5LoginTimestamp = time.time()
callback(None)
else:
self.debug("not logged in or outdated login")
# http://fritz.box/cgi-bin/webcm?getpage=../html/login_sid.xml
parms = urlencode({'getpage': '../html/login_sid.xml'})
url = "http://%s/cgi-bin/webcm" % (config.plugins.FritzCall.hostname.value)
self.debug("'" + url + "' parms: '" + parms + "'")
getPage(url,
method="POST",
headers={'Content-Type': "application/x-www-form-urlencoded", 'Content-Length': str(len(parms))},
postdata=parms).addCallback(lambda x: self._md5Login(callback, x)).addErrback(lambda x: self._oldLogin(callback, x))
def getCalls(self, callScreen, callback, callType):
#
# call sequence must be:
# - login
# - getPage -> _gotPageLogin
# - loginCallback (_getCalls)
# - getPage -> _getCalls1
self.debug("")
self._callScreen = callScreen
self._callType = callType
if (time.time() - self._callTimestamp) > 180:
self.debug("outdated data, login and get new ones: " + time.ctime(self._callTimestamp) + " time: " + time.ctime())
self._callTimestamp = time.time()
self._login(lambda x: self._getCalls(callback, x))
elif not self._callList:
self.debug("time is ok, but no callList")
self._getCalls1(callback)
else:
self.debug("time is ok, callList is ok")
self._gotPageCalls(callback)
def _selectFritzBoxPhonebook(self, html):
# first check for login error
if html:
start = html.find('<p class="errorMessage">FEHLER: ')
if start != -1:
start = start + len('<p class="errorMessage">FEHLER: ')
self._errorLoad('Login: ' + html[start, html.find('</p>', start)])
return
# look for phonebook called dreambox or Dreambox
parms = urlencode({
'sid':self._md5Sid,
})
url = "http://%s/fon_num/fonbook_select.lua" % (config.plugins.FritzCall.hostname.value)
self.debug("[FritzCallFBF_05_27] _selectPhonebook: '" + url + "' parms: '" + parms + "'")
getPage(url,
method = "POST",
agent = USERAGENT,
headers = {'Content-Type': "application/x-www-form-urlencoded", 'Content-Length': str(len(parms))},
postdata = parms).addCallback(self._loadFritzBoxPhonebook).addErrback(self._errorLoad)
def _loadFritzBoxPhonebook(self, html):
# Firmware 05.27 onwards
# look for phonebook called [dD]reambox and get bookid
found = re.match(r'.*<label for="uiBookid:([\d]+)">' + config.plugins.FritzCall.fritzphonebookName.value, html, re.S)
if found:
bookid = found.group(1)
self.debug("[FritzCallFBF_05_27] _loadFritzBoxPhonebook: found dreambox phonebook %s", bookid)
else:
bookid = 1
# http://192.168.178.1/fon_num/fonbook_list.lua?sid=2faec13b0000f3a2
parms = urlencode({
'bookid':bookid,
'sid':self._md5Sid,
})
url = "http://%s/fon_num/fonbook_list.lua" % (config.plugins.FritzCall.hostname.value)
self.debug("[FritzCallFBF_05_27] _loadFritzBoxPhonebookNew: '" + url + "' parms: '" + parms + "'")
getPage(url,
method = "POST",
agent = USERAGENT,
headers = {'Content-Type': "application/x-www-form-urlencoded", 'Content-Length': str(len(parms))},
postdata = parms).addCallback(self._parseFritzBoxPhonebook).addErrback(self._errorLoad)
def _getInfo(self, callback, html):
self.debug("[FritzCallFBF_05_27] _getInfo: verify login")
if html:
start = html.find('<p class="errorMessage">FEHLER: ')
if start != -1:
start = start + len('<p class="errorMessage">FEHLER: ')
self._errorGetInfo('Login: ' + html[start, html.find('</p>', start)])
return
self._readBlacklist()
url = "http://%s/home/home.lua" % config.plugins.FritzCall.hostname.value
parms = urlencode({
'sid':self._md5Sid
})
self.debug("[FritzCallFBF_05_27] _getInfo url: '" + url + "' parms: '" + parms + "'")
getPage(url,
method = "POST",
agent = USERAGENT,
headers = {
'Content-Type': "application/x-www-form-urlencoded",
'Content-Length': str(len(parms))},
postdata = parms).addCallback(lambda x:self._okGetInfo(callback, x)).addErrback(self._errorGetInfo)
def _loadFritzBoxPhonebook(self, html, md5Sid):
# Firmware 05.27 onwards
# look for phonebook called [dD]reambox and get bookid
found = re.match(r'.*<label for="uiBookid:([\d]+)">' + config.plugins.FritzCall.fritzphonebookName.value, html, re.S)
if found:
bookid = found.group(1)
else:
bookid = 1
self.debug("phonebook %s", bookid)
# http://192.168.178.1/fon_num/fonbook_list.lua?sid=2faec13b0000f3a2
parms = urlencode({
'bookid':bookid,
'sid':md5Sid,
'cancel':'',
'apply':'uiApply',
})
url = "http://%s/fon_num/fonbook_select.lua" % (config.plugins.FritzCall.hostname.value)
self.debug(url + "?" + parms)
getPage(url,
method = "POST",
agent = USERAGENT,
headers = {'Content-Type': "application/x-www-form-urlencoded", 'Content-Length': str(len(parms))},
postdata = parms).addCallback(self._parseFritzBoxPhonebook, md5Sid).addErrback(self._errorLoad, md5Sid)
def _dial(self, number, md5Sid):
url = "http://%s/cgi-bin/webcm" % config.plugins.FritzCall.hostname.value
parms = urlencode({
'getpage':'../html/de/menus/menu2.html',
'var:pagename':'fonbuch',
'var:menu':'home',
'telcfg:settings/UseClickToDial':'1',
'telcfg:settings/DialPort':config.plugins.FritzCall.extension.value,
'telcfg:command/Dial':number,
'sid':md5Sid
})
self.info("url: " + url + "?" + parms)
getPage(url,
method = "POST",
agent = USERAGENT,
headers = {
'Content-Type': "application/x-www-form-urlencoded",
'Content-Length': str(len(parms))},
postdata = parms).addCallback(self._okDial, md5Sid).addErrback(self._errorDial, md5Sid)
def _getInfo(self, callback, md5Sid):
self.debug("verify login")
self._login(self._readBlacklist)
url = "http://%s/home/home.lua" % config.plugins.FritzCall.hostname.value
parms = urlencode({
'sid':md5Sid
})
self.debug("url: " + url + "?" + parms)
getPage(url,
method = "POST",
agent = USERAGENT,
headers = {
'Content-Type': "application/x-www-form-urlencoded",
'Content-Length': str(len(parms))},
postdata = parms).addCallback(lambda x:self._okGetInfo(callback, x, md5Sid)).addErrback(self._errorGetInfo, md5Sid)
def _reset(self, md5Sid):
if self._callScreen:
self._callScreen.close()
url = "http://%s/system/reboot.lua" % config.plugins.FritzCall.hostname.value
parms = urlencode({
'reboot':'',
'sid':md5Sid
})
self.debug("url: " + url + "?" + parms)
getPage(url,
method = "POST",
agent = USERAGENT,
headers = {
'Content-Type': "application/x-www-form-urlencoded"},
postdata = parms).addCallback(self._okReset, md5Sid).addErrback(self._errorReset, md5Sid)
def _getInfo(self, callback, md5Sid):
self.debug("verify login")
self._login(self._readBlacklist)
url = "http://%s/data.lua" % config.plugins.FritzCall.hostname.value
parms = urlencode({
'sid':md5Sid,
'page':'overview',
'type':'all'
})
self.debug("url: " + url + "?" + parms)
getPage(url,
method = "POST",
agent = USERAGENT,
headers = {
'Content-Type': "application/x-www-form-urlencoded",
'Content-Length': str(len(parms))},
postdata = parms).addCallback(lambda x:self._okGetInfo(callback, x, md5Sid)).addErrback(self._errorGetInfo, md5Sid)
def _readBlacklist(self, md5Sid):
# http://fritz.box/cgi-bin/webcm?getpage=../html/de/menus/menu2.html&var:lang=de&var:menu=fon&var:pagename=sperre
# https://217.245.196.140:699/data.lua?xhr=1&sid=e8fcf4f9a9186070&lang=de&no_sidrenew=&page=callLock
url = "http://%s/data.lua" % config.plugins.FritzCall.hostname.value
parms = urlencode({
'sid':md5Sid,
'page':'callLock'
})
self.debug("url: " + url + "?" + parms)
getPage(url,
method = "POST",
agent = USERAGENT,
headers = {
'Content-Type': "application/x-www-form-urlencoded",
'Content-Length': str(len(parms))},
postdata = parms).addCallback(self._okBlacklist, md5Sid).addErrback(self._errorBlacklist, md5Sid)
def publicGetPage(*args, **kwargs):
d = getPage(*args, **kwargs)
return PublicDeferred(d)
def getPage(self, data=None, **kwargs):
kwargs.setdefault('method', 'POST')
kwargs['postdata'] = data
return client.getPage("http://127.0.0.1:%d/" % (self.port,), **kwargs)
def test_invalid_method(self):
"""
A classic GET on the xml server should return a NOT_ALLOWED.
"""
d = self.getPage(method='GET')
d = self.assertFailure(d, error.Error)
d.addCallback(
lambda exc: self.assertEqual(int(exc.args[0]), http.NOT_ALLOWED))
return d
def test_bad_content(self):
d = self.getPage('spamandeggs')
d = self.assertFailure(d, error.Error)
d.addCallback(
lambda exc: self.assertEqual(int(exc.args[0]), http.BAD_REQUEST))
return d
def callRemote(self, method, *args, **kwargs):
payload = SOAPpy.buildSOAP(args=args, kw=kwargs, method=method,
header=self.header, namespace=self.namespace)
return client.getPage(self.url, postdata=payload, method="POST",
headers={'content-type': 'text/xml',
'SOAPAction': method}
).addCallback(self._cbGotResult)
def testCGI(self):
cgiFilename = os.path.abspath(self.mktemp())
cgiFile = file(cgiFilename, 'wt')
cgiFile.write(DUMMY_CGI)
cgiFile.close()
portnum = self.startServer(cgiFilename)
d = client.getPage("http://localhost:%d/cgi" % portnum)
d.addCallback(self._testCGI_1)
return d
def testReadEmptyInput(self):
cgiFilename = os.path.abspath(self.mktemp())
cgiFile = file(cgiFilename, 'wt')
cgiFile.write(READINPUT_CGI)
cgiFile.close()
portnum = self.startServer(cgiFilename)
d = client.getPage("http://localhost:%d/cgi" % portnum)
d.addCallback(self._testReadEmptyInput_1)
return d
def testReadInput(self):
cgiFilename = os.path.abspath(self.mktemp())
cgiFile = file(cgiFilename, 'wt')
cgiFile.write(READINPUT_CGI)
cgiFile.close()
portnum = self.startServer(cgiFilename)
d = client.getPage("http://localhost:%d/cgi" % portnum,
method="POST",
postdata="Here is your stdin")
d.addCallback(self._testReadInput_1)
return d
def testReadAllInput(self):
cgiFilename = os.path.abspath(self.mktemp())
cgiFile = file(cgiFilename, 'wt')
cgiFile.write(READALLINPUT_CGI)
cgiFile.close()
portnum = self.startServer(cgiFilename)
d = client.getPage("http://localhost:%d/cgi" % portnum,
method="POST",
postdata="Here is your stdin")
d.addCallback(self._testReadAllInput_1)
return d
def testPayload(self):
s = "0123456789" * 10
return client.getPage(self.getURL("payload"), postdata=s
).addCallback(self.assertEquals, s
)