def configure_logging(path_to_log_directory):
"""
Configure logger
:param path_to_log_directory: path to directory to write log file in
:return:
"""
log_filename = datetime.datetime.now().strftime('%Y-%m-%d') + '.log'
importer_logger = logging.getLogger('importer_logger')
importer_logger.setLevel(LOG_LEVEL)
formatter = logging.Formatter('%(asctime)s : %(levelname)s : %(message)s')
fh = logging.FileHandler(filename=os.path.join(path_to_log_directory, log_filename))
fh.setLevel(LOG_LEVEL)
fh.setFormatter(formatter)
importer_logger.addHandler(fh)
sh = logging.StreamHandler(sys.stdout)
sh.setLevel(LOG_LEVEL)
sh.setFormatter(formatter)
importer_logger.addHandler(sh)
python类stdout()的实例源码
def __init__(self, addr="127.0.0.1", port=4444):
"""Initialize the socket and initialize pdb."""
# Backup stdin and stdout before replacing them by the socket handle
self.old_stdout = sys.stdout
self.old_stdin = sys.stdin
# Open a 'reusable' socket to let the webapp reload on the same port
self.skt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
self.skt.bind((addr, port))
self.skt.listen(1)
(clientsocket, address) = self.skt.accept()
handle = clientsocket.makefile('rw')
pdb.Pdb.__init__(self, completekey='tab', stdin=handle, stdout=handle)
sys.stdout = sys.stdin = handle
def load_data(self):
# work in the parent of the pages directory, because we
# want the filenames to begin "pages/...".
chdir(dirname(self.setup.pages_dir))
rel = relpath(self.setup.pages_dir)
for root, dirs, files in walk(rel):
for filename in files:
start, ext = splitext(filename)
if ext in self.setup.data_extensions:
#yield root, dirs, filename
loader = self.setup.data_loaders.get(ext)
path = join(root,filename)
if not loader:
raise SetupError("Identified data file '%s' by type '%s' but no loader found" % (filename, ext))
data_key = join(root, start)
loaded_dict = loader.loadf(path)
self.data[data_key] = loaded_dict
#self.setup.log.debug("data key [%s] ->" % (data_key, ), root, filename, ); pprint.pprint(loaded_dict, sys.stdout)
#pprint.pprint(self.data, sys.stdout)
#print("XXXXX data:", self.data)
def increment(self, inc_track=1024, inc_prog=1024, file_inc=True):
self.tracker += inc_track
self.progress += inc_prog
if file_inc:
while self.progress >= self.percent and self.tracker < self.size:
self.progress = self.progress - self.percent
self.tick += 1
space = self.bar_size - self.tick
total_percentage = 2 * self.tick
p_output = "[{}{}] %{}".format(p_bar * self.tick, ' ' * space, total_percentage)
sys.stdout.write(p_output)
sys.stdout.flush()
sys.stdout.write("\b" * (len(p_output)))
else:
self.tick = int((float(self.progress)/float(self.size)) * float(self.bar_size))
space = self.bar_size - self.tick
total_percentage = 2 * self.tick
p_output = "[{}{}] %{}".format(p_bar * self.tick, ' ' * space, total_percentage)
sys.stdout.write(p_output)
sys.stdout.flush()
sys.stdout.write("\b" * (len(p_output)))
def __init__(self, *args):
self.ip = args[0]
self.port = args[1]
application = service.Application("kademlia")
application.setComponent(ILogObserver, log.FileLogObserver(sys.stdout, log.INFO).emit)
if os.path.isfile('cache.pickle'):
kserver = Server.loadState('cache.pickle')
else:
kserver = Server()
kserver.bootstrap([(self.ip, self.port)])
kserver.saveStateRegularly('cache.pickle', 10)
server = internet.UDPServer(self.port, kserver.protocol)
server.setServiceParent(application)
self.log = logging.getLogger(resource_filename(__name__, __file__))
def report(self, full_report, ensure_ascii=False):
"""Returns a json dict via stdout.
:param full_report: Analyzer results as dict.
:param ensure_ascii: Force ascii output. Default: False"""
summary = {}
try:
summary = self.summary(full_report)
except:
pass
report = {
'success': True,
'summary': summary,
'artifacts': self.artifacts(full_report),
'full': full_report
}
json.dump(report, self.fpoutput, ensure_ascii=ensure_ascii)
def print_progress(bi, batch_total, is_training, output_string):
if (((bi % 10) == 0) and cf.verbose) or (bi == batch_total-1):
if is_training:
sys.stdout.write("TRAIN ")
else:
sys.stdout.write("VALID ")
sys.stdout.write(output_string)
if bi == (batch_total-1):
sys.stdout.write("\n")
else:
sys.stdout.write("\r")
sys.stdout.flush()
######
#main#
######
#set the seeds
def configure_logging(self):
"""
Configure logging to log to std output as well as to log file
"""
log_level = logging.DEBUG
log_filename = datetime.now().strftime('%Y-%m-%d') + '.log'
sp_logger = logging.getLogger('sp_logger')
sp_logger.setLevel(log_level)
formatter = logging.Formatter('%(asctime)s : %(levelname)s : %(message)s')
fh = logging.FileHandler(filename=self.log_dir + log_filename)
fh.setLevel(log_level)
fh.setFormatter(formatter)
sp_logger.addHandler(fh)
sh = logging.StreamHandler(sys.stdout)
sh.setLevel(log_level)
sh.setFormatter(formatter)
sp_logger.addHandler(sh)
query_handler.py 文件源码
项目:almond-nnparser
作者: Stanford-Mobisocial-IoT-Lab
项目源码
文件源码
阅读 33
收藏 0
点赞 0
评论 0
def get(self):
query = self.get_query_argument("q")
locale = self.get_query_argument("locale", default="en-US")
language = self.application.get_language(locale)
limit = int(self.get_query_argument("limit", default=5))
print('GET /query', query)
tokenized = yield language.tokenizer.tokenize(query)
result = yield self._do_run_query(language, tokenized, limit)
if len(result) > 0 and self.application.database:
self.application.database.execute("insert into example_utterances (is_base, language, type, utterance, target_json, click_count) " +
"values (0, %(language)s, 'log', %(utterance)s, %(target_json)s, -1)",
language=language.tag,
utterance=query,
target_json=result[0]['answer'])
if language.exact:
exact = language.exact.get(query)
if exact:
result.insert(0, dict(answer=exact, prob=1, score='Infinity'))
sys.stdout.flush()
self.write(dict(candidates=result, sessionId='X'))
def main():
import argparse
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
parser = argparse.ArgumentParser(description="Linux distro info tool")
parser.add_argument(
'--json',
'-j',
help="Output in machine readable format",
action="store_true")
args = parser.parse_args()
if args.json:
logger.info(json.dumps(info(), indent=4, sort_keys=True))
else:
logger.info('Name: %s', name(pretty=True))
distribution_version = version(pretty=True)
if distribution_version:
logger.info('Version: %s', distribution_version)
distribution_codename = codename()
if distribution_codename:
logger.info('Codename: %s', distribution_codename)
def open_spinner(message):
# Interactive spinner goes directly to sys.stdout rather than being routed
# through the logging system, but it acts like it has level INFO,
# i.e. it's only displayed if we're at level INFO or better.
# Non-interactive spinner goes through the logging system, so it is always
# in sync with logging configuration.
if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:
spinner = InteractiveSpinner(message)
else:
spinner = NonInteractiveSpinner(message)
try:
with hidden_cursor(sys.stdout):
yield spinner
except KeyboardInterrupt:
spinner.finish("canceled")
raise
except Exception:
spinner.finish("error")
raise
else:
spinner.finish("done")
def main():
"""Small main program"""
import sys, getopt
try:
opts, args = getopt.getopt(sys.argv[1:], 'deut')
except getopt.error as msg:
sys.stdout = sys.stderr
print(msg)
print("""usage: %s [-d|-e|-u|-t] [file|-]
-d, -u: decode
-e: encode (default)
-t: encode and decode string 'Aladdin:open sesame'"""%sys.argv[0])
sys.exit(2)
func = encode
for o, a in opts:
if o == '-e': func = encode
if o == '-d': func = decode
if o == '-u': func = decode
if o == '-t': test(); return
if args and args[0] != '-':
with open(args[0], 'rb') as f:
func(f, sys.stdout.buffer)
else:
func(sys.stdin.buffer, sys.stdout.buffer)
def seek(self, newpos, whence=0):
if whence==0:
if newpos < self.pos:
print("WARNING: can't seek stdout backwards")
return -1
if newpos > self.pos:
self.seekforward(newpos - self.pos)
self.pos = newpos
elif whence==1:
if newpos < 0:
print("WARNING: can't seek stdout backwards")
return -1
if newpos > 0:
self.seekforward(newpos)
self.pos += newpos
else:
print("WARNING: can't seek stdout from EOF")
return -1
def run_in_real_python(self, code):
real_stdout = sys.stdout
py_stdout = io.StringIO()
sys.stdout = py_stdout
py_value = py_exc = None
globs = {
'__builtins__': __builtins__,
'__name__': '__main__',
'__doc__': None,
'__package__': None,
}
try:
py_value = eval(code, globs, globs)
except AssertionError: # pragma: no cover
raise
except Exception as e:
py_exc = e
finally:
sys.stdout = real_stdout
return py_value, py_exc, py_stdout
def open_spinner(message):
# Interactive spinner goes directly to sys.stdout rather than being routed
# through the logging system, but it acts like it has level INFO,
# i.e. it's only displayed if we're at level INFO or better.
# Non-interactive spinner goes through the logging system, so it is always
# in sync with logging configuration.
if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:
spinner = InteractiveSpinner(message)
else:
spinner = NonInteractiveSpinner(message)
try:
with hidden_cursor(sys.stdout):
yield spinner
except KeyboardInterrupt:
spinner.finish("canceled")
raise
except Exception:
spinner.finish("error")
raise
else:
spinner.finish("done")
def handle_display_options(self, option_order):
"""If there were any non-global "display-only" options
(--help-commands or the metadata display options) on the command
line, display the requested info and return true; else return
false.
"""
import sys
if six.PY2 or self.help_commands:
return _Distribution.handle_display_options(self, option_order)
# Stdout may be StringIO (e.g. in tests)
import io
if not isinstance(sys.stdout, io.TextIOWrapper):
return _Distribution.handle_display_options(self, option_order)
# Don't wrap stdout if utf-8 is already the encoding. Provides
# workaround for #334.
if sys.stdout.encoding.lower() in ('utf-8', 'utf8'):
return _Distribution.handle_display_options(self, option_order)
# Print metadata in UTF-8 no matter the platform
encoding = sys.stdout.encoding
errors = sys.stdout.errors
newline = sys.platform != 'win32' and '\n' or None
line_buffering = sys.stdout.line_buffering
sys.stdout = io.TextIOWrapper(
sys.stdout.detach(), 'utf-8', errors, newline, line_buffering)
try:
return _Distribution.handle_display_options(self, option_order)
finally:
sys.stdout = io.TextIOWrapper(
sys.stdout.detach(), encoding, errors, newline, line_buffering)
def print_attachments(stream, test, all_channels=False):
"""Print out subunit attachments.
Print out subunit attachments that contain content. This
runs in 2 modes, one for successes where we print out just stdout
and stderr, and an override that dumps all the attachments.
"""
channels = ('stdout', 'stderr')
for name, detail in test['details'].items():
# NOTE(sdague): the subunit names are a little crazy, and actually
# are in the form pythonlogging:'' (with the colon and quotes)
name = name.split(':')[0]
if detail.content_type.type == 'test':
detail.content_type.type = 'text'
if (all_channels or name in channels) and detail.as_text():
title = "Captured %s:" % name
stream.write("\n%s\n%s\n" % (title, ('~' * len(title))))
# indent attachment lines 4 spaces to make them visually
# offset
for line in detail.as_text().split('\n'):
stream.write(" %s\n" % line)
def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
starts = Starts(sys.stdout)
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly
))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
return 1
if args.post_fails:
print_fails(sys.stdout)
print_summary(sys.stdout)
return (0 if summary.wasSuccessful() else 1)
def test_loadSADFile_startorder(self):
maxpid=32768
try:
out=subprocess.Popen(['cat', '/proc/sys/kernel/pid_max'], stdout=subprocess.PIPE)
res=out.communicate()
maxpid=int(res[0].strip())
except:
pass
retval = sb.loadSADFile('sdr/dom/waveforms/ticket_462_w/ticket_462_w.sad.xml')
self.assertEquals(retval, True)
comp_ac = sb.getComponent('ticket_462_ac_1')
self.assertNotEquals(comp_ac, None)
comp = sb.getComponent('ticket_462_1')
self.assertNotEquals(comp, None)
if comp_ac._pid <= maxpid-1:
isless= comp_ac._pid < comp._pid
else:
isless=comp._pid < comp_ac._pid
self.assertTrue(isless)
def redirectSTDOUT(filename):
if _DEBUG == True:
print "redirectSTDOUT(): redirecting stdout/stderr to filename " + str(filename)
if type(filename) == str:
dirname = os.path.dirname(filename)
if len(dirname) == 0 or \
(len(dirname) > 0 and os.path.isdir(dirname)):
try:
f = open(filename,'w')
# Send stdout and stderr to provided filename
sys.stdout = f
sys.stderr = f
except Exception, e:
print "redirectSTDOUT(): ERROR - Unable to open file " + str(filename) + " for writing stdout and stderr " + str(e)
elif type(filename) == cStringIO.OutputType:
sys.stdout = filename
sys.stderr = filename
else:
print 'redirectSTDOUT(): failed to redirect stdout/stderr to ' + str(filename)
print 'redirectSTDOUT(): argument must be: string filename, cStringIO.StringIO object'
def plot(self):
if _domainless._DEBUG == True:
print "Plot:plot()"
# Error checking before launching plot
if self.usesPortIORString == None:
raise AssertionError, "Plot:plot() ERROR - usesPortIORString not set ... must call connect() on this object from another component"
if self._usesPortName == None:
raise AssertionError, "Plot:plot() ERROR - usesPortName not set ... must call connect() on this object from another component"
if self._dataType == None:
raise AssertionError, "Plot:plot() ERROR - dataType not set ... must call connect() on this object from another component"
plotCommand = str(self._eclipsePath) + "/bin/plotter.sh -portname " + str(self._usesPortName) + " -repid " + str(self._dataType) + " -ior " + str(self.usesPortIORString)
if _domainless._DEBUG == True:
print "Plot:plotCommand " + str(plotCommand)
args = _shlex.split(plotCommand)
if _domainless._DEBUG == True:
print "Plot:args " + str(args)
try:
dev_null = open('/dev/null','w')
sub_process = _subprocess.Popen(args,stdout=dev_null,preexec_fn=_os.setpgrp)
pid = sub_process.pid
self._processes[pid] = sub_process
except Exception, e:
raise AssertionError, "Plot:plot() Failed to launch plotting due to %s" % ( e)
def parse(inFileName):
doc = parsexml_(inFileName)
rootNode = doc.getroot()
rootTag, rootClass = get_root_tag(rootNode)
if rootClass is None:
rootTag = 'softPkg'
rootClass = softPkg
rootObj = rootClass.factory()
rootObj.build(rootNode)
# Enable Python to collect the space used by the DOM.
doc = None
## sys.stdout.write('<?xml version="1.0" ?>\n')
## rootObj.export(sys.stdout, 0, name_=rootTag,
## namespacedef_='',
## pretty_print=True)
return rootObj
def parseString(inString):
from StringIO import StringIO
doc = parsexml_(StringIO(inString))
rootNode = doc.getroot()
rootTag, rootClass = get_root_tag(rootNode)
if rootClass is None:
rootTag = 'softPkg'
rootClass = softPkg
rootObj = rootClass.factory()
rootObj.build(rootNode)
# Enable Python to collect the space used by the DOM.
doc = None
## sys.stdout.write('<?xml version="1.0" ?>\n')
## rootObj.export(sys.stdout, 0, name_="softPkg",
## namespacedef_='')
return rootObj
def parseLiteral(inFileName):
doc = parsexml_(inFileName)
rootNode = doc.getroot()
rootTag, rootClass = get_root_tag(rootNode)
if rootClass is None:
rootTag = 'softPkg'
rootClass = softPkg
rootObj = rootClass.factory()
rootObj.build(rootNode)
# Enable Python to collect the space used by the DOM.
doc = None
## sys.stdout.write('#from spd import *\n\n')
## sys.stdout.write('import spd as model_\n\n')
## sys.stdout.write('rootObj = model_.rootTag(\n')
## rootObj.exportLiteral(sys.stdout, 0, name_=rootTag)
## sys.stdout.write(')\n')
return rootObj
def parseString(inString):
from StringIO import StringIO
doc = parsexml_(StringIO(inString))
rootNode = doc.getroot()
rootTag, rootClass = get_root_tag(rootNode)
if rootClass is None:
rootTag = 'devicepkg'
rootClass = devicepkg
rootObj = rootClass.factory()
rootObj.build(rootNode)
# Enable Python to collect the space used by the DOM.
doc = None
## sys.stdout.write('<?xml version="1.0" ?>\n')
## rootObj.export(sys.stdout, 0, name_="devicepkg",
## namespacedef_='')
return rootObj
def parseLiteral(inFileName):
doc = parsexml_(inFileName)
rootNode = doc.getroot()
rootTag, rootClass = get_root_tag(rootNode)
if rootClass is None:
rootTag = 'devicepkg'
rootClass = devicepkg
rootObj = rootClass.factory()
rootObj.build(rootNode)
# Enable Python to collect the space used by the DOM.
doc = None
## sys.stdout.write('#from dpd import *\n\n')
## sys.stdout.write('import dpd as model_\n\n')
## sys.stdout.write('rootObj = model_.rootTag(\n')
## rootObj.exportLiteral(sys.stdout, 0, name_=rootTag)
## sys.stdout.write(')\n')
return rootObj
def parse(inFileName):
doc = parsexml_(inFileName)
rootNode = doc.getroot()
rootTag, rootClass = get_root_tag(rootNode)
if rootClass is None:
rootTag = 'deviceconfiguration'
rootClass = deviceconfiguration
rootObj = rootClass.factory()
rootObj.build(rootNode)
# Enable Python to collect the space used by the DOM.
doc = None
## sys.stdout.write('<?xml version="1.0" ?>\n')
## rootObj.export(sys.stdout, 0, name_=rootTag,
## namespacedef_='',
## pretty_print=True)
return rootObj
def parseString(inString):
from StringIO import StringIO
doc = parsexml_(StringIO(inString))
rootNode = doc.getroot()
rootTag, rootClass = get_root_tag(rootNode)
if rootClass is None:
rootTag = 'deviceconfiguration'
rootClass = deviceconfiguration
rootObj = rootClass.factory()
rootObj.build(rootNode)
# Enable Python to collect the space used by the DOM.
doc = None
## sys.stdout.write('<?xml version="1.0" ?>\n')
## rootObj.export(sys.stdout, 0, name_="deviceconfiguration",
## namespacedef_='')
return rootObj
def parseLiteral(inFileName):
doc = parsexml_(inFileName)
rootNode = doc.getroot()
rootTag, rootClass = get_root_tag(rootNode)
if rootClass is None:
rootTag = 'deviceconfiguration'
rootClass = deviceconfiguration
rootObj = rootClass.factory()
rootObj.build(rootNode)
# Enable Python to collect the space used by the DOM.
doc = None
## sys.stdout.write('#from dcd import *\n\n')
## sys.stdout.write('import dcd as model_\n\n')
## sys.stdout.write('rootObj = model_.rootTag(\n')
## rootObj.exportLiteral(sys.stdout, 0, name_=rootTag)
## sys.stdout.write(')\n')
return rootObj
def parseString(inString):
from StringIO import StringIO
doc = parsexml_(StringIO(inString))
rootNode = doc.getroot()
rootTag, rootClass = get_root_tag(rootNode)
if rootClass is None:
rootTag = 'domainmanagerconfiguration'
rootClass = domainmanagerconfiguration
rootObj = rootClass.factory()
rootObj.build(rootNode)
# Enable Python to collect the space used by the DOM.
doc = None
## sys.stdout.write('<?xml version="1.0" ?>\n')
## rootObj.export(sys.stdout, 0, name_="domainmanagerconfiguration",
## namespacedef_='')
return rootObj