def write_data_file(self, output_file_path):
"""Write our course info file information out
output_file_path: path to file file"""
with open(output_file_path, "w") as f_out:
f_out.write("Assignments\n")
for assign in self.assignments:
f_out.write(assign.name+","+assign.internal_name+","+str(assign.id))
f_out.write("\n")
f_out.write("Roster\n")
csvwriter = csv.writer(f_out, quoting=csv.QUOTE_MINIMAL)
for student in self.roster.students_no_errors:
csvwriter.writerow([student.first_name,\
student.last_name,\
student.email,\
student.glid])
python类QUOTE_MINIMAL的实例源码
def callback(data):
print("callback called...")
global i
global vc
rospy.loginfo("I heard %s",data)
with open('dataset.csv', 'a') as csvfile:
spamwriter = csv.writer(csvfile, delimiter=' ',
quotechar='|', quoting=csv.QUOTE_MINIMAL)
spamwriter.writerow([data.angular.z])
if vc.isOpened(): # try to get the first frame
rval, frame = vc.read()
else:
rval = False
rval, frame = vc.read()
cv2.imwrite("images/image_" + str(i) + ".png", frame)
i += 1
def _test_default_attrs(self, ctor, *args):
obj = ctor(*args)
# Check defaults
self.assertEqual(obj.dialect.delimiter, ',')
self.assertEqual(obj.dialect.doublequote, True)
self.assertEqual(obj.dialect.escapechar, None)
self.assertEqual(obj.dialect.lineterminator, "\r\n")
self.assertEqual(obj.dialect.quotechar, '"')
self.assertEqual(obj.dialect.quoting, csv.QUOTE_MINIMAL)
self.assertEqual(obj.dialect.skipinitialspace, False)
self.assertEqual(obj.dialect.strict, False)
# Try deleting or changing attributes (they are read-only)
self.assertRaises(AttributeError, delattr, obj.dialect, 'delimiter')
self.assertRaises(AttributeError, setattr, obj.dialect, 'delimiter', ':')
self.assertRaises(AttributeError, delattr, obj.dialect, 'quoting')
self.assertRaises(AttributeError, setattr, obj.dialect,
'quoting', None)
def test_write_escape(self):
self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
escapechar='\\')
self.assertRaises(csv.Error,
self._write_test,
['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
escapechar=None, doublequote=False)
self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
escapechar='\\', doublequote = False)
self._write_test(['"'], '""""',
escapechar='\\', quoting = csv.QUOTE_MINIMAL)
self._write_test(['"'], '\\"',
escapechar='\\', quoting = csv.QUOTE_MINIMAL,
doublequote = False)
self._write_test(['"'], '\\"',
escapechar='\\', quoting = csv.QUOTE_NONE)
self._write_test(['a',1,'p,q'], 'a,1,p\\,q',
escapechar='\\', quoting = csv.QUOTE_NONE)
def save_db_objects(db_engine, db_objects):
"""Saves a collection of SQLAlchemy model objects to the database using a COPY command
Args:
db_engine (sqlalchemy.engine)
db_objects (list) SQLAlchemy model objects, corresponding to a valid table
"""
with tempfile.TemporaryFile(mode='w+') as f:
writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
for db_object in db_objects:
writer.writerow([
getattr(db_object, col.name)
for col in db_object.__table__.columns
])
f.seek(0)
postgres_copy.copy_from(f, type(db_objects[0]), db_engine, format='csv')
def dl_and_parse(s3,headers,keylist,bucket):
for key in keylist:
with TemporaryFile() as fp:
try:
s3.Bucket(bucket).download_fileobj(key,fp)
except ClientError as e:
logger.error('Unable to download s3://{}/{}'.format(bucket, key))
logger.debug('Received error: {}'.format(e))
sys.exit(5)
fp.seek(0)
with TextIOWrapper(GzipFile(fileobj=fp,mode='r')) as f:
try:
reader = csv.DictReader(f,fieldnames=headers,delimiter=',',quoting=csv.QUOTE_MINIMAL)
for row in reader:
yield row
except csv.Error as e:
logger.error("Unable to read CSV '{}'".format(reader.line))
logger.debug('Received error: {}'.format(e))
sys.exit(3)
# main parser
def to_string(table):
''' Return table as a StringIO object for writing via copy() '''
string = StringIO()
writer = csv.writer(string, delimiter=",", quoting=csv.QUOTE_MINIMAL)
dict_encoder = json.JSONEncoder()
jsonb_cols = set([i for i, j in enumerate(table.col_types) if j == 'jsonb'])
datetime_cols = set([i for i, j in enumerate(table.col_types) if j == 'datetime'])
for row in table:
for i in jsonb_cols:
row[i] = dict_encoder.encode(row[i])
for i in datetime_cols:
row[i] = psycopg2.extensions.adapt(i)
writer.writerow(row)
string.seek(0)
return string
def create_csv(csv_filename, header_list, sql_query_string):
"""Transfer records from database query to a new .csv file.
Keyword arguments:
csv_filename -- String with a '.csv' suffix
header_list -- List of strings (as CSV header names)
sql_query_string -- Passed to match header_list format
"""
with open(csv_filename, 'w', newline='\n') as csvfile:
csv_writer = csv.writer(csvfile, quotechar="'", quoting=csv.QUOTE_MINIMAL)
csv_writer.writerow(header_list)
CUR.execute(sql_query_string)
records = CUR.fetchall()
CONN.commit()
for entry in records:
csv_writer.writerow(entry)
def write_csv_rows(rows, path):
"""
Write CSV rows in a standard format.
:type rows: list[list[string]]
:type path: string
"""
with open(path, 'w') as outfile:
writer = csv.writer(outfile, delimiter=',', quoting=csv.QUOTE_MINIMAL)
writer.writerows(rows)
def run_query(src, fn, rows=200):
print('writing data for %s to %s' % (source, fn))
query['query']['function_score']['query']['bool']['must'][0]['term']['analysis.source'] = source
emails = set()
batches = 0
print(json.dumps(query))
total = None
with open(fn, 'w', newline='') as outfile:
writer = csv.writer(outfile, delimiter='\t', quoting=csv.QUOTE_MINIMAL)
writer.writerow(['email', 'name', 'date', 'comment', 'url'])
while len(emails) < rows and batches < 10:
offset = batches * 100
if total and offset > total:
break
resp = es.search(index='fcc-comments', body=query, size=100, from_=offset)
if batches == 0:
total = resp['hits']['total']
print('\t%s matches' % (total))
else:
print('\tbatch %s: have %s' % (batches+1, len(emails)))
batches += 1
for doc in resp['hits']['hits']:
if len(emails) == rows:
break
data = doc['_source']
if data['contact_email'] in emails:
continue
emails.add(data['contact_email'])
writer.writerow([data['contact_email'], data['filers'][0]['name'],
data['date_received'], data['text_data'],
'https://www.fcc.gov/ecfs/filing/%s' % doc['_id']
])
def validate_to_csv(model_name: str, last_epoch: int,
configuration: Configuration = Configuration.german(),
step_count=10, first_epoch: int = 0,
csv_directory: Path = configuration.default_data_directories.test_results_directory) -> List[
Tuple[int, ExpectationsVsPredictionsInGroupedBatches]]:
step_size = (last_epoch - first_epoch) / (step_count - 1)
epochs = distinct(list(int(first_epoch + index * step_size) for index in range(step_count)))
log("Testing model {} on epochs {}.".format(model_name, epochs))
model = configuration.load_model(model_name, last_epoch,
allowed_characters_for_loaded_model=configuration.allowed_characters,
use_kenlm=True,
language_model_name_extension="-incl-trans")
def get_result(epoch: int) -> ExpectationsVsPredictionsInGroupedBatches:
log("Testing epoch {}.".format(epoch))
model.load_weights(
allowed_characters_for_loaded_model=configuration.allowed_characters,
load_model_from_directory=configuration.directories.nets_base_directory / model_name, load_epoch=epoch)
return configuration.test_model_grouped_by_loaded_corpus_name(model)
results_with_epochs = []
csv_file = csv_directory / "{}.csv".format(model_name + "-incl-trans")
import csv
with csv_file.open('w', encoding='utf8') as opened_csv:
writer = csv.writer(opened_csv, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
for epoch in epochs:
result = get_result(epoch)
writer.writerow((epoch, result.average_loss, result.average_letter_error_rate,
result.average_word_error_rate, result.average_letter_error_count,
result.average_word_error_count))
return results_with_epochs
def summarize_to_csv(self, summary_csv_file: Path) -> None:
import csv
with summary_csv_file.open('w', encoding='utf8') as csv_summary_file:
writer = csv.writer(csv_summary_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
for row in self.csv_rows():
writer.writerow(row)
def save(self, corpus_csv_file: Path, use_relative_audio_file_paths: bool = True):
import csv
with corpus_csv_file.open('w', encoding='utf8') as opened_csv:
writer = csv.writer(opened_csv, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
examples_and_phase = [(e, Phase.training) for e in self.training_examples] + \
[(e, Phase.test) for e in self.test_examples]
for e, phase in examples_and_phase:
writer.writerow(
(e.id, str(e.audio_file.relative_to(
corpus_csv_file.parent) if use_relative_audio_file_paths else e.audio_file),
e.label, phase.value, e.positional_label.serialize() if e.positional_label else ""))
def csv_writer(filename, data, fields=None, delimiter=',', quoting=csv.QUOTE_MINIMAL):
log.info('writing', filename)
rows = 0
with open(filename, 'w') as f:
if not fields:
fields = data[0].keys()
writer = csv.DictWriter(f, fieldnames=fields, delimiter=delimiter, quoting=quoting)
writer.writeheader()
for row in data:
writer.writerow(row)
rows += 1
log.info('wrote %d rows' % rows)
def load_csv_columns(filename, column_names=None, skip=0, delimiter=',', quoting=csv.QUOTE_MINIMAL):
r = []
log.info('opening', filename)
with open(filename, 'r') as f:
data_file = csv_reader_converter(f, delimiter=delimiter, quoting=quoting)
for i in range(skip):
next(data_file)
headers = next(data_file, None) # parse the headers
columns = {}
for (i, h) in enumerate(headers):
h = h.strip()
if (not column_names) or h in column_names:
columns[i] = h
log.info("headers", headers)
log.info("columns", column_names)
for line in data_file:
d = {}
if not line:
continue
for (column, index) in columns.items():
if column_names:
rename = column_names[index]
else:
rename = headers[column]
value = line[column].strip()
d[rename] = value
r.append(d)
log.info('read %d lines' % len(r))
return r
def __init__(self, file, include_default_phase: bool = True):
writer = csv.writer(file, delimiter='\t', quoting=csv.QUOTE_MINIMAL)
writer.writerow(['name', 'phase', 'slope', 'intercept', 'N0', 'SNR', 'rank'])
self._writer = writer
self._file = file
self._include_default_phase = include_default_phase
def run():
note_files = ['promrep/scripts/data/MRR3ShortNotesV2.csv',
'promrep/scripts/data/MRR3LongNotesV2.csv']
source = SecondarySource.objects.get(abbrev_name='Broughton MRR3')
for ifname in note_files:
print 'Will read notes from file', ifname, '\n\n'
ifile = codecs.open(ifname, 'r', encoding='latin1')
log_fname = os.path.splitext(os.path.basename(ifname))[0] + '.log'
with open(log_fname, 'wb') as log_file:
spamwriter = csv.writer(
log_file,
delimiter=',',
quotechar='"',
quoting=csv.QUOTE_MINIMAL)
spamwriter.writerow(('id', 'note'))
for i, line in enumerate(ifile):
note_text = line.encode('utf-8').replace("**", ";").strip('"')
print str(i) + ":", note_text
note = PostAssertionNote.objects.create(
text=note_text, secondary_source=source)
spamwriter.writerow((note.id, note_text[0:20]))
def handle(self, *args, **options):
tmpdir = tempfile.mkdtemp()
try:
queryset = AnalysisJob.objects.all().filter(status=AnalysisJob.Status.COMPLETE)
filter_set = AnalysisJobFilterSet()
queryset = filter_set.filter_latest(queryset, 'latest', True)
tmp_csv_filename = os.path.join(tmpdir, 'results.csv')
with open(tmp_csv_filename, 'w') as csv_file:
writer = None
fieldnames = []
for job in queryset:
row_data = {}
for export in EXPORTS:
columns, values = export(job)
if writer is None:
fieldnames = fieldnames + columns
for column, value in zip(columns, values):
row_data[column] = value
if writer is None:
writer = csv.DictWriter(csv_file,
fieldnames=fieldnames,
dialect=csv.excel,
quoting=csv.QUOTE_MINIMAL)
writer.writeheader()
writer.writerow(row_data)
s3_client = boto3.client('s3')
now = datetime.utcnow()
s3_key = 'analysis-spreadsheets/results-{}.csv'.format(now.strftime('%Y-%m-%dT%H%M'))
s3_client.upload_file(tmp_csv_filename, settings.AWS_STORAGE_BUCKET_NAME, s3_key)
logger.info('File uploaded to: s3://{}/{}'
.format(settings.AWS_STORAGE_BUCKET_NAME, s3_key))
finally:
shutil.rmtree(tmpdir)
def add_row_to_csv(directory, filename, row, columns_to_skip):
row = ['' for i in range(columns_to_skip)] + row
with open(directory + '/' + filename + '.csv', 'a') as csvfile:
spamwriter = csv.writer(csvfile, quoting=csv.QUOTE_MINIMAL)
spamwriter.writerow(row)
csvfile.close()
def __init__(self, *args, **kwargs):
super(RankerRelevanceFileQueryStream, self).__init__(*args, **kwargs)
dialect = csv.excel
# The following explicit assignments shadow the dialect defaults
# but are necessary to avoid strange behavior while called by
# certain unit tests. Please do not delete.
dialect.doublequote = True
dialect.quoting = csv.QUOTE_MINIMAL
dialect.skipinitialspace = True
self.__reader__ = csv.reader(self.query_file, dialect=dialect)