def __init__(self):
self.db_conn = mariadb.connect(user=Globals.DB['username'], password=Globals.DB['password'], database=Globals.DB['dbname'])
self.CURSOR = self.db_conn.cursor()
#mysql = MySQL()
#mysql.init_app(app)
#self.conn = mysql.connect()
#CURSOR = self.conn.cursor()
#print('MYSQL CONNECT:')
#print(self.conn.cursor())
#print(CURSOR)
# CURSOR = db.cursor()
# cursor.execute("""
# select 3 from your_table
# """)
# result = cursor.fetchall()
# print result
python类app()的实例源码
def viewLog(mode, data=None):
'''
??? mode? ?? ?? ??? ????.
message : ???, ??, ??? ????. json??? data
add : ???? ????. json??? data
block, exit : ???? ????. string??? data
fail : ?? request ?? ?? ??
'''
if mode is "message":
app.logger.info("[message] user_key : {}, type : {}, content : {}".format(
data["user_key"],
data["type"],
data["content"]))
elif mode is "keyboard":
app.logger.info("[keyboard] call home keyboard")
elif mode is "add":
app.logger.info("[join] user_key : {}".format(data["user_key"]))
elif mode is "block":
app.logger.info("[block] user_key : {}".format(data))
elif mode is "exit":
app.logger.info("[exit] user_key : {}".format(data))
elif mode is "fail":
app.logger.info("[fail] request process fail")
def token_check(func):
@functools.wraps(func)
def wrapper(*args,**kwargs):
if not request.headers.get('Authorization'):
return base_result(failed,msg='no token',error_code=error_code.token_no_exist_error)
token = request.headers['Authorization']
# print(token)
payload = util.parser_token(token)
if payload is None:
return base_result(failed,msg='no token',error_code=error_code.token_no_exist_error)
print('['+payload.identify+']'+payload.username+' auth in '+str(datetime.fromtimestamp(payload.timestamp)))
nowtime = int(datetime.now().timestamp())
if not payload:
return base_result(failed,msg='illegal token',error_code=error_code.token_illegal_error)
if nowtime - payload.timestamp > configs.app.exprire:
return base_result(failed,msg='expire token',error_code=error_code.token_expire_error)
else:
return func(*args,**kwargs)
return wrapper
#???token?????
def list_routes():
import urllib
output = []
for rule in app.url_map.iter_rules():
line = urllib.unquote("{:20s}\t{}\t{:50s}".format(
','.join(rule.methods), rule.rule, rule.endpoint))
output.append(line)
for line in sorted(output):
print line
def make_shell_context():
"""??????"""
return dict(
app=app
)
def __init__(self):
mysql = MySQL()
mysql.init_app(app)
self.conn = mysql.connect()
self.cursor = self.conn.cursor()
def make_shell_context():
from app.schedule.models import CrontabSchedule
from app.schedule.models import IntervalSchedule
from app.schedule.models import ScheduleTask
from app.schedule.models import ScheduleMeta
from app.schedule.models import ScheduleInfo
return dict(app=app, db=db, CrontabSchedule=CrontabSchedule,
IntervalSchedule=IntervalSchedule, ScheduleTask=ScheduleTask,
ScheduleMeta=ScheduleMeta, ScheduleInfo=ScheduleInfo)
def make_shell_context():
return {
'db': db,
'PasteFile': PasteFile,
'app': app
}
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--port', help='server port',
type=int, default=9000)
args = parser.parse_args()
http_server = WSGIServer(('', args.port), app)
http_server.serve_forever()
def main():
define('port', default=9000, type=int, help='Port on which to listen.')
parse_command_line()
http_server = HTTPServer(WSGIContainer(app))
http_server.listen(options.port)
IOLoop.instance().start()
def create_app(self):
"""
Create an instance of the app with the testing configuration
:return:
"""
app.config.from_object('app.config.TestingConfig')
return app
def main():
print 'Starting TRex HTTP proxy on port: ', server_config['port']
http_server = WSGIServer(('', server_config['port']), app)
http_server.serve_forever()
# Start web server
def handle(event, context):
# Cloudfront isn't configured to pass Host headers, so the provided Host header is the API Gateway hostname
event['headers']['Host'] = os.environ['SERVER_NAME']
# Cloudfront drops X-Forwarded-Proto, so the value provided is from API Gateway
event['headers']['X-Forwarded-Proto'] = 'http'
return awsgi.response(tiler, event, context)
def __init__(self, url):
self.host, port_str = url.split(':')
self.port = int(port_str)
self.server = None
# create the thread object
self.thread = threading.Thread(target=self._start_listening_blocking)
# wrap Flask application with socketio's middleware
self.app = socketio.Middleware(sio, app)
def _start_listening_blocking(self):
# deploy as an eventlet WSGI server
listener = eventlet.listen((self.host, self.port))
self.server = wsgi.server(listener, self.app, log_output=False, debug=False)
def setLogger(app, level):
app.logger.addHandler(handler)
app.logger.setLevel(level)
def customLog(msg):
app.logger.info(msg)
def managerLog(mode, user_key):
app.logger.info("[{}] {} {} processing completed".format(mode, user_key, mode))
def lambda_handler(event, context=None):
# Periodic
if event.get('detail-type') == 'Scheduled Event':
debug(event, context)
return app.on_timer(event)
# SNS / Dynamodb / Kinesis
elif event.get('Records'):
records = event['Records']
if records and records[0]['EventSource'] == 'aws:sns':
return app.on_config_message(records)
else:
return debug(event, context)
elif not event.get('path'):
return debug(event, context)
# API Gateway
if app.config.get('sentry-dsn'):
from raven import Client
from raven.contrib.bottle import Sentry
client = Client(app.config['sentry-dsn'])
app.app.catchall = False
wrapped_app = Sentry(app.app, client)
else:
wrapped_app = app.app
return wsgigw.invoke(wrapped_app, event)
def local(reload, port):
"""run local app server, assumes into the account
"""
import logging
from bottle import run
from app import controller, app
from c7n.resources import load_resources
load_resources()
print("Loaded resources definitions")
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('botocore').setLevel(logging.WARNING)
if controller.db.provision():
print("Table Created")
run(app, reloader=reload, port=port)
def list_routes():
import urllib
output = []
for rule in app.url_map.iter_rules():
endpoint = rule.endpoint
methods = ','.join(rule.methods)
line = '{:50s} {:20s} {}'.format(endpoint, methods, rule.rule)
line = urllib.unquote(line)
output.append(line)
for line in sorted(output):
print line
def _make_context():
return dict(app=app, db=db, models=models)
def __init__(self):
""" ????????????????logging ???? """
todaydetail = datetime.datetime.today()
d = todaydetail.strftime("%Y%m%d")
logName = d + '_' + self.logPrefix
logs.init_app(app, logName)
def info(self, msg):
app.logger.info(msg)
print(msg)
def error(self, msg):
app.logger.error(msg)
print(msg)
sys.exit()
def app():
from app import app
return app
def list_routes():
import urllib
output = []
for rule in app.url_map.iter_rules():
line = urllib.unquote("{:20s}\t{}\t{:50s}".format(
','.join(rule.methods), rule.rule, rule.endpoint))
output.append(line)
for line in sorted(output):
print line
def get_apps(app_type, page):
_, response = app.test_client.get('/apps/{}/page/{}?t={}'.format(app_type, page, time.time()))
response_normal_check(response)
json = ujson.loads(response.text)
assert json.get('datas') is not None
return response
def upload():
data = FormData()
data.add_field('package', open('tests/QXmokuai_3.apk', 'rb'), filename='QXmokuai_3.apk')
data.add_field('msg', 'test upload')
_, response = app.test_client.post('/upload/app', data=data)
response_normal_check(response)
return response
def get_app_detail(app_id):
_, response = app.test_client.get('/apps/{}'.format(app_id))
response_normal_check(response)
return response