def test_write_escape(self):
self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
escapechar='\\')
self._write_error_test(csv.Error, ['a',1,'p,"q"'],
escapechar=None, doublequote=False)
self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
escapechar='\\', doublequote = False)
self._write_test(['"'], '""""',
escapechar='\\', quoting = csv.QUOTE_MINIMAL)
self._write_test(['"'], '\\"',
escapechar='\\', quoting = csv.QUOTE_MINIMAL,
doublequote = False)
self._write_test(['"'], '\\"',
escapechar='\\', quoting = csv.QUOTE_NONE)
self._write_test(['a',1,'p,q'], 'a,1,p\\,q',
escapechar='\\', quoting = csv.QUOTE_NONE)
python类QUOTE_NONE的实例源码
def test_write_arg_valid(self):
self.assertRaises(csv.Error, self._write_test, None, '')
self._write_test((), '')
self._write_test([None], '""')
self.assertRaises(csv.Error, self._write_test,
[None], None, quoting = csv.QUOTE_NONE)
# Check that exceptions are passed up the chain
class BadList:
def __len__(self):
return 10;
def __getitem__(self, i):
if i > 2:
raise IOError
self.assertRaises(IOError, self._write_test, BadList(), '')
class BadItem:
def __str__(self):
raise IOError
self.assertRaises(IOError, self._write_test, [BadItem()], '')
def test_write_escape(self):
self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
escapechar='\\')
self.assertRaises(csv.Error,
self._write_test,
['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
escapechar=None, doublequote=False)
self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
escapechar='\\', doublequote = False)
self._write_test(['"'], '""""',
escapechar='\\', quoting = csv.QUOTE_MINIMAL)
self._write_test(['"'], '\\"',
escapechar='\\', quoting = csv.QUOTE_MINIMAL,
doublequote = False)
self._write_test(['"'], '\\"',
escapechar='\\', quoting = csv.QUOTE_NONE)
self._write_test(['a',1,'p,q'], 'a,1,p\\,q',
escapechar='\\', quoting = csv.QUOTE_NONE)
def parse(self):
import csv
reader = csv.reader(
self.stream,
delimiter = ',',
quotechar = None,
escapechar = None,
doublequote = False,
skipinitialspace = True,
lineterminator = '\r\n',
quoting = csv.QUOTE_NONE)
it = iter(reader)
row = reader.next()
self.parse_header(row)
for row in it:
self.parse_row(row)
# compute derived data
self.profile.validate()
self.profile.find_cycles()
self.profile.ratio(TIME_RATIO, SAMPLES)
self.profile.call_ratios(SAMPLES2)
self.profile.integrate(TOTAL_TIME_RATIO, TIME_RATIO)
return self.profile
def glove2dict(src_filename):
"""GloVe Reader.
Parameters
----------
src_filename : str
Full path to the GloVe file to be processed.
Returns
-------
dict
Mapping words to their GloVe vectors.
"""
reader = csv.reader(open(src_filename), delimiter=' ',
quoting=csv.QUOTE_NONE)
return {line[0]: np.array(list(map(float, line[1:]))) for line in reader}
def buildMap(train_path):
df_train = pd.read_csv(train_path,delimiter='\t',quoting=csv.QUOTE_NONE,skip_blank_lines=False,header=None,names=['word','label'])
# print df_train
# print df_train['word'][df_train['word'].notnull()]
words = list(set(df_train['word'][df_train['word'].notnull()]))
labels = list(set(df_train['label'][df_train['label'].notnull()]))
word2id = dict(zip(words,range(1,len(words)+1)))
label2id = dict(zip(labels,range(1,len(labels)+1)))
id2word = dict(zip(range(1,len(words)+1),words))
id2label = dict(zip(range(1, len(labels) + 1), labels))
id2word[0] = "<PAD>"
id2label[0] = "<PAD>"
word2id["<PAD>"] = 0
label2id["<PAD>"] = 0
id2word[len(words)+1] = "<NEW>"
id2label[len(labels)+1] = "<NEW>"
word2id["<NEW>"] = len(words)+1
label2id["<NEW>"] = len(labels)+1
saveMap(id2word,id2label)
return word2id,id2word,label2id,id2label
def getTestData(test_path,seq_max_len,is_validation = True):
word2id,id2word = loadMap('data/word2id')
label2id,id2label = loadMap('data/label2id')
#print word2id
df_test = pd.read_csv(test_path,delimiter='\t',skip_blank_lines=False,header=None,quoting=csv.QUOTE_NONE,names=['word','label'])
def mapfunc(x):
if str(x) == str(np.nan):
return -1
elif x not in word2id:
return word2id['<NEW>']
else:
return word2id[x]
df_test['word_id'] = df_test.word.map(lambda x : mapfunc(x))
df_test['label_id'] = df_test.label.map(lambda x : -1 if str(x) == str(np.nan) else label2id[x])
if is_validation:
X_test,y_test = prepare(df_test['word_id'],df_test['label_id'],seq_max_len)
return X_test,y_test
else:
df_test['word'] = df_test.word.map(lambda x : -1 if str(x) == str(np.nan) else x)
df_test['label'] = df_test.label.map(lambda x : -1 if str(x) == str(np.nan) else x)
X_test,_ = prepare(df_test['word_id'],df_test['word_id'],seq_max_len)
X_test_str,X_test_label_str = prepare(df_test['word'],df_test['label'],seq_max_len,is_padding=False)
#print X_test_str
return X_test,X_test_str,X_test_label_str
def newStats2CSV(files, out_file):
arr = [ [] ] * len(files)
for j in range( len(files)):
values = []
with open(files[j], 'r') as fp:
for line in fp:
values += [ float(w) for w in line.split()\
if w[0] in ['0','1','2','3','4','5','6','7','8','9'] ]
arr[j] = values
with open(out_file, 'w') as fq:
stats_writer = csv.writer( fq, delimiter=',', quotechar='\'')#, quoting=csv.QUOTE_NONE )
for i in range(0,len(stats_tiltes)):
row = [stats_tiltes[i]] + [arr[j][i] for j in range(len(files))]
stats_writer.writerow( row )
def parse(self):
import csv
reader = csv.reader(
self.stream,
delimiter = ',',
quotechar = None,
escapechar = None,
doublequote = False,
skipinitialspace = True,
lineterminator = '\r\n',
quoting = csv.QUOTE_NONE)
it = iter(reader)
row = reader.next()
self.parse_header(row)
for row in it:
self.parse_row(row)
# compute derived data
self.profile.validate()
self.profile.find_cycles()
self.profile.ratio(TIME_RATIO, SAMPLES)
self.profile.call_ratios(SAMPLES2)
self.profile.integrate(TOTAL_TIME_RATIO, TIME_RATIO)
return self.profile
def __parse_file(self):
filename = self.options['filename']
if not filename:
raise IOError
sep = self.options['column_separator']
quote = self.options['quote_character']
values = []
with open(filename, 'rU') as infile:
# if sep is not a one character string, csv.reader will raise a TypeError
if not quote:
csvreader = csv.reader(infile, delimiter=str(sep), quoting=csv.QUOTE_NONE)
else:
csvreader = csv.reader(infile, delimiter=str(sep), quotechar=str(quote))
# get each line from the file and separate it into columns based on sep
for row in csvreader:
# append all lines as-is case-wise
# unicode(str, errors='ignore') causes all invalid characters to be stripped out
values.append([unicode(value.strip(), errors='ignore') for value in row])
# ensure the number of columns in each row is the same as the previous row
if len(values) > 1:
assert len(values[-1]) == len(values[-2])
return values
def buildMap(train_path="train.in"):
df_train = pd.read_csv(train_path, delimiter='\t', quoting=csv.QUOTE_NONE, skip_blank_lines=False, header=None, names=["char", "label"])
chars = list(set(df_train["char"][df_train["char"].notnull()]))
labels = list(set(df_train["label"][df_train["label"].notnull()]))
char2id = dict(zip(chars, range(1, len(chars) + 1)))
label2id = dict(zip(labels, range(1, len(labels) + 1)))
id2char = dict(zip(range(1, len(chars) + 1), chars))
id2label = dict(zip(range(1, len(labels) + 1), labels))
id2char[0] = "<PAD>"
id2label[0] = "<PAD>"
char2id["<PAD>"] = 0
label2id["<PAD>"] = 0
id2char[len(chars) + 1] = "<NEW>"
char2id["<NEW>"] = len(chars) + 1
saveMap(id2char, id2label)
return char2id, id2char, label2id, id2label
def test_write_arg_valid(self):
self.assertRaises(csv.Error, self._write_test, None, '')
self._write_test((), '')
self._write_test([None], '""')
self.assertRaises(csv.Error, self._write_test,
[None], None, quoting = csv.QUOTE_NONE)
# Check that exceptions are passed up the chain
class BadList:
def __len__(self):
return 10;
def __getitem__(self, i):
if i > 2:
raise OSError
self.assertRaises(OSError, self._write_test, BadList(), '')
class BadItem:
def __str__(self):
raise OSError
self.assertRaises(OSError, self._write_test, [BadItem()], '')
def test_write_escape(self):
self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
escapechar='\\')
self.assertRaises(csv.Error,
self._write_test,
['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
escapechar=None, doublequote=False)
self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
escapechar='\\', doublequote = False)
self._write_test(['"'], '""""',
escapechar='\\', quoting = csv.QUOTE_MINIMAL)
self._write_test(['"'], '\\"',
escapechar='\\', quoting = csv.QUOTE_MINIMAL,
doublequote = False)
self._write_test(['"'], '\\"',
escapechar='\\', quoting = csv.QUOTE_NONE)
self._write_test(['a',1,'p,q'], 'a,1,p\\,q',
escapechar='\\', quoting = csv.QUOTE_NONE)
def __read_imagenet(path, shuffle=True, save_file = 'imagenet_files.csv'):
if not os.path.exists(save_file):
def class_index(fn):
class_id = re.search(r'(n\d+)', fn).group(1)
return synset_map[class_id]['index']
file_list = glob.glob(path+'/*/*.JPEG')
label_indexes = []
with open(save_file, 'wb') as csv_file:
wr = csv.writer(csv_file, quoting=csv.QUOTE_NONE)
for f in file_list:
idx = class_index(f)
label_indexes.append(idx)
wr.writerow([f, idx])
with open(save_file, 'rb') as f:
reader = csv.reader(f)
file_list = list(reader)
file_tuple, label_tuple = zip(*file_list)
filename, labels = tf.train.slice_input_producer([list(file_tuple), list(label_tuple)], shuffle=shuffle)
images = tf.image.decode_jpeg(tf.read_file(filename), channels=3)
images = tf.div(tf.add(tf.to_float(images), -127), 128)
return images, tf.string_to_number(labels, tf.int32)
def run(self):
print "Starting thread to write results into the file %s..." % self._out_file_path
with open(self._out_file_path, 'wt', buffering=1) as csvfile:
#writer = csv.writer(csvfile, delimiter=',', quoting=csv.QUOTE_NONE)
dict_writer = csv.DictWriter(csvfile, self._fieldnames)
dict_writer.writeheader()
while not self._exit.is_set():
while True:
queue_item=None
try:
queue_item = self._queue.get(True, 5)
except Queue.Empty:
break
dict_writer.writerow(queue_item)
self._counter += 1
self._queue.task_done()
if self._counter % 10 == 0:
print "Analyzed %d pairs..." % self._counter
print "Finishing file writer thread..."
mgf_search_result_annotator.py 文件源码
项目:spectra-cluster-py
作者: spectra-cluster
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def parse_msgfplus(filename, fdr):
"""
Extracts the PSMs from a MSGF+ search result file.
:param filename: Filename of the MSGF+ search result file (only text file supported)
:param fdr: Target FDR as fractional (ie. 0.01 for 1%)
:return: A list of PSM objects
"""
msgfplus_results = list()
with open(filename, newline="") as result_file:
msgfplus_result_reader = csv.DictReader(result_file, delimiter="\t", quoting=csv.QUOTE_NONE)
for msgfplus_psm in msgfplus_result_reader:
# ignore all PSMs below the set FDR
if float(msgfplus_psm['PepQValue']) > fdr:
continue
psm = Psm(int(msgfplus_psm["SpecID"][6:]), msgfplus_psm["Peptide"])
msgfplus_results.append(psm)
return msgfplus_results
def write_genome_gtf(self, out_gtf_fn):
with open(out_gtf_fn, 'wb') as f:
writer = csv.writer(f, delimiter='\t', quoting=csv.QUOTE_NONE, quotechar='')
for genome_prefix, in_gtf_fn in itertools.izip(self.genome_prefixes, self.in_gtf_fns):
if len(self.genomes) > 1:
prefix_func = lambda s: '%s_%s' % (genome_prefix, s)
else:
prefix_func = lambda s: s
transcript_to_chrom = {}
cross_chrom_transcripts = set()
for row, is_comment, properties in self.gtf_reader_iter(in_gtf_fn):
if is_comment:
writer.writerow(row)
continue
chrom = prefix_func(row[0])
row[0] = chrom
if 'transcript_id' in properties:
properties['transcript_id'] = prefix_func(properties['transcript_id'])
curr_tx = properties['transcript_id']
if curr_tx in transcript_to_chrom and transcript_to_chrom[curr_tx] != chrom:
# ignore recurrences of a transcript on different chromosomes - it will break the STAR index
cross_chrom_transcripts.add(curr_tx)
continue
transcript_to_chrom[curr_tx] = chrom
if 'gene_id' in properties:
properties['gene_id'] = prefix_func(properties['gene_id'])
if 'gene_name' in properties:
properties['gene_name'] = prefix_func(properties['gene_name'])
row[8] = self.format_properties_dict(properties)
writer.writerow(row)
print "WARNING: The following transcripts appear on multiple chromosomes in the GTF:"
print '\n'.join(list(cross_chrom_transcripts)) + '\n'
print "This can indicate a problem with the reference or annotations. Only the first chromosome will be counted."
def parse(self):
import csv
reader = csv.reader(
self.stream,
delimiter = ',',
quotechar = None,
escapechar = None,
doublequote = False,
skipinitialspace = True,
lineterminator = '\r\n',
quoting = csv.QUOTE_NONE)
header = True
for row in reader:
if header:
self.parse_header(row)
header = False
else:
self.parse_row(row)
# compute derived data
self.profile.validate()
self.profile.find_cycles()
self.profile.ratio(TIME_RATIO, SAMPLES)
self.profile.call_ratios(SAMPLES2)
self.profile.integrate(TOTAL_TIME_RATIO, TIME_RATIO)
return self.profile
def load_data(self):
logging.info('loading the dataset from %s' %self.data_home)
train_file = os.path.join(self.data_home, 'user_info.train.gz')
dev_file = os.path.join(self.data_home, 'user_info.dev.gz')
test_file = os.path.join(self.data_home, 'user_info.test.gz')
df_train = pd.read_csv(train_file, delimiter='\t', encoding=self.encoding, names=['user', 'lat', 'lon', 'text'], quoting=csv.QUOTE_NONE, error_bad_lines=False)
df_dev = pd.read_csv(dev_file, delimiter='\t', encoding=self.encoding, names=['user', 'lat', 'lon', 'text'], quoting=csv.QUOTE_NONE, error_bad_lines=False)
df_test = pd.read_csv(test_file, delimiter='\t', encoding=self.encoding, names=['user', 'lat', 'lon', 'text'], quoting=csv.QUOTE_NONE, error_bad_lines=False)
df_train.dropna(inplace=True)
df_dev.dropna(inplace=True)
df_test.dropna(inplace=True)
df_train['user'] = df_train['user'].apply(lambda x: str(x).lower())
df_train.drop_duplicates(['user'], inplace=True, keep='last')
df_train.set_index(['user'], drop=True, append=False, inplace=True)
df_train.sort_index(inplace=True)
df_dev['user'] = df_dev['user'].apply(lambda x: str(x).lower())
df_dev.drop_duplicates(['user'], inplace=True, keep='last')
df_dev.set_index(['user'], drop=True, append=False, inplace=True)
df_dev.sort_index(inplace=True)
df_test['user'] = df_test['user'].apply(lambda x: str(x).lower())
df_test.drop_duplicates(['user'], inplace=True, keep='last')
df_test.set_index(['user'], drop=True, append=False, inplace=True)
df_test.sort_index(inplace=True)
self.df_train = df_train
self.df_dev = df_dev
self.df_test = df_test
def loadCSV(csvfile):
""" Load the results
Retuns: ID list of ints, ROI (array) of strings
"""
# get information about the slices first
vcnt=5 # we expect 5 error measures
values = {}
with open(csvfile, 'rb') as csvfile:
reader = csv.reader(csvfile, delimiter=',', quoting=csv.QUOTE_NONE)
values = {}
cnt = 0
for row in reader:
if(reader.line_num == 1):
apps=[]
for i in range(len(row)):
if row[i] != '': apps.append(row[i])
apps = apps[1:]
elif(reader.line_num == 2):
labels=row[1:vcnt+1]
elif(reader.line_num > 2):
d={}
for i in range(len(apps)):
d[apps[i]]=row[1+vcnt*i: 1+vcnt*(i+1)]
values[row[0]] = d
cnt +=1
#if cnt > 100: break
return apps, labels, values