python类DictReader()的实例源码

data_manager.py 文件源码 项目:30-Days-of-Python 作者: codingforentrepreneurs 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def read_data(user_id=None, email=None):
    filename = file_item_path
    with open(filename, "r") as csvfile:
        reader = csv.DictReader(csvfile)
        items = []
        unknown_user_id = None
        unknown_email = None
        for row in reader:
            if user_id is not None:
                if int(user_id) == int(row.get("id")):
                    return row
                else:
                    unknown_user_id = user_id
            if email is not None:
                if email == row.get("email"):
                    return row
                else:
                    unknown_email = email
        if unknown_user_id is not None:
            return "User id {user_id} not found".format(user_id=user_id)
        if unknown_email is not None:
            return "Email {email} not found".format(email=email)
    return None
data_class.py 文件源码 项目:30-Days-of-Python 作者: codingforentrepreneurs 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def get_user_data(self, user_id=None, email=None):
        filename = file_item_path
        with open(filename, "r") as csvfile:
            reader = csv.DictReader(csvfile)
            items = []
            unknown_user_id = None
            unknown_email = None
            for row in reader:
                if user_id is not None:
                    if int(user_id) == int(row.get("id")):
                        return row
                    else:
                        unknown_user_id = user_id
                if email is not None:
                    if email == row.get("email"):
                        return row
                    else:
                        unknown_email = email
            if unknown_user_id is not None:
                print("User id {user_id} not found".format(user_id=user_id))
            if unknown_email is not None:
                print("Email {email} not found".format(email=email))
        return None
hungry_data.py 文件源码 项目:30-Days-of-Python 作者: codingforentrepreneurs 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def read_data(user_id=None, email=None):
    filename = "data.csv"
    with open(filename, "r") as csvfile:
        reader = csv.DictReader(csvfile)
        items = []
        unknown_user_id = None
        unknown_email = None
        for row in reader:
            if user_id is not None:
                if int(user_id) == int(row.get("id")):
                    return row
                else:
                    unknown_user_id = user_id
            if email is not None:
                if email == row.get("email"):
                    return row
                else:
                    unknown_email = email
        if unknown_user_id is not None:
            return "User id {user_id} not found".format(user_id=user_id)
        if unknown_email is not None:
            return "Email {email} not found".format(email=email)
    return None
pipeline.py 文件源码 项目:zika-pipeline 作者: zibraproject 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def sample_to_run_data_mapping(samples_dir):
    '''
    return dict
    each key is string "sample_id"
    each value is a list of tuples ("library", "barcode")
    '''
    runs_file = samples_dir + "runs.tsv"
    sr_mapping = {}
    with open(runs_file) as tsv:
        for row in csv.DictReader(tsv, delimiter="\t"):
            sample = row["sample_id"]
            rb_pair = (row["run_name"], row["barcode_id"])
            if sample not in sr_mapping:
                sr_mapping[sample] = []
            sr_mapping[sample].append(rb_pair)
    return sr_mapping
pipeline.py 文件源码 项目:zika-pipeline 作者: zibraproject 项目源码 文件源码 阅读 51 收藏 0 点赞 0 评论 0
def sample_to_metadata_mapping(samples_dir):
    '''
    return dict
    each key is string "sample_id"
    each value is a list of metadata ordered as
    ["strain", "sample_id", "collect_date", "country", "division", "location"]
    '''
    metadata_file = samples_dir + "samples.tsv"
    sm_mapping = {}
    with open(metadata_file) as tsv:
        for row in csv.DictReader(tsv, delimiter="\t"):
            sample = row["sample_id"]
            metadata = [row["strain"], row["sample_id"], row["collection_date"],
                row["country"], row["division"], row["location"]]
            sm_mapping[sample] = metadata
    return sm_mapping
tests_accra.py 文件源码 项目:osm2gtfs 作者: grote 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_gtfs_infos(gtfs):
    gtfs_infos = {}
    gtfs_infos["stop_points_count"] = 0
    gtfs_infos["stop_areas_count"] = 0
    gtfs_infos["routes_count"] = 0
    with zipfile.ZipFile(gtfs) as zf:
        reader = csv.DictReader(zf.open("stops.txt"))
        for r in reader:
            if r["location_type"] == "1":
                gtfs_infos["stop_areas_count"] += 1
            else:
                gtfs_infos["stop_points_count"] += 1
        reader = csv.DictReader(zf.open("routes.txt"))
        for r in reader:
            gtfs_infos["routes_count"] += 1
    return gtfs_infos
serialize_ena_data_descriptors.py 文件源码 项目:enasearch 作者: bebatut 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def get_filters(filepath):
    """Extract the filters from the file with description of filters in ENA as
    a dictionary with the key being the filter id and the value a dictionary
    with related results, type of filter, filter description

    filepath: path with csv with filter description
    """
    filters = {}
    with open(filepath, "r") as f:
        reader = csv.DictReader(f, delimiter=';')
        for row in reader:
            filter_id = row["Filter Column"]
            filters.setdefault(filter_id, {})
            filters[filter_id]["results"] = row["Result"].split(", ")
            filters[filter_id]["type"] = row["Type"]
            filters[filter_id]["description"] = ''.join(row["Description"])
    return filters
