def enable_logging(level=None, redirect_loggers=None):
"""
Enable logging for the *saltyrtc* logger group.
Arguments:
- `level`: A :mod:`logbook` logging level. Defaults to
``WARNING``.
- `redirect_loggers`: A dictionary containing :mod:`logging`
logger names as key and their respective :mod:`logging` level
as value. Each logger will be looked up and redirected to
:mod:`logbook`. Defaults to an empty dictionary.
Raises :class:`ImportError` in case :mod:`logbook` is not
installed.
"""
if _logger_convert_level_handler is None:
_logging_error()
# At this point, logbook is either defined or an error has been returned
if level is None:
level = logbook.WARNING
logger_group.disabled = False
logger_group.level = level
if redirect_loggers is not None:
_redirect_logging_loggers(redirect_loggers, remove=False)
python类WARNING的实例源码
def set_level(self, log_level):
if log_level.lower() == LogLevel.INFO:
self.logger.level = logbook.INFO
elif log_level.lower() == LogLevel.WARNING:
self.logger.level = logbook.WARNING
elif log_level.lower() == LogLevel.CRITICAL:
self.logger.level = logbook.CRITICAL
elif log_level.lower() == LogLevel.NOTSET:
self.logger.level = logbook.NOTSET
def __get_logbook_logging_level(level_str):
# logbook levels:
# CRITICAL = 15
# ERROR = 14
# WARNING = 13
# NOTICE = 12
# INFO = 11
# DEBUG = 10
# TRACE = 9
# NOTSET = 0
level_str = level_str.upper().strip()
if level_str == 'CRITICAL':
return logbook.CRITICAL
elif level_str == 'ERROR':
return logbook.ERROR
elif level_str == 'WARNING':
return logbook.WARNING
elif level_str == 'NOTICE':
return logbook.NOTICE
elif level_str == 'INFO':
return logbook.INFO
elif level_str == 'DEBUG':
return logbook.DEBUG
elif level_str == 'TRACE':
return logbook.TRACE
elif level_str == 'NOTSET':
return logbook.NOTSET
else:
raise ValueError("Unknown logbook log level: {}".format(level_str))
def __get_logging_method(log_level):
method_table = {
logbook.DEBUG: logger.debug,
logbook.INFO: logger.info,
logbook.WARNING: logger.warning,
logbook.ERROR: logger.error,
logbook.CRITICAL: logger.critical,
}
method = method_table.get(log_level)
if method is None:
raise ValueError("unknown log level: {}".format(log_level))
return method
log_service.py 文件源码
项目:cookiecutter-pyramid-talk-python-starter
作者: mikeckennedy
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def __get_logbook_logging_level(level_str):
# logbook levels:
# CRITICAL = 15
# ERROR = 14
# WARNING = 13
# NOTICE = 12
# INFO = 11
# DEBUG = 10
# TRACE = 9
# NOTSET = 0
level_str = level_str.upper().strip()
if level_str == 'CRITICAL':
return logbook.CRITICAL
elif level_str == 'ERROR':
return logbook.ERROR
elif level_str == 'WARNING':
return logbook.WARNING
elif level_str == 'NOTICE':
return logbook.NOTICE
elif level_str == 'INFO':
return logbook.INFO
elif level_str == 'DEBUG':
return logbook.DEBUG
elif level_str == 'TRACE':
return logbook.TRACE
elif level_str == 'NOTSET':
return logbook.NOTSET
else:
raise ValueError("Unknown logbook log level: {}".format(level_str))
def __get_logbook_logging_level(level_str):
# logbook levels:
# CRITICAL = 15
# ERROR = 14
# WARNING = 13
# NOTICE = 12
# INFO = 11
# DEBUG = 10
# TRACE = 9
# NOTSET = 0
level_str = level_str.upper().strip()
if level_str == 'CRITICAL':
return logbook.CRITICAL
elif level_str == 'ERROR':
return logbook.ERROR
elif level_str == 'WARNING':
return logbook.WARNING
elif level_str == 'NOTICE':
return logbook.NOTICE
elif level_str == 'INFO':
return logbook.INFO
elif level_str == 'DEBUG':
return logbook.DEBUG
elif level_str == 'TRACE':
return logbook.TRACE
elif level_str == 'NOTSET':
return logbook.NOTSET
else:
raise ValueError("Unknown logbook log level: {}".format(level_str))
def _get_logging_level(verbosity):
# noinspection PyPackageRequirements
import logbook
return {
1: logbook.CRITICAL,
2: logbook.ERROR,
3: logbook.WARNING,
4: logbook.NOTICE,
5: logbook.INFO,
6: logbook.DEBUG,
7: logbook.TRACE,
}[verbosity]
def test_eod_order_cancel_minute(self, direction, minute_emission):
"""
Test that EOD order cancel works in minute mode for both shorts and
longs, and both daily emission and minute emission
"""
# order 1000 shares of asset1. the volume is only 1 share per bar,
# so the order should be cancelled at the end of the day.
algo = self.prep_algo(
"set_cancel_policy(cancel_policy.EODCancel())",
amount=np.copysign(1000, direction),
minute_emission=minute_emission
)
log_catcher = TestHandler()
with log_catcher:
results = algo.run(self.data_portal)
for daily_positions in results.positions:
self.assertEqual(1, len(daily_positions))
self.assertEqual(
np.copysign(389, direction),
daily_positions[0]["amount"],
)
self.assertEqual(1, results.positions[0][0]["sid"])
# should be an order on day1, but no more orders afterwards
np.testing.assert_array_equal([1, 0, 0],
list(map(len, results.orders)))
# should be 389 txns on day 1, but no more afterwards
np.testing.assert_array_equal([389, 0, 0],
list(map(len, results.transactions)))
the_order = results.orders[0][0]
self.assertEqual(ORDER_STATUS.CANCELLED, the_order["status"])
self.assertEqual(np.copysign(389, direction), the_order["filled"])
warnings = [record for record in log_catcher.records if
record.level == WARNING]
self.assertEqual(1, len(warnings))
if direction == 1:
self.assertEqual(
"Your order for 1000 shares of ASSET1 has been partially "
"filled. 389 shares were successfully purchased. "
"611 shares were not filled by the end of day and "
"were canceled.",
str(warnings[0].message)
)
elif direction == -1:
self.assertEqual(
"Your order for -1000 shares of ASSET1 has been partially "
"filled. 389 shares were successfully sold. "
"611 shares were not filled by the end of day and "
"were canceled.",
str(warnings[0].message)
)
def test_order_in_quiet_period(self, name, sid):
asset = self.asset_finder.retrieve_asset(sid)
algo_code = dedent("""
from catalyst.api import (
sid,
order,
order_value,
order_percent,
order_target,
order_target_percent,
order_target_value
)
def initialize(context):
pass
def handle_data(context, data):
order(sid({sid}), 1)
order_value(sid({sid}), 100)
order_percent(sid({sid}), 0.5)
order_target(sid({sid}), 50)
order_target_percent(sid({sid}), 0.5)
order_target_value(sid({sid}), 50)
""").format(sid=sid)
# run algo from 1/6 to 1/7
algo = TradingAlgorithm(
script=algo_code,
env=self.env,
sim_params=SimulationParameters(
start_session=pd.Timestamp("2016-01-06", tz='UTC'),
end_session=pd.Timestamp("2016-01-07", tz='UTC'),
trading_calendar=self.trading_calendar,
data_frequency="minute"
)
)
with make_test_handler(self) as log_catcher:
algo.run(self.data_portal)
warnings = [r for r in log_catcher.records
if r.level == logbook.WARNING]
# one warning per order on the second day
self.assertEqual(6 * 390, len(warnings))
for w in warnings:
expected_message = (
'Cannot place order for ASSET{sid}, as it has de-listed. '
'Any existing positions for this asset will be liquidated '
'on {date}.'.format(sid=sid, date=asset.auto_close_date)
)
self.assertEqual(expected_message, w.message)
def server_factory(request, event_loop, server_permanent_keys):
"""
Return a factory to create :class:`saltyrtc.Server` instances.
"""
# Enable asyncio debug logging
os.environ['PYTHONASYNCIODEBUG'] = '1'
# Enable logging
util.enable_logging(level=logbook.NOTICE, redirect_loggers={
'asyncio': logbook.WARNING,
'websockets': logbook.WARNING,
})
# Push handler
logging_handler = logbook.StderrHandler()
logging_handler.push_application()
_server_instances = []
def _server_factory(permanent_keys=None):
if permanent_keys is None:
permanent_keys = server_permanent_keys
# Setup server
port = unused_tcp_port()
coroutine = serve(
util.create_ssl_context(
pytest.saltyrtc.cert, dh_params_file=pytest.saltyrtc.dh_params),
permanent_keys,
host=pytest.saltyrtc.ip,
port=port,
loop=event_loop,
server_class=TestServer,
)
server_ = event_loop.run_until_complete(coroutine)
# Inject timeout and address (little bit of a hack but meh...)
server_.timeout = _get_timeout(request=request)
server_.address = (pytest.saltyrtc.ip, port)
_server_instances.append(server_)
def fin():
server_.close()
event_loop.run_until_complete(server_.wait_closed())
_server_instances.remove(server_)
if len(_server_instances) == 0:
logging_handler.pop_application()
request.addfinalizer(fin)
return server_
return _server_factory