python类DictReader()的实例源码

test_functional.py 文件源码 项目:doorman 作者: mwielgoszewski 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_node_csv_download(self, node, testapp):
        import unicodecsv as csv

        node.enrolled_on = dt.datetime.utcnow()
        node.last_checkin = dt.datetime.utcnow()
        node.last_ip = '1.1.1.1'
        node.node_info = {'hardware_vendor': "Honest Achmed's Computer Supply"}
        node.save()

        resp = testapp.get(url_for('manage.nodes_csv'))

        assert resp.headers['Content-Type'] == 'text/csv; charset=utf-8'
        assert resp.headers['Content-Disposition'] == 'attachment; filename=nodes.csv'

        reader = csv.DictReader(io.BytesIO(resp.body))
        row = next(reader)

        assert row['Display Name'] == node.display_name
        assert row['Host Identifier'] == node.host_identifier
        assert row['Enrolled On'] == str(node.enrolled_on)
        assert row['Last Check-In'] == str(node.last_checkin)
        assert row['Last Ip Address'] == node.last_ip
        assert row['Is Active'] == 'True'
        assert row['Make'] == node.node_info['hardware_vendor']
admin.py 文件源码 项目:confrontiv 作者: correctiv 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def upload_recipients(self, request):
        if not request.method == 'POST':
            raise PermissionDenied
        if not self.has_change_permission(request):
            raise PermissionDenied

        reader = unicodecsv.DictReader(request.FILES['file'])
        for lineno, line in enumerate(reader, 1):
            group = line.pop('group', None)
            if 'slug' not in line:
                line['slug'] = slugify(line['name'])
            recipient = Recipient.objects.create(
                **line
            )
            if group is not None:
                rg = RecipientGroup.objects.get(slug=group)
                recipient.groups.add(rg)

        return redirect('admin:confrontiv_recipient_changelist')
print_batch.py 文件源码 项目:adversarial-squad 作者: robinjia 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def dump_grammar():
  with open(OPTS.filename) as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
      if OPTS.worker and row['WorkerId'] != OPTS.worker: continue
      if row['AssignmentStatus'] == 'Rejected': continue
      print 'HIT %s' % row['HITId']
      print 'WorkerId: %s' % row['WorkerId']
      print 'Time: %s s' % row['WorkTimeInSeconds']
      input_qids = row['Input.qids'].split('\t')
      input_sents = row['Input.sents'].split('\t')
      ans_is_good = row['Answer.is-good'].split('\t')
      ans_responses = row['Answer.responses'].split('\t')
      for qid, s, is_good, response in zip(input_qids, input_sents, 
                                           ans_is_good, ans_responses):
        print ('  Example %s' % qid)
        print ('    Sentence: %s' % s).encode('utf-8')
        print ('    Is good?  %s' % is_good)
        print ('    Response: %s' % colored(response, 'cyan')).encode('utf-8')
print_batch.py 文件源码 项目:adversarial-squad 作者: robinjia 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def dump_verify():
  with open(OPTS.filename) as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
      if OPTS.worker and row['WorkerId'] != OPTS.worker: continue
      if row['AssignmentStatus'] == 'Rejected': continue
      print 'HIT %s' % row['HITId']
      print 'WorkerId: %s' % row['WorkerId']
      print 'Time: %s s' % row['WorkTimeInSeconds']
      qids = row['Input.qids'].split('\t')
      questions = row['Input.questions'].split('\t')
      sents = row['Answer.sents'].split('\t')
      responses = row['Answer.responses'].split('\t')
      for qid, q, s_str, response_str in zip(
          qids, questions, sents, responses):
        print ('  Example %s' % qid)
        print ('  Question %s' % q)
        s_list = s_str.split('|')
        a_list = response_str.split('|')
        for s, a in zip(s_list, a_list):
          print ('    Sentence: %s' % sent_format(s)).encode('utf-8')
          print ('    Is good?  %s' % colored(a, 'cyan'))
print_batch.py 文件源码 项目:adversarial-squad 作者: robinjia 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def pred_human_eval():
  all_preds = collections.defaultdict(list)
  with open(OPTS.filename) as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
      all_preds[row['Input.qid']].append(row['Answer.response'])
  preds = {}
  for qid in all_preds:
    if OPTS.ensemble:
      for a in all_preds[qid]:
        count = sum(1 for pred in all_preds[qid] if a == pred)
        if count > 1:
          preds[qid] = a
          break
      else:
        preds[qid] = random.sample(all_preds[qid], 1)[0]
    else:
      preds[qid] = random.sample(all_preds[qid], 1)[0]
  print json.dumps(preds)