run_IGV_snapshot_automator.py 文件源码 项目:reportIT 作者: stevekm 项目源码 文件源码 阅读 46 收藏 0 点赞 0 评论 0
def summary_table_to_bed_long(sample_summary_table, output_file, filename_suffix = 'long', min_frequency = 1):
    '''
    Write out the low frequency variants

    NOTE: See 'check_for_IGV_long_regions_snapshot' function in run_parser.py
    UPDATE: Naima wants long snapshots for ALL variants from now on.
    '''
    import csv
    print('Find low frequency variants...')
    print('input file: {0}'.format(sample_summary_table))
    print('output file: {0}'.format(output_file))
    with open(sample_summary_table, 'r') as tsvin, open(output_file, 'w') as bedout:
        reader = csv.DictReader(tsvin, delimiter='\t')
        writer = csv.writer(bedout, delimiter='\t')
        for row in reader:
            if float(row['Frequency']) < min_frequency:
                print(row['Frequency'])
                filename = make_snapshot_filename(summary_dict = row, filename_suffix = filename_suffix)
                entry = [row['Chrom'], row['Position'], row['Position'], filename]
                print(entry)
                writer.writerow(entry)
run_IGV_snapshot_automator.py 文件源码 项目:reportIT 作者: stevekm 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def summary_table_to_bed_long(sample_summary_table, output_file, filename_suffix = 'long', min_frequency = 1):
    '''
    Write out the low frequency variants

    NOTE: See 'check_for_IGV_long_regions_snapshot' function in run_parser.py
    UPDATE: Naima wants long snapshots for ALL variants from now on.
    '''
    import csv
    print('Find low frequency variants...')
    print('input file: {0}'.format(sample_summary_table))
    print('output file: {0}'.format(output_file))
    with open(sample_summary_table, 'r') as tsvin, open(output_file, 'w') as bedout:
        reader = csv.DictReader(tsvin, delimiter='\t')
        writer = csv.writer(bedout, delimiter='\t')
        for row in reader:
            if float(row['Frequency']) < min_frequency:
                print(row['Frequency'])
                filename = make_snapshot_filename(summary_dict = row, filename_suffix = filename_suffix)
                entry = [row['Chrom'], row['Position'], row['Position'], filename]
                print(entry)
                writer.writerow(entry)
coalesce_words.py 文件源码 项目:pdf_bbox_utils 作者: jsfenfen 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def process_file(infile, outfile, precision=1, format='csv', pages=None):

    reader = csv.DictReader(infile)
    char_height_dict = get_chars_hashed_by_yoffset(reader, precision, pages=pages)

    # page numbers come back as strings
    #pages_to_read = ['1']

    words_by_array = coalesce_into_words(char_height_dict)
    word_list = merge_word_arrays(words_by_array)

    if format=='csv':
        to_csv(word_list, outfile)

    elif format=='json':
        to_json(word_list, outfile)

    return 1
readers.py 文件源码 项目:micromasters 作者: mitodl 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def read(self, tsv_file):
        """
        Reads the rows from the designated file using the configured fields.

        Arguments:
            tsv_file: a file-like object to read the data from

        Returns:
            records(list):
                a list of the records cat to read_as_cls
        """
        file_reader = csv.DictReader(
            tsv_file,
            **PEARSON_DIALECT_OPTIONS
        )
        valid_rows, invalid_rows = [], []

        for row in file_reader:
            try:
                valid_rows.append(self.map_row(row))
            except InvalidTsvRowException:
                invalid_rows.append(row)

        return (valid_rows, invalid_rows)
adjust_exam_grades_from_csv.py 文件源码 项目:micromasters 作者: mitodl 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def parse_exam_grade_adjustments(self, csv_reader):
        """
        Parses all rows of grade adjustment info from a CSV and yields each ProctoredExamGrade object
        with its associated grade adjustment row from the CSV

        Args:
            csv_reader (csv.DictReader): A DictReader instance

        Returns:
            tuple(ProctoredExamGrade, RowProps):
                A tuple containing a ProctoredExamGrade and its associated parsed CSV row
        """
        parsed_row_dict = {}
        for row in csv_reader:
            parsed_row = self.parse_and_validate_row(row)
            parsed_row_dict[parsed_row.exam_grade_id] = parsed_row
        exam_grade_query = ProctoredExamGrade.objects.filter(id__in=parsed_row_dict.keys())
        if exam_grade_query.count() < len(parsed_row_dict):
            bad_exam_grade_ids = set(parsed_row_dict.keys()) - set(exam_grade_query.values_list('id', flat=True))
            raise ParsingError(
                'Some exam grade IDs do not match any ProctoredExamGrade records: {}'.format(bad_exam_grade_ids)
            )
        for exam_grade in exam_grade_query.all():
            yield exam_grade, parsed_row_dict[exam_grade.id]
