def echo_json_response(handler,code,status=None,results=None):
"""Takes a JSON package and returns it to the user w/ full HTTP headers"""
if handler is None or code is None:
return False
if status is None:
status = httplib.responses[code]
if results is None:
results = {}
json_res = {'code': code, 'status': status, 'results' : results}
json_response = json.dumps(json_res)
if isinstance(handler, BaseHTTPRequestHandler):
handler.send_response(code)
handler.send_header('Content-Type', 'application/json')
handler.end_headers()
handler.wfile.write(json_response)
return True
elif isinstance(handler, tornado.web.RequestHandler):
handler.set_status(code)
handler.set_header('Content-Type', 'application/json')
handler.write(json_response)
return True
else:
return False
python类responses()的实例源码
def handle_event(self, event):
if isinstance(event, h2.events.ResponseReceived):
headers = self.build_http_headers(event.headers)
status_code = int(headers.pop(':status'))
start_line = ResponseStartLine(
'HTTP/2.0', status_code, httplib.responses[status_code]
)
self.headers_received(start_line, headers)
elif isinstance(event, h2.events.DataReceived):
self.data_received(event.data)
elif isinstance(event, h2.events.StreamEnded):
self._stream_ended = True
self.context.remove_stream_delegate(self.stream_id)
if len(self._pushed_responses) == len(self._pushed_streams):
self.finish()
elif isinstance(event, h2.events.PushedStreamReceived):
stream = self.from_push_stream(event)
self._pushed_streams[event.pushed_stream_id] = stream
else:
logger.warning('ignored event: %r, %r', event, event.__dict__)
def handle_event(self, event):
if isinstance(event, h2.events.ResponseReceived):
headers = self.build_http_headers(event.headers)
status_code = int(headers.pop(':status'))
start_line = httputil.ResponseStartLine(
'HTTP/2.0', status_code, httplib.responses[status_code]
)
self.headers_received(start_line, headers)
elif isinstance(event, h2.events.DataReceived):
self.data_received(event.data)
elif isinstance(event, h2.events.StreamEnded):
self._stream_ended = True
self.context.remove_stream_delegate(self.stream_id)
if len(self._pushed_responses) == len(self._pushed_streams):
self.finish()
elif isinstance(event, h2.events.PushedStreamReceived):
stream = self.from_push_stream(event)
self._pushed_streams[event.pushed_stream_id] = stream
else:
logger.warning('ignored event: %r, %r', event, event.__dict__)
def __call__(self, environ, start_response):
handler = web.Application.__call__(self, HTTPRequest(environ)) # ????<????>????:
assert handler._finished # ??
status = str(handler._status_code) + " " + \
httplib.responses[handler._status_code]
headers = handler._headers.items()
for cookie_dict in getattr(handler, "_new_cookies", []):
for cookie in cookie_dict.values():
headers.append(("Set-Cookie", cookie.OutputString(None)))
start_response(status, headers)
return handler._write_buffer
def _test_connectivity(self, param, LADS):
"""
Called when the user depresses the test connectivity button on the Phantom UI.
This query returns a list of configured applications
https://api.a10networks.com/api/v2/applications
"""
self.debug_print("%s _test_connectivity %s" % (A10_LADS_Connector.BANNER, param))
msg = "test connectivity to %s status_code: " % (LADS.dashboard)
if LADS.genericGET(uri="/api/v2/applications"):
# True is success
return self.set_status_save_progress(phantom.APP_SUCCESS, msg + "%s %s apps: %s" %
(LADS.status_code, httplib.responses[LADS.status_code], LADS.get_names(LADS.response)))
else:
# None or False, is a failure based on incorrect IP address, username, passords
return self.set_status_save_progress(phantom.APP_ERROR, msg + "%s %s" % (LADS.status_code, LADS.response))
def _test_connectivity(self, param):
"""
Called when the user depresses the test connectivity button on the Phantom UI.
Use a basic query to determine if the IP address, username and password is correct,
curl -k -u admin:redacted -X GET https://192.0.2.1/mgmt/tm/ltm/
"""
self.debug_print("%s TEST_CONNECTIVITY %s" % (F5_Connector.BANNER, param))
config = self.get_config()
host = config.get("device")
F5 = iControl.BIG_IP(host=host,
username=config.get("username"),
password=config.get("password"),
uri="/mgmt/tm/sys/software/image",
method="GET")
msg = "test connectivity to %s status_code: " % host
if F5.genericGET():
# True is success
return self.set_status_save_progress(phantom.APP_SUCCESS, msg + "%s %s" % (F5.status_code, httplib.responses[F5.status_code]))
else:
# None or False, is a failure based on incorrect IP address, username, passords
return self.set_status_save_progress(phantom.APP_ERROR, msg + "%s %s" % (F5.status_code, F5.response))
def __write_error(self, status_code, error_message=None):
"""Return the HTTP status line and body for a given error code and message.
Args:
status_code: HTTP status code to be returned.
error_message: Error message to be returned.
Returns:
Tuple (http_status, body):
http_status: HTTP status line, e.g. 200 OK.
body: Body of the HTTP request.
"""
if error_message is None:
error_message = httplib.responses[status_code]
status = '%d %s' % (status_code, httplib.responses[status_code])
message = EndpointsErrorMessage(
state=EndpointsErrorMessage.State.APPLICATION_ERROR,
error_message=error_message)
return status, self.__PROTOJSON.encode_message(message)
def __init__(self, *args, **kwargs):
self.pushed_responses = kwargs.pop('pushed_responses', [])
self.new_request = kwargs.pop('new_request', None)
super(HTTP2Response, self).__init__(*args, **kwargs)
reason = kwargs.pop('reason', None)
self.reason = reason or httputil.responses.get(self.code, "Unknown")
def write_error(self, status_code, **kwargs):
self.render("pages/error.html", message=httplib.responses[status_code], error=status_code)
def write_error(self, status_code, **kwargs):
self.render("pages/error.html", message=httplib.responses[status_code], error=status_code)
def write_error(self, status_code, **kwargs):
self.render("pages/error.html", message=httplib.responses[status_code], error=status_code)
def write_error(self, status_code, **kwargs):
try:
req_resp = stats.request(str(get_ip(self.request)))
except:
error("Errored while handling request IP -- still served...")
self.render("pages/error.html", message=httplib.responses[status_code], error=status_code)
def build_response(self, result):
data = {
'result': {
'status': self.status,
},
}
if self.status:
data['result']['value'] = result
body = json.dumps(data)
headers = Headers(SUCCESSFUL_HEADERS)
response = MockResponse(b'HTTP/1.1', self.response_code, httplib.responses[self.response_code], headers, body)
return response
def test_responses(self):
self.assertEqual(httplib.responses[httplib.NOT_FOUND], "Not Found")
def http_open(self, req):
import mimetools, copy
from StringIO import StringIO
self.requests.append(copy.deepcopy(req))
if self._count == 0:
self._count = self._count + 1
name = httplib.responses[self.code]
msg = mimetools.Message(StringIO(self.headers))
return self.parent.error(
"http", req, MockFile(), self.code, name, msg)
else:
self.req = req
msg = mimetools.Message(StringIO("\r\n\r\n"))
return MockResponse(200, "OK", msg, "", req.get_full_url())
def test_responses(self):
self.assertEqual(httplib.responses[httplib.NOT_FOUND], "Not Found")
def http_open(self, req):
import mimetools, copy
from StringIO import StringIO
self.requests.append(copy.deepcopy(req))
if self._count == 0:
self._count = self._count + 1
name = httplib.responses[self.code]
msg = mimetools.Message(StringIO(self.headers))
return self.parent.error(
"http", req, MockFile(), self.code, name, msg)
else:
self.req = req
msg = mimetools.Message(StringIO("\r\n\r\n"))
return MockResponse(200, "OK", msg, "", req.get_full_url())
def wasLastResponseDelayed():
"""
Returns True if the last web request resulted in a time-delay
"""
# 99.9999999997440% of all non time-based SQL injection affected
# response times should be inside +-7*stdev([normal response times])
# Math reference: http://www.answers.com/topic/standard-deviation
deviation = stdev(kb.responseTimes.get(kb.responseTimeMode, []))
threadData = getCurrentThreadData()
if deviation and not conf.direct and not conf.disableStats:
if len(kb.responseTimes[kb.responseTimeMode]) < MIN_TIME_RESPONSES:
warnMsg = "time-based standard deviation method used on a model "
warnMsg += "with less than %d response times" % MIN_TIME_RESPONSES
logger.warn(warnMsg)
lowerStdLimit = average(kb.responseTimes[kb.responseTimeMode]) + TIME_STDEV_COEFF * deviation
retVal = (threadData.lastQueryDuration >= max(MIN_VALID_DELAYED_RESPONSE, lowerStdLimit))
if not kb.testMode and retVal:
if kb.adjustTimeDelay is None:
msg = "do you want sqlmap to try to optimize value(s) "
msg += "for DBMS delay responses (option '--time-sec')? [Y/n] "
kb.adjustTimeDelay = ADJUST_TIME_DELAY.DISABLE if not readInput(msg, default='Y', boolean=True) else ADJUST_TIME_DELAY.YES
if kb.adjustTimeDelay is ADJUST_TIME_DELAY.YES:
adjustTimeDelay(threadData.lastQueryDuration, lowerStdLimit)
return retVal
else:
delta = threadData.lastQueryDuration - conf.timeSec
if Backend.getIdentifiedDbms() in (DBMS.MYSQL,): # MySQL's SLEEP(X) lasts 0.05 seconds shorter on average
delta += 0.05
return delta >= 0
def showHttpErrorCodes():
"""
Shows all HTTP error codes raised till now
"""
if kb.httpErrorCodes:
warnMsg = "HTTP error codes detected during run:\n"
warnMsg += ", ".join("%d (%s) - %d times" % (code, httplib.responses[code] \
if code in httplib.responses else '?', count) \
for code, count in kb.httpErrorCodes.items())
logger.warn(warnMsg)
if any((str(_).startswith('4') or str(_).startswith('5')) and _ != httplib.INTERNAL_SERVER_ERROR and _ != kb.originalCode for _ in kb.httpErrorCodes.keys()):
msg = "too many 4xx and/or 5xx HTTP error codes "
msg += "could mean that some kind of protection is involved (e.g. WAF)"
logger.debug(msg)
def __init__(self, code, content=""):
Exception.__init__(self, "Bad server response: %s %s" % (code, responses[int(code)]))
self.code = code
self.content = content
def get_error_html(self, status_code, **kwargs):
if status_code in [404, 500, 503, 403]:
filename = os.path.join(os.path.join(getsettings('BASE_DIR'),'templates'), '%d.html' % status_code)
if os.path.exists(filename):
with io.open(filename, 'r') as f:
data = f.read()
return data
import httplib
return "<html><title>%(code)d: %(message)s</title>" \
"<body class='bodyErrorPage'>%(code)d: %(message)s</body></html>" % {
"code": status_code,
"message": httplib.responses[status_code],
}
def __call__(self, environ, start_response):
handler = web.Application.__call__(self, HTTPRequest(environ))
assert handler._finished
status = str(handler._status_code) + " " + \
httplib.responses[handler._status_code]
headers = handler._headers.items()
for cookie_dict in getattr(handler, "_new_cookies", []):
for cookie in cookie_dict.values():
headers.append(("Set-Cookie", cookie.OutputString(None)))
start_response(status,
[(native_str(k), native_str(v)) for (k,v) in headers])
return handler._write_buffer
def set_status(self, status_code):
"""Sets the status code for our response."""
assert status_code in httplib.responses
self._status_code = status_code
def _generate_headers(self):
lines = [utf8(self.request.version + " " +
str(self._status_code) +
" " + httplib.responses[self._status_code])]
lines.extend([(utf8(n) + b(": ") + utf8(v)) for n, v in
itertools.chain(self._headers.iteritems(), self._list_headers)])
for cookie_dict in getattr(self, "_new_cookies", []):
for cookie in cookie_dict.values():
lines.append(utf8("Set-Cookie: " + cookie.OutputString(None)))
return b("\r\n").join(lines) + b("\r\n\r\n")
def _handle_request_exception(self, e):
if isinstance(e, HTTPError):
if e.log_message:
format = "%d %s: " + e.log_message
args = [e.status_code, self._request_summary()] + list(e.args)
logging.warning(format, *args)
if e.status_code not in httplib.responses:
logging.error("Bad HTTP status code: %d", e.status_code)
self.send_error(500, exc_info=sys.exc_info())
else:
self.send_error(e.status_code, exc_info=sys.exc_info())
else:
logging.error("Uncaught exception %s\n%r", self._request_summary(),
self.request, exc_info=True)
self.send_error(500, exc_info=sys.exc_info())
def assert_meta(self, meta):
"""
Verify that the response metadata is correct.
:param meta: the response meta object
"""
self.assert_equal(meta.user_xid, self.test_user['xid'])
self.assert_equal(meta.message, httplib.responses[httplib.OK])
self.assert_equal(meta.code, httplib.OK)
self.assert_datetime_within(meta.time, window=11)
def __init__(self):
self.method = None
self.uri_template = None
self.summary = ''
self.description = ''
self.parameters = []
self.responses = {}
self.default_response_schema = None
self.response_headers = None
def add_response_codes(self, status_dict):
for code, info in status_dict.items():
swagger_rsp = self.responses.setdefault(code, {})
if not info['reason']:
try:
code = int(code)
info['reason'] = http_client.responses[code]
except (KeyError, TypeError, ValueError):
info['reason'] = 'Unknown'
tokens = info['description'].split(maxsplit=2)
if tokens:
tokens[0] = tokens[0].title()
swagger_rsp['description'] = '{}\n\n{}'.format(
info['reason'], ' '.join(tokens)).strip()
def generate_swagger(self):
swagger = {'summary': self.summary, 'description': self.description}
if self.parameters:
swagger['parameters'] = self.parameters
if self.responses:
swagger['responses'] = self.responses
else: # swagger requires at least one response
swagger['responses'] = {'default': {'description': ''}}
# Figure out where to put the response schema and response
# header details. This is probably going to change in the
# future since it is `hinky' at best.
default_code = 'default'
status_codes = sorted(int(code)
for code in swagger['responses']
if code.isdigit())
for code in status_codes:
if 200 <= code < 400:
default_code = str(code)
break
if default_code in swagger['responses']:
if self.default_response_schema:
swagger['responses'][default_code]['schema'] = \
self.default_response_schema
if self.response_headers:
swagger['responses'][default_code]['headers'] = \
self.response_headers
return swagger
def set_status(self, status_code):
"""Sets the status code for our response."""
assert status_code in httplib.responses # ?? assert ?? ?????, ???,????
self._status_code = status_code
# ?? HTTP???
# ?? value ??, ? ??????