def export_supervision_authorities(self, *args, **options):
writer = unicodecsv.DictWriter(open(options['filename'], 'w'), (
'name', 'email', 'address', 'contact', 'jurisdiction__slug', 'other_names', 'description', 'tags', 'parent__name', 'classification', 'url', 'website_dump', 'request_note'
))
writer.writeheader()
for authority in SupervisionAuthority.objects.all():
slug = slugify(authority.name)
authority.fds_url = 'https://fragdenstaat.de/behoerde/%s/' % slug
authority.save()
writer.writerow({
'name': authority.name,
'email': authority.email,
'address': authority.address,
'contact': authority.contact,
'jurisdiction__slug': slugify(authority.state.name),
'classification': 'Heimaufsicht'
})
python类DictWriter()的实例源码
def write_routes(self):
"""Write routes to file"""
with open(os.path.join(self.out_dir, 'routes.txt'), 'wb') as fhandle:
route_fieldnames = [
'route_id', 'route_short_name', 'route_long_name', 'route_type', 'agency_id']
route_writer = csv.DictWriter(fhandle, delimiter=',',
quotechar='"', fieldnames=route_fieldnames)
route_writer.writeheader()
for rid, route in enumerate(self.routes):
route_writer.writerow({
'route_id': rid,
'route_short_name': route['route_short_name'],
'route_long_name': route['route_long_name'],
'route_type': route['route_type'],
'agency_id': route['agency_id']
})
def write_stops(self):
"""Write stops to file"""
with open(os.path.join(self.out_dir, 'stops.txt'), 'wb') as fhandle:
route_fieldnames = ['stop_id', 'stop_name', 'stop_lat', 'stop_lon']
route_writer = csv.DictWriter(fhandle, delimiter=',',
quotechar='"', fieldnames=route_fieldnames)
route_writer.writeheader()
for stop in self.stops.itervalues():
if not stop['has_trip']:
continue
route_writer.writerow({
'stop_id': stop['stop_id'],
'stop_name': stop['stop_name'],
'stop_lat': stop['stop_lat'],
'stop_lon': stop['stop_lon']
})
def write_agencies(self):
"""Write agencies to file"""
with open(os.path.join(self.out_dir, 'agency.txt'), 'wb') as fhandle:
agency_fieldnames = [
'agency_id', 'agency_name', 'agency_url', 'agency_timezone', 'agency_lang']
agency_writer = csv.DictWriter(fhandle, delimiter=',',
quotechar='"', fieldnames=agency_fieldnames)
agency_writer.writeheader()
for agency in self.agencies:
agency_writer.writerow({
'agency_id': agency,
'agency_name': agency,
'agency_url': 'http://www.bahn.de',
'agency_timezone': 'Europe/Berlin',
'agency_lang': 'de'
})
def export(self, file_handle=None):
'''
exports the task questions and answers as a CSV
'''
try:
if not file_handle:
file_handle = StringIO.StringIO()
data = self.task.answers
# http://stackoverflow.com/a/11399424
# getting the union of all keys in all of the answer rows
headers = list(set().union(*(i.keys() for i in data)))
writer = csv.DictWriter(file_handle, fieldnames=headers)
writer.writeheader()
for row in data:
writer.writerow(row)
export_file = ContentFile(file_handle.getvalue())
export_filename = "ST_TASK_{task_id}_EXPORT_{date}.csv".format(task_id=self.task.id,
date=str(datetime.date.today()))
self.export_file.save(name=export_filename, content=export_file, save=False)
self.status = self.SUCCESS
except Exception as e:
LOG.exception(e)
self.status = self.FAILURE
self.save()
def create(self, cr, uid, ids, data, context=None):
temp = tempfile.NamedTemporaryFile(mode='w+t',suffix='.csv')
outfile = tempfile.NamedTemporaryFile(mode='w+b',suffix='.pdf')
glabels = tempfile.NamedTemporaryFile(mode='w+t',suffix='.glabels')
glabels.write(base64.b64decode(data.get('template')) if data.get('template') else None or self.template)
glabels.seek(0)
pool = registry(cr.dbname)
labelwriter = None
for p in pool.get(self.model).read(cr,uid,ids):
if not labelwriter:
labelwriter = csv.DictWriter(temp,p.keys())
labelwriter.writeheader()
for c in range(self.count):
labelwriter.writerow({k:isinstance(v, (str, unicode)) and v.encode('utf8') or str(v) for k,v in p.items()})
temp.seek(0)
res = os.system("glabels-3-batch -o %s -l -C -i %s %s" % (outfile.name,temp.name,glabels.name))
outfile.seek(0)
pdf = outfile.read()
outfile.close()
temp.close()
glabels.close()
return (pdf,'pdf')
def create(self, cr, uid, ids, data, context=None):
temp = tempfile.NamedTemporaryFile(mode='w+t',suffix='.csv')
outfile = tempfile.NamedTemporaryFile(mode='w+b',suffix='.pdf')
glabels = tempfile.NamedTemporaryFile(mode='w+t',suffix='.glabels')
glabels.write(base64.b64decode(data.get('template')) if data.get('template') else None or self.template)
glabels.seek(0)
pool = registry(cr.dbname)
labelwriter = csv.DictWriter(temp,[h[self.col_name] for h in pool.get(self.model).read(cr,uid,pool.get(self.model).search(cr,uid,[]),[self.col_name])])
labelwriter.writeheader()
for c in range(self.count):
#~ labelwriter.writerow({p[self.col_name]:isinstance(p[self.col_value], (str, unicode)) and p[self.col_value].encode('utf8') or p[self.col_value] or '' for p in pool.get(self.model).read(cr,uid,pool.get(self.model).search(cr,uid,[]),[self.col_name,self.col_value])])})
labelwriter.writerow({p[self.col_name]: str(p[self.col_value]) if not str(p[self.col_value]) == '0.0' else '' for p in pool.get(self.model).read(cr,uid,pool.get(self.model).search(cr,uid,[]),[self.col_name,self.col_value], context=context)})
temp.seek(0)
res = os.system("glabels-3-batch -o %s -l -C -i %s %s" % (outfile.name,temp.name,glabels.name))
outfile.seek(0)
pdf = outfile.read()
outfile.close()
temp.close()
glabels.close()
return (pdf,'pdf')
def write_trips(self):
"""Write trips to file"""
with open(os.path.join(self.out_dir, 'trips.txt'), 'wb') as fhandle, open(os.path.join(self.out_dir, 'stop_times.txt'), 'wb') as sfhandle:
trip_fieldnames = [
'route_id', 'service_id', 'trip_id', 'trip_headsign']
trip_writer = csv.DictWriter(fhandle, delimiter=',',
quotechar='"', fieldnames=trip_fieldnames)
trip_writer.writeheader()
stoptimes_fieldnames = [
'trip_id', 'arrival_time', 'departure_time', 'stop_id', 'stop_sequence']
stoptimes_writer = csv.DictWriter(sfhandle, delimiter=',',
quotechar='"', fieldnames=stoptimes_fieldnames)
stoptimes_writer.writeheader()
for tid, trip in enumerate(self.trips):
trip_writer.writerow({
'route_id': trip['route_id'],
'service_id': trip['service_id'],
'trip_id': tid,
'trip_headsign': trip['headsign']
})
for stoptime in trip['stoptimes']:
stoptimes_writer.writerow({
'trip_id': tid,
'arrival_time': stoptime['arrival_time'] + ':00',
'departure_time': stoptime['departure_time'] + ':00',
'stop_id': stoptime['stop_id'],
'stop_sequence': stoptime['stop_sequence']
})
def write_calendar_dates(self):
"""Write calendar dates to file"""
with open(os.path.join(self.out_dir, 'calendar_dates.txt'), 'wb') as fhandle:
calendar_fieldnames = ['service_id', 'date', 'exception_type']
calendar_writer = csv.DictWriter(fhandle, delimiter=',',
quotechar='"', fieldnames=calendar_fieldnames)
calendar_writer.writeheader()
for sid, cdate in enumerate(self.calendar_dates):
for date in cdate:
calendar_writer.writerow({
'service_id': sid,
'date': date.strftime('%Y%m%d'),
'exception_type': 1
})
def toCSV(self, out=sys.stdout):
"Saves dataset rows to a CSV file"
fieldnames = self.rows.keys()
writer = unicodecsv.DictWriter(out, fieldnames)
writer.writeheader()
for r in self.rows:
writer.writerow(dict(r))
def generate_deposit_slip(self, rows):
output = BytesIO()
# l'ordre est important
headers = rows[0].keys()
# l'ordre est fixé par headers
writer = csv.DictWriter(output, headers, encoding='utf-8')
writer.writeheader()
writer.writerows(rows)
return output
def process_results(self, results):
'''
Processes a page full of results.
Saves pdf for each result.
'''
try:
articles = results[0]['records']['article']
with open(self.csv_file, 'ab') as csv_file:
writer = csv.DictWriter(csv_file, FIELDS, encoding='utf-8')
if self.harvested == 0:
writer.writeheader()
for article in articles:
article_id = article['id']
row = self.prepare_row(article)
writer.writerow(row)
if self.pdf:
pdf_url = self.get_pdf_url(article_id)
if pdf_url:
pdf_filename = self.make_filename(article)
pdf_file = os.path.join(self.data_dir, 'pdf', '{}.pdf'.format(pdf_filename))
urlretrieve(pdf_url, pdf_file)
if self.text:
text = article.get('articleText')
if text:
text_filename = self.make_filename(article)
text = re.sub('<[^<]+?>', '', text)
text = re.sub("\s\s+", " ", text)
text_file = os.path.join(self.data_dir, 'text', '{}.txt'.format(text_filename))
with open(text_file, 'wb') as text_output:
text_output.write(text.encode('utf-8'))
time.sleep(0.5)
self.harvested += self.get_highest_n(results)
print('Harvested: {}'.format(self.harvested))
except KeyError:
pass
def csv_transactions(self, year, month, file_name):
transactions = self.transactions(year, month)
if len(transactions) == 0:
warnings.warn('No transactions for the period ({}-{})'.format(
year, month))
return
with open(file_name, 'wb') as f:
csv_writer = csv.DictWriter(f, fieldnames=self.fieldnames,
encoding='utf-8-sig') # add BOM to csv
csv_writer.writeheader()
csv_writer.writerows(transactions)
def write_csv(resource_name, data, write_header=False):
"""Start the csv writing flow.
Args:
resource_name (str): The resource name.
data (iterable): An iterable of data to be written to csv.
write_header (bool): If True, write the header in the csv file.
Yields:
object: The CSV temporary file pointer.
Raises:
CSVFileError: If there was an error writing the CSV file.
"""
csv_file = tempfile.NamedTemporaryFile(delete=False)
try:
writer = csv.DictWriter(csv_file, doublequote=False, escapechar='\\',
quoting=csv.QUOTE_NONE,
fieldnames=CSV_FIELDNAME_MAP[resource_name])
if write_header:
writer.writeheader()
for i in data:
writer.writerow(i)
# This must be closed before returned for loading.
csv_file.close()
yield csv_file
# Remove the csv file after loading.
os.remove(csv_file.name)
except (IOError, OSError, csv.Error) as e:
raise CSVFileError(resource_name, e)
def csvify(rows):
'''Expects a list of dictionaries and returns a CSV response.'''
if not rows:
csv_str = ''
else:
s = BytesIO()
keys = rows[0].keys()
dw = csv.DictWriter(s, keys)
dw.writeheader()
dw.writerows([dict(r) for r in rows])
csv_str = s.getvalue()
return Response(csv_str, mimetype='text/csv')
def _agregado_por_comercio(request, anio, mes, quincena, region_id, funcion, prefijo):
if not (hasattr(request.user, "perfil") and \
request.user.perfil.autorizacion >= PERMISO_COORD_ZONAL):
messages.error(request, 'Permisos insuficientes.')
return render(request, 'relevamiento/mensaje.html')
lecturas = _lecturas_del_periodo(anio, mes, quincena, region_id, funcion)
# Pivot de los resultados
lecturas_por_comercio = OrderedDict()
encabezado = ["Producto"]
for lectura in lecturas:
if lectura['comercio'] not in encabezado:
encabezado.append(lectura['comercio'])
lecturas_por_comercio.setdefault(lectura['producto'], {}).update(
{lectura['comercio']: lectura['valor']})
nombre_archivo = [prefijo, anio, mes.zfill(2), quincena.zfill(2)]
if region_id:
region = Region.objects.get(pk=region_id)
nombre_archivo.append("_%s" % slugify(region.nombre))
nombre_archivo.append(".csv")
nombre_archivo = "".join(nombre_archivo)
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = 'attachment; filename=%s;' % nombre_archivo
response['Cache-Control'] = 'no-cache'
writer = csv.DictWriter(response, fieldnames=encabezado)
writer.writeheader()
for producto, lecturas_dict in list(lecturas_por_comercio.items()):
row = {"Producto": producto}
row.update(lecturas_dict)
writer.writerow(row)
return response
def write_ipa_all(ipa_bases, ipa_all, all_segments, sort_order):
with open(ipa_bases, 'rb') as f:
reader = csv.reader(f, encoding='utf-8')
fieldnames = next(reader)
with open(ipa_all, 'wb') as f:
writer = csv.DictWriter(f, encoding='utf-8', fieldnames=fieldnames)
writer.writerow({k: k for k in fieldnames})
all_segments_list = sort_all_segments(sort_order, all_segments)
for segment in all_segments_list:
fields = copy.copy(segment.features)
fields['ipa'] = segment.form
writer.writerow(fields)
def write_csv(self, filename='output.csv', make_strings=False):
"""Write the processed rows to the given filename
"""
if (len(self.rows) <= 0):
raise AttributeError('No rows were loaded')
if make_strings:
out = self.make_strings()
else:
out = self.rows
with open(filename, 'wb+') as f:
writer = csv.DictWriter(f, self.key_map.keys())
writer.writeheader()
writer.writerows(out)
def write_csv(self, filename='output.csv', make_strings=False):
"""Write the processed rows to the given filename
"""
if (len(self.rows) <= 0):
raise AttributeError('No rows were loaded')
if make_strings:
out = self.make_strings()
else:
out = self.rows
with open(filename, 'wb+') as f:
writer = csv.DictWriter(f, self.key_map.keys())
writer.writeheader()
writer.writerows(out)
def toCSV(self, delimiter = '\t'):
NOT_ALLOWED_FIELDS = ['evidence.evidence_chain', 'search_metadata', 'search_metadata.sort']
output = BytesIO()
if not self.data:
self.flatten(self.toDict()) # populate data if empty
if self.data and isinstance(self.data[0], dict):
key_set = set()
flattened_data = []
for row in self.data:
flat = self.flatten(row,
simplify=self.params.datastructure == SourceDataStructureOptions.SIMPLE)
for field in NOT_ALLOWED_FIELDS:
flat.pop(field, None)
flattened_data.append(flat)
key_set.update(flat.keys())
ordered_keys=self.params.fields or sorted(list(key_set))
ordered_keys = map(unicode,ordered_keys)
writer = csv.DictWriter(output,
ordered_keys,
restval='',
delimiter=delimiter,
quotechar='"',
quoting=csv.QUOTE_MINIMAL,
doublequote=False,
escapechar='\\',
# extrasaction='ignore',
)
writer.writeheader()
for row in flattened_data:
writer.writerow(row)
if self.data and isinstance(self.data[0], list):
writer = csv.writer(output,
delimiter=delimiter,
quotechar='"',
quoting=csv.QUOTE_MINIMAL,
doublequote=False,
escapechar='\\',
# extrasaction = 'ignore',
)
for row in self.data:
writer.writerow(row)
return output.getvalue()