scorer.py 文件源码 项目:NLP 作者: Deamon5550 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def load_dataset(filename):
    data = None
    try:
        with open(filename, encoding=ENCODING) as fh:
            reader = csv.DictReader(fh)
            if reader.fieldnames != FIELDNAMES:
                print(reader.fieldnames)
                print(FIELDNAMES)
                error = 'ERROR: Incorrect headers in: {}'.format(filename)
                raise FNCException(error)
            else:
                data = list(reader)

            if data is None:
                error = 'ERROR: No data found in: {}'.format(filename)
                raise FNCException(error)
    except FileNotFoundError:
        error = "ERROR: Could not find file: {}".format(filename)
        raise FNCException(error)

    return data
call_arp.py 文件源码 项目:infraview 作者: a-dekker 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def get_arp_table():
    """
        Get ARP table from /proc/net/arp
    """
    with open('/proc/net/arp') as arpt:
        names = [
            'IP address', 'HW type', 'Flags', 'HW address',
            'Mask', 'Device'
        ]  # arp 1.88, net-tools 1.60

        reader = csv.DictReader(
            arpt, fieldnames=names,
            skipinitialspace=True,
            delimiter=' ')

        next(reader)  # Skip header.

        return [block for block in reader]
csv_reader.py 文件源码 项目:luminoth 作者: tryolabs 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def _get_records(self):
        with tf.gfile.Open(self._labels_filename) as label_file:
            csv_reader = csv.DictReader(label_file, fieldnames=self._columns)

        images_gt_boxes = {}

        first = True
        for csv_line in csv_reader:
            if first and self._with_header:
                first = False
                continue

            csv_line = dict(csv_line)
            label_dict = self._normalize_csv_line(csv_line)

            image_id = label_dict.pop('image_id')
            images_gt_boxes.setdefault(image_id, []).append(label_dict)

        return images_gt_boxes
populate_issue_resolver_patches.py 文件源码 项目:admintools 作者: openstates 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def populate():
    with open('sample-user-patches.csv') as csvfile:
        rows = csv.DictReader(csvfile)
        for row in rows:
            person = Person.objects.get(id=row['person_id'])
            patch = IssueResolverPatch.objects.create(
                content_object=person,
                jurisdiction_id=row['jurisdiction_id'],
                status=row['status'],
                old_value=row['old_value'],
                new_value=row['new_value'],
                category=row['category'],
                alert=row['alert'],
                note=row['note'],
                source=row['source'],
                reporter_name=row['reporter_name'],
                reporter_email=row['reporter_email'],
                applied_by=row['applied_by']
            )
            patch.save()
companies.py 文件源码 项目:jarbas 作者: datasciencebr 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def save_companies(self):
        """
        Receives path to the dataset file and create a Company object for
        each row of each file. It creates the related activity when needed.
        """
        skip = ('main_activity', 'secondary_activity')
        keys = tuple(f.name for f in Company._meta.fields if f not in skip)
        with lzma.open(self.path, mode='rt', encoding='utf-8') as file_handler:
            for row in csv.DictReader(file_handler):
                main, secondary = self.save_activities(row)

                filtered = {k: v for k, v in row.items() if k in keys}
                obj = Company.objects.create(**self.serialize(filtered))
                for activity in main:
                    obj.main_activity.add(activity)
                for activity in secondary:
                    obj.secondary_activity.add(activity)
                obj.save()

                self.count += 1
                self.print_count(Company, count=self.count)
conftest.py 文件源码 项目:agile-analytics 作者: cmheisel 项目源码 文件源码 阅读 51 收藏 0 点赞 0 评论 0
def weeks_of_tickets(datetime, tzutc, AnalyzedAgileTicket):
    """A bunch of tickets."""
    from dateutil.parser import parse
    parsed = []
    default = datetime(1979, 8, 15, 0, 0, 0, tzinfo=tzutc)

    current_path = path.dirname(path.abspath(__file__))
    csv_file = path.join(current_path, 'data', 'weeks_of_tickets.csv')

    count = 1
    for row in csv.DictReader(open(csv_file, 'r')):
        t = AnalyzedAgileTicket(
            key="FOO-{}".format(count),
            committed=dict(state="committed", entered_at=parse(row['committed'], default=default)),
            started=dict(state="started", entered_at=parse(row['started'], default=default)),
            ended=dict(state="ended", entered_at=parse(row['ended'], default=default))
        )
        parsed.append(t)
        count += 1

    return parsed
wrangle_nypd_stop_and_frisk_data.py 文件源码 项目:datasets 作者: helloworlddata 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def read_and_wrangle(src, dest):
    wf = dest.open('w')
    wcsv = csv.DictWriter(wf, fieldnames=FINAL_HEADERS)
    wcsv.writeheader()
    # only 2011.csv has windows-1252 instead of ascii encoding,
    # but we open all files as windows-1252 just to be safe
    with src.open("r", encoding='windows-1252') as rf:
        records = csv.DictReader(rf)
        for i, row in enumerate(records):
            row = strip_record(row)
            newrow = wrangle_record(row)
            wcsv.writerow(newrow)
            # a little status checker
            if i % 10000 == 1:
                print("...wrote row #", i)

    # done writing file
    print("Wrangled", i, "rows and saved to", dest)
    wf.close()


问题


面经


文章

微信
公众号

扫码关注公众号