def setup_output_writers(parent_dir, fold_number):
"""
Create an output directory for the fold under the provided parent dir
:param str parent_dir: file path to the output dir
:param int fold_number: fold number to use in directory name
:return: writer for <outdir.name>/fold<fold_number>/train.csv and <outdir.name>/fold<fold_number>/validation.csv
:rtype: tuple(csv.writer,csv.writer)
"""
output_dir = path.join(parent_dir, "Fold%d" % fold_number)
if not path.isdir(output_dir):
LOGGER.debug("Creating output for fold %d at the location: %s" % (fold_number, output_dir))
makedirs(output_dir)
else:
LOGGER.warn("Path <<%s>> already exists, files may be overwritten" % output_dir)
train_writer = csv.writer(smart_file_open(path.join(output_dir, TRAIN_RELEVANCE_FILENAME), 'w'),
dialect=csv.excel, delimiter=',')
validation_writer = csv.writer(smart_file_open(path.join(output_dir, VALIDATION_RELEVANCE_FILENAME), 'w'),
dialect=csv.excel, delimiter=',')
return train_writer, validation_writer
python类writer()的实例源码
create_cross_validation_splits.py 文件源码
项目:retrieve-and-rank-tuning
作者: rchaks
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def csv_safe_unicode(row, ignoreErrors=True):
"""
Given an array of values, make sure all strings are unicode in Python 3 and
str in Python 2. The csv.writer in Python 2 does not handle unicode
strings and in Python 3 it does not handle byte strings.
:param row: an array which could contain mixed types.
:param ignoreErrors: if True, convert what is possible and ignore errors.
If false, conversion errors will raise an exception.
:returns: either the original array if safe, or an array with all byte
strings converted to unicode in Python 3 or str in Python 2.
"""
# This is unicode in Python 2 and bytes in Python 3
wrong_type = six.text_type if str == six.binary_type else six.binary_type
# If all of the data is not the wrong type, just return it as is. This
# allows non-string types, such as integers and floats.
if not any(isinstance(value, wrong_type) for value in row):
return row
# Convert the wrong type of string to the type needed for this version of
# Python. For Python 2 unicode gets encoded to str (bytes). For Python 3
# bytes get decoded to str (unicode).
func = 'encode' if str == six.binary_type else 'decode'
row = [getattr(value, func)('utf8', 'ignore' if ignoreErrors else 'strict')
if isinstance(value, wrong_type) else value for value in row]
return row
def save_gem_class_csv(self, base_dir):
csv_file_path = os.path.join(base_dir, 'gem_classification.csv')
cr_utils.makedirs(os.path.dirname(csv_file_path), allow_existing=True)
with open(csv_file_path, 'wb') as f:
writer = csv.writer(f)
writer.writerow(['barcode',
self.result['genome0'],
self.result['genome1'],
'call'
])
for i in xrange(len(self.result['barcode'])):
call = self.result['call'][i]
call = call.replace(cr_constants.GEM_CLASS_GENOME0, self.result['genome0'])
call = call.replace(cr_constants.GEM_CLASS_GENOME1, self.result['genome1'])
writer.writerow([
self.result['barcode'][i],
self.result['count0'][i],
self.result['count1'][i],
call,
])
def build_gtf(self):
print "Writing new genes GTF file (may take 10 minutes for a 1GB input GTF file)..."
with open(self.out_gtf_fn, 'wb') as f:
writer = csv.writer(f, delimiter='\t', quoting=csv.QUOTE_NONE, quotechar='')
for row, is_comment, properties in self.gtf_reader_iter(self.in_gtf_fn):
if is_comment:
writer.writerow(row)
continue
remove = False
for key, value in properties.iteritems():
if key in self.attributes and value not in self.attributes[key]:
remove = True
if not remove:
writer.writerow(row)
print "...done\n"
def build_metrics_summary_csv(filename, sample_properties, sample_data, pipeline):
metrics, alarms, charts, all_prefixes = get_constants_for_pipeline(pipeline)
tables, _ = build_tables(sample_properties, metrics, alarms, sample_data, all_prefixes=all_prefixes)
if not tables:
sys.stderr.write("No metrics tables were generated, skipping CSV generation.\n")
return
csv_metrics = collections.OrderedDict()
for table in tables:
if not table:
continue
for metric, _, value in table['rows']:
if type(metric) == dict:
metric = metric['v']
if type(value) == dict:
value = value['v']
if metric not in csv_metrics:
csv_metrics[metric] = value
with open(filename, 'wb') as f:
writer = csv.writer(f)
writer.writerow(csv_metrics.keys())
writer.writerow(csv_metrics.values())
def dump_to_csv(self, output_csv, input_fields, write_header=True, top_level=False, mode='a', encoding='utf-8', compression=None):
if compression == 'bz2':
mode = binary_mode(mode)
filehandle = bz2.open(output_csv, mode)
elif compression == 'gzip':
mode = binary_mode(mode)
filehandle = gzip.open(output_csv, mode)
else:
filehandle = open(output_csv, mode)
writer = csv.writer(filehandle)
if write_header:
writer.writerow(input_fields)
tweet_parser = TweetParser()
for tweet in self.get_iterator():
if top_level:
ret = list(zip(input_fields, [tweet.get(field) for field in input_fields]))
else:
ret = tweet_parser.parse_columns_from_tweet(tweet,input_fields)
ret_values = [col_val[1] for col_val in ret]
writer.writerow(ret_values)
filehandle.close()
def __init__(self, ofile, maxresultrows=None):
self._maxresultrows = 50000 if maxresultrows is None else maxresultrows
self._ofile = ofile
self._fieldnames = None
self._buffer = StringIO()
self._writer = csv.writer(self._buffer, dialect=CsvDialect)
self._writerow = self._writer.writerow
self._finished = False
self._flushed = False
self._inspector = OrderedDict()
self._chunk_count = 0
self._record_count = 0
self._total_record_count = 0L
def QA_SU_save_account_to_csv(message,path=os.getcwd()):
__file_name_1 = '{}backtest-ca&history-{}.csv'.format(path,str(message['header']['cookie']))
with open(__file_name_1, 'w', newline='') as C:
csvwriter = csv.writer(C)
csvwriter.writerow(['date', 'code', 'price', 'towards', 'amount',
'order_id', 'trade_id', 'commission_fee', 'cash', 'assets'])
for i in range(0, max(len(message['body']['account']['cash']), len(message['body']['account']['assets']))):
try:
message['body']['account']['history'][i].append(
message['body']['account']['cash'][i])
message['body']['account']['history'][i].append(
message['body']['account']['assets'][i])
csvwriter.writerow(message['body']['account']['history'][i])
except:
pass
def QA_SU_save_pnl_to_csv(detail, cookie):
__file_name_2 = 'backtest-pnl--' + \
str(cookie) + '.csv'
with open(__file_name_2, 'w', newline='') as E:
csvwriter_1 = csv.writer(E)
csvwriter_1.writerow(detail.columns)
for item in detail:
csvwriter_1.writerow(item)
"""
'cash': message['body']['account']['cash'],
'hold': message['body']['account']['hold'],
'history': message['body']['account']['history'],
'assets': message['body']['account']['assets'],
'detail': message['body']['account']['detail']
"""
def __init__(self, ofile, maxresultrows=None):
self._maxresultrows = 50000 if maxresultrows is None else maxresultrows
self._ofile = ofile
self._fieldnames = None
self._buffer = StringIO()
self._writer = csv.writer(self._buffer, dialect=CsvDialect)
self._writerow = self._writer.writerow
self._finished = False
self._flushed = False
self._inspector = OrderedDict()
self._chunk_count = 0
self._record_count = 0
self._total_record_count = 0L
def convert_hypnograms(datadir):
"""
This function is quite a hack to read the edf hypnogram as a byte array.
I found no working reader for the hypnogram edfs.
"""
print('Converting hypnograms')
files = [x for x in os.listdir(datadir) if x.endswith('.hyp')]
for file in files:
file = os.path.join(datadir,file)
hypnogram = []
with open(file, mode='rb') as f: # b is important -> binary
raw_hypno = [x for x in str(f.read()).split('Sleep_stage_')][1:]
for h in raw_hypno:
stage = h[0]
repeat = int(h.split('\\')[0][12:])//30 # no idea if this also works on linux
hypnogram.extend(stage*repeat)
with open(file[:-4] + '.csv', "w") as f:
writer = csv.writer(f, lineterminator='\r')
writer.writerows(hypnogram)
def export_to_csv(request, variants):
#export to csv
export = request.GET.get('export', '')
if export != '':
if export == 'csv':
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = 'attachment; filename=export.csv'
writer = csv.writer(response)
elif export == 'txt':
response = HttpResponse(content_type='text/plain')
response['Content-Disposition'] = 'attachment; filename=export.txt'
writer = csv.writer(response, delimiter='\t', quoting=csv.QUOTE_NONE)
writer.writerow(['Individual', 'Index', 'Pos_index', 'Chr', 'Pos', 'Variant_id', 'Ref', 'Alt', 'Qual', 'Filter', 'Info', 'Format', 'Genotype_col', 'Genotype', 'Read_depth', 'Gene', 'Mutation_type', 'Vartype', 'Genomes1k_maf', 'Dbsnp_maf', 'Esp_maf', 'Dbsnp_build', 'Sift', 'Sift_pred', 'Polyphen2', 'Polyphen2_pred', 'Condel', 'Condel_pred', 'DANN', 'CADD', 'Is_at_omim', 'Is_at_hgmd', 'Hgmd_entries', 'Effect', 'Impact', 'Func_class', 'Codon_change', 'Aa_change', 'Aa_len', 'Gene_name', 'Biotype', 'Gene_coding', 'Transcript_id', 'Exon_rank', 'Genotype_number', 'Allele', 'Gene', 'Feature', 'Feature_type', 'Consequence', 'Cdna_position', 'Cds_position', 'Protein_position', 'Amino_acids', 'Codons', 'Existing_variation', 'Distance', 'Strand', 'Symbol', 'Symbol_source', 'Sift', 'Polyphen', 'Condel'])
for variant in variants:
# print 'variant', variant.index
writer.writerow([variant.individual, variant.index, variant.pos_index, variant.chr, variant.pos, variant.variant_id, variant.ref, variant.alt, variant.qual, variant.filter, pickle.loads(variant.info), variant.format, variant.genotype_col, variant.genotype, variant.read_depth, variant.gene, variant.mutation_type, variant.vartype, variant.genomes1k_maf, variant.dbsnp_maf, variant.esp_maf, variant.dbsnp_build, variant.sift, variant.sift_pred, variant.polyphen2, variant.polyphen2_pred, variant.condel, variant.condel_pred, variant.dann, variant.cadd, variant.is_at_omim, variant.is_at_hgmd, variant.hgmd_entries])
return response
def writerow(self, row):
"""Write row."""
line = []
for s in row:
if (type(s) == dict):
line.append(json.dumps(s))
else:
line.append(unicode(s).encode("utf-8"))
self.writer.writerow(line)
# Fetch UTF-8 output from the queue ...
data = self.queue.getvalue()
data = data.decode("utf-8")
# ... and reencode it into the target encoding
data = self.encoder.encode(data)
# write to the target stream
self.stream.write(data)
# empty queue
self.queue.truncate(0)
def get(self, request, *args, **kwargs):
queryset = self.get_queryset()
field_names = self.get_fields(queryset)
response = HttpResponse(content_type='text/csv')
filename = self.get_filename(queryset)
response['Content-Disposition'] = 'attachment; filename="{}.csv"'.format(filename)
writer = csv.writer(response, **self.get_csv_writer_fmtparams())
if self.specify_separator:
response.write('sep={}{}'.format(writer.dialect.delimiter, writer.dialect.lineterminator))
if self.header:
writer.writerow([self.get_header_name(queryset.model, field_name) for field_name in list(field_names)])
for obj in queryset:
writer.writerow([self.get_field_value(obj, field) for field in field_names])
return response
def _write_csv_output(note_phrase_matches, output_filename):
"""Write one CSV row for each phrase_match where the row contains all of
the RPDR note keys along with the extracted numerical value at the end of
the row."""
rpdr_rows_with_regex_value = []
for phrase_matches in note_phrase_matches:
row = phrase_matches.rpdr_note.get_keys()
if not phrase_matches.phrase_matches:
extracted_value = None
else:
extracted_value = phrase_matches.phrase_matches[0].extracted_value
row.append(extracted_value)
rpdr_rows_with_regex_value.append(row)
with open(output_filename, 'wb') as output_file:
csv_writer = csv.writer(output_file)
csv_writer.writerows(rpdr_rows_with_regex_value)
def write_data_to_csv(csv_name, people, countries):
""" Loop through the list of people and write them to a csv.
Args:
csv_name (str): Name of the file to write results to.
people (list): List of instantiated ``Person`` objects.
countries (list): List of strings that represent countries.
"""
with open(csv_name, 'w') as outfile:
writer = csv.writer(outfile)
columns = ['name'] + countries
writer.writerow(columns)
for person in people:
person_row = [person.name] + [
getattr(person, country, 0) for country in countries
]
writer.writerow(person_row)
def sensor_live(self):
x = []
y1 = []
y2 = []
for i in range(0,330,30): # change time interval here, if required
self.sensor_wake()
time.sleep(10)
pm = self.sensor_read()
if pm is not None:
x.append(i)
y1.append(pm[0])
y2.append(pm[1])
with open('/home/pi/data.csv', 'ab') as csvfile:
file = csv.writer(csvfile, delimiter=';', quotechar='"', quoting=csv.QUOTE_MINIMAL)
file.writerow([datetime.datetime.now().replace(microsecond=0).isoformat().replace('T', ' '), pm[0], pm[1]])
csvfile.close()
line1, = self.ax.plot(x,y1,'r-x')
line2, = self.ax.plot(x,y2,'b-x')
self.canvas.draw()
self.sensor_sleep()
time.sleep(20)
def list_to_csv(directory_and_filename, list):
if directory_and_filename[-4:] == '.csv':
directory_and_filename = directory_and_filename[:-4]
with open(directory_and_filename + '.csv', 'wb') as csvfile:
spamwriter = csv.writer(csvfile, quoting=csv.QUOTE_MINIMAL)
for row in list:
try:
spamwriter.writerow(row)
except UnicodeEncodeError:
new_row = []
for element in row:
if type(element) is unicode:
new_row.append(element.encode('utf-8'))
else:
new_row.append(element)
csvfile.close()
def generate_csv(domains, file_name):
output = open(file_name, 'w')
writer = csv.writer(output)
# First row should always be the headers
writer.writerow(CSV_HEADERS)
for domain in domains:
row = []
# Grab the dictionary for each row.
# Keys for the dict are the column headers.
results = domain.generate_results()
for column in CSV_HEADERS:
row.append(results[column])
writer.writerow(row)
output.close()
def result_write_to_csv(devices_info_list):
""" Write network devices information to csv format.
Args:
devices_info_list (list[NetworkDeviceDTO]): Lists of network device info instances.
"""
f = csv.writer(open("network_devices.csv", "w+"))
f.writerow(["Device Name", "IP Address", "MAC Address", "IOS/Firmware", "Platform",
"Serial Number", "Devcie Role", "Device Family"])
for device_info in devices_info_list:
f.writerow([device_info.hostname,
device_info.managementIpAddress,
device_info.macAddress,
device_info.softwareVersion,
device_info.platformId,
device_info.serialNumber,
device_info.role,
device_info.family])
def write_snp_summary(self, file="snp_summary.csv", summary_parameters=None, sort=False):
if summary_parameters is None:
summary_parameters = ["maf", "hwe", "rep", "call_rate"]
out_file = os.path.join(self.out_path, self.attributes["project"] + "_" + file)
out_data = [["id"] + summary_parameters]
snps = [[snp] + [data[parameter] for parameter in summary_parameters] for snp, data in self.data.items()]
if sort:
snps = sorted(snps, key=operator.itemgetter(*[i for i in range(1, len(summary_parameters)+1)]),
reverse=True)
out_data += snps
with open(out_file, "w") as snp_summary:
writer = csv.writer(snp_summary)
writer.writerows(out_data)
def _writeToCSV(self):
'''
INFO
----
Writes a 2-dimensional list to a CSV text file
Comma-delimits values. If there is no data, then there is no attempt to
creat a file.
RETURNS
-------
None
'''
if self._dataAsList:
with open(self._filePathAndName,'w') as csvFile:
writer = csv.writer(csvFile, lineterminator='\n', quoting=csv.QUOTE_NONNUMERIC )
writer.writerows(self._dataAsList)
csvFile.close()
def export_data(self):
self.running = False
self.pupil_thread.join(5)
self.lsl_thread.join(5)
print('Joined threads, now outputting pupil data.')
i = 0
while os.path.exists("data/pupil/data-%s.csv" % i):
i += 1
# csv writer with stim_type, msg, and timestamp, then data
with open('data/pupil/data-%s.csv' % i, 'w+') as f:
writer = csv.writer(f)
writer.writerow(('Signal Type', 'Msg', 'Time', 'Channel 1', 'Channel 2', 'Channel 3', 'Channel 4', 'Channel 5', 'Channel 6', 'Channel 7', 'Channel 8' ))
for sample in self.samples:
signal_type, timestamp, datas = sample
out = (signal_type, 'msg', timestamp)
for data in datas:
out = out + (data,)
writer.writerow(out)
def export_annotations(self,export_range,export_dir):
if not self.annotations:
logger.warning('No annotations in this recording nothing to export')
return
annotations_in_section = chain(*self.annotations_by_frame[export_range])
annotations_in_section = list({a['index']: a for a in annotations_in_section}.values()) # remove duplicates
annotations_in_section.sort(key=lambda a:a['index'])
with open(os.path.join(export_dir,'annotations.csv'),'w',encoding='utf-8',newline='') as csvfile:
csv_writer = csv.writer(csvfile)
csv_writer.writerow(self.csv_representation_keys())
for a in annotations_in_section:
csv_writer.writerow(self.csv_representation_for_annotations(a))
logger.info("Created 'annotations.csv' file.")
def write_key_value_file(csvfile,dictionary,append=False):
"""Writes a dictionary to a writable file in a CSV format
Args:
csvfile (FILE): Writable file
dictionary (dict): Dictionary containing key-value pairs
append (bool, optional): Writes `key,value` as fieldnames if False
Returns:
None: No return
"""
writer = csv.writer(csvfile, delimiter=',')
if not append:
writer.writerow(['key','value'])
for key,val in dictionary.items():
writer.writerow([key,val])
def _drop_answer_id_col_from_feature_file(self, train_file_location):
file_without_aid = insert_modifier_in_filename(train_file_location, 'no_aid')
if path.isfile(file_without_aid):
self.logger.info('Found a previously generated version of the training file without answer id column, '
're-using it: %s' % file_without_aid)
else:
self.logger.info('Generating a version of the feature file without answer id (which is what ranker'
' training expects')
temp_file = get_temp_file(file_without_aid)
with smart_file_open(temp_file, 'w') as outfile:
writer = csv.writer(outfile)
with smart_file_open(train_file_location) as infile:
reader = csv.reader(infile)
for row in reader:
writer.writerow(row[:1] + row[2:])
move(temp_file, file_without_aid)
self.logger.info('Done generating file: %s' % file_without_aid)
return file_without_aid
create_cross_validation_splits.py 文件源码
项目:retrieve-and-rank-tuning
作者: rchaks
项目源码
文件源码
阅读 32
收藏 0
点赞 0
评论 0
def setup_train_and_test_writer(output_dir):
"""
Create an output directory for the fold under the provided parent dir
:param str output_dir: file path to the output dir
:return: writer for <outdir.name>/train.csv and <outdir.name>/validation.csv
:rtype: tuple(csv.writer,csv.writer)
"""
if not path.isdir(output_dir):
makedirs(output_dir)
else:
LOGGER.warn("Path <<%s>> already exists, files may be overwritten" % output_dir)
train_writer = csv.writer(smart_file_open(path.join(output_dir, TRAIN_RELEVANCE_FILENAME), 'w'),
dialect=csv.excel, delimiter=',')
validation_writer = csv.writer(smart_file_open(path.join(output_dir, VALIDATION_RELEVANCE_FILENAME), 'w'),
dialect=csv.excel, delimiter=',')
return train_writer, validation_writer
generate_rnr_feature_file.py 文件源码
项目:retrieve-and-rank-tuning
作者: rchaks
项目源码
文件源码
阅读 32
收藏 0
点赞 0
评论 0
def _print_feature_vectors_and_check_for_correct_answers(writer, rnr_search_results, qid, correct_ans_lookup):
"""
write the search results to file as a feature vector with the qid and gt labels from the query.
:param csv.writer writer:
:param list(list(str)) rnr_search_results:
:param str qid: the qid to print at the start of each feature vector
:param dict(str,int) correct_ans_lookup: label lookup for correct answer ids
:return: num_possible_correct, num_correct_answers_in_search_results
:rtype: tuple(int,int)
"""
num_possible_correct = len(correct_ans_lookup)
num_correct_answers_in_search_results = 0
for row in rnr_search_results:
gt_label = 0
doc_id = row[_ANS_ID_COL].strip()
if doc_id in correct_ans_lookup:
gt_label = correct_ans_lookup[doc_id]
num_correct_answers_in_search_results += 1
writer.writerow([qid] + row + [gt_label])
return num_possible_correct, num_correct_answers_in_search_results
def scaneventresultexport(self, id, type, dialect="excel"):
dbh = SpiderFootDb(self.config)
data = dbh.scanResultEvent(id, type)
fileobj = StringIO()
parser = csv.writer(fileobj, dialect=dialect)
parser.writerow(["Updated", "Type", "Module", "Source", "F/P", "Data"])
for row in data:
if row[4] == "ROOT":
continue
lastseen = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(row[0]))
datafield = str(row[1]).replace("<SFURL>", "").replace("</SFURL>", "")
parser.writerow([lastseen, str(row[4]), str(row[3]), str(row[2]), row[13], datafield])
cherrypy.response.headers['Content-Disposition'] = "attachment; filename=SpiderFoot.csv"
cherrypy.response.headers['Content-Type'] = "application/csv"
cherrypy.response.headers['Pragma'] = "no-cache"
return fileobj.getvalue()
def scaneventresultexportmulti(self, ids, dialect="excel"):
dbh = SpiderFootDb(self.config)
scaninfo = dict()
data = list()
for id in ids.split(','):
scaninfo[id] = dbh.scanInstanceGet(id)
data = data + dbh.scanResultEvent(id)
fileobj = StringIO()
parser = csv.writer(fileobj, dialect=dialect)
parser.writerow(["Scan Name", "Updated", "Type", "Module", "Source", "F/P", "Data"])
for row in data:
if row[4] == "ROOT":
continue
lastseen = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(row[0]))
datafield = str(row[1]).replace("<SFURL>", "").replace("</SFURL>", "")
parser.writerow([scaninfo[row[12]][0], lastseen, str(row[4]), str(row[3]),
str(row[2]), row[13], datafield])
cherrypy.response.headers['Content-Disposition'] = "attachment; filename=SpiderFoot.csv"
cherrypy.response.headers['Content-Type'] = "application/csv"
cherrypy.response.headers['Pragma'] = "no-cache"
return fileobj.getvalue()