def write_csv(resource_name, data, write_header=False):
"""Start the csv writing flow.
Args:
resource_name (str): The resource name.
data (iterable): An iterable of data to be written to csv.
write_header (bool): If True, write the header in the csv file.
Yields:
object: The CSV temporary file pointer.
Raises:
CSVFileError: If there was an error writing the CSV file.
"""
csv_file = tempfile.NamedTemporaryFile(delete=False)
try:
writer = csv.DictWriter(csv_file, doublequote=False, escapechar='\\',
quoting=csv.QUOTE_NONE,
fieldnames=CSV_FIELDNAME_MAP[resource_name])
if write_header:
writer.writeheader()
for i in data:
writer.writerow(i)
# This must be closed before returned for loading.
csv_file.close()
yield csv_file
# Remove the csv file after loading.
os.remove(csv_file.name)
except (IOError, OSError, csv.Error) as e:
raise CSVFileError(resource_name, e)
csv_writer.py 文件源码
python
阅读 17
收藏 0
点赞 0
评论 0
评论列表
文章目录