def setup_output_writers(parent_dir, fold_number):
"""
Create an output directory for the fold under the provided parent dir
:param str parent_dir: file path to the output dir
:param int fold_number: fold number to use in directory name
:return: writer for <outdir.name>/fold<fold_number>/train.csv and <outdir.name>/fold<fold_number>/validation.csv
:rtype: tuple(csv.writer,csv.writer)
"""
output_dir = path.join(parent_dir, "Fold%d" % fold_number)
if not path.isdir(output_dir):
LOGGER.debug("Creating output for fold %d at the location: %s" % (fold_number, output_dir))
makedirs(output_dir)
else:
LOGGER.warn("Path <<%s>> already exists, files may be overwritten" % output_dir)
train_writer = csv.writer(smart_file_open(path.join(output_dir, TRAIN_RELEVANCE_FILENAME), 'w'),
dialect=csv.excel, delimiter=',')
validation_writer = csv.writer(smart_file_open(path.join(output_dir, VALIDATION_RELEVANCE_FILENAME), 'w'),
dialect=csv.excel, delimiter=',')
return train_writer, validation_writer
python类excel()的实例源码
create_cross_validation_splits.py 文件源码
项目:retrieve-and-rank-tuning
作者: rchaks
项目源码
文件源码
阅读 40
收藏 0
点赞 0
评论 0
def open_anything(source, format, ignoreheader, force_unbuffered=False):
source = open_regular_or_compressed(source)
if force_unbuffered:
# simply disabling buffering is not enough, see this for details: http://stackoverflow.com/a/6556862
source = iter(source.readline, '')
if format == 'vw':
return source
if format == 'tsv':
reader = csv.reader(source, csv.excel_tab)
if ignoreheader:
reader.next()
elif format == 'csv':
reader = csv.reader(source, csv.excel)
if ignoreheader:
reader.next()
else:
raise ValueError('format not supported: %s' % format)
return reader
def read_csv(handle):
""" Read CSV file
:param handle: File-like object of the CSV file
:return: csv.reader object
"""
# These functions are to handle unicode in Python 2 as described in:
# https://docs.python.org/2/library/csv.html#examples
def unicode_csv_reader(unicode_csv_data, dialect=csv.excel, **kwargs):
""" csv.py doesn't do Unicode; encode temporarily as UTF-8."""
csv_reader = csv.reader(utf_8_encoder(unicode_csv_data),
dialect=dialect, **kwargs)
for row in csv_reader:
# decode UTF-8 back to Unicode, cell by cell:
yield [unicode(cell, 'utf-8') for cell in row]
def utf_8_encoder(unicode_csv_data):
""" Encode with UTF-8."""
for line in unicode_csv_data:
yield line.encode('utf-8')
return unicode_csv_reader(handle) if PY2 else csv.reader(handle)
def test_space_dialect(self):
class space(csv.excel):
delimiter = " "
quoting = csv.QUOTE_NONE
escapechar = "\\"
fd, name = tempfile.mkstemp()
fileobj = os.fdopen(fd, "w+b")
try:
fileobj.write("abc def\nc1ccccc1 benzene\n")
fileobj.seek(0)
rdr = csv.reader(fileobj, dialect=space())
self.assertEqual(rdr.next(), ["abc", "def"])
self.assertEqual(rdr.next(), ["c1ccccc1", "benzene"])
finally:
fileobj.close()
os.unlink(name)
def test_int_write(self):
import array
contents = [(20-i) for i in range(20)]
a = array.array('i', contents)
fd, name = tempfile.mkstemp()
fileobj = os.fdopen(fd, "w+b")
try:
writer = csv.writer(fileobj, dialect="excel")
writer.writerow(a)
expected = ",".join([str(i) for i in a])+"\r\n"
fileobj.seek(0)
self.assertEqual(fileobj.read(), expected)
finally:
fileobj.close()
os.unlink(name)
def test_space_dialect(self):
class space(csv.excel):
delimiter = " "
quoting = csv.QUOTE_NONE
escapechar = "\\"
fd, name = tempfile.mkstemp()
fileobj = os.fdopen(fd, "w+b")
try:
fileobj.write("abc def\nc1ccccc1 benzene\n")
fileobj.seek(0)
rdr = csv.reader(fileobj, dialect=space())
self.assertEqual(rdr.next(), ["abc", "def"])
self.assertEqual(rdr.next(), ["c1ccccc1", "benzene"])
finally:
fileobj.close()
os.unlink(name)
test_parsers.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def test_dialect(self):
data = """\
label1,label2,label3
index1,"a,c,e
index2,b,d,f
"""
dia = csv.excel()
dia.quoting = csv.QUOTE_NONE
df = self.read_csv(StringIO(data), dialect=dia)
data = '''\
label1,label2,label3
index1,a,c,e
index2,b,d,f
'''
exp = self.read_csv(StringIO(data))
exp.replace('a', '"a', inplace=True)
tm.assert_frame_equal(df, exp)
create_cross_validation_splits.py 文件源码
项目:retrieve-and-rank-tuning
作者: rchaks
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def setup_train_and_test_writer(output_dir):
"""
Create an output directory for the fold under the provided parent dir
:param str output_dir: file path to the output dir
:return: writer for <outdir.name>/train.csv and <outdir.name>/validation.csv
:rtype: tuple(csv.writer,csv.writer)
"""
if not path.isdir(output_dir):
makedirs(output_dir)
else:
LOGGER.warn("Path <<%s>> already exists, files may be overwritten" % output_dir)
train_writer = csv.writer(smart_file_open(path.join(output_dir, TRAIN_RELEVANCE_FILENAME), 'w'),
dialect=csv.excel, delimiter=',')
validation_writer = csv.writer(smart_file_open(path.join(output_dir, VALIDATION_RELEVANCE_FILENAME), 'w'),
dialect=csv.excel, delimiter=',')
return train_writer, validation_writer
def __init__(self, f, fieldnames=None, encoding=UTF8, **kwds):
self.encoding = encoding
try:
self.reader = csv.reader(UTF8Recoder(f, encoding) if self.encoding != UTF8 else f, dialect=csv.excel, **kwds)
if not fieldnames:
self.fieldnames = self.reader.next()
if len(self.fieldnames) > 0 and self.fieldnames[0].startswith(codecs.BOM_UTF8):
self.fieldnames[0] = self.fieldnames[0].replace(codecs.BOM_UTF8, u'', 1)
else:
self.fieldnames = fieldnames
except (csv.Error, StopIteration):
self.fieldnames = []
except LookupError as e:
Cmd.Backup()
usageErrorExit(e)
self.numfields = len(self.fieldnames)
def export_csv(modeladmin, request, queryset):
import csv
from django.utils.encoding import smart_str
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = 'attachment; filename=elehphant_sightings.csv'
writer = csv.writer(response, csv.excel)
response.write(u'\ufeff'.encode('utf8')) # BOM (optional...Excel needs it to open UTF-8 file properly)
writer.writerow([
smart_str(u"ID"),
smart_str(u"Reported At"),
smart_str(u"Latitude"),
smart_str(u"Longitude"),
smart_str(u"Message"),
smart_str(u"Informer"),
])
for obj in queryset:
writer.writerow([
smart_str(obj.pk),
smart_str(str(obj.created_at)),
smart_str(obj.location),
smart_str(obj.message),
smart_str(obj.informer.name),
])
return response
def test_dialect_apply(self):
class testA(csv.excel):
delimiter = "\t"
class testB(csv.excel):
delimiter = ":"
class testC(csv.excel):
delimiter = "|"
class testUni(csv.excel):
delimiter = "\u039B"
csv.register_dialect('testC', testC)
try:
self.compare_dialect_123("1,2,3\r\n")
self.compare_dialect_123("1\t2\t3\r\n", testA)
self.compare_dialect_123("1:2:3\r\n", dialect=testB())
self.compare_dialect_123("1|2|3\r\n", dialect='testC')
self.compare_dialect_123("1;2;3\r\n", dialect=testA,
delimiter=';')
self.compare_dialect_123("1\u039B2\u039B3\r\n",
dialect=testUni)
finally:
csv.unregister_dialect('testC')
def test_space_dialect(self):
class space(csv.excel):
delimiter = " "
quoting = csv.QUOTE_NONE
escapechar = "\\"
fd, name = tempfile.mkstemp()
fileobj = os.fdopen(fd, "w+b")
try:
fileobj.write("abc def\nc1ccccc1 benzene\n")
fileobj.seek(0)
rdr = csv.reader(fileobj, dialect=space())
self.assertEqual(rdr.next(), ["abc", "def"])
self.assertEqual(rdr.next(), ["c1ccccc1", "benzene"])
finally:
fileobj.close()
os.unlink(name)
def test_int_write(self):
import array
contents = [(20-i) for i in range(20)]
a = array.array('i', contents)
fd, name = tempfile.mkstemp()
fileobj = os.fdopen(fd, "w+b")
try:
writer = csv.writer(fileobj, dialect="excel")
writer.writerow(a)
expected = ",".join([str(i) for i in a])+"\r\n"
fileobj.seek(0)
self.assertEqual(fileobj.read(), expected)
finally:
fileobj.close()
os.unlink(name)
def test_dialect_apply(self):
class testA(csv.excel):
delimiter = "\t"
class testB(csv.excel):
delimiter = ":"
class testC(csv.excel):
delimiter = "|"
class testUni(csv.excel):
delimiter = "\u039B"
csv.register_dialect('testC', testC)
try:
self.compare_dialect_123("1,2,3\r\n")
self.compare_dialect_123("1\t2\t3\r\n", testA)
self.compare_dialect_123("1:2:3\r\n", dialect=testB())
self.compare_dialect_123("1|2|3\r\n", dialect='testC')
self.compare_dialect_123("1;2;3\r\n", dialect=testA,
delimiter=';')
self.compare_dialect_123("1\u039B2\u039B3\r\n",
dialect=testUni)
finally:
csv.unregister_dialect('testC')
def test_space_dialect(self):
class space(csv.excel):
delimiter = " "
quoting = csv.QUOTE_NONE
escapechar = "\\"
fd, name = tempfile.mkstemp()
fileobj = os.fdopen(fd, "w+b")
try:
fileobj.write("abc def\nc1ccccc1 benzene\n")
fileobj.seek(0)
rdr = csv.reader(fileobj, dialect=space())
self.assertEqual(rdr.next(), ["abc", "def"])
self.assertEqual(rdr.next(), ["c1ccccc1", "benzene"])
finally:
fileobj.close()
os.unlink(name)
def test_int_write(self):
import array
contents = [(20-i) for i in range(20)]
a = array.array('i', contents)
fd, name = tempfile.mkstemp()
fileobj = os.fdopen(fd, "w+b")
try:
writer = csv.writer(fileobj, dialect="excel")
writer.writerow(a)
expected = ",".join([str(i) for i in a])+"\r\n"
fileobj.seek(0)
self.assertEqual(fileobj.read(), expected)
finally:
fileobj.close()
os.unlink(name)
def on_train_end(self, logs=None):
REJECT_KEYS={'has_validation_data'}
row_dict = self.row_dict
class CustomDialect(csv.excel):
delimiter = self.sep
self.keys = self.keys
temp_file = NamedTemporaryFile(delete=False, mode='w')
with open(self.file, 'r') as csv_file, temp_file:
reader = csv.DictReader(csv_file,
fieldnames=['model'] + [k for k in self.keys if k not in REJECT_KEYS],
dialect=CustomDialect)
writer = csv.DictWriter(temp_file,
fieldnames=['model'] + [k for k in self.keys if k not in REJECT_KEYS],
dialect=CustomDialect)
for row_idx, row in enumerate(reader):
if row_idx == 0:
# re-write header with on_train_end's metrics
pass
if row['model'] == self.row_dict['model']:
writer.writerow(row_dict)
else:
writer.writerow(row)
shutil.move(temp_file.name, self.file)
def __init__(self, f, fieldnames=None, encoding=UTF8, **kwds):
self.encoding = encoding
try:
self.reader = csv.reader(UTF8Recoder(f, encoding) if self.encoding != UTF8 else f, dialect=csv.excel, **kwds)
if not fieldnames:
self.fieldnames = self.reader.next()
if len(self.fieldnames) > 0 and self.fieldnames[0].startswith(codecs.BOM_UTF8):
self.fieldnames[0] = self.fieldnames[0].replace(codecs.BOM_UTF8, u'', 1)
else:
self.fieldnames = fieldnames
except (csv.Error, StopIteration):
self.fieldnames = []
except LookupError as e:
Cmd.Backup()
usageErrorExit(e)
self.numfields = len(self.fieldnames)
def on_epoch_end(self, epoch, logs=None):
logs = logs or {}
def handle_value(k):
is_zero_dim_ndarray = isinstance(k, np.ndarray) and k.ndim == 0
if isinstance(k, Iterable) and not is_zero_dim_ndarray:
return '"[%s]"' % (', '.join(map(str, k)))
else:
return k
if not self.writer:
self.keys = sorted(logs.keys())
class CustomDialect(csv.excel):
delimiter = self.sep
self.writer = csv.DictWriter(self.csv_file,
fieldnames=['epoch'] + self.keys, dialect=CustomDialect)
if self.append_header:
self.writer.writeheader()
row_dict = OrderedDict({'epoch': epoch})
row_dict.update((key, handle_value(logs[key])) for key in self.keys)
self.writer.writerow(row_dict)
self.csv_file.flush()
def write_csv(output_filename, dict_list, delimiter, verbose=False):
"""Write a CSV file
"""
if not dict_list:
if verbose:
print('Not writing %s; no lines to write' % output_filename)
return
dialect = csv.excel
dialect.delimiter = delimiter
with open(output_filename, 'w') as f:
dict_writer = csv.DictWriter(f, fieldnames=dict_list[0].keys(),
dialect=dialect)
dict_writer.writeheader()
dict_writer.writerows(dict_list)
if verbose:
print 'Wrote %s' % output_filename
def on_epoch_end(self, epoch, logs=None):
logs = logs or {}
def handle_value(k):
is_zero_dim_ndarray = isinstance(k, np.ndarray) and k.ndim == 0
if isinstance(k, Iterable) and not is_zero_dim_ndarray:
return '"[%s]"' % (', '.join(map(str, k)))
else:
return k
if not self.writer:
self.keys = sorted(logs.keys())
class CustomDialect(csv.excel):
delimiter = self.sep
self.writer = csv.DictWriter(self.csv_file,
fieldnames=['epoch'] + self.keys, dialect=CustomDialect)
if self.append_header:
self.writer.writeheader()
row_dict = OrderedDict({'epoch': epoch})
row_dict.update((key, handle_value(logs[key])) for key in self.keys)
self.writer.writerow(row_dict)
self.csv_file.flush()
def test_dialect_apply(self):
class testA(csv.excel):
delimiter = "\t"
class testB(csv.excel):
delimiter = ":"
class testC(csv.excel):
delimiter = "|"
class testUni(csv.excel):
delimiter = "\u039B"
csv.register_dialect('testC', testC)
try:
self.compare_dialect_123("1,2,3\r\n")
self.compare_dialect_123("1\t2\t3\r\n", testA)
self.compare_dialect_123("1:2:3\r\n", dialect=testB())
self.compare_dialect_123("1|2|3\r\n", dialect='testC')
self.compare_dialect_123("1;2;3\r\n", dialect=testA,
delimiter=';')
self.compare_dialect_123("1\u039B2\u039B3\r\n",
dialect=testUni)
finally:
csv.unregister_dialect('testC')
def write_csv(output_filename, dict_list, delimiter, verbose=False):
"""Write a CSV file
"""
if not dict_list:
if verbose:
print('Not writing %s; no lines to write' % output_filename)
return
dialect = csv.excel
dialect.delimiter = delimiter
with open(output_filename, 'w') as f:
dict_writer = csv.DictWriter(f, fieldnames=dict_list[0].keys(),
dialect=dialect)
dict_writer.writeheader()
dict_writer.writerows(dict_list)
if verbose:
print 'Wrote %s' % output_filename
def __init__(self, f, dialect=csv.excel, encoding="utf-8-sig", **kwds):
f = UTF8Recoder(f, encoding)
self.reader = csv.reader(f, dialect=dialect, **kwds)
def __init__(self, f, dialect=csv.excel, encoding="utf-8-sig", **kwds):
self.queue = cStringIO.StringIO()
self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
self.stream = f
self.encoder = codecs.getincrementalencoder(encoding)()
def __init__(self, filename, dialect=csv.excel,
encoding="utf-8", **kw):
self.filename = filename
self.dialect = dialect
self.encoding = encoding
self.kw = kw
def process(options, args):
'''
Save air quality data in a csv_file
'''
url = "http://www.seremisaludrm.cl/sitio/pag/aire/indexjs3aireindices-prueba.asp"
sock = urllib.urlopen(url)
htmlSource = sock.read()
sock.close()
csv_file = options.csv_file
csv_exists = False
# Append sensors data
air_data = extract_data(htmlSource)
encabezado = air_data.next()
encabezado.append('DATE')
encabezado.append('TIME')
date = extract_date(htmlSource)
time = extract_time(htmlSource)
if os.path.exists(csv_file):
f_in = open(csv_file, "rb")
Reader = csv.reader(f_in, dialect=csv.excel)
encabezado_old = Reader.next()
csv_exists = True
f_out = open(csv_file+'~', 'wb')
Writer = csv.writer(f_out, dialect=csv.excel)
# TODO check encabezado == encabezado_old
Writer.writerow(encabezado)
for row in air_data:
row.append(date)
row.append(time)
Writer.writerow(row)
if csv_exists:
for row in Reader:
Writer.writerow(row)
f_in.close()
os.remove(csv_file)
f_out.close()
os.rename(csv_file+'~', csv_file)
def unicode_csv_reader(unicode_csv_data, dialect=csv.excel, **kwargs):
"""Unicode CSV reader."""
# This code is taken from http://docs.python.org/library/csv.html#examples
# csv.py doesn't do Unicode; encode temporarily as UTF-8:
csv_reader = csv.reader(utf_8_encoder(unicode_csv_data),
dialect=dialect, **kwargs)
for row in csv_reader:
# decode UTF-8 back to Unicode, cell by cell:
yield [unicode(cell, 'utf-8') for cell in row]
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
"""Init method."""
# Redirect output to a queue
self.queue = cStringIO.StringIO()
self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
self.stream = f
self.encoder = codecs.getincrementalencoder(encoding)()
def csv_reader_converter(utf8_data, dialect=csv.excel, **kwargs):
csv_reader = csv.reader(utf8_data, dialect=dialect, **kwargs)
for row in csv_reader:
yield [unicode(cell, 'latin-1') for cell in row]