def test_delimiters(self):
sniffer = csv.Sniffer()
dialect = sniffer.sniff(self.sample3)
# given that all three lines in sample3 are equal,
# I think that any character could have been 'guessed' as the
# delimiter, depending on dictionary order
self.assertIn(dialect.delimiter, self.sample3)
dialect = sniffer.sniff(self.sample3, delimiters="?,")
self.assertEqual(dialect.delimiter, "?")
dialect = sniffer.sniff(self.sample3, delimiters="/,")
self.assertEqual(dialect.delimiter, "/")
dialect = sniffer.sniff(self.sample4)
self.assertEqual(dialect.delimiter, ";")
dialect = sniffer.sniff(self.sample5)
self.assertEqual(dialect.delimiter, "\t")
dialect = sniffer.sniff(self.sample6)
self.assertEqual(dialect.delimiter, "|")
dialect = sniffer.sniff(self.sample7)
self.assertEqual(dialect.delimiter, "|")
self.assertEqual(dialect.quotechar, "'")
dialect = sniffer.sniff(self.sample8)
self.assertEqual(dialect.delimiter, '+')
dialect = sniffer.sniff(self.sample9)
self.assertEqual(dialect.delimiter, '+')
self.assertEqual(dialect.quotechar, "'")
python类Sniffer()的实例源码
def load_input_data(points):
"""Creates DictReader from *.csv file.
:param points (file object):
*.csv file with
'lon' (required),
'lat' (required),
'name' (optional) columns.
Returns:
data (csv.DictReader)
"""
dialect = csv.Sniffer().sniff(points.read())
points.seek(0)
data = csv.DictReader(points, dialect=dialect)
return data
def loadDataset(fileName):
with open(fileName, 'rU') as trainingInput:
# detect the "dialect" of this type of csv file
try:
dialect = csv.Sniffer().sniff(trainingInput.read(1024))
except:
# if we fail to detect the dialect, defautl to Microsoft Excel
dialect = 'excel'
trainingInput.seek(0)
trainingRows = csv.reader(trainingInput, dialect)
allTweets = []
allTweetSentiments = []
entireDataset = []
for row in trainingRows:
# csv only gives us an iterable, not the data itself
entireDataset.append(row)
return entireDataset
def from_csv(fp, field_names = None, **kwargs):
dialect = csv.Sniffer().sniff(fp.read(1024))
fp.seek(0)
reader = csv.reader(fp, dialect)
table = PrettyTable(**kwargs)
if field_names:
table.field_names = field_names
else:
if py3k:
table.field_names = [x.strip() for x in next(reader)]
else:
table.field_names = [x.strip() for x in reader.next()]
for row in reader:
table.add_row([x.strip() for x in row])
return table
def process_coordinates(self, processor_handler, output):
in_cols = self.in_cols
out_cols= self.out_cols
with open(self.datafile.name) as file_in:
# detect delimiter
dialect = csv.Sniffer().sniff(file_in.read(2048), delimiters=',\t')
file_in.seek(0)
reader = csv.reader(file_in, dialect)
writer = csv.writer(output, dialect)
for row in reader:
new_row = row[:] # copy
try:
x = float(row[in_cols[0]])
y = float(row[in_cols[1]])
except: # go to next column if not number
writer.writerow(new_row)
continue
# translate
x, y = processor_handler(x, y)
new_row[out_cols[0]] = int(x)
new_row[out_cols[1]] = int(y)
writer.writerow(new_row)
def generate_rows(f):
sniffer = csv.Sniffer()
dialect = sniffer.sniff(f.readline())
f.seek(0)
reader = csv.reader(f, dialect)
for line in reader:
yield line
def load_data(year):
'''
Load data into memory cache
'''
year = str(year)
if year in CACHE:
return True
data_file = os.path.join(
os.path.dirname(__file__), 'data', '{}.csv'.format(year)
)
if not os.path.isfile(data_file):
return False
CACHE[year] = {}
with io.open(data_file, encoding='utf-8') as rf:
# Detect CSV header line
has_header = csv.Sniffer().has_header(rf.read(1024))
rf.seek(0)
reader = csv.DictReader(rf, DATA_FIELDS)
if has_header:
next(reader)
for data_line in reader:
day = clean_up_dict(data_line)
# Convert into `int` type so we don't need to parse it afterwards
dt = datetime.strptime(day['date'], '%Y-%m-%d')
day['year'] = dt.year
day['month'] = dt.month
day['day'] = dt.day
day['isholiday'] = bool(int(day['isholiday']))
day['isworkday'] = bool(int(day['isworkday']))
CACHE[year][day.pop('date')] = day
return True
def _import_phenolist_csv(f, has_header):
# Note: If a csv (1) contains commas in quoted cells and (2) doesn't have any line that starts with a quoted cell,
# then sometimes this makes very bad choices.
# In particular, if all lines have the same number of some other character (even a letter), that character might become the delimeter.
try:
dialect = csv.Sniffer().sniff(f.read(4096))
except Exception as exc:
raise PheWebError("Sniffing csv format failed. Check that your csv file is well-formed. If it is, try delimiting with tabs or semicolons.") from exc
if dialect.delimiter in string.ascii_letters or dialect.delimiter in string.digits:
raise PheWebError("Our csv sniffer decided that {!r} looks like the most likely delimiter in your csv file, but that's crazy.")
f.seek(0)
try:
rows = list(csv.reader(f, dialect))
except ValueError:
return None
num_cols = len(rows[0])
if has_header:
fieldnames, rows = rows[0], rows[1:]
if any(fieldname is None or fieldname == '' for fieldname in fieldnames):
if has_header == 'augment':
fieldnames = [i if fieldname is None else fieldname for i, fieldname in enumerate(fieldnames)]
else:
raise PheWebError('bad csv header')
assert len(set(fieldnames)) == len(fieldnames)
else:
fieldnames = list(range(num_cols))
return [{fieldnames[i]: row[i] for i in range(num_cols)} for row in rows]
def read_key_value_file(csvfile):
"""Reads CSV file, parses content into dict
Args:
csvfile (FILE): Readable file
Returns:
DICT: Dictionary containing file content
"""
kvstore = {} # init key value store
first_line = csvfile.readline()
if 'key' not in first_line or 'value' not in first_line:
csvfile.seek(0) # Seek to start if first_line is not an header
dialect = csv.Sniffer().sniff(first_line, delimiters=',\t')
reader = csv.reader(csvfile, dialect) # create reader
for row in reader:
kvstore[row[0]] = row[1]
return kvstore
def csv_col_current(pl, segment_info, display_name='auto', name_format=' ({column_name:.15})'):
'''Display CSV column number and column name
Requires filetype to be set to ``csv``.
:param bool or str name:
May be ``True``, ``False`` and ``"auto"``. In the first case value from
the first raw will always be displayed. In the second case it will never
be displayed. In thi last case ``csv.Sniffer().has_header()`` will be
used to detect whether current file contains header in the first column.
:param str name_format:
String used to format column name (in case ``display_name`` is set to
``True`` or ``"auto"``). Accepts ``column_name`` keyword argument.
Highlight groups used: ``csv:column_number`` or ``csv``, ``csv:column_name`` or ``csv``.
'''
if vim_getbufoption(segment_info, 'filetype') != 'csv':
return None
line, col = segment_info['window'].cursor
column_number, column_name = process_csv_buffer(pl, segment_info['buffer'], line, col, display_name)
if not column_number:
return None
return [{
'contents': column_number,
'highlight_groups': ['csv:column_number', 'csv'],
}] + ([{
'contents': name_format.format(column_name=column_name),
'highlight_groups': ['csv:column_name', 'csv'],
}] if column_name else [])
def attach_file(self, filename, text=None, tabular=False, syntax='auto', fileinfo=False):
attachment = Attachment()
if tabular:
syntax = None
(mime, _) = mimetypes.guess_type(filename)
attachment.title = os.path.basename(filename)
if text is None:
with open(filename, 'rUb') as f:
text = f.read().decode('utf-8')
if tabular:
csvfile = StringIO(text.strip())
if tabular == 'sniff':
dialect = csv.Sniffer().sniff(text)
else:
dialect = tabular
text = md_table(csv.reader(csvfile, dialect))
elif syntax == 'auto':
syntax = detect_syntax(attachment.title, mime)
if syntax is not None:
text = md_code(text, syntax)
attachment.text = text
if fileinfo:
statinfo = os.stat(filename)
attachment.add_field('Size', sizeof_fmt(statinfo.st_size), True)
attachment.add_field('Mime', mime, True)
self.attachments.append(attachment)
return attachment
def send(channel, message='', filename=False, url=None, username=None,
icon=None, syntax='auto', tabular=False, fileinfo=False,
just_return=False, config_section='DEFAULT',
config_name='mattersend', config_file=None):
msg = Message(channel, url, username, icon, config_section,
config_name, config_file)
if filename:
if syntax == 'none':
syntax = None
msg.attach_file(filename, None, tabular, syntax, fileinfo)
else:
if tabular:
syntax = None
csvfile = StringIO(message.strip())
if tabular == 'sniff':
dialect = csv.Sniffer().sniff(message)
else:
dialect = tabular
message = md_table(csv.reader(csvfile, dialect))
elif syntax in ('auto', 'none'):
syntax = None
if syntax is not None:
message = md_code(message, syntax)
msg.text = message
if just_return:
payload = msg.get_payload()
return "POST {}\n{}".format(msg.url, payload)
msg.send()
def loadMatrix(filepath):
"""
use pandas to load the csv file into the dataframe,
using a header if appropriate
"""
with open(filepath, 'rbU') as csvfile:
snippet = csvfile.read(2048)
sniffer = csv.Sniffer()
dialect = sniffer.sniff(snippet)
if sniffer.has_header(snippet):
df = pd.read_csv(filepath, dialect=dialect)
else:
df = pd.read_csv(filepath, dialect=dialect, header=None)
return df
def sniff_file(csvfile):
chunk = "\n".join([x for _,x in zip(range(50), strip_comments(csvfile))])
try:
return csv.Sniffer().sniff(chunk, "\t|, ")
except:
return None
def test_has_header(self):
sniffer = csv.Sniffer()
self.assertEqual(sniffer.has_header(self.sample1), False)
self.assertEqual(sniffer.has_header(self.header+self.sample1), True)
def test_doublequote(self):
sniffer = csv.Sniffer()
dialect = sniffer.sniff(self.header)
self.assertFalse(dialect.doublequote)
dialect = sniffer.sniff(self.sample2)
self.assertTrue(dialect.doublequote)
def get_delimiter(path):
with open(path, 'rb') as csvfile:
return csv.Sniffer().sniff(csvfile.read(), delimiters=';,').delimiter
# Gets the ratio of missing values to existing values in a dataframe. Either operates on rows or columns, depending
# on input.
def test_has_header(self):
sniffer = csv.Sniffer()
self.assertEqual(sniffer.has_header(self.sample1), False)
self.assertEqual(sniffer.has_header(self.header1 + self.sample1),
True)