def getsource(self, astcache=None):
""" return failing source code. """
# we use the passed in astcache to not reparse asttrees
# within exception info printing
from _pytest._code.source import getstatementrange_ast
source = self.frame.code.fullsource
if source is None:
return None
key = astnode = None
if astcache is not None:
key = self.frame.code.path
if key is not None:
astnode = astcache.get(key, None)
start = self.getfirstlinesource()
try:
astnode, _, end = getstatementrange_ast(self.lineno, source,
astnode=astnode)
except SyntaxError:
end = self.lineno + 1
else:
if key is not None:
astcache[key] = astnode
return source[start:end]
python类path()的实例源码
def recursionindex(self):
""" return the index of the frame/TracebackEntry where recursion
originates if appropriate, None if no recursion occurred
"""
cache = {}
for i, entry in enumerate(self):
# id for the code.raw is needed to work around
# the strange metaprogramming in the decorator lib from pypi
# which generates code objects that have hash/value equality
#XXX needs a test
key = entry.frame.code.path, id(entry.frame.code.raw), entry.lineno
#print "checking for recursion at", key
l = cache.setdefault(key, [])
if l:
f = entry.frame
loc = f.f_locals
for otherloc in l:
if f.is_true(f.eval(co_equal,
__recursioncache_locals_1=loc,
__recursioncache_locals_2=otherloc)):
return i
l.append(entry.frame.f_locals)
return None
def get_common_ancestor(args):
# args are what we get after early command line parsing (usually
# strings, but can be py.path.local objects as well)
common_ancestor = None
for arg in args:
if str(arg)[0] == "-":
continue
p = py.path.local(arg)
if common_ancestor is None:
common_ancestor = p
else:
if p.relto(common_ancestor) or p == common_ancestor:
continue
elif common_ancestor.relto(p):
common_ancestor = p
else:
shared = p.common(common_ancestor)
if shared is not None:
common_ancestor = shared
if common_ancestor is None:
common_ancestor = py.path.local()
elif not common_ancestor.isdir():
common_ancestor = common_ancestor.dirpath()
return common_ancestor
def getsource(self, astcache=None):
""" return failing source code. """
# we use the passed in astcache to not reparse asttrees
# within exception info printing
from _pytest._code.source import getstatementrange_ast
source = self.frame.code.fullsource
if source is None:
return None
key = astnode = None
if astcache is not None:
key = self.frame.code.path
if key is not None:
astnode = astcache.get(key, None)
start = self.getfirstlinesource()
try:
astnode, _, end = getstatementrange_ast(self.lineno, source,
astnode=astnode)
except SyntaxError:
end = self.lineno + 1
else:
if key is not None:
astcache[key] = astnode
return source[start:end]
def recursionindex(self):
""" return the index of the frame/TracebackEntry where recursion
originates if appropriate, None if no recursion occurred
"""
cache = {}
for i, entry in enumerate(self):
# id for the code.raw is needed to work around
# the strange metaprogramming in the decorator lib from pypi
# which generates code objects that have hash/value equality
#XXX needs a test
key = entry.frame.code.path, id(entry.frame.code.raw), entry.lineno
#print "checking for recursion at", key
l = cache.setdefault(key, [])
if l:
f = entry.frame
loc = f.f_locals
for otherloc in l:
if f.is_true(f.eval(co_equal,
__recursioncache_locals_1=loc,
__recursioncache_locals_2=otherloc)):
return i
l.append(entry.frame.f_locals)
return None
def getsource(self, astcache=None):
""" return failing source code. """
# we use the passed in astcache to not reparse asttrees
# within exception info printing
from _pytest._code.source import getstatementrange_ast
source = self.frame.code.fullsource
if source is None:
return None
key = astnode = None
if astcache is not None:
key = self.frame.code.path
if key is not None:
astnode = astcache.get(key, None)
start = self.getfirstlinesource()
try:
astnode, _, end = getstatementrange_ast(self.lineno, source,
astnode=astnode)
except SyntaxError:
end = self.lineno + 1
else:
if key is not None:
astcache[key] = astnode
return source[start:end]
def recursionindex(self):
""" return the index of the frame/TracebackEntry where recursion
originates if appropriate, None if no recursion occurred
"""
cache = {}
for i, entry in enumerate(self):
# id for the code.raw is needed to work around
# the strange metaprogramming in the decorator lib from pypi
# which generates code objects that have hash/value equality
#XXX needs a test
key = entry.frame.code.path, id(entry.frame.code.raw), entry.lineno
#print "checking for recursion at", key
l = cache.setdefault(key, [])
if l:
f = entry.frame
loc = f.f_locals
for otherloc in l:
if f.is_true(f.eval(co_equal,
__recursioncache_locals_1=loc,
__recursioncache_locals_2=otherloc)):
return i
l.append(entry.frame.f_locals)
return None
def cut(self, path=None, lineno=None, firstlineno=None, excludepath=None):
""" return a Traceback instance wrapping part of this Traceback
by provding any combination of path, lineno and firstlineno, the
first frame to start the to-be-returned traceback is determined
this allows cutting the first part of a Traceback instance e.g.
for formatting reasons (removing some uninteresting bits that deal
with handling of the exception/traceback)
"""
for x in self:
code = x.frame.code
codepath = code.path
if ((path is None or codepath == path) and
(excludepath is None or not hasattr(codepath, 'relto') or
not codepath.relto(excludepath)) and
(lineno is None or x.lineno == lineno) and
(firstlineno is None or x.frame.code.firstlineno == firstlineno)):
return Traceback(x._rawentry, self._excinfo)
return self
def recursionindex(self):
""" return the index of the frame/TracebackEntry where recursion
originates if appropriate, None if no recursion occurred
"""
cache = {}
for i, entry in enumerate(self):
# id for the code.raw is needed to work around
# the strange metaprogramming in the decorator lib from pypi
# which generates code objects that have hash/value equality
#XXX needs a test
key = entry.frame.code.path, id(entry.frame.code.raw), entry.lineno
#print "checking for recursion at", key
l = cache.setdefault(key, [])
if l:
f = entry.frame
loc = f.f_locals
for otherloc in l:
if f.is_true(f.eval(co_equal,
__recursioncache_locals_1=loc,
__recursioncache_locals_2=otherloc)):
return i
l.append(entry.frame.f_locals)
return None
def cdload(path=None):
"""Load the ISO into the virtual CD-ROM device.
:param path: File path of ISO file to load if not sample.iso.
"""
path = path or 'sample.iso'
try:
run(['cdemu', 'load', '0', path])
except subprocess.CalledProcessError as exc:
if b'AlreadyLoaded' not in exc.stderr:
raise
loaded = cdstatus()
if not loaded or os.path.realpath(loaded) != os.path.realpath(path):
cdunload()
return cdload(path)
for _ in range(50):
if os.path.exists('/dev/cdrom'):
return
time.sleep(0.1)
if not os.path.exists('/dev/cdrom'):
raise IOError('Failed to load cdemu device!')
def test_broken_py_path_local_join_workaround_on_Windows(
self, tmpdir, initproj, monkeypatch):
# construct an absolute folder path for our src_root folder without the
# Windows drive indicator
src_root = tmpdir.join('spam')
src_root = _path_parts(src_root)
src_root[0] = ''
src_root = '/'.join(src_root)
# make sure tmpdir drive is the current one so the constructed src_root
# folder path gets interpreted correctly on Windows
monkeypatch.chdir(tmpdir)
# will throw an assertion error if the bug is not worked around
initproj('spam-666', src_root=src_root)
init_file = tmpdir.join('spam', 'spam', '__init__.py')
assert init_file.read_binary() == b"__version__ = '666'"
def test_make_repo(path1, tmpdir):
repo = tmpdir.join("repo")
py.process.cmdexec('svnadmin create %s' % repo)
if sys.platform == 'win32':
repo = '/' + str(repo).replace('\\', '/')
repo = py.path.svnurl("file://%s" % repo)
wc = py.path.svnwc(tmpdir.join("wc"))
wc.checkout(repo)
assert wc.rev == 0
assert len(wc.listdir()) == 0
p = wc.join("a_file")
p.write("test file")
p.add()
rev = wc.commit("some test")
assert p.info().rev == 1
assert rev == 1
rev = wc.commit()
assert rev is None
def test_exists_svn_root(self, path1):
assert path1.check()
#def test_not_exists_rev(self, path1):
# url = path1.__class__(path1url, rev=500)
# assert url.check(exists=0)
#def test_nonexisting_listdir_rev(self, path1):
# url = path1.__class__(path1url, rev=500)
# raises(py.error.ENOENT, url.listdir)
#def test_newrev(self, path1):
# url = path1.new(rev=None)
# assert url.rev == None
# assert url.strpath == path1.strpath
# url = path1.new(rev=10)
# assert url.rev == 10
#def test_info_rev(self, path1):
# url = path1.__class__(path1url, rev=1155)
# url = url.join("samplefile")
# res = url.info()
# assert res.size > len("samplefile") and res.created_rev == 1155
# the following tests are easier if we have a path class
def test_setmtime(self):
import tempfile
import time
try:
fd, name = tempfile.mkstemp()
py.std.os.close(fd)
except AttributeError:
name = tempfile.mktemp()
open(name, 'w').close()
try:
mtime = int(time.time())-100
path = local(name)
assert path.mtime() != mtime
path.setmtime(mtime)
assert path.mtime() == mtime
path.setmtime()
assert path.mtime() != mtime
finally:
py.std.os.remove(name)
def test_log(self):
u = svnurl_no_svn('http://foo.bar/svn/foo', auth=self.auth)
u.popen_output = py.io.TextIO(py.builtin._totext('''\
<?xml version="1.0"?>
<log>
<logentry revision="51381">
<author>guido</author>
<date>2008-02-11T12:12:18.476481Z</date>
<msg>Creating branch to work on auth support for py.path.svn*.
</msg>
</logentry>
</log>
''', 'ascii'))
u.check = lambda *args, **kwargs: True
ret = u.log(10, 20, verbose=True)
assert '--username="foo" --password="bar"' in u.commands[0]
assert len(ret) == 1
assert int(ret[0].rev) == 51381
assert ret[0].author == 'guido'
def __init__(self, request):
if not svnbin:
py.test.skip("svn binary required")
if not request.config.option.runslowtests:
py.test.skip('use --runslowtests to run these tests')
tmpdir = request.getfuncargvalue("tmpdir")
repodir = tmpdir.join("repo")
py.process.cmdexec('svnadmin create %s' % repodir)
if sys.platform == 'win32':
repodir = '/' + str(repodir).replace('\\', '/')
self.repo = py.path.svnurl("file://%s" % repodir)
if py.std.sys.platform == 'win32':
# remove trailing slash...
repodir = repodir[1:]
self.repopath = py.path.local(repodir)
self.temppath = tmpdir.mkdir("temppath")
self.auth = SvnAuth('johnny', 'foo', cache_auth=False,
interactive=False)
make_repo_auth(self.repopath, {'johnny': ('foo', 'rw')})
self.port, self.pid = serve_bg(self.repopath.dirpath())
# XXX caching is too global
py.path.svnurl._lsnorevcache._dict.clear()
request.addfinalizer(lambda: py.process.kill(self.pid))
def test_update(self, setup):
wc1 = py.path.svnwc(setup.temppath.ensure('wc1', dir=True),
auth=setup.auth)
wc2 = py.path.svnwc(setup.temppath.ensure('wc2', dir=True),
auth=setup.auth)
wc1.checkout(
'svn://localhost:%s/%s' % (setup.port, setup.repopath.basename))
wc2.checkout(
'svn://localhost:%s/%s' % (setup.port, setup.repopath.basename))
wc1.ensure('foo', dir=True)
wc1.commit('added foo dir')
wc2.update()
assert wc2.join('foo').check()
auth = SvnAuth('unknown', 'unknown', interactive=False)
wc2.auth = auth
py.test.raises(Exception, 'wc2.update()')
def test_lock_unlock_status(self, setup):
port = setup.port
wc = py.path.svnwc(setup.temppath, auth=setup.auth)
wc.checkout(
'svn://localhost:%s/%s' % (port, setup.repopath.basename,))
wc.ensure('foo', file=True)
wc.commit('added foo file')
foo = wc.join('foo')
foo.lock()
status = foo.status()
assert status.locked
foo.unlock()
status = foo.status()
assert not status.locked
auth = SvnAuth('unknown', 'unknown', interactive=False)
foo.auth = auth
py.test.raises(Exception, 'foo.lock()')
py.test.raises(Exception, 'foo.unlock()')
def test_copy(self, setup):
port = setup.port
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, setup.repopath.basename),
auth=setup.auth)
foo = u.mkdir('foo')
assert foo.check()
bar = u.join('bar')
foo.copy(bar)
assert bar.check()
assert bar.auth is setup.auth
auth = SvnAuth('foo', 'bar', interactive=False)
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, setup.repopath.basename),
auth=auth)
foo = u.join('foo')
bar = u.join('bar')
py.test.raises(Exception, 'foo.copy(bar)')
def test_write_read(self, setup):
port = setup.port
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, setup.repopath.basename),
auth=setup.auth)
foo = u.ensure('foo')
fp = foo.open()
try:
data = fp.read()
finally:
fp.close()
assert data == ''
auth = SvnAuth('foo', 'bar', interactive=False)
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, setup.repopath.basename),
auth=auth)
foo = u.join('foo')
py.test.raises(Exception, 'foo.open()')
# XXX rinse, repeat... :|
def test_make_repo(path1, tmpdir):
repo = tmpdir.join("repo")
py.process.cmdexec('svnadmin create %s' % repo)
if sys.platform == 'win32':
repo = '/' + str(repo).replace('\\', '/')
repo = py.path.svnurl("file://%s" % repo)
wc = py.path.svnwc(tmpdir.join("wc"))
wc.checkout(repo)
assert wc.rev == 0
assert len(wc.listdir()) == 0
p = wc.join("a_file")
p.write("test file")
p.add()
rev = wc.commit("some test")
assert p.info().rev == 1
assert rev == 1
rev = wc.commit()
assert rev is None
def test_exists_svn_root(self, path1):
assert path1.check()
#def test_not_exists_rev(self, path1):
# url = path1.__class__(path1url, rev=500)
# assert url.check(exists=0)
#def test_nonexisting_listdir_rev(self, path1):
# url = path1.__class__(path1url, rev=500)
# raises(py.error.ENOENT, url.listdir)
#def test_newrev(self, path1):
# url = path1.new(rev=None)
# assert url.rev == None
# assert url.strpath == path1.strpath
# url = path1.new(rev=10)
# assert url.rev == 10
#def test_info_rev(self, path1):
# url = path1.__class__(path1url, rev=1155)
# url = url.join("samplefile")
# res = url.info()
# assert res.size > len("samplefile") and res.created_rev == 1155
# the following tests are easier if we have a path class
def test_setmtime(self):
import tempfile
import time
try:
fd, name = tempfile.mkstemp()
os.close(fd)
except AttributeError:
name = tempfile.mktemp()
open(name, 'w').close()
try:
mtime = int(time.time())-100
path = local(name)
assert path.mtime() != mtime
path.setmtime(mtime)
assert path.mtime() == mtime
path.setmtime()
assert path.mtime() != mtime
finally:
os.remove(name)
def make_repo_auth(repo, userdata):
""" write config to repo
user information in userdata is used for auth
userdata has user names as keys, and a tuple (password, readwrite) as
values, where 'readwrite' is either 'r' or 'rw'
"""
confdir = py.path.local(repo).join('conf')
confdir.join('svnserve.conf').write('''\
[general]
anon-access = none
password-db = passwd
authz-db = authz
realm = TestRepo
''')
authzdata = '[/]\n'
passwddata = '[users]\n'
for user in userdata:
authzdata += '%s = %s\n' % (user, userdata[user][1])
passwddata += '%s = %s\n' % (user, userdata[user][0])
confdir.join('authz').write(authzdata)
confdir.join('passwd').write(passwddata)
def test_log(self):
u = svnurl_no_svn('http://foo.bar/svn/foo', auth=self.auth)
u.popen_output = py.io.TextIO(py.builtin._totext('''\
<?xml version="1.0"?>
<log>
<logentry revision="51381">
<author>guido</author>
<date>2008-02-11T12:12:18.476481Z</date>
<msg>Creating branch to work on auth support for py.path.svn*.
</msg>
</logentry>
</log>
''', 'ascii'))
u.check = lambda *args, **kwargs: True
ret = u.log(10, 20, verbose=True)
assert '--username="foo" --password="bar"' in u.commands[0]
assert len(ret) == 1
assert int(ret[0].rev) == 51381
assert ret[0].author == 'guido'
def __init__(self, request):
if not svnbin:
py.test.skip("svn binary required")
if not request.config.option.runslowtests:
py.test.skip('use --runslowtests to run these tests')
tmpdir = request.getfuncargvalue("tmpdir")
repodir = tmpdir.join("repo")
py.process.cmdexec('svnadmin create %s' % repodir)
if sys.platform == 'win32':
repodir = '/' + str(repodir).replace('\\', '/')
self.repo = py.path.svnurl("file://%s" % repodir)
if sys.platform == 'win32':
# remove trailing slash...
repodir = repodir[1:]
self.repopath = py.path.local(repodir)
self.temppath = tmpdir.mkdir("temppath")
self.auth = SvnAuth('johnny', 'foo', cache_auth=False,
interactive=False)
make_repo_auth(self.repopath, {'johnny': ('foo', 'rw')})
self.port, self.pid = serve_bg(self.repopath.dirpath())
# XXX caching is too global
py.path.svnurl._lsnorevcache._dict.clear()
request.addfinalizer(lambda: py.process.kill(self.pid))
def test_switch(self, setup):
import pytest
try:
import xdist
pytest.skip('#160: fails under xdist')
except ImportError:
pass
wc = py.path.svnwc(setup.temppath, auth=setup.auth)
svnurl = 'svn://localhost:%s/%s' % (setup.port, setup.repopath.basename)
wc.checkout(svnurl)
wc.ensure('foo', dir=True).ensure('foo.txt').write('foo')
wc.commit('added foo dir with foo.txt file')
wc.ensure('bar', dir=True)
wc.commit('added bar dir')
bar = wc.join('bar')
bar.switch(svnurl + '/foo')
assert bar.join('foo.txt')
def test_update(self, setup):
wc1 = py.path.svnwc(setup.temppath.ensure('wc1', dir=True),
auth=setup.auth)
wc2 = py.path.svnwc(setup.temppath.ensure('wc2', dir=True),
auth=setup.auth)
wc1.checkout(
'svn://localhost:%s/%s' % (setup.port, setup.repopath.basename))
wc2.checkout(
'svn://localhost:%s/%s' % (setup.port, setup.repopath.basename))
wc1.ensure('foo', dir=True)
wc1.commit('added foo dir')
wc2.update()
assert wc2.join('foo').check()
auth = SvnAuth('unknown', 'unknown', interactive=False)
wc2.auth = auth
py.test.raises(Exception, 'wc2.update()')
def test_lock_unlock_status(self, setup):
port = setup.port
wc = py.path.svnwc(setup.temppath, auth=setup.auth)
wc.checkout(
'svn://localhost:%s/%s' % (port, setup.repopath.basename,))
wc.ensure('foo', file=True)
wc.commit('added foo file')
foo = wc.join('foo')
foo.lock()
status = foo.status()
assert status.locked
foo.unlock()
status = foo.status()
assert not status.locked
auth = SvnAuth('unknown', 'unknown', interactive=False)
foo.auth = auth
py.test.raises(Exception, 'foo.lock()')
py.test.raises(Exception, 'foo.unlock()')
def test_diff(self, setup):
port = setup.port
wc = py.path.svnwc(setup.temppath, auth=setup.auth)
wc.checkout(
'svn://localhost:%s/%s' % (port, setup.repopath.basename,))
wc.ensure('foo', file=True)
wc.commit('added foo file')
wc.update()
rev = int(wc.status().rev)
foo = wc.join('foo')
foo.write('bar')
diff = foo.diff()
assert '\n+bar\n' in diff
foo.commit('added some content')
diff = foo.diff()
assert not diff
diff = foo.diff(rev=rev)
assert '\n+bar\n' in diff
auth = SvnAuth('unknown', 'unknown', interactive=False)
foo.auth = auth
py.test.raises(Exception, 'foo.diff(rev=rev)')