def harvest(self, limit=None, offset=None):
"""
Harvest the data from the file
:param offset: Integer offset for the row - starts from 0
:param limit: Interger limit of the rows to iterate over - starts from 0
:return: list of tuples containing CAS number and IUPAC name
"""
response = []
for i, row in enumerate(list(self.reader)[offset:]):
if limit:
if i == limit:
break
cas = row[0].split(' ', 1)[0]
cut_start_iupac = str(row[0].split('(', 1)[1])
iupac = cut_start_iupac.rsplit(')', 1)[0]
response.append({
"CAS": cas,
"IUPAC": iupac
})
return response
python类reader()的实例源码
def add_afsc(dict, fname):
"""
Add AFSCs from given filename into the dictionary.
:param dict: empty dictionary
:param fname: CSV file using '#' as delimiter
"""
with open(CSV_FOLDER + fname, newline='') as f:
reader = csv.reader(f, delimiter='#')
for row in reader:
base_afsc = row[0]
job_title = row[1]
afsc_dict = {"base_afsc": base_afsc,
"job_title": job_title,
"shreds": {},
"link": ""}
dict[base_afsc] = afsc_dict
def add_shreds(afsc_dict, fname):
"""
Add shreds from given filename into the dictionary.
:param dict: either enlisted_dict or officer_dict
:param fname: CSV file using '#' as delimiter
"""
with open(CSV_FOLDER + fname, newline='') as f:
reader = csv.reader(f, delimiter=',')
for row in reader:
base_afsc = row[0]
shred_char = row[1]
shred_title = row[2]
# if shred AFSC not in base afsc, skip it
try:
afsc_dict[base_afsc]["shreds"][shred_char] = shred_title
except KeyError:
pass
def process_desired_state(server_state, desired_state):
"""
Processes the user provided input file and determines any actions that need to happen because the server state is different than the desired state
:param server_state: The list of all users and their associated groups in the server
:param desired_state: The input provided by the user
:return: None
"""
group_id =[]
with open(desired_state) as csvDataFile:
csvReader = csv.reader(csvDataFile)
next(csvReader, None)
for email, lastname, firstname, groups in csvReader:
group_list = []
if groups != "":
group_list = groups.split(",")
group_list = [group.strip(' ') for group in group_list]
if email not in [user for user in server_state]:
actions[email] = {'action': 'add', 'groups': group_list, 'user_id':'', 'user_data': {'firstname': firstname, 'lastname': lastname, 'email': email, 'reset_password_required': True} }
else:
group_names_server = server_state[email]['groups'][0::2]
group_names_desired = groups.split(',')
group_names_desired = [group.strip(' ') for group in group_names_desired]
group_diff = [i for i in group_names_desired if i not in group_names_server]
if group_diff != [] and group_diff != ['']:
actions[email] = {'action': 'add to group', 'groups': group_diff, 'user_id': server_state[email]['user_id'] }
def extractRows(fileName):
fileName = 'results/cvpr_db_results/'+fileName+'.csv'
with open(fileName, 'r') as csvfile:
lines = csv.reader(csvfile)
for row in lines:
if row[0] != "name":
retval = [int(x) if x != "" else -100 for x in row[1:-2] ]
else:
nameRow = row[1:-2]
if row[0] == "causalgrammar":
causalRow = retval
elif row[0] == "origdata":
origRow = retval
elif row[0] == "random":
randomRow = retval
return {"nameRow": nameRow, "causalRow": causalRow, "origRow": origRow, "randomRow": randomRow}
def _get_external_data(url):
result = {}
try:
# urlopen might fail if it runs into redirections,
# because of Python issue #13696. Fixed in locators
# using a custom redirect handler.
resp = urlopen(url)
headers = resp.info()
ct = headers.get('Content-Type')
if not ct.startswith('application/json'):
logger.debug('Unexpected response for JSON request: %s', ct)
else:
reader = codecs.getreader('utf-8')(resp)
#data = reader.read().decode('utf-8')
#result = json.loads(data)
result = json.load(reader)
except Exception as e:
logger.exception('Failed to get external data for %s: %s', url, e)
return result
def reader(self, stream, context):
"""
Read lines from a subprocess' output stream and either pass to a progress
callable (if specified) or write progress information to sys.stderr.
"""
progress = self.progress
verbose = self.verbose
while True:
s = stream.readline()
if not s:
break
if progress is not None:
progress(s, context)
else:
if not verbose:
sys.stderr.write('.')
else:
sys.stderr.write(s.decode('utf-8'))
sys.stderr.flush()
stream.close()
def _get_external_data(url):
result = {}
try:
# urlopen might fail if it runs into redirections,
# because of Python issue #13696. Fixed in locators
# using a custom redirect handler.
resp = urlopen(url)
headers = resp.info()
ct = headers.get('Content-Type')
if not ct.startswith('application/json'):
logger.debug('Unexpected response for JSON request: %s', ct)
else:
reader = codecs.getreader('utf-8')(resp)
#data = reader.read().decode('utf-8')
#result = json.loads(data)
result = json.load(reader)
except Exception as e:
logger.exception('Failed to get external data for %s: %s', url, e)
return result
def reader(self, stream, context):
"""
Read lines from a subprocess' output stream and either pass to a progress
callable (if specified) or write progress information to sys.stderr.
"""
progress = self.progress
verbose = self.verbose
while True:
s = stream.readline()
if not s:
break
if progress is not None:
progress(s, context)
else:
if not verbose:
sys.stderr.write('.')
else:
sys.stderr.write(s.decode('utf-8'))
sys.stderr.flush()
stream.close()
def loadData(bfile, extractSim, phenoFile, missingPhenotype='-9', loadSNPs=False, standardize=True):
bed = Bed(bfile)
if (extractSim is not None):
f = open(extractSim)
csvReader = csv.reader(f)
extractSnpsSet = set([])
for l in csvReader: extractSnpsSet.add(l[0])
f.close()
keepSnpsInds = [i for i in xrange(bed.sid.shape[0]) if bed.sid[i] in extractSnpsSet]
bed = bed[:, keepSnpsInds]
phe = None
if (phenoFile is not None): bed, phe = loadPheno(bed, phenoFile, missingPhenotype)
if (loadSNPs):
bed = bed.read()
if (standardize): bed = bed.standardize()
return bed, phe
def prepare_data(imagery_path, train_file_path, split_points):
# Read tiff image
image_tuple = read_image(imagery_path)
# Read samples
original_data_list = []
csv_reader = csv.reader(open(train_file_path, encoding='utf-8'))
for row in csv_reader:
original_data_list.append(row)
original_data_array = np.array(original_data_list)
# Split training data into variables and lables
x_s = original_data_array[:,split_points[0]:split_points[1]]
y_s = original_data_array[:,split_points[1]]
return x_s, y_s, image_tuple
# Read image
def read_csv_rows(path):
"""
Extract the rows from the CSV at the specified path.
Will throw an error if the file doesn't exist.
:type path: string
:rtype: list[list[string]]
"""
with open(path, 'rU') as infile:
reader = csv.reader(infile, delimiter=',')
rows = [row for row in reader]
# eliminate trailing cols that have no entries (CSI-215)
for idx, row in enumerate(rows):
clipIndex = 0
for col in row[::-1]:
if not col:
clipIndex -= 1
else:
break
if clipIndex < 0:
rows[idx] = rows[idx][:clipIndex]
return rows
def load_solar_data():
with open('solar label.csv', 'r') as csvfile:
reader = csv.reader(csvfile)
rows = [row for row in reader]
labels = np.array(rows, dtype=int)
print(shape(labels))
with open('solar.csv', 'r') as csvfile:
reader = csv.reader(csvfile)
rows = [row for row in reader]
rows = np.array(rows, dtype=float)
rows=rows[:104832,:]
print(shape(rows))
trX = np.reshape(rows.T,(-1,576))
print(shape(trX))
m = np.ndarray.max(rows)
print("maximum value of solar power", m)
trY=np.tile(labels,(32,1))
trX=trX/m
return trX,trY
def read_from_csv(filename, column_names):
data = []
with open(filename) as csv_file:
reader = csv.reader(csv_file)
is_header_row = True
for row in reader:
if is_header_row:
for col in row:
data.append([])
is_header_row = False
else:
colnum = 0
for col in row:
data[colnum].append(float(col))
colnum += 1
return data
def read_from_csv(filename, column_names):
data = []
with open(filename) as csv_file:
reader = csv.reader(csv_file)
is_header_row = True
for row in reader:
if is_header_row:
for col in row:
data.append([])
is_header_row = False
else:
colnum = 0
for col in row:
data[colnum].append(float(col))
colnum += 1
return data
def collect_moves(self, reader, name):
Moves = namedtuple('Moves', ['pokemon', 'gen', 'color', 'moves', 'versions'])
if name.split('-')[-1].isdigit():
for row in reader:
if name == row[0]:
pokemon = name.split('-')[0].title()
generation, color = switcher[row[1]], int(ast.literal_eval(row[2]))
moves, versions = ast.literal_eval(row[3]), ast.literal_eval(row[4])
return Moves(pokemon, generation, color, moves, versions)
else:
for row in reader:
if name in row[0]:
pokemon = name.title()
generation, color = switcher[row[1]], int(ast.literal_eval(row[2]))
moves, versions = ast.literal_eval(row[3]), ast.literal_eval(row[4])
return Moves(pokemon, generation, color, moves, versions)
def _records_protocol_v1(self, ifile):
reader = csv.reader(ifile, dialect=CsvDialect)
try:
fieldnames = reader.next()
except StopIteration:
return
mv_fieldnames = {name: name[len('__mv_'):] for name in fieldnames if name.startswith('__mv_')}
if len(mv_fieldnames) == 0:
for values in reader:
yield OrderedDict(izip(fieldnames, values))
return
for values in reader:
record = OrderedDict()
for fieldname, value in izip(fieldnames, values):
if fieldname.startswith('__mv_'):
if len(value) > 0:
record[mv_fieldnames[fieldname]] = self._decode_list(value)
elif fieldname not in record:
record[fieldname] = value
yield record
def main():
indexfilepath = r'C:\Users\Paul Bilokon\Documents\dev\alexandria\bilokon-msc\dissertation\code\winbugs\svl2\dataset-1\coda-index.txt'
chainfilepath = r'C:\Users\Paul Bilokon\Documents\dev\alexandria\bilokon-msc\dissertation\code\winbugs\svl2\dataset-1\coda-for-chain-1.txt'
index = readindexfile(indexfilepath)
print(index)
data = []
indexoffset = None
with open(chainfilepath, 'rt') as chainfile:
reader = csv.reader(chainfile, delimiter='\t')
for row in reader:
index, value = int(row[0]), float(row[1])
if not data: indexoffset = index
print(index, indexoffset)
assert index == len(data) + indexoffset
data.append(value)
print(data)
def _get_external_data(url):
result = {}
try:
# urlopen might fail if it runs into redirections,
# because of Python issue #13696. Fixed in locators
# using a custom redirect handler.
resp = urlopen(url)
headers = resp.info()
if headers.get('Content-Type') != 'application/json':
logger.debug('Unexpected response for JSON request')
else:
reader = codecs.getreader('utf-8')(resp)
#data = reader.read().decode('utf-8')
#result = json.loads(data)
result = json.load(reader)
except Exception as e:
logger.exception('Failed to get external data for %s: %s', url, e)
return result
def reader(self, stream, context):
"""
Read lines from a subprocess' output stream and either pass to a progress
callable (if specified) or write progress information to sys.stderr.
"""
progress = self.progress
verbose = self.verbose
while True:
s = stream.readline()
if not s:
break
if progress is not None:
progress(s, context)
else:
if not verbose:
sys.stderr.write('.')
else:
sys.stderr.write(s.decode('utf-8'))
sys.stderr.flush()
stream.close()
def _get_external_data(url):
result = {}
try:
# urlopen might fail if it runs into redirections,
# because of Python issue #13696. Fixed in locators
# using a custom redirect handler.
resp = urlopen(url)
headers = resp.info()
ct = headers.get('Content-Type')
if not ct.startswith('application/json'):
logger.debug('Unexpected response for JSON request: %s', ct)
else:
reader = codecs.getreader('utf-8')(resp)
#data = reader.read().decode('utf-8')
#result = json.loads(data)
result = json.load(reader)
except Exception as e:
logger.exception('Failed to get external data for %s: %s', url, e)
return result
def reader(self, stream, context):
"""
Read lines from a subprocess' output stream and either pass to a progress
callable (if specified) or write progress information to sys.stderr.
"""
progress = self.progress
verbose = self.verbose
while True:
s = stream.readline()
if not s:
break
if progress is not None:
progress(s, context)
else:
if not verbose:
sys.stderr.write('.')
else:
sys.stderr.write(s.decode('utf-8'))
sys.stderr.flush()
stream.close()
def _records_protocol_v1(self, ifile):
reader = csv.reader(ifile, dialect=CsvDialect)
try:
fieldnames = reader.next()
except StopIteration:
return
mv_fieldnames = {name: name[len('__mv_'):] for name in fieldnames if name.startswith('__mv_')}
if len(mv_fieldnames) == 0:
for values in reader:
yield OrderedDict(izip(fieldnames, values))
return
for values in reader:
record = OrderedDict()
for fieldname, value in izip(fieldnames, values):
if fieldname.startswith('__mv_'):
if len(value) > 0:
record[mv_fieldnames[fieldname]] = self._decode_list(value)
elif fieldname not in record:
record[fieldname] = value
yield record
def loadRecord(line):
"""
????csv??
"""
input_line=StringIO.StringIO(line)
#row=unicodecsv.reader(input_line, encoding="utf-8")
#return row.next()
#reader=csv.DictReader(input_line,fieldnames=["id","qid1","qid2","question1","question2","is_duplicate"])
reader=csv.reader(input_line)
return reader.next()
#data=[]
#for row in reader:
# print row
# data.append([unicode(cell,"utf-8") for cell in row])
#return data[0]
#return reader.next()
#raw_data=sc.textFile(train_file_path).map(loadRecord)
#print raw_data.take(10)
def __init__(self, bot):
self.bot = bot
# Add commands as random subcommands
for name, command in inspect.getmembers(self):
if isinstance(command, commands.Command) and command.parent is None and name != "random":
self.bot.add_command(command)
self.random.add_command(command)
# Add fact subcommands as subcommands of corresponding commands
for command, parent in ((self.fact_cat, self.cat), (self.fact_date, self.date), (self.fact_number, self.number)):
utilities.add_as_subcommand(self, command, parent, "fact")
# Add random subcommands as subcommands of corresponding commands
self.random_subcommands = ((self.color, "Resources.color"), (self.giphy, "Resources.giphy"), (self.map, "Resources.map"), (self.streetview, "Resources.streetview"), (self.uesp, "Search.uesp"), (self.wikipedia, "Search.wikipedia"), (self.xkcd, "Resources.xkcd"))
for command, parent_name in self.random_subcommands:
utilities.add_as_subcommand(self, command, parent_name, "random")
# Import jokes
self.jokes = []
try:
with open("data/jokes.csv", newline = "") as jokes_file:
jokes_reader = csv.reader(jokes_file)
for row in jokes_reader:
self.jokes.append(row[0])
except FileNotFoundError:
pass
def main():
print()
csvnames = sys.argv[1:]
columns_named=False
scores = []
for csvname in sys.argv[1:] :
print("Reading scores from "+csvname+" .")
with open(csvname, 'r') as csvfile:
score_reader = csv.reader(csvfile)
for row in score_reader:
print(', '.join(row))
if 'Winner' in row[0]:
continue
if 'Index' in row[0] and columns_named:
continue
else:
scores.append(row)
columns_named=True
csvfile.close()
winner = metarank(scores)
def read_turk_dic_proton():
"""
dic of abs_id to answers to (wid, q3,q4) for each worker
"""
f = open("data/proton-beam-RawTurkResults.csv")
first_line = f.readline()
csv_reader = csv.reader(f)
turk_dic = {}
for row in csv_reader:
(AssignmentId, WorkerId, HITId, AcceptTime, SubmitTime, ApprovalTime, TimeToComplete, AbstractId, Question1, Question2, Question3, Question4, Relevant) = tuple(row)
AbstractId = int(AbstractId)
if AbstractId not in turk_dic: turk_dic[AbstractId] = []
turk_dic[AbstractId].append( (Question3, Question4) )
return turk_dic
def get_pub_dic_csv(dataset):
filename = "data/" + dataset + "-text.csv"
f = open(filename)
f.readline()
csv_reader = csv.reader(f)
# Create dic of : id -> text features
pub_dic = {}
for row in csv_reader:
if dataset.startswith("RCT"):
(abstract_id, abstract, title) = tuple(row)[0:3]
else:
(abstract_id, title, publisher, abstract) = tuple(row)[0:4]
abstract_id = int(abstract_id)
text = title + abstract
pub_dic[abstract_id] = text
return pub_dic
def load(corpus_csv_file: Path,
sampled_training_example_count: Optional[int] = None) -> 'Corpus':
import csv
with corpus_csv_file.open(encoding='utf8') as opened_csv:
reader = csv.reader(opened_csv, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
def to_absolute(audio_file_path: Path) -> Path:
return audio_file_path if audio_file_path.is_absolute() else Path(
corpus_csv_file.parent) / audio_file_path
examples = [
(
LabeledExampleFromFile(
audio_file=to_absolute(Path(audio_file_path)), id=id, label=label,
positional_label=None if positional_label == "" else PositionalLabel.deserialize(
positional_label)), Phase[phase])
for id, audio_file_path, label, phase, positional_label in reader]
return Corpus(training_examples=[e for e, phase in examples if phase == Phase.training],
test_examples=[e for e, phase in examples if phase == Phase.test],
sampled_training_example_count=sampled_training_example_count)
def test_UnicodeWriter(self):
"""Test UnicodeWriter class works."""
tmp = tempfile.NamedTemporaryFile()
uw = util.UnicodeWriter(tmp)
fake_csv = ['one, two, three, {"i": 1}']
for row in csv.reader(fake_csv):
# change it for a dict
row[3] = dict(i=1)
uw.writerow(row)
tmp.seek(0)
err_msg = "It should be the same CSV content"
with open(tmp.name, 'rb') as f:
reader = csv.reader(f)
for row in reader:
for item in row:
assert item in fake_csv[0], err_msg