def read_data(user_id=None, email=None):
filename = file_item_path
with open(filename, "r") as csvfile:
reader = csv.DictReader(csvfile)
items = []
unknown_user_id = None
unknown_email = None
for row in reader:
if user_id is not None:
if int(user_id) == int(row.get("id")):
return row
else:
unknown_user_id = user_id
if email is not None:
if email == row.get("email"):
return row
else:
unknown_email = email
if unknown_user_id is not None:
return "User id {user_id} not found".format(user_id=user_id)
if unknown_email is not None:
return "Email {email} not found".format(email=email)
return None
python类DictReader()的实例源码
def get_user_data(self, user_id=None, email=None):
filename = file_item_path
with open(filename, "r") as csvfile:
reader = csv.DictReader(csvfile)
items = []
unknown_user_id = None
unknown_email = None
for row in reader:
if user_id is not None:
if int(user_id) == int(row.get("id")):
return row
else:
unknown_user_id = user_id
if email is not None:
if email == row.get("email"):
return row
else:
unknown_email = email
if unknown_user_id is not None:
print("User id {user_id} not found".format(user_id=user_id))
if unknown_email is not None:
print("Email {email} not found".format(email=email))
return None
def read_data(user_id=None, email=None):
filename = "data.csv"
with open(filename, "r") as csvfile:
reader = csv.DictReader(csvfile)
items = []
unknown_user_id = None
unknown_email = None
for row in reader:
if user_id is not None:
if int(user_id) == int(row.get("id")):
return row
else:
unknown_user_id = user_id
if email is not None:
if email == row.get("email"):
return row
else:
unknown_email = email
if unknown_user_id is not None:
return "User id {user_id} not found".format(user_id=user_id)
if unknown_email is not None:
return "Email {email} not found".format(email=email)
return None
def sample_to_run_data_mapping(samples_dir):
'''
return dict
each key is string "sample_id"
each value is a list of tuples ("library", "barcode")
'''
runs_file = samples_dir + "runs.tsv"
sr_mapping = {}
with open(runs_file) as tsv:
for row in csv.DictReader(tsv, delimiter="\t"):
sample = row["sample_id"]
rb_pair = (row["run_name"], row["barcode_id"])
if sample not in sr_mapping:
sr_mapping[sample] = []
sr_mapping[sample].append(rb_pair)
return sr_mapping
def sample_to_metadata_mapping(samples_dir):
'''
return dict
each key is string "sample_id"
each value is a list of metadata ordered as
["strain", "sample_id", "collect_date", "country", "division", "location"]
'''
metadata_file = samples_dir + "samples.tsv"
sm_mapping = {}
with open(metadata_file) as tsv:
for row in csv.DictReader(tsv, delimiter="\t"):
sample = row["sample_id"]
metadata = [row["strain"], row["sample_id"], row["collection_date"],
row["country"], row["division"], row["location"]]
sm_mapping[sample] = metadata
return sm_mapping
def get_gtfs_infos(gtfs):
gtfs_infos = {}
gtfs_infos["stop_points_count"] = 0
gtfs_infos["stop_areas_count"] = 0
gtfs_infos["routes_count"] = 0
with zipfile.ZipFile(gtfs) as zf:
reader = csv.DictReader(zf.open("stops.txt"))
for r in reader:
if r["location_type"] == "1":
gtfs_infos["stop_areas_count"] += 1
else:
gtfs_infos["stop_points_count"] += 1
reader = csv.DictReader(zf.open("routes.txt"))
for r in reader:
gtfs_infos["routes_count"] += 1
return gtfs_infos
def get_filters(filepath):
"""Extract the filters from the file with description of filters in ENA as
a dictionary with the key being the filter id and the value a dictionary
with related results, type of filter, filter description
filepath: path with csv with filter description
"""
filters = {}
with open(filepath, "r") as f:
reader = csv.DictReader(f, delimiter=';')
for row in reader:
filter_id = row["Filter Column"]
filters.setdefault(filter_id, {})
filters[filter_id]["results"] = row["Result"].split(", ")
filters[filter_id]["type"] = row["Type"]
filters[filter_id]["description"] = ''.join(row["Description"])
return filters
def summary_table_to_bed_long(sample_summary_table, output_file, filename_suffix = 'long', min_frequency = 1):
'''
Write out the low frequency variants
NOTE: See 'check_for_IGV_long_regions_snapshot' function in run_parser.py
UPDATE: Naima wants long snapshots for ALL variants from now on.
'''
import csv
print('Find low frequency variants...')
print('input file: {0}'.format(sample_summary_table))
print('output file: {0}'.format(output_file))
with open(sample_summary_table, 'r') as tsvin, open(output_file, 'w') as bedout:
reader = csv.DictReader(tsvin, delimiter='\t')
writer = csv.writer(bedout, delimiter='\t')
for row in reader:
if float(row['Frequency']) < min_frequency:
print(row['Frequency'])
filename = make_snapshot_filename(summary_dict = row, filename_suffix = filename_suffix)
entry = [row['Chrom'], row['Position'], row['Position'], filename]
print(entry)
writer.writerow(entry)
def summary_table_to_bed_long(sample_summary_table, output_file, filename_suffix = 'long', min_frequency = 1):
'''
Write out the low frequency variants
NOTE: See 'check_for_IGV_long_regions_snapshot' function in run_parser.py
UPDATE: Naima wants long snapshots for ALL variants from now on.
'''
import csv
print('Find low frequency variants...')
print('input file: {0}'.format(sample_summary_table))
print('output file: {0}'.format(output_file))
with open(sample_summary_table, 'r') as tsvin, open(output_file, 'w') as bedout:
reader = csv.DictReader(tsvin, delimiter='\t')
writer = csv.writer(bedout, delimiter='\t')
for row in reader:
if float(row['Frequency']) < min_frequency:
print(row['Frequency'])
filename = make_snapshot_filename(summary_dict = row, filename_suffix = filename_suffix)
entry = [row['Chrom'], row['Position'], row['Position'], filename]
print(entry)
writer.writerow(entry)
def process_file(infile, outfile, precision=1, format='csv', pages=None):
reader = csv.DictReader(infile)
char_height_dict = get_chars_hashed_by_yoffset(reader, precision, pages=pages)
# page numbers come back as strings
#pages_to_read = ['1']
words_by_array = coalesce_into_words(char_height_dict)
word_list = merge_word_arrays(words_by_array)
if format=='csv':
to_csv(word_list, outfile)
elif format=='json':
to_json(word_list, outfile)
return 1
def read(self, tsv_file):
"""
Reads the rows from the designated file using the configured fields.
Arguments:
tsv_file: a file-like object to read the data from
Returns:
records(list):
a list of the records cat to read_as_cls
"""
file_reader = csv.DictReader(
tsv_file,
**PEARSON_DIALECT_OPTIONS
)
valid_rows, invalid_rows = [], []
for row in file_reader:
try:
valid_rows.append(self.map_row(row))
except InvalidTsvRowException:
invalid_rows.append(row)
return (valid_rows, invalid_rows)
def parse_exam_grade_adjustments(self, csv_reader):
"""
Parses all rows of grade adjustment info from a CSV and yields each ProctoredExamGrade object
with its associated grade adjustment row from the CSV
Args:
csv_reader (csv.DictReader): A DictReader instance
Returns:
tuple(ProctoredExamGrade, RowProps):
A tuple containing a ProctoredExamGrade and its associated parsed CSV row
"""
parsed_row_dict = {}
for row in csv_reader:
parsed_row = self.parse_and_validate_row(row)
parsed_row_dict[parsed_row.exam_grade_id] = parsed_row
exam_grade_query = ProctoredExamGrade.objects.filter(id__in=parsed_row_dict.keys())
if exam_grade_query.count() < len(parsed_row_dict):
bad_exam_grade_ids = set(parsed_row_dict.keys()) - set(exam_grade_query.values_list('id', flat=True))
raise ParsingError(
'Some exam grade IDs do not match any ProctoredExamGrade records: {}'.format(bad_exam_grade_ids)
)
for exam_grade in exam_grade_query.all():
yield exam_grade, parsed_row_dict[exam_grade.id]
def load_dataset(filename):
data = None
try:
with open(filename, encoding=ENCODING) as fh:
reader = csv.DictReader(fh)
if reader.fieldnames != FIELDNAMES:
print(reader.fieldnames)
print(FIELDNAMES)
error = 'ERROR: Incorrect headers in: {}'.format(filename)
raise FNCException(error)
else:
data = list(reader)
if data is None:
error = 'ERROR: No data found in: {}'.format(filename)
raise FNCException(error)
except FileNotFoundError:
error = "ERROR: Could not find file: {}".format(filename)
raise FNCException(error)
return data
def get_arp_table():
"""
Get ARP table from /proc/net/arp
"""
with open('/proc/net/arp') as arpt:
names = [
'IP address', 'HW type', 'Flags', 'HW address',
'Mask', 'Device'
] # arp 1.88, net-tools 1.60
reader = csv.DictReader(
arpt, fieldnames=names,
skipinitialspace=True,
delimiter=' ')
next(reader) # Skip header.
return [block for block in reader]
def _get_records(self):
with tf.gfile.Open(self._labels_filename) as label_file:
csv_reader = csv.DictReader(label_file, fieldnames=self._columns)
images_gt_boxes = {}
first = True
for csv_line in csv_reader:
if first and self._with_header:
first = False
continue
csv_line = dict(csv_line)
label_dict = self._normalize_csv_line(csv_line)
image_id = label_dict.pop('image_id')
images_gt_boxes.setdefault(image_id, []).append(label_dict)
return images_gt_boxes
def populate():
with open('sample-user-patches.csv') as csvfile:
rows = csv.DictReader(csvfile)
for row in rows:
person = Person.objects.get(id=row['person_id'])
patch = IssueResolverPatch.objects.create(
content_object=person,
jurisdiction_id=row['jurisdiction_id'],
status=row['status'],
old_value=row['old_value'],
new_value=row['new_value'],
category=row['category'],
alert=row['alert'],
note=row['note'],
source=row['source'],
reporter_name=row['reporter_name'],
reporter_email=row['reporter_email'],
applied_by=row['applied_by']
)
patch.save()
def save_companies(self):
"""
Receives path to the dataset file and create a Company object for
each row of each file. It creates the related activity when needed.
"""
skip = ('main_activity', 'secondary_activity')
keys = tuple(f.name for f in Company._meta.fields if f not in skip)
with lzma.open(self.path, mode='rt', encoding='utf-8') as file_handler:
for row in csv.DictReader(file_handler):
main, secondary = self.save_activities(row)
filtered = {k: v for k, v in row.items() if k in keys}
obj = Company.objects.create(**self.serialize(filtered))
for activity in main:
obj.main_activity.add(activity)
for activity in secondary:
obj.secondary_activity.add(activity)
obj.save()
self.count += 1
self.print_count(Company, count=self.count)
def weeks_of_tickets(datetime, tzutc, AnalyzedAgileTicket):
"""A bunch of tickets."""
from dateutil.parser import parse
parsed = []
default = datetime(1979, 8, 15, 0, 0, 0, tzinfo=tzutc)
current_path = path.dirname(path.abspath(__file__))
csv_file = path.join(current_path, 'data', 'weeks_of_tickets.csv')
count = 1
for row in csv.DictReader(open(csv_file, 'r')):
t = AnalyzedAgileTicket(
key="FOO-{}".format(count),
committed=dict(state="committed", entered_at=parse(row['committed'], default=default)),
started=dict(state="started", entered_at=parse(row['started'], default=default)),
ended=dict(state="ended", entered_at=parse(row['ended'], default=default))
)
parsed.append(t)
count += 1
return parsed
wrangle_nypd_stop_and_frisk_data.py 文件源码
项目:datasets
作者: helloworlddata
项目源码
文件源码
阅读 31
收藏 0
点赞 0
评论 0
def read_and_wrangle(src, dest):
wf = dest.open('w')
wcsv = csv.DictWriter(wf, fieldnames=FINAL_HEADERS)
wcsv.writeheader()
# only 2011.csv has windows-1252 instead of ascii encoding,
# but we open all files as windows-1252 just to be safe
with src.open("r", encoding='windows-1252') as rf:
records = csv.DictReader(rf)
for i, row in enumerate(records):
row = strip_record(row)
newrow = wrangle_record(row)
wcsv.writerow(newrow)
# a little status checker
if i % 10000 == 1:
print("...wrote row #", i)
# done writing file
print("Wrangled", i, "rows and saved to", dest)
wf.close()