def setup_backup():
if not Options['backup_hhmm']:
return
curTimeSec = sliauth.epoch_ms()/1000.0
curDate = sliauth.iso_date(sliauth.create_date(curTimeSec*1000.0))[:10]
backupTimeSec = sliauth.epoch_ms(sliauth.parse_date(curDate+'T'+Options['backup_hhmm']))/1000.0
backupInterval = 86400
if curTimeSec+60 > backupTimeSec:
backupTimeSec += backupInterval
print >> sys.stderr, Options['site_name'] or 'ROOT', 'Scheduled daily backup in dir %s, starting at %s' % (Options['backup_dir'], sliauth.iso_date(sliauth.create_date(backupTimeSec*1000.0)))
def start_backup():
if Options['debug']:
print >> sys.stderr, "Starting periodic backup"
backupSite()
Global.backup = PeriodicCallback(backupSite, backupInterval*1000.0)
Global.backup.start()
IOLoop.current().call_at(backupTimeSec, start_backup)
python类debug()的实例源码
def __init__(self):
handlers = [
(r"/", IndexHandler),
(r"/plugin/(.*)", PluginsHandler),
]
settings = dict(
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
# xsrf_cookies=True,
# cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
# login_url="/auth/login",
ui_modules={'Plugin': PluginModule},
debug=options.debug,
)
tornado.web.Application.__init__(self, handlers, **settings)
def set_id(self, username, role='', sites='', displayName='', email='', altid='', data={}):
if Options['debug']:
print >> sys.stderr, 'sdserver.UserIdMixin.set_id', username, role, sites, displayName, email, altid, data
if ':' in username or ':' in role or ':' in sites or ':' in displayName:
raise Exception('Colon character not allowed in username/role/name')
cookie_data = {'version': COOKIE_VERSION}
cookie_data['name'] = displayName or username
if email:
cookie_data['email'] = email
if altid:
cookie_data['altid'] = altid
cookie_data.update(data)
token = gen_proxy_auth_token(username, role, sites, root=True)
cookieStr = ':'.join( sliauth.safe_quote(x) for x in [username, role, sites, token, base64.b64encode(json.dumps(cookie_data,sort_keys=True))] )
self.set_user_cookie(cookieStr, batch=cookie_data.get('batch'))
def imageUpload(self, sessionName, imageFile, fname, fbody, autonumber=None):
if Options['debug']:
print >> sys.stderr, 'ActionHandler:imageUpload', sessionName, imageFile, fname, autonumber, len(fbody)
if not self.previewActive():
raise tornado.web.HTTPError(404, log_message='CUSTOM:Not previewing session')
if not imageFile:
imgName = re.sub(r'[^\w,.+-]', '', fname.strip().replace(' ','_'))
if imgName and not autonumber:
imageFile = imgName
else:
imageFile = (md2md.IMAGE_FMT % self.previewState['new_image_number']) + os.path.splitext(fname)[1].lower()
self.previewState['new_image_number'] += 1
if not self.previewState['image_zipfile']:
self.previewState['image_zipbytes'] = io.BytesIO()
self.previewState['image_zipfile'] = zipfile.ZipFile(self.previewState['image_zipbytes'], 'a')
imagePath = sessionName+'_images/' + imageFile
self.previewState['image_zipfile'].writestr(imagePath, fbody)
self.previewState['image_paths'][imageFile] = imagePath
self.previewState['modimages'] = 'append'
self.set_header('Content-Type', 'application/json')
self.write( json.dumps( {'result': 'success', 'imageFile': imageFile} ) )
def on_close(self):
if Options['debug']:
print >> sys.stderr, "DEBUG: WSon_close", getattr(self, 'pathUser', 'NOT OPENED')
try:
if self.eventFlusher:
self.eventFlusher.stop()
self.eventFlusher = None
self._connections[self.pathUser[0]][self.pathUser[1]].remove(self)
if not self._connections[self.pathUser[0]][self.pathUser[1]]:
del self._connections[self.pathUser[0]][self.pathUser[1]]
if not self._connections[self.pathUser[0]]:
del self._connections[self.pathUser[0]]
if self._interactiveSession[0] is self:
# Disable interactivity associated with this connection
self._interactiveSession = (None, None, None, None)
except Exception, err:
pass
def post(self, subpath=''):
try:
userRole = self.get_id_from_cookie(role=True, for_site=Options['site_name'])
msg = WSHandler.processMessage(self.get_id_from_cookie(), userRole, self.get_id_from_cookie(name=True), self.get_argument("message", ""), allStatus=True, source='interact', adminBroadcast=True)
if not msg:
msg = 'Previous message accepted'
except Exception, excp:
if Options['debug']:
import traceback
traceback.print_exc()
msg = 'Error in processing message: '+str(excp)
print >> sys.stderr, "AuthMessageHandler.post", msg
else:
msg = 'Error in processing message'
site_prefix = '/'+Options['site_name'] if Options['site_name'] else ''
self.redirect(site_prefix+'/interact/?note='+sliauth.safe_quote(msg))
def restoreSheet(sheetName, filepath, csvfile, overwrite=None):
# Restore sheet from backup CSV file
try:
##dialect = csv.Sniffer().sniff(csvfile.read(1024))
##csvfile.seek(0)
reader = csv.reader(csvfile, delimiter=',') # Ignore dialect for now
rows = [row for row in reader]
if not rows:
raise Exception('No rows in CSV file %s for sheet %s' % (filepath, sheetName))
sdproxy.importSheet(sheetName, rows[0], rows[1:], overwrite=overwrite)
return ''
except Exception, excp:
if Options['debug']:
import traceback
traceback.print_exc()
return 'Error in restoreSheet: '+str(excp)
def shutdown_all(keep_root=False):
if Options['debug']:
print >> sys.stderr, 'sdserver.shutdown_all:'
if not Global.remoteShutdown:
Global.remoteShutdown = True
for j, site in enumerate(Options['site_list']):
# Shutdown child servers
relay_addr = SiteProps.relay_map(j+1)
try:
retval = sendPrivateRequest(relay_addr, path='/'+site+'/_shutdown?root='+Options['server_key'])
except Exception, excp:
print >> sys.stderr, 'sdserver.shutdown_all: Error in shutting down site', site, excp
if not keep_root:
shutdown_root()
def start_multiproxy():
import multiproxy
class ProxyRequestHandler(multiproxy.RequestHandler):
def get_relay_addr_uri(self, pipeline, header_list):
""" Returns relay host, port.
May modify self.request_uri or header list (excluding the first element)
Raises exception if connection not allowed.
"""
comps = self.request_uri.split('/')
if len(comps) > 1 and comps[1] and comps[1] in Options['site_list']:
# Site server
site_number = 1+Options['site_list'].index(comps[1])
retval = SiteProps.relay_map(site_number)
elif Global.relay_forward and not self.request_uri.startswith('/_'):
# Not URL starting with '_'; forward to underlying website
retval = Global.relay_forward
else:
# Root server
retval = SiteProps.relay_map(0)
print >> sys.stderr, 'ABC: get_relay_addr_uri:', sliauth.iso_date(nosubsec=True), self.ip_addr, self.request_uri, retval
return retval
Global.proxy_server = multiproxy.ProxyServer(Options['host'], Options['port'], ProxyRequestHandler, log_interval=0,
io_loop=IOLoop.current(), xheaders=True, masquerade="server/1.2345", ssl_options=Options['ssl_options'], debug=True)
def getSettingsSheet(gsheet_url, site_name='', adminonly_fail=False):
try:
bak_dir = getBakDir(site_name)
if bak_dir:
settingsPath = os.path.join(bak_dir, sdproxy.SETTINGS_SHEET+'.csv')
if not os.path.exists(settingsPath):
raise Exception('Settings sheet %s not found in backup directory' % settingsPath )
with open(settingsPath, 'rb') as f:
rows = [row for row in csv.reader(f, delimiter=',')]
if not rows:
raise Exception('No rows in CSV file %s for settings sheet %s' % (settingsPath, sdproxy.SETTINGS_SHEET))
headers = rows[0]
rows = rows[1:]
else:
rows, headers = sliauth.read_sheet(gsheet_url, Options['root_auth_key'], sdproxy.SETTINGS_SHEET, site=site_name)
return sliauth.get_settings(rows)
except Exception, excp:
##if Options['debug']:
## import traceback
## traceback.print_exc()
print >> sys.stderr, 'Error:site %s: Failed to read Google Sheet settings_slidoc from %s: %s' % (site_name, gsheet_url, excp)
return {'site_access': 'adminonly'} if adminonly_fail else {}
def getSiteRosterMaps(gsheet_url, site_name=''):
try:
rows, headers = sliauth.read_sheet(gsheet_url, Options['root_auth_key'], sdproxy.ROSTER_SHEET, site=site_name)
nameCol = 1 + headers.index('name')
idCol = 1 + headers.index('id')
emailCol = 1 + headers.index('email')
rosterMaps = {}
rosterMaps['id2email'] = dict( (x[idCol-1], x[emailCol-1]) for x in rows[1:] if x[idCol-1] )
rosterMaps['id2name'] = dict( (x[idCol-1], x[nameCol-1]) for x in rows[1:] if x[idCol-1] )
return rosterMaps
except Exception, excp:
##if Options['debug']:
## import traceback
## traceback.print_exc()
print >> sys.stderr, 'Error:site %s: Failed to read Google Sheet roster_slidoc from %s: %s' % (site_name, gsheet_url, excp)
return {}
def option_handle():
"""?????????"""
parse_command_line()
if options.config:
if os.path.exists(options.config):
options.parse_config_file(options.config)
else:
print "can not find %s"%options.config
print "usage:python %s --help"%os.path.basename(__file__)
sys.exit(1)
if options.voicelist:
def _parse_voicelist():
return tornado.escape.json_decode(options.voicelist)
global ALL_VOICE
ALL_VOICE = _parse_voicelist()
logging.debug("conf voicelist: %s", ALL_VOICE)
if options.cache_dir:
mkdir_p(options.cache_dir)
def main():
parse_command_line()
app = tornado.web.Application(
[
(r"/", MainHandler),
(r"/a/message/new", MessageNewHandler),
(r"/a/message/updates", MessageUpdatesHandler),
],
cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
xsrf_cookies=True,
debug=options.debug,
)
app.listen(options.port)
tornado.ioloop.IOLoop.current().start()
def main():
tornado.options.parse_command_line()
app = App()
http_server = tornado.httpserver.HTTPServer(app)
http_server.listen(options.port)
if options.debug:
# autorelaod for template file
tornado.autoreload.start()
for root, dir, files in os.walk(TEMPLATE_PATH):
for item in files:
if item.endswith('.html'):
tornado.autoreload.watch(os.path.join(root, item))
try:
tornado.ioloop.IOLoop.current().start()
except:
tornado.ioloop.IOLoop.current().stop()
def main():
parse_command_line()
app = tornado.web.Application(
[
(r"/", MainHandler),
(r"/a/message/new", MessageNewHandler),
(r"/a/message/updates", MessageUpdatesHandler),
],
cookie_secret="153A3FDSIKM56",
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
xsrf_cookies=True,
debug=options.debug,
)
app.listen(options.port)
tornado.ioloop.IOLoop.current().start()
def test_tweet_best_posts(self):
old_likes = options.likes_to_tweet
old_magic = options.likes_to_magic
old_debug = options.debug
try:
options.likes_to_tweet = 1
options.likes_to_magic = 1
options.debug = False
add_posts(shake_id=self.shake_a.id, sharedfile_id=self.shared_1.id, sourcefile_id=self.source.id)
self.user_b.add_favorite(self.shared_1)
# this like should trigger a tweet
self.assertEqual(MockTweepy.count, 1)
mf = Magicfile.get("sharedfile_id = %s", self.shared_1.id)
self.assertIsNotNone(mf)
finally:
options.likes_to_tweet = old_likes
options.likes_to_magic = old_magic
options.debug = old_debug
def main():
parse_command_line()
app = tornado.web.Application(
[
(r"/", MainHandler),
(r"/a/message/new", MessageNewHandler),
(r"/a/message/updates", MessageUpdatesHandler),
],
cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
xsrf_cookies=True,
debug=options.debug,
)
app.listen(options.port)
tornado.ioloop.IOLoop.current().start()
def main():
parse_command_line()
redis.connect(host=options.redis_host)
app = tornado.web.Application(
[
(r'/', MainHandler),
(r'/oauth', OAuthHandler),
(r'/command', CommandHandler),
(r'/button', ButtonHandler),
],
template_path=os.path.join(os.path.dirname(__file__), 'templates'),
static_path=os.path.join(os.path.dirname(__file__), 'static'),
debug=options.debug,
)
app.listen(options.port)
ioloop = tornado.ioloop.IOLoop.current()
ioloop.start()
def main():
parse_command_line()
app = tornado.web.Application(
[
(r"/", MainHandler),
(r"/a/message/new", MessageNewHandler),
(r"/a/message/updates", MessageUpdatesHandler),
],
cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
xsrf_cookies=True,
debug=options.debug,
)
app.listen(options.port)
tornado.ioloop.IOLoop.current().start()
def main():
parse_command_line()
app = tornado.web.Application(
[
(r"/", MainHandler),
(r"/a/message/new", MessageNewHandler),
(r"/a/message/updates", MessageUpdatesHandler),
],
cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
xsrf_cookies=True,
debug=options.debug,
)
app.listen(options.port)
tornado.ioloop.IOLoop.current().start()
def main():
parse_command_line()
app = tornado.web.Application(
[
(r"/", MainHandler),
(r"/a/message/new", MessageNewHandler),
(r"/a/message/updates", MessageUpdatesHandler),
],
cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
xsrf_cookies=True,
debug=options.debug,
)
app.listen(options.port)
tornado.ioloop.IOLoop.current().start()
def main():
"""Creates Tornado Application and starts the IO Loop
"""
# Get the Port and Debug mode from command line options
options.parse_command_line()
# create logger for app
logger = logging.getLogger('app')
logger.setLevel(logging.INFO)
FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(format=FORMAT)
tic_tac_toe_game_manager = TicTacToeGameManager()
urls = [
(r"/$", IndexHandler),
(r"/tic-tac-toe$", TicTacToeHandler),
(r"/tic-tac-toe/ws$", TicTacToeSocketHandler, dict(game_manager=tic_tac_toe_game_manager))
]
# Create Tornado application
application = tornado.web.Application(
urls,
debug=options.debug,
autoreload=options.debug,
**settings)
# Start Server
logger.info("Starting App on Port: {} with Debug Mode: {}".format(options.port, options.debug))
application.listen(options.port)
tornado.ioloop.IOLoop.current().start()
def run():
define('port', default=8090, type=int, help='')
define('debug', default=False, type=bool, help='')
parse_command_line()
settings['debug'] = options.debug
if settings['debug']:
print 'debug mode'
'''
connect mongodb
'''
try:
client = MotorClient(settings['database']['address'])
settings['connection'] = client[settings['database']['db']]
except:
print 'can not connect MongoDB'
sys.exit(0)
'''
connect redis
'''
try:
client = redis.Redis(host=settings['redis']['host'],
port=settings['redis']['port'],
db=settings['redis']['db'])
settings['redis_conn'] = client
except:
print 'can not connect redis'
sys.exit(0)
application = Application(
handlers=urlpattern,
**settings
)
http_server = HTTPServer(application, xheaders=True)
http_server.listen(options.port)
IOLoop.instance().start()
def __init__(self, debug):
root = os.path.dirname(__file__)
static_path = os.path.join(root, 'static')
template_path = os.path.join(root, 'templates')
settings = {
'debug': debug,
'compress_response': True,
'template_path': template_path,
}
# routes
handlers = [
(r'/submit', FormHandler),
(r'/results/?', ResultsSummaryHandler),
(r'/results/([0-9a-fA-F-]+)/?', ResultsHandler),
(r'/tracking/([0-9a-fA-F-]+)/([A-Za-z0-9._-]+)/?', TrackingHandler),
(r'/unsubscribe/([0-9a-fA-F-]+)/?', BlacklistHandler),
(r'/privacy/?', PrivacyHandler),
(r'/', MainHandler),
(r'/(.*)', StaticHandler, {'path': static_path}),
]
# database instance
self.db = db.MailerDatabase(config.DB_PATH)
# rate limiters
self.global_limiter = ratelimit.Bucket(**config.GLOBAL_RATE_LIMIT)
self.ip_limiters = {}
tornado.web.Application.__init__(self, handlers, **settings)
def main():
tornado.options.parse_command_line()
application = Application(options.debug)
ssl_options = None if not config.SSL_ENABLED else {
'certfile': config.SSL_CERTFILE,
'keyfile': config.SSL_KEYFILE,
}
http_server = tornado.httpserver.HTTPServer(application, ssl_options=ssl_options)
http_server.listen(options.port)
print 'Listening on port %d...' % options.port
try:
tornado.ioloop.IOLoop.instance().start()
except KeyboardInterrupt:
tornado.ioloop.IOLoop.instance().stop()
def main():
parse_command_line()
app = application(options.debug)
port = int(os.environ.get("PORT", options.port))
app.listen(port)
tornado.ioloop.IOLoop.current().start()
def default_settings(self):
"""
"""
import gprime.const
return {
"cookie_secret": base64.b64encode(uuid.uuid4().bytes + uuid.uuid4().bytes),
"login_url": self.make_url("/login"),
'template_path': os.path.join(gprime.const.DATA_DIR, "templates"),
'debug': self.options.debug,
"xsrf_cookies": self.options.xsrf,
}
def main():
from tornado.options import options
import traceback
try:
run_app()
except Exception as exc:
if options.debug:
traceback.print_exc()
else:
print(exc)
def write_error(self, status_code, **kwargs):
if 'exc_info' in kwargs:
exc_type, exc, trace = kwargs['exc_info']
if exc.args and isinstance(exc.args[0], dict):
error = exc.args[0]
else:
error = dict()
if tornado_options.debug:
error['traceback'] = format_exception(exc_type, exc, trace)
self.reply(error)
def write_error(self, status_code, **kwargs):
if 'exc_info' in kwargs:
exc_type, exc, trace = kwargs['exc_info']
if exc.args and isinstance(exc.args[0], dict):
error = exc.args[0]
else:
error = dict()
if tornado_options.debug:
error['traceback'] = format_exception(exc_type, exc, trace)
self.reply('error', error)