def load_data(self):
# work in the parent of the pages directory, because we
# want the filenames to begin "pages/...".
chdir(dirname(self.setup.pages_dir))
rel = relpath(self.setup.pages_dir)
for root, dirs, files in walk(rel):
for filename in files:
start, ext = splitext(filename)
if ext in self.setup.data_extensions:
#yield root, dirs, filename
loader = self.setup.data_loaders.get(ext)
path = join(root,filename)
if not loader:
raise SetupError("Identified data file '%s' by type '%s' but no loader found" % (filename, ext))
data_key = join(root, start)
loaded_dict = loader.loadf(path)
self.data[data_key] = loaded_dict
#self.setup.log.debug("data key [%s] ->" % (data_key, ), root, filename, ); pprint.pprint(loaded_dict, sys.stdout)
#pprint.pprint(self.data, sys.stdout)
#print("XXXXX data:", self.data)
python类pprint()的实例源码
def loadData(self):
# Load the toilet collection data to pandas
collects = pd.read_sql('SELECT * FROM premodeling.toiletcollection', self.conn, coerce_float=True, params=None)
pprint.pprint(collects.keys())
collects = collects[['ToiletID','ToiletExID','Collection_Date','Area','Feces_kg_day','year','month']]
pprint.pprint(collects.keys())
# Load the density data to pandas
density = pd.read_sql('SELECT * FROM premodeling.toiletdensity', self.conn, coerce_float=True, params=None)
pprint.pprint(density.keys())
# Return the data
self.collects = collects
self.density = density
return(collects, density)
def constructGCSFilePath(fileUuid, tokenFile):
filters = {
"op": "=",
"content": {
"field": "file_id",
"value": [fileUuid]
}
}
params = {
"filters": json.dumps(filters)
}
query = "?expand=cases.project"
fileInfo = GDCDataUtils.query(tokenFile, "files", query=query, params=params).json()
pprint.pprint(fileInfo)
return "{project}/{strategy}/{platform}/{uuid}/{filename}".format(
project=fileInfo["data"]["hits"][0]["cases"][0]["project"]["project_id"],
strategy=str(fileInfo["data"]["hits"][0]["experimental_strategy"]),
platform=str(fileInfo["data"]["hits"][0]["platform"]),
uuid=str(fileUuid),
filename=str(fileInfo["data"]["hits"][0]["file_name"])
)
def pprint(self, *args, **kwargs):
"""
Pretty-printer for parsed results as a list, using the C{pprint} module.
Accepts additional positional or keyword args as defined for the
C{pprint.pprint} method. (U{http://docs.python.org/3/library/pprint.html#pprint.pprint})
Example::
ident = Word(alphas, alphanums)
num = Word(nums)
func = Forward()
term = ident | num | Group('(' + func + ')')
func <<= ident + Group(Optional(delimitedList(term)))
result = func.parseString("fna a,b,(fnb c,d,200),100")
result.pprint(width=40)
prints::
['fna',
['a',
'b',
['(', 'fnb', ['c', 'd', '200'], ')'],
'100']]
"""
pprint.pprint(self.asList(), *args, **kwargs)
# add support for pickle protocol
def pprint(self, *args, **kwargs):
"""
Pretty-printer for parsed results as a list, using the C{pprint} module.
Accepts additional positional or keyword args as defined for the
C{pprint.pprint} method. (U{http://docs.python.org/3/library/pprint.html#pprint.pprint})
Example::
ident = Word(alphas, alphanums)
num = Word(nums)
func = Forward()
term = ident | num | Group('(' + func + ')')
func <<= ident + Group(Optional(delimitedList(term)))
result = func.parseString("fna a,b,(fnb c,d,200),100")
result.pprint(width=40)
prints::
['fna',
['a',
'b',
['(', 'fnb', ['c', 'd', '200'], ')'],
'100']]
"""
pprint.pprint(self.asList(), *args, **kwargs)
# add support for pickle protocol
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.server_url or not opts.token:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.server_url, token=opts.token, ssl_verify=opts.ssl_verify)
start = 0
pagesize=100
while True:
results = cb.alert_search(opts.query, rows=int(pagesize), start=start)
if len(results['results']) == 0: break
for result in results['results']:
pprint.pprint(result)
start = start + int(pagesize)
def pprint(self, *args, **kwargs):
"""
Pretty-printer for parsed results as a list, using the C{pprint} module.
Accepts additional positional or keyword args as defined for the
C{pprint.pprint} method. (U{http://docs.python.org/3/library/pprint.html#pprint.pprint})
Example::
ident = Word(alphas, alphanums)
num = Word(nums)
func = Forward()
term = ident | num | Group('(' + func + ')')
func <<= ident + Group(Optional(delimitedList(term)))
result = func.parseString("fna a,b,(fnb c,d,200),100")
result.pprint(width=40)
prints::
['fna',
['a',
'b',
['(', 'fnb', ['c', 'd', '200'], ')'],
'100']]
"""
pprint.pprint(self.asList(), *args, **kwargs)
# add support for pickle protocol
def pprint(self, *args, **kwargs):
"""
Pretty-printer for parsed results as a list, using the C{pprint} module.
Accepts additional positional or keyword args as defined for the
C{pprint.pprint} method. (U{http://docs.python.org/3/library/pprint.html#pprint.pprint})
Example::
ident = Word(alphas, alphanums)
num = Word(nums)
func = Forward()
term = ident | num | Group('(' + func + ')')
func <<= ident + Group(Optional(delimitedList(term)))
result = func.parseString("fna a,b,(fnb c,d,200),100")
result.pprint(width=40)
prints::
['fna',
['a',
'b',
['(', 'fnb', ['c', 'd', '200'], ')'],
'100']]
"""
pprint.pprint(self.asList(), *args, **kwargs)
# add support for pickle protocol
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.server_url or not opts.token:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.server_url, token=opts.token, ssl_verify=opts.ssl_verify)
start = 0
pagesize=100
while True:
results = cb.alert_search(opts.query, rows=int(pagesize), start=start)
if len(results['results']) == 0: break
for result in results['results']:
pprint.pprint(result)
start = start + int(pagesize)
def pprint(to_be_printed):
"""nicely formated print"""
try:
import pprint as pp
# generate an instance PrettyPrinter
# pp.PrettyPrinter().pprint(to_be_printed)
pp.pprint(to_be_printed)
except ImportError:
if isinstance(to_be_printed, dict):
print('{')
for k, v in to_be_printed.items():
print("'" + k + "'" if str(k) == k else k,
': ',
"'" + v + "'" if str(v) == v else v,
sep="")
print('}')
else:
print('could not import pprint module, appling regular print')
print(to_be_printed)
# todo: this should rather be a class instance
def test(**kwargs):
"""
Runtime test case
"""
args = _parse_args(kwargs)
local_client = salt.client.LocalClient()
proposals = local_client.cmd(args['target'], 'proposal.test',
expr_form='compound', kwarg=args)
# determine which proposal to choose
for node, proposal in proposals.items():
_proposal = _choose_proposal(node, proposal, args)
if _proposal:
pprint.pprint(_proposal)
def peek(**kwargs):
"""
Display the output to the user
"""
args = _parse_args(kwargs)
local_client = salt.client.LocalClient()
proposals = local_client.cmd(args['target'], 'proposal.generate',
expr_form='compound', kwarg=args)
# determine which proposal to choose
for node, proposal in proposals.items():
_proposal = _choose_proposal(node, proposal, args)
if _proposal:
pprint.pprint(_proposal)
def _record_filter(args, base_dir):
"""
Save the filter provided
"""
filter_file = '{}/.filter'.format(base_dir)
if not isfile(filter_file):
# do a touch filter_file
open(filter_file, 'a').close()
current_filter = {}
with open(filter_file) as filehandle:
current_filter = yaml.load(filehandle)
if current_filter is None:
current_filter = {}
pprint.pprint(current_filter)
# filter a bunch of salt content and the target key before writing
rec_args = {k: v for k, v in args.items() if k is not 'target' and not
k.startswith('__')}
current_filter[args['target']] = rec_args
with open(filter_file, 'w') as filehandle:
yaml.dump(current_filter, filehandle, default_flow_style=False)
def systemInfo():
print("sys.platform: %s" % sys.platform)
print("sys.version: %s" % sys.version)
from .Qt import VERSION_INFO
print("qt bindings: %s" % VERSION_INFO)
global __version__
rev = None
if __version__ is None: ## this code was probably checked out from bzr; look up the last-revision file
lastRevFile = os.path.join(os.path.dirname(__file__), '..', '.bzr', 'branch', 'last-revision')
if os.path.exists(lastRevFile):
rev = open(lastRevFile, 'r').read().strip()
print("pyqtgraph: %s; %s" % (__version__, rev))
print("config:")
import pprint
pprint.pprint(CONFIG_OPTIONS)
## Rename orphaned .pyc files. This is *probably* safe :)
## We only do this if __version__ is None, indicating the code was probably pulled
## from the repository.
def systemInfo():
print("sys.platform: %s" % sys.platform)
print("sys.version: %s" % sys.version)
from .Qt import VERSION_INFO
print("qt bindings: %s" % VERSION_INFO)
global __version__
rev = None
if __version__ is None: ## this code was probably checked out from bzr; look up the last-revision file
lastRevFile = os.path.join(os.path.dirname(__file__), '..', '.bzr', 'branch', 'last-revision')
if os.path.exists(lastRevFile):
rev = open(lastRevFile, 'r').read().strip()
print("pyqtgraph: %s; %s" % (__version__, rev))
print("config:")
import pprint
pprint.pprint(CONFIG_OPTIONS)
## Rename orphaned .pyc files. This is *probably* safe :)
## We only do this if __version__ is None, indicating the code was probably pulled
## from the repository.
def pprint(self, *args, **kwargs):
"""
Pretty-printer for parsed results as a list, using the C{pprint} module.
Accepts additional positional or keyword args as defined for the
C{pprint.pprint} method. (U{http://docs.python.org/3/library/pprint.html#pprint.pprint})
Example::
ident = Word(alphas, alphanums)
num = Word(nums)
func = Forward()
term = ident | num | Group('(' + func + ')')
func <<= ident + Group(Optional(delimitedList(term)))
result = func.parseString("fna a,b,(fnb c,d,200),100")
result.pprint(width=40)
prints::
['fna',
['a',
'b',
['(', 'fnb', ['c', 'd', '200'], ')'],
'100']]
"""
pprint.pprint(self.asList(), *args, **kwargs)
# add support for pickle protocol
def query_api(term, location):
"""Queries the API by the input values from the user.
Args:
term (str): The search term to query.
location (str): The location of the business to query.
"""
response = search(API_KEY, term, location)
businesses = response.get('businesses')
if not businesses:
print(u'No businesses for {0} in {1} found.'.format(term, location))
return
business_id = businesses[0]['id']
print(u'{0} businesses found, querying business info ' \
'for the top result "{1}" ...'.format(
len(businesses), business_id))
response = get_business(API_KEY, business_id)
print(u'Result for business "{0}" found:'.format(business_id))
pprint.pprint(response, indent=2)
def pprint(self, *args, **kwargs):
"""
Pretty-printer for parsed results as a list, using the C{pprint} module.
Accepts additional positional or keyword args as defined for the
C{pprint.pprint} method. (U{http://docs.python.org/3/library/pprint.html#pprint.pprint})
Example::
ident = Word(alphas, alphanums)
num = Word(nums)
func = Forward()
term = ident | num | Group('(' + func + ')')
func <<= ident + Group(Optional(delimitedList(term)))
result = func.parseString("fna a,b,(fnb c,d,200),100")
result.pprint(width=40)
prints::
['fna',
['a',
'b',
['(', 'fnb', ['c', 'd', '200'], ')'],
'100']]
"""
pprint.pprint(self.asList(), *args, **kwargs)
# add support for pickle protocol
def store_serp_result(self, serp, config):
"""Store the parsed SERP page.
When called from SearchEngineScrape, then
a parser object is passed.
When called from caching, a list of serp object are given.
"""
if self.outfile:
data = self.row2dict(serp)
data['results'] = []
for link in serp.links:
data['results'].append(self.row2dict(link))
if self.output_format == 'json':
self.outfile.write(data)
elif self.output_format == 'csv':
serp = self.row2dict(serp)
self.outfile.write(data, serp)
elif self.output_format == 'stdout':
if config.get('print_results') == 'summarize':
print(serp)
elif config.get('print_results') == 'all':
pprint.pprint(data)
def pprint(self, *args, **kwargs):
"""
Pretty-printer for parsed results as a list, using the C{pprint} module.
Accepts additional positional or keyword args as defined for the
C{pprint.pprint} method. (U{http://docs.python.org/3/library/pprint.html#pprint.pprint})
Example::
ident = Word(alphas, alphanums)
num = Word(nums)
func = Forward()
term = ident | num | Group('(' + func + ')')
func <<= ident + Group(Optional(delimitedList(term)))
result = func.parseString("fna a,b,(fnb c,d,200),100")
result.pprint(width=40)
prints::
['fna',
['a',
'b',
['(', 'fnb', ['c', 'd', '200'], ')'],
'100']]
"""
pprint.pprint(self.asList(), *args, **kwargs)
# add support for pickle protocol
def pprint(self, *args, **kwargs):
"""
Pretty-printer for parsed results as a list, using the C{pprint} module.
Accepts additional positional or keyword args as defined for the
C{pprint.pprint} method. (U{http://docs.python.org/3/library/pprint.html#pprint.pprint})
Example::
ident = Word(alphas, alphanums)
num = Word(nums)
func = Forward()
term = ident | num | Group('(' + func + ')')
func <<= ident + Group(Optional(delimitedList(term)))
result = func.parseString("fna a,b,(fnb c,d,200),100")
result.pprint(width=40)
prints::
['fna',
['a',
'b',
['(', 'fnb', ['c', 'd', '200'], ')'],
'100']]
"""
pprint.pprint(self.asList(), *args, **kwargs)
# add support for pickle protocol
def write_statement(vardict):
"""
A function to write a conditional statement based on the conditions in a variable
Args
DICT[dict] VARDICT A dictionary of variable names, where the values are conditions
Returns
LIST[str] Conditions A list of condition statements
"""
conditions = []
for feat in vardict:
if bool(vardict[feat])==True:
# Dictionary is not empty, parse the split values into a statement
for split in vardict[feat]:
if ((split=='and')|(split=="or")):
statement = split.join(['("%s"%s%s)' %(feat,sp[0],sp[1])
for sp in vardict[feat][split]])
elif (split=='not'):
statement = split + ' "%s"%s%s' %(feat,
vardict[feat][split][0],
vardict[feat][split][1])
elif (split=='list'):
statement = '"%s"' %(feat) + "=any('{%s}')" %(','.join(vardict[feat][split]))
conditions.append('('+statement+')')
pprint.pprint(conditions)
return(conditions)
def run(parsed_args, job_type):
start = time()
worker_run(**vars(parsed_args))
logger.info(u"finished update in {} seconds".format(elapsed(start)))
# resp = None
# if job_type in ["normal"]:
# my_location = PageNew.query.get(parsed_args.id)
# resp = my_location.__dict__
# pprint(resp)
print "done"
return
# python doi_queue.py --hybrid --filename=data/dois_juan_accuracy.csv --dynos=40 --soup
def default(self, line, *args, **kwargs):
async def query():
data = await self.request.query(line)
if self.print_full_response:
print('*' * 80)
print('Full Response: \n')
pprint.pprint(data)
print('*' * 80)
print('\n')
table = Tabulate(data)
print(table.draw())
asyncio.get_event_loop().run_until_complete(query())
def get_replay():
response = requests.get(
"https://www.rocketleaguereplays.com/api/replays?owned",
params={
# "owned": ""
}
)
print(response)
print(response.url)
pprint(response.json())
# _id = response.json()['id']
#
# response = requests.post(
# "https://www.rocketleaguereplays.com/api/replays/{}".format(_id)
# )
# print(response)
# pprint(response.json())
def process_ping(filename, ip=None, check_ssh_connectivity_only=False):
if not os.path.isfile(filename):
return False
status_update('Trying to read ' + filename)
with open(filename) as f:
lines = f.readlines()
pprint.pprint(lines)
info = load_json(filename)
if not check_ssh_connectivity_only:
return info.get('pass', False)
cmd_list = info['command_list']
for cmd in cmd_list:
m = re.search(
'ssh (\S+) with provided username and passwd', cmd['cmd'])
if m:
if ip == m.group(1):
return cmd['pass']
return False
def submit_requests(queue, logger, key=ARGS.key):
"""Submit queued requests to each Dashboard org contained within."""
operations = {"add": meraki_admins.DashboardAdmins.add_admin,
"modify": meraki_admins.DashboardAdmins.update_admin,
"delete": meraki_admins.DashboardAdmins.del_admin}
for oid, user_list in queue.items():
submitter = meraki_admins.DashboardAdmins(oid, key)
for user in user_list:
operation = user.pop('operation')
request_id = user.pop('request_id')
if operation not in operations.keys():
error = "Unknown operation %s" % operation
# add new column to fail_tracker tied to original row
logger.fail_tracker[request_id]['error'] = error
print request_id
print logger.fail_tracker[request_id]['error']
print logger.fail_tracker[request_id]
continue
else:
user['orgAccess'] = user.pop('orgaccess') # requires camelcase
# pprint.pprint(user)
result = operations[operation](submitter, **user)
def start_containers(self, containers):
"""
Starts all given containers.
"""
started_containers = []
for container in containers:
if self.verbose:
print "Going to start the following container:"
pprint.pprint(container)
try:
self.log("Starting container '%s'..." % container['MyName'], no_nl=True)
self.client.start(container=container['Id'])
started_containers.append(container)
self.log("successful")
except docker.errors.APIError as e:
raise DockerError("Failed to start container: %s" % e)
return started_containers
#}}}
#{{{ Stop containers
def test( strng ):
global testnum
print(strng)
try:
bnf = CORBA_IDL_BNF()
tokens = bnf.parseString( strng )
print("tokens = ")
pprint.pprint( tokens.asList() )
imgname = "idlParse%02d.bmp" % testnum
testnum += 1
#~ tree2image.str2image( str(tokens.asList()), imgname )
except ParseException as err:
print(err.line)
print(" "*(err.column-1) + "^")
print(err)
print()
def runTest(self):
from examples.jsonParser import jsonObject
from test.jsonParserTests import test1,test2,test3,test4,test5
from test.jsonParserTests import test1,test2,test3,test4,test5
expected = [
[],
[],
[],
[],
[],
]
import pprint
for t,exp in zip((test1,test2,test3,test4,test5),expected):
result = jsonObject.parseString(t)
## print result.dump()
result.pprint()
print()
## if result.asList() != exp:
## print "Expected %s, parsed results as %s" % (exp, result.asList())