def __init__(self, f, dialect=csv.excel, **kwds):
f = UTF8Recoder(f)
self.reader = csv.reader(f, dialect=dialect, **kwds)
python类excel()的实例源码
def __init__(self, f, dialect=csv.excel, **kwds):
self.queue = io.BytesIO()
self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
self.stream = f
def handle(self, *args, **options):
tmpdir = tempfile.mkdtemp()
try:
queryset = AnalysisJob.objects.all().filter(status=AnalysisJob.Status.COMPLETE)
filter_set = AnalysisJobFilterSet()
queryset = filter_set.filter_latest(queryset, 'latest', True)
tmp_csv_filename = os.path.join(tmpdir, 'results.csv')
with open(tmp_csv_filename, 'w') as csv_file:
writer = None
fieldnames = []
for job in queryset:
row_data = {}
for export in EXPORTS:
columns, values = export(job)
if writer is None:
fieldnames = fieldnames + columns
for column, value in zip(columns, values):
row_data[column] = value
if writer is None:
writer = csv.DictWriter(csv_file,
fieldnames=fieldnames,
dialect=csv.excel,
quoting=csv.QUOTE_MINIMAL)
writer.writeheader()
writer.writerow(row_data)
s3_client = boto3.client('s3')
now = datetime.utcnow()
s3_key = 'analysis-spreadsheets/results-{}.csv'.format(now.strftime('%Y-%m-%dT%H%M'))
s3_client.upload_file(tmp_csv_filename, settings.AWS_STORAGE_BUCKET_NAME, s3_key)
logger.info('File uploaded to: s3://{}/{}'
.format(settings.AWS_STORAGE_BUCKET_NAME, s3_key))
finally:
shutil.rmtree(tmpdir)
def get_csv(self):
# Call the URL, get the response, parse it strictly as CSV,
# and return the list of dictionaries
rsp = self.client.get(self.url)
self.assertEqual(200, rsp.status_code)
dialect = csv.excel()
dialect.strict = True
reader = csv.DictReader(StringIO(rsp.content), dialect=dialect)
result = []
for item in reader:
for k, v in item.iteritems():
item[k] = v.decode('utf-8')
result.append(item)
return result
def __init__(self, *args, **kwargs):
super(RankerRelevanceFileQueryStream, self).__init__(*args, **kwargs)
dialect = csv.excel
# The following explicit assignments shadow the dialect defaults
# but are necessary to avoid strange behavior while called by
# certain unit tests. Please do not delete.
dialect.doublequote = True
dialect.quoting = csv.QUOTE_MINIMAL
dialect.skipinitialspace = True
self.__reader__ = csv.reader(self.query_file, dialect=dialect)
def __init__(self, f, delimiter=',', dialect=csv.excel, encoding="utf-8"):
self.reader = csv.DictReader(f, delimiter=delimiter, dialect=dialect)
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwargs):
"""
Instantiates the UnicodeWriter instance
:param f: File like object to write CSV data to
:param dialect: The dialect for the CSV
:param encoding: The CSV encoding
:param kwargs: Keyword args
"""
self.writer = csv.writer(f)
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwargs):
"""
Instantiates the UnicodeWriter instance
:param f: File like object to write CSV data to
:param dialect: The dialect for the CSV
:param encoding: The CSV encoding
:param kwargs: Keyword args
"""
self.queue = cStringIO.StringIO()
self.writer = csv.writer(self.queue, dialect=dialect, **kwargs)
self.stream = f
self.encoder = codecs.getincrementalencoder(encoding)()
def __init__(self, f, fieldnames, dialect=csv.excel, encoding="utf-8", newfile=True, **kwds):
# Redirect output to a queue
self.queue = cStringIO.StringIO()
self.writer = csv.DictWriter(self.queue, fieldnames, dialect=dialect, **kwds)
self.stream = f
self.encoder = codecs.getincrementalencoder(encoding)()
if newfile:
self.writebom()
def writebom(self):
"""Write BOM, so excel can identify this as UTF8"""
self.stream.write(u'\ufeff'.encode('utf8'))
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
# Redirect output to a queue
self.queue = cStringIO.StringIO()
self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
self.stream = f
self.encoder = codecs.getincrementalencoder(encoding)()
def __init__(self, f, dialect=csv.excel, encoding='utf-8', **kwds):
f = CSVRecoder(f, encoding)
self.reader = csv.reader(f, dialect=dialect, **kwds)
def test_registry(self):
class myexceltsv(csv.excel):
delimiter = "\t"
name = "myexceltsv"
expected_dialects = csv.list_dialects() + [name]
expected_dialects.sort()
csv.register_dialect(name, myexceltsv)
self.addCleanup(csv.unregister_dialect, name)
self.assertEqual(csv.get_dialect(name).delimiter, '\t')
got_dialects = sorted(csv.list_dialects())
self.assertEqual(expected_dialects, got_dialects)
def test_space_dialect(self):
class space(csv.excel):
delimiter = " "
quoting = csv.QUOTE_NONE
escapechar = "\\"
with TemporaryFile("w+") as fileobj:
fileobj.write("abc def\nc1ccccc1 benzene\n")
fileobj.seek(0)
reader = csv.reader(fileobj, dialect=space())
self.assertEqual(next(reader), ["abc", "def"])
self.assertEqual(next(reader), ["c1ccccc1", "benzene"])
def test_int_write(self):
import array
contents = [(20-i) for i in range(20)]
a = array.array('i', contents)
with TemporaryFile("w+", newline='') as fileobj:
writer = csv.writer(fileobj, dialect="excel")
writer.writerow(a)
expected = ",".join([str(i) for i in a])+"\r\n"
fileobj.seek(0)
self.assertEqual(fileobj.read(), expected)
def test_double_write(self):
import array
contents = [(20-i)*0.1 for i in range(20)]
a = array.array('d', contents)
with TemporaryFile("w+", newline='') as fileobj:
writer = csv.writer(fileobj, dialect="excel")
writer.writerow(a)
expected = ",".join([str(i) for i in a])+"\r\n"
fileobj.seek(0)
self.assertEqual(fileobj.read(), expected)
def test_char_write(self):
import array, string
a = array.array('u', string.ascii_letters)
with TemporaryFile("w+", newline='') as fileobj:
writer = csv.writer(fileobj, dialect="excel")
writer.writerow(a)
expected = ",".join(a)+"\r\n"
fileobj.seek(0)
self.assertEqual(fileobj.read(), expected)
def on_epoch_end(self, epoch, logs=None):
logs = logs or {}
def handle_value(k):
is_zero_dim_ndarray = isinstance(k, np.ndarray) and k.ndim == 0
if isinstance(k, Iterable) and not is_zero_dim_ndarray:
return '"[%s]"' % (', '.join(map(str, k)))
else:
return k
if not self.writer:
self.keys = sorted(logs.keys())
class CustomDialect(csv.excel):
delimiter = self.sep
self.writer = csv.DictWriter(self.csv_file,
fieldnames=['epoch'] + self.keys, dialect=CustomDialect)
if self.append_header:
self.writer.writeheader()
row_dict = OrderedDict({'epoch': epoch})
row_dict.update((key, handle_value(logs[key])) for key in self.keys)
self.writer.writerow(row_dict)
self.csv_file.flush()
def __init__(self, f, dialect=csv.excel, encoding="utf-8", errors='replace', **kwds):
# Redirect output to a queue
self.queue = cStringIO.StringIO()
self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
self.stream = f
encoder_cls = codecs.getincrementalencoder(encoding)
self.encoder = encoder_cls(errors=errors)
def test_registry(self):
class myexceltsv(csv.excel):
delimiter = "\t"
name = "myexceltsv"
expected_dialects = csv.list_dialects() + [name]
expected_dialects.sort()
csv.register_dialect(name, myexceltsv)
self.addCleanup(csv.unregister_dialect, name)
self.assertEqual(csv.get_dialect(name).delimiter, '\t')
got_dialects = sorted(csv.list_dialects())
self.assertEqual(expected_dialects, got_dialects)