def test_showwarning(self):
file_name = os.path.splitext(warning_tests.__file__)[0] + '.py'
line_num = 3
expected_file_line = linecache.getline(file_name, line_num).strip()
message = 'msg'
category = Warning
file_object = StringIO.StringIO()
expect = self.module.formatwarning(message, category, file_name,
line_num)
self.module.showwarning(message, category, file_name, line_num,
file_object)
self.assertEqual(file_object.getvalue(), expect)
# Test 'line' argument.
expected_file_line += "for the win!"
expect = self.module.formatwarning(message, category, file_name,
line_num, expected_file_line)
file_object = StringIO.StringIO()
self.module.showwarning(message, category, file_name, line_num,
file_object, expected_file_line)
self.assertEqual(expect, file_object.getvalue())
python类getline()的实例源码
def GetText(self):
frame, lineno = self.info
try:
modname = frame.f_globals["__name__"]
except:
modname = "?"
code = frame.f_code
filename = code.co_filename
funcname = code.co_name
sourceline = linecache.getline(filename, lineno)
sourceline = sourceline.strip()
if funcname in ("?", "", None):
item = "%s, line %d: %s" % (modname, lineno, sourceline)
else:
item = "%s.%s(...), line %d: %s" % (modname, funcname,
lineno, sourceline)
return item
def __iter__(self):
""" Return an iterator to the data. Yield the value for self.key
from each object
"""
start = 0
while start < self.num_entries:
end = min(self.num_entries, start + self.chunk_size)
# linecache line numbering starts at 1
batch = [
json.loads(
linecache.getline(
self.source,
i + 1
).strip()
)[self.key]
for i in self.indices[start:end]
]
yield batch
start = end
###########################################################################
def __iter__(self):
""" Return an iterator to the data. Get the value (tensor) for self.key
from each object and yield batches of these tensors
"""
start = 0
while start < self.num_entries:
end = min(self.num_entries, start + self.chunk_size)
# linecache line numbering starts at 1
batch = [
json.loads(linecache.getline(self.source, i + 1).strip())[self.key]
for i in self.indices[start:end]
]
yield batch
start = end
###########################################################################
def checkline(self, filename, lineno):
"""Check whether specified line seems to be executable.
Return `lineno` if it is, 0 if not (e.g. a docstring, comment, blank
line or EOF). Warning: testing is not comprehensive.
"""
# this method should be callable before starting debugging, so default
# to "no globals" if there is no current frame
globs = self.curframe.f_globals if hasattr(self, 'curframe') else None
line = linecache.getline(filename, lineno, globs)
if not line:
print >>self.stdout, 'End of file'
return 0
line = line.strip()
# Don't allow setting breakpoint at a blank line
if (not line or (line[0] == '#') or
(line[:3] == '"""') or line[:3] == "'''"):
print >>self.stdout, '*** Blank or comment'
return 0
return lineno
def get_code(self, max_lines=3, tabbed=False):
'''Gets lines of code from a file the generated this issue.
:param max_lines: Max lines of context to return
:param tabbed: Use tabbing in the output
:return: strings of code
'''
lines = []
max_lines = max(max_lines, 1)
lmin = max(1, self.lineno - max_lines // 2)
lmax = lmin + len(self.linerange) + max_lines - 1
tmplt = "%i\t%s" if tabbed else "%i %s"
for line in moves.xrange(lmin, lmax):
text = linecache.getline(self.fname, line)
if isinstance(text, bytes):
text = text.decode('utf-8', 'ignore')
if not len(text):
break
lines.append(tmplt % (line, text))
return ''.join(lines)
def __iter__(self):
while(True):
buser = []
bitem = []
brate = []
if (not linecache.getline(self.fname, self.index_start + self.batch_size)):
return
for i in range(self.index_start, self.index_start + self.batch_size):
line = linecache.getline(self.fname, i)
lines = line.strip().split('::')
if(len(lines) != 4):
continue
line_user, line_item, line_rate, _ = lines
buser.append(line_user)
bitem.append(line_item)
brate.append(line_rate)
data_all = [mx.nd.array(buser), mx.nd.array(bitem)]
label_all = [mx.nd.array(brate)]
data_names = ['user', 'item']
label_names = ['rate']
self.index_start += self.batch_size
data_batch = Batch(data_names, data_all, label_names, label_all)
yield data_batch
def __iter__(self):
while(True):
bdata = []
blabel = []
if (not linecache.getline(self.fname, self.index_start + self.batch_size)):
return
for i in range(self.index_start, self.index_start + self.batch_size):
line = linecache.getline(self.fname, i)
line_label, line_data = line.strip().split('\t',1)
blabel.append(line_label)
bdata.append(np.array(line_data.split('\t')))
data_all = [mx.nd.array(bdata)]
label_all = [mx.nd.array(blabel)]
data_names = ['data']
label_names = ['softmax_label']
self.index_start += self.batch_size
data_batch = Batch(data_names, data_all, label_names, label_all)
yield data_batch
def checkline(self, filename, lineno):
"""Check whether specified line seems to be executable.
Return `lineno` if it is, 0 if not (e.g. a docstring, comment, blank
line or EOF). Warning: testing is not comprehensive.
"""
# this method should be callable before starting debugging, so default
# to "no globals" if there is no current frame
globs = self.curframe.f_globals if hasattr(self, 'curframe') else None
line = linecache.getline(filename, lineno, globs)
if not line:
self.message('End of file')
return 0
line = line.strip()
# Don't allow setting breakpoint at a blank line
if (not line or (line[0] == '#') or
(line[:3] == '"""') or line[:3] == "'''"):
self.error('Blank or comment')
return 0
return lineno
def print_tb(tb, limit=None, file=None):
"""Print up to 'limit' stack trace entries from the traceback 'tb'.
If 'limit' is omitted or None, all entries are printed. If 'file'
is omitted or None, the output goes to sys.stderr; otherwise
'file' should be an open file or file-like object with a write()
method.
"""
if file is None:
file = sys.stderr
if limit is None:
if hasattr(sys, 'tracebacklimit'):
limit = sys.tracebacklimit
n = 0
while tb is not None and (limit is None or n < limit):
f = tb.tb_frame
lineno = tb.tb_lineno
co = f.f_code
filename = co.co_filename
name = co.co_name
_print(file,
' File "%s", line %d, in %s' % (filename, lineno, name))
linecache.checkcache(filename)
line = linecache.getline(filename, lineno, f.f_globals)
if line: _print(file, ' ' + line.strip())
tb = tb.tb_next
n = n+1
def extract_tb(tb, limit = None):
"""Return list of up to limit pre-processed entries from traceback.
This is useful for alternate formatting of stack traces. If
'limit' is omitted or None, all entries are extracted. A
pre-processed stack trace entry is a quadruple (filename, line
number, function name, text) representing the information that is
usually printed for a stack trace. The text is a string with
leading and trailing whitespace stripped; if the source is not
available it is None.
"""
if limit is None:
if hasattr(sys, 'tracebacklimit'):
limit = sys.tracebacklimit
list = []
n = 0
while tb is not None and (limit is None or n < limit):
f = tb.tb_frame
lineno = tb.tb_lineno
co = f.f_code
filename = co.co_filename
name = co.co_name
linecache.checkcache(filename)
line = linecache.getline(filename, lineno, f.f_globals)
if line: line = line.strip()
else: line = None
list.append((filename, lineno, name, line))
tb = tb.tb_next
n = n+1
return list
def extract_stack(f=None, limit = None):
"""Extract the raw traceback from the current stack frame.
The return value has the same format as for extract_tb(). The
optional 'f' and 'limit' arguments have the same meaning as for
print_stack(). Each item in the list is a quadruple (filename,
line number, function name, text), and the entries are in order
from oldest to newest stack frame.
"""
if f is None:
try:
raise ZeroDivisionError
except ZeroDivisionError:
f = sys.exc_info()[2].tb_frame.f_back
if limit is None:
if hasattr(sys, 'tracebacklimit'):
limit = sys.tracebacklimit
list = []
n = 0
while f is not None and (limit is None or n < limit):
lineno = f.f_lineno
co = f.f_code
filename = co.co_filename
name = co.co_name
linecache.checkcache(filename)
line = linecache.getline(filename, lineno, f.f_globals)
if line: line = line.strip()
else: line = None
list.append((filename, lineno, name, line))
f = f.f_back
n = n+1
list.reverse()
return list
def set_break(self, filename, lineno, temporary=0, cond = None,
funcname=None):
filename = self.canonic(filename)
import linecache # Import as late as possible
line = linecache.getline(filename, lineno)
if not line:
return 'Line %s:%d does not exist' % (filename,
lineno)
if not filename in self.breaks:
self.breaks[filename] = []
list = self.breaks[filename]
if not lineno in list:
list.append(lineno)
bp = Breakpoint(filename, lineno, temporary, cond, funcname)
def format_stack_entry(self, frame_lineno, lprefix=': '):
import linecache, repr
frame, lineno = frame_lineno
filename = self.canonic(frame.f_code.co_filename)
s = '%s(%r)' % (filename, lineno)
if frame.f_code.co_name:
s = s + frame.f_code.co_name
else:
s = s + "<lambda>"
if '__args__' in frame.f_locals:
args = frame.f_locals['__args__']
else:
args = None
if args:
s = s + repr.repr(args)
else:
s = s + '()'
if '__return__' in frame.f_locals:
rv = frame.f_locals['__return__']
s = s + '->'
s = s + repr.repr(rv)
line = linecache.getline(filename, lineno, frame.f_globals)
if line: s = s + lprefix + line.strip()
return s
# The following two methods can be called by clients to use
# a debugger to debug a statement, given as a string.
def user_line(self, frame):
import linecache
name = frame.f_code.co_name
if not name: name = '???'
fn = self.canonic(frame.f_code.co_filename)
line = linecache.getline(fn, frame.f_lineno, frame.f_globals)
print '+++', fn, frame.f_lineno, name, ':', line.strip()
def localtrace_trace_and_count(self, frame, why, arg):
if why == "line":
# record the file name and line number of every trace
filename = frame.f_code.co_filename
lineno = frame.f_lineno
key = filename, lineno
self.counts[key] = self.counts.get(key, 0) + 1
if self.start_time:
print '%.2f' % (time.time() - self.start_time),
bname = os.path.basename(filename)
print "%s(%d): %s" % (bname, lineno,
linecache.getline(filename, lineno)),
return self.localtrace
def localtrace_trace(self, frame, why, arg):
if why == "line":
# record the file name and line number of every trace
filename = frame.f_code.co_filename
lineno = frame.f_lineno
if self.start_time:
print '%.2f' % (time.time() - self.start_time),
bname = os.path.basename(filename)
print "%s(%d): %s" % (bname, lineno,
linecache.getline(filename, lineno)),
return self.localtrace
def __generateMapper(self):
Partitions = self.__readPartitions()
i = 0
for p in Partitions.keys():
endPoints = Partitions[p]
if os.path.exists(self.list_file):
for i in range(int(endPoints[0]), int(endPoints[1]) + 1):
ip = linecache.getline(self.list_file, i).strip().strip(',')
(self.__IpMapDict)[ip] = endPoints[2]
else:
print('*** The file \'' + self.list_file + '\' could not be read\n')
raise IOError('The file \'' + self.list_file + '\' could not be read\n')
def formatwarning(message, category, filename, lineno, line=None):
"""Function to format a warning the standard way."""
try:
unicodetype = unicode
except NameError:
unicodetype = ()
try:
message = str(message)
except UnicodeEncodeError:
pass
s = "%s: %s: %s\n" % (lineno, category.__name__, message)
line = linecache.getline(filename, lineno) if line is None else line
if line:
line = line.strip()
if isinstance(s, unicodetype) and isinstance(line, str):
line = unicode(line, 'latin1')
s += " %s\n" % line
if isinstance(s, unicodetype) and isinstance(filename, str):
enc = sys.getfilesystemencoding()
if enc:
try:
filename = unicode(filename, enc)
except UnicodeDecodeError:
pass
s = "%s:%s" % (filename, s)
return s
def _disable_linecache():
import linecache
def fake_getline(*args, **kwargs):
return ''
linecache.orig_getline = linecache.getline
linecache.getline = fake_getline