def test_node_csv_download(self, node, testapp):
import unicodecsv as csv
node.enrolled_on = dt.datetime.utcnow()
node.last_checkin = dt.datetime.utcnow()
node.last_ip = '1.1.1.1'
node.node_info = {'hardware_vendor': "Honest Achmed's Computer Supply"}
node.save()
resp = testapp.get(url_for('manage.nodes_csv'))
assert resp.headers['Content-Type'] == 'text/csv; charset=utf-8'
assert resp.headers['Content-Disposition'] == 'attachment; filename=nodes.csv'
reader = csv.DictReader(io.BytesIO(resp.body))
row = next(reader)
assert row['Display Name'] == node.display_name
assert row['Host Identifier'] == node.host_identifier
assert row['Enrolled On'] == str(node.enrolled_on)
assert row['Last Check-In'] == str(node.last_checkin)
assert row['Last Ip Address'] == node.last_ip
assert row['Is Active'] == 'True'
assert row['Make'] == node.node_info['hardware_vendor']
python类DictReader()的实例源码
def upload_recipients(self, request):
if not request.method == 'POST':
raise PermissionDenied
if not self.has_change_permission(request):
raise PermissionDenied
reader = unicodecsv.DictReader(request.FILES['file'])
for lineno, line in enumerate(reader, 1):
group = line.pop('group', None)
if 'slug' not in line:
line['slug'] = slugify(line['name'])
recipient = Recipient.objects.create(
**line
)
if group is not None:
rg = RecipientGroup.objects.get(slug=group)
recipient.groups.add(rg)
return redirect('admin:confrontiv_recipient_changelist')
def dump_grammar():
with open(OPTS.filename) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
if OPTS.worker and row['WorkerId'] != OPTS.worker: continue
if row['AssignmentStatus'] == 'Rejected': continue
print 'HIT %s' % row['HITId']
print 'WorkerId: %s' % row['WorkerId']
print 'Time: %s s' % row['WorkTimeInSeconds']
input_qids = row['Input.qids'].split('\t')
input_sents = row['Input.sents'].split('\t')
ans_is_good = row['Answer.is-good'].split('\t')
ans_responses = row['Answer.responses'].split('\t')
for qid, s, is_good, response in zip(input_qids, input_sents,
ans_is_good, ans_responses):
print (' Example %s' % qid)
print (' Sentence: %s' % s).encode('utf-8')
print (' Is good? %s' % is_good)
print (' Response: %s' % colored(response, 'cyan')).encode('utf-8')
def dump_verify():
with open(OPTS.filename) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
if OPTS.worker and row['WorkerId'] != OPTS.worker: continue
if row['AssignmentStatus'] == 'Rejected': continue
print 'HIT %s' % row['HITId']
print 'WorkerId: %s' % row['WorkerId']
print 'Time: %s s' % row['WorkTimeInSeconds']
qids = row['Input.qids'].split('\t')
questions = row['Input.questions'].split('\t')
sents = row['Answer.sents'].split('\t')
responses = row['Answer.responses'].split('\t')
for qid, q, s_str, response_str in zip(
qids, questions, sents, responses):
print (' Example %s' % qid)
print (' Question %s' % q)
s_list = s_str.split('|')
a_list = response_str.split('|')
for s, a in zip(s_list, a_list):
print (' Sentence: %s' % sent_format(s)).encode('utf-8')
print (' Is good? %s' % colored(a, 'cyan'))
def pred_human_eval():
all_preds = collections.defaultdict(list)
with open(OPTS.filename) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
all_preds[row['Input.qid']].append(row['Answer.response'])
preds = {}
for qid in all_preds:
if OPTS.ensemble:
for a in all_preds[qid]:
count = sum(1 for pred in all_preds[qid] if a == pred)
if count > 1:
preds[qid] = a
break
else:
preds[qid] = random.sample(all_preds[qid], 1)[0]
else:
preds[qid] = random.sample(all_preds[qid], 1)[0]
print json.dumps(preds)
def fetch():
out_path = os.path.dirname(__file__)
out_path = os.path.join(out_path, 'fingerprints', 'data', 'types.yml')
fh = urllib.urlopen(CSV_URL)
types = {}
for row in unicodecsv.DictReader(fh):
name = stringify(row.get('Name'))
abbr = stringify(row.get('Abbreviation'))
if name is None or abbr is None:
continue
if name in types and types[name] != abbr:
print name, types[name], abbr
types[name] = abbr
# print abbr, name
with open(out_path, 'w') as fh:
yaml.safe_dump({'types': types}, fh,
indent=2,
allow_unicode=True,
canonical=False,
default_flow_style=False)
def parse_csv(file_stream, expected_columns=None):
"""
Parse csv file and return a stream of dictionaries representing each row.
First line of CSV file must contain column headers.
Arguments:
file_stream: input file
expected_columns (set[unicode]): columns that are expected to be present
Yields:
dict: CSV line parsed into a dictionary.
"""
reader = unicodecsv.DictReader(file_stream, encoding="utf-8")
if expected_columns and set(expected_columns) - set(reader.fieldnames):
raise ValidationError(ValidationMessages.MISSING_EXPECTED_COLUMNS.format(
expected_columns=", ".join(expected_columns), actual_columns=", ".join(reader.fieldnames)
))
# "yield from reader" would be nicer, but we're on python2.7 yet.
for row in reader:
yield row
def _read_input(self, in_file):
"""Dummy file of inputs
:param in_file: path to input file of 2 cols (tab-delim); accession_number, sequence
:type string
:return dictionary of accession_number to sequence tags
"""
result = {}
with open(in_file, 'r') as f:
reader = csv.DictReader(f, delimiter=str('\t'))
for row in reader:
result[row['accession']] = {
'transcript_sequence': row['transcript_sequence'],
'cds_start_i': int(row['cds_start_i']),
'cds_end_i': int(row['cds_end_i'])
}
return result
def process(self, activate=False):
if not self.is_questions_created:
data = list(csv.DictReader(self.csv.file))
for d in data:
Question.objects.create(task=self,
question=d)
self.is_questions_created = True
if activate:
self.is_active = True
self.save()
def read_test_labels(filename):
labels = []
csvFile = open(filename)
reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
for j in reader:
labels.append(j['id'])
return labels
def read_depths(filename):
depths = {}
csvFile = open(filename)
reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
for j in reader:
depths[j['key']] = int(j['depth'])
return depths
def read_labels(filename):
labels = {}
csvFile = open(filename)
reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
for j in reader:
#if not j['id'] == 'b8bb02d229361a623a4dc57aa0e5c485':
labels[j['id']] = int(j['cancer'])
return labels
def read_test_labels(filename):
labels = []
csvFile = open(filename)
reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
for j in reader:
#if not j['id'] == 'b8bb02d229361a623a4dc57aa0e5c485':
labels.append(j['id'])
return labels
def read_depths(filename):
depths = {}
csvFile = open(filename)
reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
for j in reader:
depths[j['key']] = int(j['depth'])
return depths
dsb_create_voxel_model_predictions.py 文件源码
项目:data-science-bowl-2017
作者: tondonia
项目源码
文件源码
阅读 34
收藏 0
点赞 0
评论 0
def read_test_labels(filename):
labels = []
csvFile = open(filename)
reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
for j in reader:
#if not j['id'] == 'b8bb02d229361a623a4dc57aa0e5c485':
labels.append(j['id'])
return labels
dsb_create_voxel_model_predictions.py 文件源码
项目:data-science-bowl-2017
作者: tondonia
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def read_depths(filename):
depths = {}
csvFile = open(filename)
reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
for j in reader:
depths[j['key']] = int(j['depth'])
return depths
dsb_voxel_predict_stage2.py 文件源码
项目:data-science-bowl-2017
作者: tondonia
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def read_labels(filename):
labels = {}
csvFile = open(filename)
reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
for j in reader:
#if not j['id'] == 'b8bb02d229361a623a4dc57aa0e5c485':
labels[j['id']] = int(j['cancer'])
return labels
dsb_voxel_predict_stage2.py 文件源码
项目:data-science-bowl-2017
作者: tondonia
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def read_test_labels(filename):
labels = []
csvFile = open(filename)
reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
for j in reader:
#if not j['id'] == 'b8bb02d229361a623a4dc57aa0e5c485':
labels.append(j['id'])
return labels
dsb_voxel_predict_stage2.py 文件源码
项目:data-science-bowl-2017
作者: tondonia
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def read_depths(filename):
depths = {}
csvFile = open(filename)
reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
for j in reader:
depths[j['key']] = int(j['depth'])
return depths
def make_inquiry_requests_from_file(inquiry, file):
reader = unicodecsv.DictReader(file)
for lineno, line in enumerate(reader, 1):
try:
recipient = Recipient.objects.get(slug=line['recipient'])
except Recipient.DoesNotExist:
raise ValueError('Recipient on line %s not found' % lineno)
if not recipient.groups.filter(id=inquiry.group_id).exists():
raise ValueError('Recipient %s not in inquiry group' % recipient)
data = json.loads(line['data'])
InquiryRequest.objects.create_from_inquiry(inquiry, recipient, data)
def read_sentences():
id_to_sents = collections.defaultdict(list)
with open(OPTS.batch_file) as f:
reader = csv.DictReader(f)
for row in reader:
input_qids = row['Input.qids'].split('\t')
input_sents = row['Input.sents'].split('\t')
ans_is_good = row['Answer.is-good'].split('\t')
ans_responses = row['Answer.responses'].split('\t')
for qid, s, is_good, response in zip(input_qids, input_sents, ans_is_good, ans_responses):
if is_good == 'yes':
response = s
if response not in id_to_sents[qid]:
id_to_sents[qid].append(response)
return id_to_sents
def stats_grammar():
# Read data
worker_to_is_good = collections.defaultdict(list)
worker_to_times = collections.defaultdict(list)
with open(OPTS.filename) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
if row['AssignmentStatus'] == 'Rejected': continue
worker_id = row['WorkerId']
ans_is_good = row['Answer.is-good'].split('\t')
time = float(row['WorkTimeInSeconds'])
worker_to_is_good[worker_id].extend(ans_is_good)
worker_to_times[worker_id].append(time)
# Aggregate by worker
print '%d total workers' % len(worker_to_times)
worker_stats = {}
for worker_id in worker_to_times:
times = sorted(worker_to_times[worker_id])
t_median = times[len(times)/2]
t_mean = sum(times) / float(len(times))
is_good_list = worker_to_is_good[worker_id]
num_qs = len(is_good_list)
frac_good = sum(1.0 for x in is_good_list if x == 'yes') / num_qs
worker_stats[worker_id] = (t_median, t_mean, num_qs, frac_good)
# Print
sorted_ids = sorted(list(worker_stats), key=lambda x: worker_stats[x][3])
for worker_id in sorted_ids:
t_median, t_mean, num_qs, frac_good = worker_stats[worker_id]
print 'Worker %s: t_median %.1f, t_mean %.1f, %d questions, %.1f%% good' % (
worker_id, t_median, t_mean, num_qs, 100.0 * frac_good)
def stats_verify():
# Read data
worker_to_is_good = collections.defaultdict(list)
worker_to_times = collections.defaultdict(list)
with open(OPTS.filename) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
if row['AssignmentStatus'] == 'Rejected': continue
worker_id = row['WorkerId']
ans_is_good = [x for s in row['Answer.responses'].split('\t')
for x in s.split('|')]
time = float(row['WorkTimeInSeconds'])
worker_to_is_good[worker_id].extend(ans_is_good)
worker_to_times[worker_id].append(time)
# Aggregate by worker
print '%d total workers' % len(worker_to_times)
worker_stats = {}
for worker_id in worker_to_times:
times = sorted(worker_to_times[worker_id])
t_median = times[len(times)/2]
t_mean = sum(times) / float(len(times))
is_good_list = worker_to_is_good[worker_id]
num_qs = len(is_good_list)
frac_good = sum(1.0 for x in is_good_list if x == 'yes') / num_qs
worker_stats[worker_id] = (t_median, t_mean, num_qs, frac_good)
# Print
sorted_ids = sorted(list(worker_stats), key=lambda x: worker_stats[x][3])
for worker_id in sorted_ids:
t_median, t_mean, num_qs, frac_good = worker_stats[worker_id]
print 'Worker %s: t_median %.1f, t_mean %.1f, %d questions, %.1f%% good' % (
worker_id, t_median, t_mean, num_qs, 100.0 * frac_good)
def get_csv(csv_file):
reader = unicodecsv.DictReader(open(example_file, 'rb'))
return list(reader)
def handle(self, *args, **options):
translation.activate(settings.LANGUAGE_CODE)
filename = options['filename']
reader = unicodecsv.DictReader(open(filename))
importer = CSVImporter()
importer.run(reader)
def upload_information_objects(self, request):
if not request.method == 'POST':
raise PermissionDenied
if not self.has_change_permission(request):
raise PermissionDenied
reader = unicodecsv.DictReader(request.FILES['file'])
importer = CSVImporter()
importer.run(reader)
return redirect('admin:froide_campaign_informationobject_changelist')
def read_ipa_bases(ipa_bases):
segments = []
with open(ipa_bases, 'rb') as f:
dictreader = csv.DictReader(f, encoding='utf=8')
for record in dictreader:
form = record['ipa']
features = {k: v for k, v in record.items() if k != 'ipa'}
segments.append(Segment(form, features))
return segments
def gcp_file_reader(fn):
rdr = csv.DictReader(open(fn, "r"), delimiter=str("\t"))
for rec in rdr:
if rec["id"].startswith("#"):
continue
yield rec
def test_parser_test_completeness(self):
"""ensure that all rules in grammar have tests"""
grammar_rule_re = re.compile("^(\w+)")
grammar_fn = pkg_resources.resource_filename(__name__, "../hgvs/_data/hgvs.pymeta")
with open(grammar_fn, "r") as f:
grammar_rules = set(r.group(1) for r in filter(None, map(grammar_rule_re.match, f)))
with open(self._test_fn, "r") as f:
reader = csv.DictReader(f, delimiter=str("\t"))
test_rules = set(row["Func"] for row in reader)
untested_rules = grammar_rules - test_rules
self.assertTrue(len(untested_rules) == 0, "untested rules: {}".format(untested_rules))
def test_parser_grammar(self):
with open(self._test_fn, "r") as f:
reader = csv.DictReader(f, delimiter=str("\t"))
fail_cases = []
for row in reader:
if row["Func"].startswith("#"):
continue
# setup input
inputs = self._split_inputs(row["Test"], row["InType"])
expected_results = self._split_inputs(row["Expected"], row["InType"]) if row["Expected"] else inputs
expected_map = dict(zip(inputs, expected_results))
# step through each item and check
is_valid = True if row["Valid"].lower() == "true" else False
for key in expected_map:
expected_result = six.text_type(expected_map[key]).replace("u'", "'")
function_to_test = getattr(self.p._grammar(key), row["Func"])
row_str = u"{}\t{}\t{}\t{}\t{}".format(row["Func"], key, row["Valid"], "one", expected_result)
try:
actual_result = six.text_type(function_to_test()).replace("u'", "'")
if not is_valid or (expected_result != actual_result):
print("expected: {} actual:{}".format(expected_result, actual_result))
fail_cases.append(row_str)
except Exception as e:
if is_valid:
print("expected: {} Exception: {}".format(expected_result, e))
fail_cases.append(row_str)
# everything should have passed - report whatever failed
self.assertTrue(len(fail_cases) == 0, pprint.pprint(fail_cases))