fetch.py 文件源码 项目:fingerprints 作者: alephdata 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def fetch():
    out_path = os.path.dirname(__file__)
    out_path = os.path.join(out_path, 'fingerprints', 'data', 'types.yml')
    fh = urllib.urlopen(CSV_URL)
    types = {}
    for row in unicodecsv.DictReader(fh):
        name = stringify(row.get('Name'))
        abbr = stringify(row.get('Abbreviation'))
        if name is None or abbr is None:
            continue
        if name in types and types[name] != abbr:
            print name, types[name], abbr
        types[name] = abbr
        # print abbr, name

    with open(out_path, 'w') as fh:
        yaml.safe_dump({'types': types}, fh,
                       indent=2,
                       allow_unicode=True,
                       canonical=False,
                       default_flow_style=False)
utils.py 文件源码 项目:edx-enterprise 作者: edx 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def parse_csv(file_stream, expected_columns=None):
    """
    Parse csv file and return a stream of dictionaries representing each row.

    First line of CSV file must contain column headers.

    Arguments:
         file_stream: input file
         expected_columns (set[unicode]): columns that are expected to be present

    Yields:
        dict: CSV line parsed into a dictionary.
    """
    reader = unicodecsv.DictReader(file_stream, encoding="utf-8")

    if expected_columns and set(expected_columns) - set(reader.fieldnames):
        raise ValidationError(ValidationMessages.MISSING_EXPECTED_COLUMNS.format(
            expected_columns=", ".join(expected_columns), actual_columns=", ".join(reader.fieldnames)
        ))

    # "yield from reader" would be nicer, but we're on python2.7 yet.
    for row in reader:
        yield row
mock_input_source.py 文件源码 项目:hgvs 作者: biocommons 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _read_input(self, in_file):
        """Dummy file of inputs

        :param in_file: path to input file of 2 cols (tab-delim); accession_number, sequence
        :type string
        :return dictionary of accession_number to sequence tags
        """
        result = {}
        with open(in_file, 'r') as f:
            reader = csv.DictReader(f, delimiter=str('\t'))
            for row in reader:
                result[row['accession']] = {
                    'transcript_sequence': row['transcript_sequence'],
                    'cds_start_i': int(row['cds_start_i']),
                    'cds_end_i': int(row['cds_end_i'])
                }

        return result
models.py 文件源码 项目:stormtrooper 作者: CompileInc 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def process(self, activate=False):
        if not self.is_questions_created:
            data = list(csv.DictReader(self.csv.file))
            for d in data:
                Question.objects.create(task=self,
                                        question=d)
            self.is_questions_created = True

            if activate:
                self.is_active = True

            self.save()
dsb_stage2_submission.py 文件源码 项目:data-science-bowl-2017 作者: tondonia 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def read_test_labels(filename):
    labels = []
    csvFile = open(filename)
    reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
    for j in reader:
        labels.append(j['id'])
    return labels
dsb_stage2_submission.py 文件源码 项目:data-science-bowl-2017 作者: tondonia 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def read_depths(filename):
    depths = {}
    csvFile = open(filename)
    reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
    for j in reader:
        depths[j['key']] = int(j['depth'])
    return depths
dsb_voxel_predict.py 文件源码 项目:data-science-bowl-2017 作者: tondonia 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def read_labels(filename):
    labels = {}
    csvFile = open(filename)
    reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
    for j in reader:
        #if not j['id'] == 'b8bb02d229361a623a4dc57aa0e5c485':
        labels[j['id']] = int(j['cancer'])
    return labels
dsb_voxel_predict.py 文件源码 项目:data-science-bowl-2017 作者: tondonia 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def read_test_labels(filename):
    labels = []
    csvFile = open(filename)
    reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
    for j in reader:
        #if not j['id'] == 'b8bb02d229361a623a4dc57aa0e5c485':
        labels.append(j['id'])
    return labels
dsb_voxel_predict.py 文件源码 项目:data-science-bowl-2017 作者: tondonia 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def read_depths(filename):
    depths = {}
    csvFile = open(filename)
    reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
    for j in reader:
        depths[j['key']] = int(j['depth'])
    return depths
dsb_create_voxel_model_predictions.py 文件源码 项目:data-science-bowl-2017 作者: tondonia 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def read_test_labels(filename):
    labels = []
    csvFile = open(filename)
    reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
    for j in reader:
        #if not j['id'] == 'b8bb02d229361a623a4dc57aa0e5c485':
        labels.append(j['id'])
    return labels
dsb_create_voxel_model_predictions.py 文件源码 项目:data-science-bowl-2017 作者: tondonia 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def read_depths(filename):
    depths = {}
    csvFile = open(filename)
    reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
    for j in reader:
        depths[j['key']] = int(j['depth'])
    return depths
