def get(self):
cached = memcache.get('worst_handler')
if cached:
self.response.out.write(cached)
else:
response_writer = csv.writer(
self.response, delimiter=',', quoting=csv.QUOTE_ALL)
# Instruct endpoint to cache for 1 day.
self.response.headers['Cache-control'] = 'public, max-age=86400'
# Using GQL as a test - will create new index
for line in ndb.gql(
'select distinct screen_name, twitter_id, score '
'from Score order by score desc limit 20000'):
response_writer.writerow(
[line.screen_name, line.twitter_id, line.score])
memcache.set('worst_handler', self.response.text, 86400)
python类QUOTE_ALL的实例源码
def get(self):
cached = memcache.get('worst_hashtags')
if cached:
self.response.out.write(cached)
else:
response_writer = csv.writer(
self.response, delimiter=',', quoting=csv.QUOTE_ALL)
# Instruct endpoint to cache for 1 day.
self.response.headers['Cache-control'] = 'public, max-age=86400'
c = Counter()
for s in Score.query().order(-Score.score).iter(
limit=5000, projection=(Score.hashtags)):
if s.hashtags is not None:
c.update((h.lower() for h in s.hashtags))
for tag, tag_count in c.most_common(100):
response_writer.writerow(
[tag, tag_count])
memcache.set('worst_hashtags', self.response.text, 86400)
def get(self):
cached = memcache.get('worst_websitess')
if cached:
self.response.out.write(cached)
else:
response_writer = csv.writer(
self.response, delimiter=',', quoting=csv.QUOTE_ALL)
# Instruct endpoint to cache for 1 day.
self.response.headers['Cache-control'] = 'public, max-age=86400'
c = Counter()
for s in Score.query().order(-Score.score).iter(
limit=5000, projection=(Score.websites)):
if s.websites is not None:
c.update((h.lower() for h in s.websites))
for site, site_count in c.most_common(200):
response_writer.writerow(
[site, site_count])
memcache.set('worst_websitess', self.response.text, 86400)
def get(self):
cached = memcache.get('worst_unknown_websites')
if cached:
self.response.out.write(cached)
else:
response_writer = csv.writer(
self.response, delimiter=',', quoting=csv.QUOTE_ALL)
# Instruct endpoint to cache for 1 day.
self.response.headers['Cache-control'] = 'public, max-age=86400'
c = Counter()
for s in Score.query().order(-Score.score).iter(
limit=5000, projection=(Score.websites)):
if s.websites is not None:
c.update((h.lower() for h in s.websites
if h.lower() not in c('KNOWN_SITES')))
for site, site_count in c.most_common(200):
response_writer.writerow(
[site, site_count])
memcache.set('worst_unknown_websites', self.response.text, 86400)
def _test_arg_valid(self, ctor, arg):
self.assertRaises(TypeError, ctor)
self.assertRaises(TypeError, ctor, None)
self.assertRaises(TypeError, ctor, arg, bad_attr = 0)
self.assertRaises(TypeError, ctor, arg, delimiter = 0)
self.assertRaises(TypeError, ctor, arg, delimiter = 'XX')
self.assertRaises(csv.Error, ctor, arg, 'foo')
self.assertRaises(TypeError, ctor, arg, delimiter=None)
self.assertRaises(TypeError, ctor, arg, delimiter=1)
self.assertRaises(TypeError, ctor, arg, quotechar=1)
self.assertRaises(TypeError, ctor, arg, lineterminator=None)
self.assertRaises(TypeError, ctor, arg, lineterminator=1)
self.assertRaises(TypeError, ctor, arg, quoting=None)
self.assertRaises(TypeError, ctor, arg,
quoting=csv.QUOTE_ALL, quotechar='')
self.assertRaises(TypeError, ctor, arg,
quoting=csv.QUOTE_ALL, quotechar=None)
def _test_dialect_attrs(self, ctor, *args):
# Now try with dialect-derived options
class dialect:
delimiter='-'
doublequote=False
escapechar='^'
lineterminator='$'
quotechar='#'
quoting=csv.QUOTE_ALL
skipinitialspace=True
strict=False
args = args + (dialect,)
obj = ctor(*args)
self.assertEqual(obj.dialect.delimiter, '-')
self.assertEqual(obj.dialect.doublequote, False)
self.assertEqual(obj.dialect.escapechar, '^')
self.assertEqual(obj.dialect.lineterminator, "$")
self.assertEqual(obj.dialect.quotechar, '#')
self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL)
self.assertEqual(obj.dialect.skipinitialspace, True)
self.assertEqual(obj.dialect.strict, False)
def read_table(zip_file, table_name, columns, dtypes=None):
if not dtypes:
dtypes = {}
with zip_file.open(table_name + '.txt', 'r') as csv_stream:
hearer = csv_stream.readline().strip()
names = [
GET_COLUMN_NAME_REGEX.sub(b'', name).decode('ascii')
for name in hearer.split(b',')]
table = pandas.read_csv(
csv_stream, names=names, quotechar='"', quoting=csv.QUOTE_ALL,
usecols=[col for col in columns])
table = [
numpy.asarray(remove_nans(table[column]), dtype=dtypes.get(column))
for column in columns]
return table
def gen_csv(csvfile, page_list):
# Define writer
writer = csv.writer(csvfile, quoting=csv.QUOTE_ALL)
# Field names
field_names = ['PageID', 'DescriptionURL', 'ImageURL', 'ImageSHA1',
'PixelHeight', 'PixelWidth',
'PaintingID', 'Artist', 'RealDimensions']
writer.writerow(field_names)
# For each image
for page in page_list:
writer.writerow([page.page_id, page.description_url, page.img_url,
page.img_sha1, page.img_height, page.img_width,
page.paint_id, page.artist, page.dim])
# Main
def get_csv_writer_fmtparams(self):
return {
'dialect': 'excel',
'quoting': csv.QUOTE_ALL,
}
def write_dict_to_csv(list_of_dictionaries, output_file):
"""write a list of dictionaries to a csv file."""
fieldnames = ['centroid_lon', 'centroid_lat', 'feature_type', 'name', 'source']
with open(output_file, 'w', newline = '') as f:
w = csv.DictWriter(f, fieldnames, quoting = csv.QUOTE_ALL)
w.writeheader()
w.writerows(list_of_dictionaries)
def write_to_csv(received_data = [], *args):
first_data = ["Company", "Last Traded Price", "Change", "Total Listed Shares", "Paid Up Value", "Total Paid Up Value", "Closing Market Price", "Market Capitalization"]
if os.path.isfile("datafile.csv"):
with open("datafile.csv", "a") as output:
writer = csv.writer(output, quoting=csv.QUOTE_ALL)
writer.writerow(received_data)
received_data[:] = []
else:
with open("datafile.csv", "w") as output:
writer = csv.writer(output, quoting=csv.QUOTE_ALL)
writer.writerow(first_data)
writer.writerow(received_data)
received_data[:] = []
def module_run(self):
filename = self.options['filename']
# codecs module not used because the csv module converts to ascii
with open(filename, 'w') as outfile:
# build a list of table names
table = self.options['table']
rows = self.query('SELECT * FROM "%s" ORDER BY 1' % (table))
cnt = 0
for row in rows:
row = [x if x else '' for x in row]
if any(row):
cnt += 1
csvwriter = csv.writer(outfile, quoting=csv.QUOTE_ALL)
csvwriter.writerow([s.encode("utf-8") for s in row])
self.output('%d records added to \'%s\'.' % (cnt, filename))
def write_csv_file(filename, result, path):
"""Writes the result to csv with the given filename.
Args:
filename (str): Filename to write to.
path (str): Directory path to use.
"""
output = open(path + filename + '.csv', 'wb')
writer = csv.writer(output, quoting=csv.QUOTE_ALL, lineterminator='\n')
for val in result:
writer.writerow([val])
# Print one a single row
# writer.writerow(result)
def execute(self, statement):
execution_id = self.athena.start_query_execution(self.dbname, statement)
if not execution_id:
return
while True:
stats = self.athena.get_query_execution(execution_id)
status = stats['QueryExecution']['Status']['State']
if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
break
time.sleep(0.2) # 200ms
if status == 'SUCCEEDED':
results = self.athena.get_query_results(execution_id)
headers = [h['Name'].encode("utf-8") for h in results['ResultSet']['ResultSetMetadata']['ColumnInfo']]
if self.format in ['CSV', 'CSV_HEADER']:
csv_writer = csv.writer(sys.stdout, quoting=csv.QUOTE_ALL)
if self.format == 'CSV_HEADER':
csv_writer.writerow(headers)
csv_writer.writerows([[text.encode("utf-8") for text in row] for row in self.athena.yield_rows(results, headers)])
elif self.format == 'TSV':
print(tabulate([row for row in self.athena.yield_rows(results, headers)], tablefmt='tsv'))
elif self.format == 'TSV_HEADER':
print(tabulate([row for row in self.athena.yield_rows(results, headers)], headers=headers, tablefmt='tsv'))
elif self.format == 'VERTICAL':
for num, row in enumerate(self.athena.yield_rows(results, headers)):
print('--[RECORD {}]--'.format(num+1))
print(tabulate(zip(*[headers, row]), tablefmt='presto'))
else: # ALIGNED
print(tabulate([x for x in self.athena.yield_rows(results, headers)], headers=headers, tablefmt='presto'))
if status == 'FAILED':
print(stats['QueryExecution']['Status']['StateChangeReason'])
def Y_Output():
mylist = [1,1,0,0,1,1,1,1,0,1,1,0,1,1,1,0,0,1,0,0,1,1,1,1,0]
myfile = open("sandeep_Y.csv", 'wb')
wr = csv.writer(myfile, quoting=csv.QUOTE_ALL)
wr.writerow(mylist)
with open('prakhar_Y.csv', 'rb') as csvfile:
spamreader = csv.reader(csvfile)
for row in spamreader:
print row
def test_write_quoting(self):
self._write_test(['a',1,'p,q'], 'a,1,"p,q"')
self.assertRaises(csv.Error,
self._write_test,
['a',1,'p,q'], 'a,1,p,q',
quoting = csv.QUOTE_NONE)
self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
quoting = csv.QUOTE_MINIMAL)
self._write_test(['a',1,'p,q'], '"a",1,"p,q"',
quoting = csv.QUOTE_NONNUMERIC)
self._write_test(['a',1,'p,q'], '"a","1","p,q"',
quoting = csv.QUOTE_ALL)
self._write_test(['a\nb',1], '"a\nb","1"',
quoting = csv.QUOTE_ALL)
def test_write_quoting(self):
self._write_test(['a',1,'p,q'], 'a,1,"p,q"')
self._write_error_test(csv.Error, ['a',1,'p,q'],
quoting = csv.QUOTE_NONE)
self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
quoting = csv.QUOTE_MINIMAL)
self._write_test(['a',1,'p,q'], '"a",1,"p,q"',
quoting = csv.QUOTE_NONNUMERIC)
self._write_test(['a',1,'p,q'], '"a","1","p,q"',
quoting = csv.QUOTE_ALL)
self._write_test(['a\nb',1], '"a\nb","1"',
quoting = csv.QUOTE_ALL)
def test_write_quoting(self):
self._write_test(['a',1,'p,q'], 'a,1,"p,q"')
self._write_error_test(csv.Error, ['a',1,'p,q'],
quoting = csv.QUOTE_NONE)
self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
quoting = csv.QUOTE_MINIMAL)
self._write_test(['a',1,'p,q'], '"a",1,"p,q"',
quoting = csv.QUOTE_NONNUMERIC)
self._write_test(['a',1,'p,q'], '"a","1","p,q"',
quoting = csv.QUOTE_ALL)
self._write_test(['a\nb',1], '"a\nb","1"',
quoting = csv.QUOTE_ALL)
def test_quoting(self):
class mydialect(csv.Dialect):
delimiter = ";"
escapechar = '\\'
doublequote = False
skipinitialspace = True
lineterminator = '\r\n'
quoting = csv.QUOTE_NONE
d = mydialect()
self.assertEqual(d.quoting, csv.QUOTE_NONE)
mydialect.quoting = None
self.assertRaises(csv.Error, mydialect)
mydialect.doublequote = True
mydialect.quoting = csv.QUOTE_ALL
mydialect.quotechar = '"'
d = mydialect()
self.assertEqual(d.quoting, csv.QUOTE_ALL)
self.assertEqual(d.quotechar, '"')
self.assertTrue(d.doublequote)
mydialect.quotechar = "''"
with self.assertRaises(csv.Error) as cm:
mydialect()
self.assertEqual(str(cm.exception),
'"quotechar" must be an 1-character string')
mydialect.quotechar = 4
with self.assertRaises(csv.Error) as cm:
mydialect()
self.assertEqual(str(cm.exception),
'"quotechar" must be string, not int')
def __create_stocklist(self):
"""???????CSV????????????
Returns:
????????????????????????
"""
def get_row(idx):
"""CSV????1???????????"""
return ('TEST{}'.format(idx),
'Test {}'.format(idx),
'Sector {}'.format(idx % 10),)
header = ('Symbol', 'Name', 'Sector',)
num_markets = len(const.MARKET_DATA.keys())
lis = [[
get_row(idx2) for idx2, row in enumerate(range(self.num_stocks))
if idx2 % num_markets == idx1
] for idx1 in range(len(const.MARKET_DATA.keys()))]
for idx, (market_id, market_name) in enumerate(const.MARKET_DATA.items()):
with open(os.path.join(self.dest_dir_stocklist, market_name + '.csv'), 'w',
encoding=const.DEFAULT_FILE_ENCODING, newline='') as fp:
writer = csv.writer(fp, quoting=csv.QUOTE_ALL)
writer.writerow(header)
writer.writerows(lis[idx])
return [row[0] for row in chain(*lis)] # Symbol??????????