def append_by_csvs(self, market_situations_path, buy_offer_path, csv_merchant_id=None):
with open(market_situations_path, 'r') as csvfile:
has_header = csv.Sniffer().has_header(csvfile.read(16384))
csvfile.seek(0)
if has_header:
situation_data = csv.DictReader(csvfile)
else:
situation_data = csv.DictReader(csvfile, fieldnames=get_market_situation_fieldnames())
for line in situation_data:
self.append_marketplace_situations(line, csv_merchant_id)
self.update_timestamps()
with open(buy_offer_path, 'r') as csvfile:
has_header = csv.Sniffer().has_header(csvfile.read(16384))
csvfile.seek(0)
if has_header:
buy_offer_data = csv.DictReader(csvfile)
else:
buy_offer_data = csv.DictReader(csvfile, fieldnames=get_buy_offer_fieldnames())
for line in buy_offer_data:
self.append_sales(line)
self.print_info()
python类Sniffer()的实例源码
def get_csv_reader(input):
# csv package does not support unicode
input = str(input)
# Special case: detect single-column files.
# This check assumes that our only valid delimiters are commas and tabs.
firstLine = input.split('\n')[0]
if not ('\t' in firstLine or ',' in firstLine) \
or len(input.splitlines()) == 1:
dialect = 'excel'
else:
# Take a data sample to determine dialect, but
# don't include incomplete last line
sample = ''
sampleSize = 0
while len(sample) == 0:
sampleSize += 5000
sample = '\n'.join(input[:sampleSize].splitlines()[:-1])
dialect = csv.Sniffer().sniff(sample)
dialect.skipinitialspace = True
return csv.DictReader(input.splitlines(), dialect=dialect)
def import_phenolist(filepath, has_header):
# Return a list-of-dicts with the original column names, or integers if none.
# It'd be great to use pandas for this.
if not os.path.exists(filepath):
raise PheWebError("ERROR: unable to import {!r} because it doesn't exist".format(filepath))
# 1. try openpyxl.
phenos = _import_phenolist_xlsx(filepath, has_header)
if phenos is not None:
return phenos
with read_maybe_gzip(filepath) as f:
# 2. try json.load(f)
try:
return json.load(f)
except ValueError:
if filepath.endswith('.json'):
raise PheWebError("The filepath {!r} ends with '.json' but reading it as json failed.".format(filepath))
# 3. try csv.reader() with csv.Sniffer().sniff()
f.seek(0)
phenos = _import_phenolist_csv(f, has_header)
if phenos is not None:
return phenos
raise PheWebError("I couldn't figure out how to open the file {!r}, sorry.".format(filepath))
def test_write_feed(product_in_stock, monkeypatch):
buffer = StringIO()
write_feed(buffer)
buffer.seek(0)
dialect = csv.Sniffer().sniff(buffer.getvalue())
assert dialect.delimiter == csv.excel_tab.delimiter
assert dialect.quotechar == csv.excel_tab.quotechar
assert dialect.escapechar == csv.excel_tab.escapechar
assert csv.Sniffer().has_header(buffer.getvalue())
lines = [line for line in csv.reader(buffer, dialect=csv.excel_tab)]
assert len(lines) == 2
header = lines[0]
google_required_fields = ['id', 'title', 'link', 'image_link',
'availability', 'price', 'condition']
for field in google_required_fields:
assert field in header
def __init__(self, fname, labels):
"""
Initialize the corpus from a file.
`labels` = are class labels present in the input file? => skip the first column
"""
logger.info("loading corpus from %s" % fname)
self.fname = fname
self.length = None
self.labels = labels
# load the first few lines, to guess the CSV dialect
head = ''.join(itertools.islice(open(self.fname), 5))
self.headers = csv.Sniffer().has_header(head)
self.dialect = csv.Sniffer().sniff(head)
logger.info("sniffed CSV delimiter=%r, headers=%s" % (self.dialect.delimiter, self.headers))
def csvfile_to_wb(csv_filename):
'''Open a CSV file and return an openpyxl workbook.'''
logger.log(
DEBUG_DETAILED,
'Converting CSV file {} into an XLSX workbook.'.format(csv_filename))
with open(csv_filename) as csv_file:
dialect = csv.Sniffer().sniff(csv_file.read())
if USING_PYTHON2:
for attr in dir(dialect):
a = getattr(dialect, attr)
if type(a) == unicode:
setattr(dialect, attr, bytes(a))
csv_file.seek(0)
reader = csv.reader(csv_file, dialect)
wb = pyxl.Workbook()
ws = wb.active
for row_index, row in enumerate(reader, 1):
for column_index, cell in enumerate(row, 1):
if cell not in ('', None):
ws.cell(row=row_index, column=column_index).value = cell
return (wb, dialect)
def addfromcsv(self):
if os.environ.get("REDIS_URL") :
redis_url = os.environ.get("REDIS_URL")
else:
redis_url = "localhost"
r_server = redis.from_url(redis_url)
with open('mapofinnovation/public/spaces_ready_for_merge.csv', 'rb') as csv_file:
dialect = csv.Sniffer().sniff(csv_file.read(), delimiters=',')
csv_file.seek(0)
csv_reader = csv.DictReader(csv_file, dialect=dialect)
for row in csv_reader:
key = row['name']+str(datetime.now())
row.update({'archived':False})
row.update({'verified':True})
r_server.hmset(re.sub(' ','',key),row)
return {'success':'true'}
def from_csv(fp, field_names = None, **kwargs):
dialect = csv.Sniffer().sniff(fp.read(1024))
fp.seek(0)
reader = csv.reader(fp, dialect)
table = PrettyTable(**kwargs)
if field_names:
table.field_names = field_names
else:
if py3k:
table.field_names = [x.strip() for x in next(reader)]
else:
table.field_names = [x.strip() for x in reader.next()]
for row in reader:
table.add_row([x.strip() for x in row])
return table
def Open(self):
try:
if not os.path.isfile(self.v_filename):
raise Spartacus.Utils.Exception('File {0} does not exist or is not a file.'.format(self.v_filename))
if self.v_extension == 'csv':
self.v_file = open(self.v_filename, encoding=self.v_encoding)
v_sample = self.v_file.read(1024)
self.v_file.seek(0)
v_sniffer = csv.Sniffer()
if not v_sniffer.has_header(v_sample):
raise Spartacus.Utils.Exception('CSV file {0} does not have a header.'.format(self.v_filename))
v_dialect = v_sniffer.sniff(v_sample)
self.v_object = csv.DictReader(self.v_file, self.v_header, None, None, v_dialect)
self.v_open = True
elif self.v_extension == 'xlsx':
self.v_object = openpyxl.load_workbook(self.v_filename, read_only=True)
self.v_open = True
else:
raise Spartacus.Utils.Exception('File extension "{0}" not supported.'.format(self.v_extension))
except Spartacus.Utils.Exception as exc:
raise exc
except Exception as exc:
raise Spartacus.Utils.Exception(str(exc))
def restoreSheet(sheetName, filepath, csvfile, overwrite=None):
# Restore sheet from backup CSV file
try:
##dialect = csv.Sniffer().sniff(csvfile.read(1024))
##csvfile.seek(0)
reader = csv.reader(csvfile, delimiter=',') # Ignore dialect for now
rows = [row for row in reader]
if not rows:
raise Exception('No rows in CSV file %s for sheet %s' % (filepath, sheetName))
sdproxy.importSheet(sheetName, rows[0], rows[1:], overwrite=overwrite)
return ''
except Exception, excp:
if Options['debug']:
import traceback
traceback.print_exc()
return 'Error in restoreSheet: '+str(excp)
def test_sniff(self):
sniffer = csv.Sniffer()
dialect = sniffer.sniff(self.sample1)
self.assertEqual(dialect.delimiter, ",")
self.assertEqual(dialect.quotechar, '"')
self.assertEqual(dialect.skipinitialspace, True)
dialect = sniffer.sniff(self.sample2)
self.assertEqual(dialect.delimiter, ":")
self.assertEqual(dialect.quotechar, "'")
self.assertEqual(dialect.skipinitialspace, False)
def test_delimiters(self):
sniffer = csv.Sniffer()
dialect = sniffer.sniff(self.sample3)
# given that all three lines in sample3 are equal,
# I think that any character could have been 'guessed' as the
# delimiter, depending on dictionary order
self.assertIn(dialect.delimiter, self.sample3)
dialect = sniffer.sniff(self.sample3, delimiters="?,")
self.assertEqual(dialect.delimiter, "?")
dialect = sniffer.sniff(self.sample3, delimiters="/,")
self.assertEqual(dialect.delimiter, "/")
dialect = sniffer.sniff(self.sample4)
self.assertEqual(dialect.delimiter, ";")
dialect = sniffer.sniff(self.sample5)
self.assertEqual(dialect.delimiter, "\t")
dialect = sniffer.sniff(self.sample6)
self.assertEqual(dialect.delimiter, "|")
dialect = sniffer.sniff(self.sample7)
self.assertEqual(dialect.delimiter, "|")
self.assertEqual(dialect.quotechar, "'")
def __init__(self, fname, labels):
"""
Initialize the corpus from a file.
`labels` = are class labels present in the input file? => skip the first column
"""
logger.info("loading corpus from %s" % fname)
self.fname = fname
self.length = None
self.labels = labels
# load the first few lines, to guess the CSV dialect
head = ''.join(itertools.islice(open(self.fname), 5))
self.headers = csv.Sniffer().has_header(head)
self.dialect = csv.Sniffer().sniff(head)
logger.info("sniffed CSV delimiter=%r, headers=%s" % (self.dialect.delimiter, self.headers))
def __init__(self, fname, labels):
"""
Initialize the corpus from a file.
`labels` = are class labels present in the input file? => skip the first column
"""
logger.info("loading corpus from %s" % fname)
self.fname = fname
self.length = None
self.labels = labels
# load the first few lines, to guess the CSV dialect
head = ''.join(itertools.islice(open(self.fname), 5))
self.headers = csv.Sniffer().has_header(head)
self.dialect = csv.Sniffer().sniff(head)
logger.info("sniffed CSV delimiter=%r, headers=%s" % (self.dialect.delimiter, self.headers))
def __init__(self, fname, labels):
"""
Initialize the corpus from a file.
`labels` = are class labels present in the input file? => skip the first column
"""
logger.info("loading corpus from %s" % fname)
self.fname = fname
self.length = None
self.labels = labels
# load the first few lines, to guess the CSV dialect
head = ''.join(itertools.islice(open(self.fname), 5))
self.headers = csv.Sniffer().has_header(head)
self.dialect = csv.Sniffer().sniff(head)
logger.info("sniffed CSV delimiter=%r, headers=%s" % (self.dialect.delimiter, self.headers))
def csv2sos(path, keys=None, encoding=None, dialect=None):
if not encoding:
encoding = detectEncoding(path)
print('Detected encoding: %s' % encoding)
csvfile = open(path, 'rt', encoding=encoding)
sosfile = open(path + '.sos', 'wt', encoding='utf8')
if not dialect:
dialect = csv.Sniffer().sniff(csvfile.read(1024*1024), delimiters=[';','\t',','])
print('Detected csv dialect: %s' % dialect)
csvfile.seek(0)
reader = csv.DictReader(csvfile, dialect=dialect)
i = 0
for row in reader:
sosfile.write(str(i) + '\t' + json.dumps(row, ensure_ascii=False) + '\n')
i += 1
if i % 100000 == 0:
print("%10d items converted" % i)
csvfile.close()
sosfile.close()
def test_delimiters(self):
sniffer = csv.Sniffer()
dialect = sniffer.sniff(self.sample3)
# given that all three lines in sample3 are equal,
# I think that any character could have been 'guessed' as the
# delimiter, depending on dictionary order
self.assertIn(dialect.delimiter, self.sample3)
dialect = sniffer.sniff(self.sample3, delimiters="?,")
self.assertEqual(dialect.delimiter, "?")
dialect = sniffer.sniff(self.sample3, delimiters="/,")
self.assertEqual(dialect.delimiter, "/")
dialect = sniffer.sniff(self.sample4)
self.assertEqual(dialect.delimiter, ";")
dialect = sniffer.sniff(self.sample5)
self.assertEqual(dialect.delimiter, "\t")
dialect = sniffer.sniff(self.sample6)
self.assertEqual(dialect.delimiter, "|")
dialect = sniffer.sniff(self.sample7)
self.assertEqual(dialect.delimiter, "|")
self.assertEqual(dialect.quotechar, "'")
dialect = sniffer.sniff(self.sample8)
self.assertEqual(dialect.delimiter, '+')
dialect = sniffer.sniff(self.sample9)
self.assertEqual(dialect.delimiter, '+')
self.assertEqual(dialect.quotechar, "'")
def test_delimiters(self):
sniffer = csv.Sniffer()
dialect = sniffer.sniff(self.sample3)
# given that all three lines in sample3 are equal,
# I think that any character could have been 'guessed' as the
# delimiter, depending on dictionary order
self.assertIn(dialect.delimiter, self.sample3)
dialect = sniffer.sniff(self.sample3, delimiters="?,")
self.assertEqual(dialect.delimiter, "?")
dialect = sniffer.sniff(self.sample3, delimiters="/,")
self.assertEqual(dialect.delimiter, "/")
dialect = sniffer.sniff(self.sample4)
self.assertEqual(dialect.delimiter, ";")
dialect = sniffer.sniff(self.sample5)
self.assertEqual(dialect.delimiter, "\t")
dialect = sniffer.sniff(self.sample6)
self.assertEqual(dialect.delimiter, "|")
dialect = sniffer.sniff(self.sample7)
self.assertEqual(dialect.delimiter, "|")
self.assertEqual(dialect.quotechar, "'")
dialect = sniffer.sniff(self.sample8)
self.assertEqual(dialect.delimiter, '+')
dialect = sniffer.sniff(self.sample9)
self.assertEqual(dialect.delimiter, '+')
self.assertEqual(dialect.quotechar, "'")
prettytable.py 文件源码
项目:My-Web-Server-Framework-With-Python2.7
作者: syjsu
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def from_csv(fp, field_names = None, **kwargs):
dialect = csv.Sniffer().sniff(fp.read(1024))
fp.seek(0)
reader = csv.reader(fp, dialect)
table = PrettyTable(**kwargs)
if field_names:
table.field_names = field_names
else:
if py3k:
table.field_names = [x.strip() for x in next(reader)]
else:
table.field_names = [x.strip() for x in reader.next()]
for row in reader:
table.add_row([x.strip() for x in row])
return table
def test_delimiters(self):
sniffer = csv.Sniffer()
dialect = sniffer.sniff(self.sample3)
# given that all three lines in sample3 are equal,
# I think that any character could have been 'guessed' as the
# delimiter, depending on dictionary order
self.assertIn(dialect.delimiter, self.sample3)
dialect = sniffer.sniff(self.sample3, delimiters="?,")
self.assertEqual(dialect.delimiter, "?")
dialect = sniffer.sniff(self.sample3, delimiters="/,")
self.assertEqual(dialect.delimiter, "/")
dialect = sniffer.sniff(self.sample4)
self.assertEqual(dialect.delimiter, ";")
dialect = sniffer.sniff(self.sample5)
self.assertEqual(dialect.delimiter, "\t")
dialect = sniffer.sniff(self.sample6)
self.assertEqual(dialect.delimiter, "|")
dialect = sniffer.sniff(self.sample7)
self.assertEqual(dialect.delimiter, "|")
self.assertEqual(dialect.quotechar, "'")
dialect = sniffer.sniff(self.sample8)
self.assertEqual(dialect.delimiter, '+')
dialect = sniffer.sniff(self.sample9)
self.assertEqual(dialect.delimiter, '+')
self.assertEqual(dialect.quotechar, "'")
def from_csv(fp, field_names = None, **kwargs):
dialect = csv.Sniffer().sniff(fp.read(1024))
fp.seek(0)
reader = csv.reader(fp, dialect)
table = PrettyTable(**kwargs)
if field_names:
table.field_names = field_names
else:
if py3k:
table.field_names = [x.strip() for x in next(reader)]
else:
table.field_names = [x.strip() for x in reader.next()]
for row in reader:
table.add_row([x.strip() for x in row])
return table
def parseDelimiter(self,f):
infile = open(f)
lines = infile.readlines()
infile.close()
sniffer = csv.Sniffer()
text = sniffer.sniff(lines[0])
return text.delimiter
###############################################################################
# The following are GUI shortcut tools
###############################################################################
## Create an action for GUIs
def from_csv(fp, field_names = None, **kwargs):
dialect = csv.Sniffer().sniff(fp.read(1024))
fp.seek(0)
reader = csv.reader(fp, dialect)
table = PrettyTable(**kwargs)
if field_names:
table.field_names = field_names
else:
if py3k:
table.field_names = [x.strip() for x in next(reader)]
else:
table.field_names = [x.strip() for x in reader.next()]
for row in reader:
table.add_row([x.strip() for x in row])
return table
def test_delimiters(self):
sniffer = csv.Sniffer()
dialect = sniffer.sniff(self.sample3)
# given that all three lines in sample3 are equal,
# I think that any character could have been 'guessed' as the
# delimiter, depending on dictionary order
self.assertIn(dialect.delimiter, self.sample3)
dialect = sniffer.sniff(self.sample3, delimiters="?,")
self.assertEqual(dialect.delimiter, "?")
dialect = sniffer.sniff(self.sample3, delimiters="/,")
self.assertEqual(dialect.delimiter, "/")
dialect = sniffer.sniff(self.sample4)
self.assertEqual(dialect.delimiter, ";")
dialect = sniffer.sniff(self.sample5)
self.assertEqual(dialect.delimiter, "\t")
dialect = sniffer.sniff(self.sample6)
self.assertEqual(dialect.delimiter, "|")
dialect = sniffer.sniff(self.sample7)
self.assertEqual(dialect.delimiter, "|")
self.assertEqual(dialect.quotechar, "'")
dialect = sniffer.sniff(self.sample8)
self.assertEqual(dialect.delimiter, '+')
dialect = sniffer.sniff(self.sample9)
self.assertEqual(dialect.delimiter, '+')
self.assertEqual(dialect.quotechar, "'")
def test_delimiters(self):
sniffer = csv.Sniffer()
dialect = sniffer.sniff(self.sample3)
# given that all three lines in sample3 are equal,
# I think that any character could have been 'guessed' as the
# delimiter, depending on dictionary order
self.assertIn(dialect.delimiter, self.sample3)
dialect = sniffer.sniff(self.sample3, delimiters="?,")
self.assertEqual(dialect.delimiter, "?")
dialect = sniffer.sniff(self.sample3, delimiters="/,")
self.assertEqual(dialect.delimiter, "/")
dialect = sniffer.sniff(self.sample4)
self.assertEqual(dialect.delimiter, ";")
dialect = sniffer.sniff(self.sample5)
self.assertEqual(dialect.delimiter, "\t")
dialect = sniffer.sniff(self.sample6)
self.assertEqual(dialect.delimiter, "|")
dialect = sniffer.sniff(self.sample7)
self.assertEqual(dialect.delimiter, "|")
self.assertEqual(dialect.quotechar, "'")
dialect = sniffer.sniff(self.sample8)
self.assertEqual(dialect.delimiter, '+')
dialect = sniffer.sniff(self.sample9)
self.assertEqual(dialect.delimiter, '+')
self.assertEqual(dialect.quotechar, "'")
def from_csv(fp, field_names = None, **kwargs):
dialect = csv.Sniffer().sniff(fp.read(1024))
fp.seek(0)
reader = csv.reader(fp, dialect)
table = PrettyTable(**kwargs)
if field_names:
table.field_names = field_names
else:
if py3k:
table.field_names = [x.strip() for x in next(reader)]
else:
table.field_names = [x.strip() for x in reader.next()]
for row in reader:
table.add_row([x.strip() for x in row])
return table
def test_delimiters(self):
sniffer = csv.Sniffer()
dialect = sniffer.sniff(self.sample3)
# given that all three lines in sample3 are equal,
# I think that any character could have been 'guessed' as the
# delimiter, depending on dictionary order
self.assertIn(dialect.delimiter, self.sample3)
dialect = sniffer.sniff(self.sample3, delimiters="?,")
self.assertEqual(dialect.delimiter, "?")
dialect = sniffer.sniff(self.sample3, delimiters="/,")
self.assertEqual(dialect.delimiter, "/")
dialect = sniffer.sniff(self.sample4)
self.assertEqual(dialect.delimiter, ";")
dialect = sniffer.sniff(self.sample5)
self.assertEqual(dialect.delimiter, "\t")
dialect = sniffer.sniff(self.sample6)
self.assertEqual(dialect.delimiter, "|")
dialect = sniffer.sniff(self.sample7)
self.assertEqual(dialect.delimiter, "|")
self.assertEqual(dialect.quotechar, "'")
def from_csv(fp, field_names = None, **kwargs):
dialect = csv.Sniffer().sniff(fp.read(1024))
fp.seek(0)
reader = csv.reader(fp, dialect)
table = PrettyTable(**kwargs)
if field_names:
table.field_names = field_names
else:
if py3k:
table.field_names = [x.strip() for x in next(reader)]
else:
table.field_names = [x.strip() for x in reader.next()]
for row in reader:
table.add_row([x.strip() for x in row])
return table
def from_csv(fp, field_names = None, **kwargs):
dialect = csv.Sniffer().sniff(fp.read(1024))
fp.seek(0)
reader = csv.reader(fp, dialect)
table = PrettyTable(**kwargs)
if field_names:
table.field_names = field_names
else:
if py3k:
table.field_names = [x.strip() for x in next(reader)]
else:
table.field_names = [x.strip() for x in reader.next()]
for row in reader:
table.add_row([x.strip() for x in row])
return table
def from_csv(fp, field_names = None, **kwargs):
dialect = csv.Sniffer().sniff(fp.read(1024))
fp.seek(0)
reader = csv.reader(fp, dialect)
table = PrettyTable(**kwargs)
if field_names:
table.field_names = field_names
else:
if py3k:
table.field_names = [x.strip() for x in next(reader)]
else:
table.field_names = [x.strip() for x in reader.next()]
for row in reader:
table.add_row([x.strip() for x in row])
return table