dsb_voxel_predict_stage2.py 文件源码 项目:data-science-bowl-2017 作者: tondonia 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def read_labels(filename):
    labels = {}
    csvFile = open(filename)
    reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
    for j in reader:
        #if not j['id'] == 'b8bb02d229361a623a4dc57aa0e5c485':
        labels[j['id']] = int(j['cancer'])
    return labels
dsb_voxel_predict_stage2.py 文件源码 项目:data-science-bowl-2017 作者: tondonia 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def read_test_labels(filename):
    labels = []
    csvFile = open(filename)
    reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
    for j in reader:
        #if not j['id'] == 'b8bb02d229361a623a4dc57aa0e5c485':
        labels.append(j['id'])
    return labels
dsb_voxel_predict_stage2.py 文件源码 项目:data-science-bowl-2017 作者: tondonia 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def read_depths(filename):
    depths = {}
    csvFile = open(filename)
    reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
    for j in reader:
        depths[j['key']] = int(j['depth'])
    return depths
core.py 文件源码 项目:confrontiv 作者: correctiv 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def make_inquiry_requests_from_file(inquiry, file):
    reader = unicodecsv.DictReader(file)
    for lineno, line in enumerate(reader, 1):
        try:
            recipient = Recipient.objects.get(slug=line['recipient'])
        except Recipient.DoesNotExist:
            raise ValueError('Recipient on line %s not found' % lineno)
        if not recipient.groups.filter(id=inquiry.group_id).exists():
            raise ValueError('Recipient %s not in inquiry group' % recipient)
        data = json.loads(line['data'])
        InquiryRequest.objects.create_from_inquiry(inquiry, recipient, data)
gen_csv_verify.py 文件源码 项目:adversarial-squad 作者: robinjia 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def read_sentences():
  id_to_sents = collections.defaultdict(list)
  with open(OPTS.batch_file) as f:
    reader = csv.DictReader(f)
    for row in reader:
      input_qids = row['Input.qids'].split('\t')
      input_sents = row['Input.sents'].split('\t')
      ans_is_good = row['Answer.is-good'].split('\t')
      ans_responses = row['Answer.responses'].split('\t')
      for qid, s, is_good, response in zip(input_qids, input_sents, ans_is_good, ans_responses):
        if is_good == 'yes':
          response = s
        if response not in id_to_sents[qid]:
          id_to_sents[qid].append(response)
  return id_to_sents
print_batch.py 文件源码 项目:adversarial-squad 作者: robinjia 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def stats_grammar():
  # Read data
  worker_to_is_good = collections.defaultdict(list)
  worker_to_times = collections.defaultdict(list)
  with open(OPTS.filename) as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
      if row['AssignmentStatus'] == 'Rejected': continue
      worker_id = row['WorkerId']
      ans_is_good = row['Answer.is-good'].split('\t')
      time = float(row['WorkTimeInSeconds'])
      worker_to_is_good[worker_id].extend(ans_is_good)
      worker_to_times[worker_id].append(time)

  # Aggregate by worker
  print '%d total workers' % len(worker_to_times)
  worker_stats = {}
  for worker_id in worker_to_times:
    times = sorted(worker_to_times[worker_id])
    t_median = times[len(times)/2]
    t_mean = sum(times) / float(len(times))
    is_good_list = worker_to_is_good[worker_id]
    num_qs = len(is_good_list)
    frac_good = sum(1.0 for x in is_good_list if x == 'yes') / num_qs
    worker_stats[worker_id] = (t_median, t_mean, num_qs, frac_good)

  # Print
  sorted_ids = sorted(list(worker_stats), key=lambda x: worker_stats[x][3])
  for worker_id in sorted_ids:
    t_median, t_mean, num_qs, frac_good = worker_stats[worker_id]
    print 'Worker %s: t_median %.1f, t_mean %.1f, %d questions, %.1f%% good' % (
        worker_id, t_median, t_mean, num_qs, 100.0 * frac_good)
print_batch.py 文件源码 项目:adversarial-squad 作者: robinjia 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def stats_verify():
  # Read data
  worker_to_is_good = collections.defaultdict(list)
  worker_to_times = collections.defaultdict(list)
  with open(OPTS.filename) as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
      if row['AssignmentStatus'] == 'Rejected': continue
      worker_id = row['WorkerId']
      ans_is_good = [x for s in row['Answer.responses'].split('\t')
                     for x in s.split('|')]
      time = float(row['WorkTimeInSeconds'])
      worker_to_is_good[worker_id].extend(ans_is_good)
      worker_to_times[worker_id].append(time)

  # Aggregate by worker
  print '%d total workers' % len(worker_to_times)
  worker_stats = {}
  for worker_id in worker_to_times:
    times = sorted(worker_to_times[worker_id])
    t_median = times[len(times)/2]
    t_mean = sum(times) / float(len(times))
    is_good_list = worker_to_is_good[worker_id]
    num_qs = len(is_good_list)
    frac_good = sum(1.0 for x in is_good_list if x == 'yes') / num_qs
    worker_stats[worker_id] = (t_median, t_mean, num_qs, frac_good)

  # Print
  sorted_ids = sorted(list(worker_stats), key=lambda x: worker_stats[x][3])
  for worker_id in sorted_ids:
    t_median, t_mean, num_qs, frac_good = worker_stats[worker_id]
    print 'Worker %s: t_median %.1f, t_mean %.1f, %d questions, %.1f%% good' % (
        worker_id, t_median, t_mean, num_qs, 100.0 * frac_good)
