def getClientIP(request, useForwardedHeader=False):
"""Get the client's IP address from the ``'X-Forwarded-For:'``
header, or from the :api:`request <twisted.web.server.Request>`.
:type request: :api:`twisted.web.http.Request`
:param request: A ``Request`` for a :api:`twisted.web.resource.Resource`.
:param bool useForwardedHeader: If ``True``, attempt to get the client's
IP address from the ``'X-Forwarded-For:'`` header.
:rtype: ``None`` or :any:`str`
:returns: The client's IP address, if it was obtainable.
"""
ip = None
if useForwardedHeader:
header = request.getHeader("X-Forwarded-For")
if header:
ip = header.split(",")[-1].strip()
if not isIPAddress(ip):
log.msg("Got weird X-Forwarded-For value %r" % header)
ip = None
else:
ip = request.getClientIP()
return ip
python类Request()的实例源码
def getResourceFor(self, request):
"""Get a resource for a request.
This iterates through the resource heirarchy, calling
getChildWithDefault on each resource it finds for a path element,
stopping when it hits an element where isLeaf is true.
"""
log.msg("Request for resource at %s" % request.path)
request.site = self
# Sitepath is used to determine cookie names between distributed
# servers and disconnected sites.
request.sitepath = copy.copy(request.prepath)
child = resource.getChildForRequest(self.resource, request)
log.msg(child)
return child
def setup(self):
self.empty_resp = ''
if sys.version_info[0] >= 3:
self.empty_resp = b''
self.mock_redir = Mock(spec_set=VaultRedirector)
type(self.mock_redir).active_node_ip_port = 'mynode:1234'
type(self.mock_redir).last_poll_time = 'myPollTime'
type(self.mock_redir).log_enabled = True
type(self.mock_redir).consul_scheme = 'http'
type(self.mock_redir).consul_host_port = 'consul:5678'
self.cls = VaultRedirectorSite(self.mock_redir)
# mock client request
self.mock_request = Mock(spec_set=Request)
type(self.mock_request).method = 'GET'
client_addr = IPv4Address('TCP', '1.2.3.4', 12345)
type(self.mock_request).client = client_addr
type(self.mock_request).clientproto = 'HTTP/1.1'
headers = Headers()
headers.setRawHeaders('date', ['Mon, 11 Apr 2016 15:26:42 GMT'])
headers.setRawHeaders('server', ['TwistedWeb/16.1.0'])
type(self.mock_request).responseHeaders = headers
type(self.mock_request).queued = 0
def test_headersAndCode(self):
"""
L{redirectTo} will set the C{Location} and C{Content-Type} headers on
its request, and set the response code to C{FOUND}, so the browser will
be redirected.
"""
request = Request(DummyChannel(), True)
request.method = b'GET'
targetURL = b"http://target.example.com/4321"
redirectTo(targetURL, request)
self.assertEqual(request.code, FOUND)
self.assertEqual(
request.responseHeaders.getRawHeaders(b'location'), [targetURL])
self.assertEqual(
request.responseHeaders.getRawHeaders(b'content-type'),
[b'text/html; charset=utf-8'])
def test_processingFailedNoTraceback(self):
"""
L{Request.processingFailed} when the site has C{displayTracebacks} set
to C{False} does not write out the failure, but give a generic error
message.
"""
d = DummyChannel()
request = server.Request(d, 1)
request.site = server.Site(resource.Resource())
request.site.displayTracebacks = False
fail = failure.Failure(Exception("Oh no!"))
request.processingFailed(fail)
self.assertNotIn(b"Oh no!", request.transport.written.getvalue())
self.assertIn(
b"Processing Failed", request.transport.written.getvalue()
)
# Since we didn't "handle" the exception, flush it to prevent a test
# failure
self.assertEqual(1, len(self.flushLoggedErrors()))
def test_processingFailedDisplayTraceback(self):
"""
L{Request.processingFailed} when the site has C{displayTracebacks} set
to C{True} writes out the failure.
"""
d = DummyChannel()
request = server.Request(d, 1)
request.site = server.Site(resource.Resource())
request.site.displayTracebacks = True
fail = failure.Failure(Exception("Oh no!"))
request.processingFailed(fail)
self.assertIn(b"Oh no!", request.transport.written.getvalue())
# Since we didn't "handle" the exception, flush it to prevent a test
# failure
self.assertEqual(1, len(self.flushLoggedErrors()))
def test_processingFailedDisplayTracebackHandlesUnicode(self):
"""
L{Request.processingFailed} when the site has C{displayTracebacks} set
to C{True} writes out the failure, making UTF-8 items into HTML
entities.
"""
d = DummyChannel()
request = server.Request(d, 1)
request.site = server.Site(resource.Resource())
request.site.displayTracebacks = True
fail = failure.Failure(Exception(u"\u2603"))
request.processingFailed(fail)
self.assertIn(b"☃", request.transport.written.getvalue())
# On some platforms, we get a UnicodeError when trying to
# display the Failure with twisted.python.log because
# the default encoding cannot display u"\u2603". Windows for example
# uses a default encodig of cp437 which does not support u"\u2603".
self.flushLoggedErrors(UnicodeError)
# Since we didn't "handle" the exception, flush it to prevent a test
# failure
self.assertEqual(1, len(self.flushLoggedErrors()))
def test_sessionDifferentFromSecureSession(self):
"""
L{Request.session} and L{Request.secure_session} should be two separate
sessions with unique ids and different cookies.
"""
d = DummyChannel()
d.transport = DummyChannel.SSL()
request = server.Request(d, 1)
request.site = server.Site(resource.Resource())
request.sitepath = []
secureSession = request.getSession()
self.assertIsNotNone(secureSession)
self.addCleanup(secureSession.expire)
self.assertEqual(request.cookies[0].split(b"=")[0],
b"TWISTED_SECURE_SESSION")
session = request.getSession(forceNotSecure=True)
self.assertIsNotNone(session)
self.assertEqual(request.cookies[1].split(b"=")[0], b"TWISTED_SESSION")
self.addCleanup(session.expire)
self.assertNotEqual(session.uid, secureSession.uid)
def test_retrieveNonExistentSession(self):
"""
L{Request.getSession} generates a new session if the relevant cookie is
set in the incoming request.
"""
site = server.Site(resource.Resource())
d = DummyChannel()
request = server.Request(d, 1)
request.site = site
request.sitepath = []
request.received_cookies[b'TWISTED_SESSION'] = b"does-not-exist"
session = request.getSession()
self.assertIsNotNone(session)
self.addCleanup(session.expire)
self.assertTrue(request.cookies[0].startswith(b'TWISTED_SESSION='))
# It should be a new session ID.
self.assertNotIn(b"does-not-exist", request.cookies[0])
def test_interfaces(self):
"""
L{server.GzipEncoderFactory} implements the
L{iweb._IRequestEncoderFactory} and its C{encoderForRequest} returns an
instance of L{server._GzipEncoder} which implements
L{iweb._IRequestEncoder}.
"""
request = server.Request(self.channel, False)
request.gotLength(0)
request.requestHeaders.setRawHeaders(b"Accept-Encoding",
[b"gzip,deflate"])
factory = server.GzipEncoderFactory()
self.assertTrue(verifyObject(iweb._IRequestEncoderFactory, factory))
encoder = factory.encoderForRequest(request)
self.assertTrue(verifyObject(iweb._IRequestEncoder, encoder))
def test_encoding(self):
"""
If the client request passes a I{Accept-Encoding} header which mentions
gzip, L{server._GzipEncoder} automatically compresses the data.
"""
request = server.Request(self.channel, False)
request.gotLength(0)
request.requestHeaders.setRawHeaders(b"Accept-Encoding",
[b"gzip,deflate"])
request.requestReceived(b'GET', b'/foo', b'HTTP/1.0')
data = self.channel.transport.written.getvalue()
self.assertNotIn(b"Content-Length", data)
self.assertIn(b"Content-Encoding: gzip\r\n", data)
body = data[data.find(b"\r\n\r\n") + 4:]
self.assertEqual(b"Some data",
zlib.decompress(body, 16 + zlib.MAX_WBITS))
def test_multipleAccept(self):
"""
If there are multiple I{Accept-Encoding} header,
L{server.GzipEncoderFactory} reads them properly to detect if gzip is
supported.
"""
request = server.Request(self.channel, False)
request.gotLength(0)
request.requestHeaders.setRawHeaders(b"Accept-Encoding",
[b"deflate", b"gzip"])
request.requestReceived(b'GET', b'/foo', b'HTTP/1.0')
data = self.channel.transport.written.getvalue()
self.assertNotIn(b"Content-Length", data)
self.assertIn(b"Content-Encoding: gzip\r\n", data)
body = data[data.find(b"\r\n\r\n") + 4:]
self.assertEqual(b"Some data",
zlib.decompress(body, 16 + zlib.MAX_WBITS))
def test_alreadyEncoded(self):
"""
If the content is already encoded and the I{Content-Encoding} header is
set, L{server.GzipEncoderFactory} properly appends gzip to it.
"""
request = server.Request(self.channel, False)
request.gotLength(0)
request.requestHeaders.setRawHeaders(b"Accept-Encoding",
[b"deflate", b"gzip"])
request.responseHeaders.setRawHeaders(b"Content-Encoding",
[b"deflate"])
request.requestReceived(b'GET', b'/foo', b'HTTP/1.0')
data = self.channel.transport.written.getvalue()
self.assertNotIn(b"Content-Length", data)
self.assertIn(b"Content-Encoding: deflate,gzip\r\n", data)
body = data[data.find(b"\r\n\r\n") + 4:]
self.assertEqual(b"Some data",
zlib.decompress(body, 16 + zlib.MAX_WBITS))
def test_multipleEncodingLines(self):
"""
If there are several I{Content-Encoding} headers,
L{server.GzipEncoderFactory} normalizes it and appends gzip to the
field value.
"""
request = server.Request(self.channel, False)
request.gotLength(0)
request.requestHeaders.setRawHeaders(b"Accept-Encoding",
[b"deflate", b"gzip"])
request.responseHeaders.setRawHeaders(b"Content-Encoding",
[b"foo", b"bar"])
request.requestReceived(b'GET', b'/foo', b'HTTP/1.0')
data = self.channel.transport.written.getvalue()
self.assertNotIn(b"Content-Length", data)
self.assertIn(b"Content-Encoding: foo,bar,gzip\r\n", data)
body = data[data.find(b"\r\n\r\n") + 4:]
self.assertEqual(b"Some data",
zlib.decompress(body, 16 + zlib.MAX_WBITS))
def test_requestWriteAfterFinish(self):
app = self.app
request = requestMock(b"/")
@app.route("/")
def root(request):
request.finish()
return b'foo'
d = _render(self.kr, request)
self.assertFired(d)
self.assertEqual(request.writeCount, 2)
self.assertEqual(request.getWrittenData(), b'')
[failure] = self.flushLoggedErrors(RuntimeError)
self.assertEqual(
str(failure.value),
("Request.write called on a request after Request.finish was "
"called."))
def getClientIP(self, request):
"""Get the client's IP address from the ``'X-Forwarded-For:'``
header, or from the :api:`request <twisted.web.server.Request>`.
:type request: :api:`twisted.web.http.Request`
:param request: A ``Request`` for a
:api:`twisted.web.resource.Resource`.
:rtype: ``None`` or :any:`str`
:returns: The client's IP address, if it was obtainable.
"""
return getClientIP(request, self.useForwardedHeader)
def getCaptchaImage(self, request):
"""Get a random CAPTCHA image from our **captchaDir**.
Creates a :class:`~farfetchd.captcha.GimpCaptcha`, and calls its
:meth:`~farfetchd.captcha.GimpCaptcha.get` method to return a random
CAPTCHA and challenge string.
:type request: :api:`twisted.web.http.Request`
:param request: A client's initial request for some other resource
which is protected by this one (i.e. protected by a CAPTCHA).
:returns: A 2-tuple of ``(image, challenge)``, where::
- ``image`` is a string holding a binary, JPEG-encoded image.
- ``challenge`` is a unique string associated with the request.
"""
# Create a new HMAC key, specific to requests from this client:
clientIP = self.getClientIP(request)
clientHMACKey = crypto.getHMAC(self.hmacKey, clientIP)
capt = captcha.GimpCaptcha(self.publicKey, self.secretKey,
clientHMACKey, self.captchaDir)
try:
capt.get()
except captcha.GimpCaptchaError as error:
log.msg(error)
except Exception as error: # pragma: no cover
log.err("Unhandled error while retrieving Gimp captcha!")
log.err(error)
return (capt.image, capt.challenge)
def render_GET(self, request):
"""Retrieve a ReCaptcha from the API server and serve it to the client.
:type request: :api:`twisted.web.http.Request`
:param request: A ``Request`` object for a CAPTCHA.
:rtype: str
:returns: A JSON blob containing the following fields:
* "version": The Farfetchd protocol version.
* "image": A base64-encoded CAPTCHA JPEG image.
* "challenge": A base64-encoded, encrypted challenge. The client
will need to hold on to the and pass it back later, along with
their challenge response.
* "error": An ASCII error message.
Any of the above JSON fields may be "null".
"""
image, challenge = self.getCaptchaImage(request)
json = {
'data': {
'id': 1,
'type': self.responseType,
'version': FARFETCHD_API_VERSION,
'image': image,
'challenge': challenge, # The challenge is already base64-encoded
}
}
try:
json["data"]["image"] = base64.b64encode(image)
except Exception as err:
log.err("Could not construct or encode captcha!")
log.err(err)
return self.formatResponse(json, request)
def render_POST(self, request):
"""Process a client's CAPTCHA solution.
If the client's CAPTCHA solution is valid (according to
:meth:`checkSolution`), process and serve their original
request. Otherwise, redirect them back to a new CAPTCHA page.
:type request: :api:`twisted.web.http.Request`
:param request: A ``Request`` object, including POST arguments which
should include two key/value pairs: one key being
``'captcha_challenge_field'``, and the other,
``'captcha_response_field'``. These POST arguments should be
obtained from :meth:`render_GET`.
:rtype: str
:returns: A rendered HTML page containing a ReCaptcha challenge image
for the client to solve.
"""
data = {
"data": {
"id": 3,
"type": self.responseType,
"version": FARFETCHD_API_VERSION,
"result": False,
}
}
try:
if self.checkSolution(request) is True:
data["data"]["result"] = True
return self.formatResponse(data, request)
else:
return self.failureResponse(request)
except Exception as err:
log.err(err)
#class FarfetchdRequestHandler(http.Request):
def __init__(self, *args, **kwargs):
http.Request.__init__(self, *args, **kwargs)
def process(self):
"""Process an incoming request to the Farfetchd server."""
# Get site from channel
self.site = self.channel.site
# Set various default headers
self.setHeader(b"Content-Type", b"application/vnd.api+json")
self.setHeader(b"Server", "Farfetchd v%s" % FARFETCHD_API_VERSION)
self.setHeader(b"Date", http.datetimeToString())
# Resource Identification
self.prepath = []
self.postpath = list(map(http.unquote, self.path[1:].split(b'/')))
log.msg("postpath is %s" % self.postpath)
log.msg("self.resource is %s" % self.resource)
#requested_resource = self.resource.getChildForRequest(self)
requested_resource = resource.getChildForRequest(self.resource, self)
try:
requested_resource = self.site.getResourceFor(self)
#self.render(requested_resource)
log.msg("Requested resource is %s" % requested_resource)
log.msg("Requested resource entities are %s" % requested_resource.listEntities())
if requested_resource:
if requested_resource.responseType:
log.msg("Request will be handled by %r" % requested_resource.__class__.__name__)
self.checkRequestHeaders()
#requested_resource.render(self)
self.render(requested_resource)
else:
self.setResponseCode(http.NOT_FOUND)
self.write(b"No such resource")
except:
self.processingFailed(failure.Failure())
if not self.finished:
self.finish()
def testChildLink(self):
request = server.Request(DummyChannel(), 1)
request.gotLength(0)
request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
self.assertEqual(request.childLink('baz'), 'bar/baz')
request = server.Request(DummyChannel(), 1)
request.gotLength(0)
request.requestReceived('GET', '/foo/bar/', 'HTTP/1.0')
self.assertEqual(request.childLink('baz'), 'baz')
def testPrePathURLSimple(self):
request = server.Request(DummyChannel(), 1)
request.gotLength(0)
request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
request.setHost('example.com', 80)
self.assertEqual(request.prePathURL(), 'http://example.com/foo/bar')
def testPrePathURLNonDefault(self):
d = DummyChannel()
d.transport = DummyChannel.TCP()
d.transport.port = 81
request = server.Request(d, 1)
request.setHost('example.com', 81)
request.gotLength(0)
request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
self.assertEqual(request.prePathURL(), 'http://example.com:81/foo/bar')
def testPrePathURLSSLPort(self):
d = DummyChannel()
d.transport = DummyChannel.TCP()
d.transport.port = 443
request = server.Request(d, 1)
request.setHost('example.com', 443)
request.gotLength(0)
request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
self.assertEqual(request.prePathURL(), 'http://example.com:443/foo/bar')
def testPrePathURLSSLPortAndSSL(self):
d = DummyChannel()
d.transport = DummyChannel.SSL()
d.transport.port = 443
request = server.Request(d, 1)
request.setHost('example.com', 443)
request.gotLength(0)
request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
self.assertEqual(request.prePathURL(), 'https://example.com/foo/bar')
def testPrePathURLSSLNonDefault(self):
d = DummyChannel()
d.transport = DummyChannel.SSL()
d.transport.port = 81
request = server.Request(d, 1)
request.setHost('example.com', 81)
request.gotLength(0)
request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
self.assertEqual(request.prePathURL(), 'https://example.com:81/foo/bar')
def testPrePathURLSetSSLHost(self):
d = DummyChannel()
d.transport = DummyChannel.TCP()
d.transport.port = 81
request = server.Request(d, 1)
request.setHost('foo.com', 81, 1)
request.gotLength(0)
request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
self.assertEqual(request.prePathURL(), 'https://foo.com:81/foo/bar')
def testNotifyFinishConnectionLost(self):
d = DummyChannel()
d.transport = DummyChannel.TCP()
request = server.Request(d, 1)
finished = request.notifyFinish()
request.connectionLost(error.ConnectionDone("Connection done"))
return self.assertFailure(finished, error.ConnectionDone)
def testSimple(self):
r = resource.Resource()
r.isLeaf=0
rr = RootResource()
r.putChild('foo', rr)
rr.putChild('', rr)
rr.putChild('bar', resource.Resource())
chan = self.createServer(r)
for url in ['/foo/', '/foo/bar', '/foo/bar/baz', '/foo/bar/']:
request = server.Request(chan, 1)
request.setHost('example.com', 81)
request.gotLength(0)
request.requestReceived('GET', url, 'HTTP/1.0')
self.assertEqual(request.getRootURL(), "http://example.com/foo")