def create_destinations(self):
# type: () -> List[AbstractDestination]
destinations = [] # type: List[AbstractDestination]
if self._config.get('flush_stdout'):
destinations.append(Stdout())
if self._config.get('flush_graphite'):
for graphite_address in self._config['flush_graphite'].split(','):
graphite_address = graphite_address.strip().split(':')
graphite_host = graphite_address.pop(0).strip()
graphite_port = graphite_address and int(graphite_address.pop(
)) or 2003
destinations.append(Graphite(graphite_host, graphite_port))
if self._config.get('flush_file'):
for file_path in self._config['flush_file'].split('|'):
destinations.append(TextFile(file_path))
if self._config.get('flush_file_csv'):
for file_path in self._config['flush_file_csv'].split('|'):
destinations.append(CsvFile(file_path))
return destinations
python类append()的实例源码
def get_addresses_with_unique_ports(addresses):
# type: (str) -> List[Tuple[str, int]]
address_tuples = [tuple(address.strip().split(':')) for address in
addresses.split(',')]
result = [] # type: List[Tuple[str, int]]
ports = set() # type: Set[int]
for address in address_tuples:
host = address[0]
if len(address) > 1 and address[1]:
port_str = address[1]
port = int(port_str)
if port < 1 or port > 65535:
raise ValueError(
"Port {} is out of range".format(port_str))
if port in ports:
raise ValueError(
"Port {} is already specified before".format(port_str))
else:
port = DEFAULT_PORT
result.append((host, port))
ports.add(port)
return result
def init():
date_format = config.logging.date_format
level = log_level(config.logging.level)
handlers = []
# Ensure each process has its own unique log file
# to prevent file write conflicts
# TODO implement global log server with unique uuid per process
handlers.append(file_handler(date_format))
handlers.append(console_handler(date_format))
if config.logging.system.active:
handlers.append(system_file_handler(date_format))
logging.basicConfig(handlers=handlers, level=level, datefmt=date_format)
logging.info('Initialized logging')
def init_logger(self) -> None:
try:
log_path = QStandardPaths.writableLocation(QStandardPaths.AppConfigLocation)
except AttributeError:
if sys.platform == 'win32':
log_path = os.path.join(QDir.homePath(), 'AppData', 'Local', qApp.applicationName().lower())
elif sys.platform == 'darwin':
log_path = os.path.join(QDir.homePath(), 'Library', 'Preferences', qApp.applicationName().lower())
else:
log_path = os.path.join(QDir.homePath(), '.config', qApp.applicationName().lower())
os.makedirs(log_path, exist_ok=True)
self.console = ConsoleWidget(self)
self.consoleLogger = ConsoleHandler(self.console)
handlers = [logging.handlers.RotatingFileHandler(os.path.join(log_path, '%s.log'
% qApp.applicationName().lower()),
maxBytes=1000000, backupCount=1),
self.consoleLogger]
if self.parser.isSet(self.debug_option) or self.verboseLogs:
# noinspection PyTypeChecker
handlers.append(logging.StreamHandler())
logging.setLoggerClass(VideoLogger)
logging.basicConfig(handlers=handlers,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M',
level=logging.INFO)
logging.captureWarnings(capture=True)
sys.excepthook = MainWindow.log_uncaught_exceptions
def setup_logging(self, job, daemon=False, verbose=False):
#log_folder = '%s/%s' % (JOB_LOGS_DIRECTORY, job)
#file_path = '%s/%s' % (MIN_DATA_LOG, self.today)
if os.path.exists(JOB_LOGS_DIRECTORY) is False:
os.makedirs(JOB_LOGS_DIRECTORY)
log_filename = '%s/%s.log' % (JOB_LOGS_DIRECTORY, job)
logger = logging.getLogger()
if verbose:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
handlers = []
if daemon:
handlers.append(logging.handlers.TimedRotatingFileHandler(filename=log_filename, when='midnight'))
else:
handlers.append(logging.FileHandler(filename='%s.%s' % (log_filename, time.strftime('%Y-%m-%d'))))
handlers.append(logging.StreamHandler())
for handler in handlers:
if verbose:
handler.setLevel(logging.DEBUG)
else:
handler.setLevel(logging.INFO)
handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(message)s'))
logger.addHandler(handler)
def _create_socket_servers(self, addresses, socket_type):
# type: (str, int) -> List[SocketServer]
collectors = []
socket_addresses = self.get_addresses_with_unique_ports(
addresses)
for (host, port) in socket_addresses:
socket_server = self._configure_socket_server(
SocketServer(),
host=host,
port=port
)
socket_server.socket_type = socket_type
collectors.append(socket_server)
return collectors
def _close_logger(self):
# type: () -> None
if self.logger:
handlers = []
for handler in self.logger.handlers:
if hasattr(handler, 'close') and callable(handler.close):
handler.close()
handlers.append(handler)
map(self.logger.removeHandler, handlers)
self.logger = None
def _extract_handlers(handlers_dict):
handlers = []
if not handlers_dict:
raise ConfigurationError('no handlers are defined for logger')
for filename, handler_config in handlers_dict.items():
if not isinstance(handler_config, collections.abc.Mapping):
raise ConfigurationError(
'handler %s is not a dictionary' % filename
)
level = handler_config.get('level', 'debug').lower()
fmt = handler_config.get('format', '%(message)s')
datefmt = handler_config.get('datefmt', '%FT%T')
append = handler_config.get('append', False)
timestamp = handler_config.get('timestamp', None)
if filename == '&1':
hdlr = StreamHandler(stream=sys.stdout)
elif filename == '&2':
hdlr = StreamHandler(stream=sys.stderr)
else:
if timestamp:
basename, ext = os.path.splitext(filename)
filename = '%s_%s%s' % (
basename, datetime.now().strftime(timestamp), ext
)
hdlr = RotatingFileHandler(filename, mode='a+' if append else 'w+')
hdlr.setFormatter(logging.Formatter(fmt=fmt, datefmt=datefmt))
hdlr.setLevel(level)
handlers.append(hdlr)
return handlers