core.py 文件源码 项目:handcart 作者: hatnote 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_csv(csv_file):
    reader = unicodecsv.DictReader(open(example_file, 'rb'))
    return list(reader)
load_campaign.py 文件源码 项目:froide-campaign 作者: okfde 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        translation.activate(settings.LANGUAGE_CODE)

        filename = options['filename']

        reader = unicodecsv.DictReader(open(filename))
        importer = CSVImporter()
        importer.run(reader)
admin.py 文件源码 项目:froide-campaign 作者: okfde 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def upload_information_objects(self, request):
        if not request.method == 'POST':
            raise PermissionDenied
        if not self.has_change_permission(request):
            raise PermissionDenied
        reader = unicodecsv.DictReader(request.FILES['file'])
        importer = CSVImporter()
        importer.run(reader)
        return redirect('admin:froide_campaign_informationobject_changelist')
generate_ipa_all.py 文件源码 项目:panphon 作者: dmort27 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def read_ipa_bases(ipa_bases):
    segments = []
    with open(ipa_bases, 'rb') as f:
        dictreader = csv.DictReader(f, encoding='utf=8')
        for record in dictreader:
            form = record['ipa']
            features = {k: v for k, v in record.items() if k != 'ipa'}
            segments.append(Segment(form, features))
    return segments
test_hgvs_variantmapper_cp_real.py 文件源码 项目:hgvs 作者: biocommons 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def gcp_file_reader(fn):
    rdr = csv.DictReader(open(fn, "r"), delimiter=str("\t"))
    for rec in rdr:
        if rec["id"].startswith("#"):
            continue
        yield rec
test_hgvs_grammar_full.py 文件源码 项目:hgvs 作者: biocommons 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test_parser_test_completeness(self):
        """ensure that all rules in grammar have tests"""

        grammar_rule_re = re.compile("^(\w+)")
        grammar_fn = pkg_resources.resource_filename(__name__, "../hgvs/_data/hgvs.pymeta")
        with open(grammar_fn, "r") as f:
            grammar_rules = set(r.group(1) for r in filter(None, map(grammar_rule_re.match, f)))

        with open(self._test_fn, "r") as f:
            reader = csv.DictReader(f, delimiter=str("\t"))
            test_rules = set(row["Func"] for row in reader)

        untested_rules = grammar_rules - test_rules

        self.assertTrue(len(untested_rules) == 0, "untested rules: {}".format(untested_rules))
test_hgvs_grammar_full.py 文件源码 项目:hgvs 作者: biocommons 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_parser_grammar(self):
        with open(self._test_fn, "r") as f:
            reader = csv.DictReader(f, delimiter=str("\t"))

            fail_cases = []

            for row in reader:
                if row["Func"].startswith("#"):
                    continue

                # setup input
                inputs = self._split_inputs(row["Test"], row["InType"])
                expected_results = self._split_inputs(row["Expected"], row["InType"]) if row["Expected"] else inputs
                expected_map = dict(zip(inputs, expected_results))
                # step through each item and check
                is_valid = True if row["Valid"].lower() == "true" else False

                for key in expected_map:
                    expected_result = six.text_type(expected_map[key]).replace("u'", "'")
                    function_to_test = getattr(self.p._grammar(key), row["Func"])
                    row_str = u"{}\t{}\t{}\t{}\t{}".format(row["Func"], key, row["Valid"], "one", expected_result)
                    try:
                        actual_result = six.text_type(function_to_test()).replace("u'", "'")
                        if not is_valid or (expected_result != actual_result):
                            print("expected: {} actual:{}".format(expected_result, actual_result))
                            fail_cases.append(row_str)
                    except Exception as e:
                        if is_valid:
                            print("expected: {} Exception: {}".format(expected_result, e))
                            fail_cases.append(row_str)

        # everything should have passed - report whatever failed
        self.assertTrue(len(fail_cases) == 0, pprint.pprint(fail_cases))


问题


面经


文章

微信
公众号

扫码关注公众号