def copy_reg_file(rpin, rpout, compress = 0):
"""Copy regular file rpin to rpout, possibly avoiding connection"""
try:
if (rpout.conn is rpin.conn and
rpout.conn is not Globals.local_connection):
v = rpout.conn.rpath.copy_reg_file(rpin.path, rpout.path, compress)
rpout.setdata()
return v
except AttributeError: pass
try:
return rpout.write_from_fileobj(rpin.open("rb"), compress = compress)
except IOError, e:
if (e.errno == errno.ERANGE):
log.Log.FatalError("'IOError - Result too large' while reading %s. "
"If you are using a Mac, this is probably "
"the result of HFS+ filesystem corruption. "
"Please exclude this file from your backup "
"before proceeding." % rpin.path)
else:
raise
python类ERANGE的实例源码
def nltype(self,v):
if v < 0: raise error(errno.ERANGE,"Netlink type {0} is invalid".format(v))
self['type'] = v
def seq(self,v):
if v < 1: raise error(errno.ERANGE,"Invalid seq. number")
self['seq'] = v
def pid(self,v):
if v < 1: raise error(errno.ERANGE,"Invalid port id")
self['pid'] = v
def cmd(self,v):
if v < 0: raise error(errno.ERANGE,"Invalid cmd")
self['cmd'] = v
def nla_put(msg,v,a,d):
"""
append attribute to msg's attribute list
:param msg: GENLMsg
:param v: attribute value
:param a: attribute type
:param d: attribute datatype
"""
if d > nlh.NLA_TYPE_MAX: raise error(errno.ERANGE,"Value type is invalid")
msg['attrs'].append((a,v,d))
# nla_put_* append data of specified datatype
def read_from_rp(self, rp):
"""Set the extended attributes from an rpath"""
try:
attr_list = rp.conn.xattr.listxattr(encode(rp.path),
rp.issym())
except IOError, exc:
if exc[0] in (errno.EOPNOTSUPP, errno.EPERM, errno.ETXTBSY):
return # if not supported, consider empty
if exc[0] in (errno.EACCES, errno.ENOENT, errno.ELOOP):
log.Log("Warning: listattr(%s): %s" % (repr(rp.path), exc), 4)
return
raise
for attr in attr_list:
if attr.startswith('system.'):
# Do not preserve system extended attributes
continue
if not rp.isdir() and attr == 'com.apple.ResourceFork':
# Resource Fork handled elsewhere, except for directories
continue
try:
self.attr_dict[attr] = \
rp.conn.xattr.getxattr(encode(rp.path),
attr, rp.issym())
except IOError, exc:
# File probably modified while reading, just continue
if exc[0] == errno.ENODATA: continue
elif exc[0] == errno.ENOENT: break
# Handle bug in pyxattr < 0.2.2
elif exc[0] == errno.ERANGE: continue
else: raise
def sys_getcwd(self, buf, size):
'''
getcwd - Get the current working directory
:param int buf: Pointer to dest array
:param size: size in bytes of the array pointed to by the buf
:return: buf (Success), or 0
'''
try:
current_dir = os.getcwd()
length = len(current_dir) + 1
if size > 0 and size < length:
logger.info("GETCWD: size is greater than 0, but is smaller than the length"
"of the path + 1. Returning ERANGE")
return -errno.ERANGE
if not self.current.memory.access_ok(slice(buf, buf+length), 'w'):
logger.info("GETCWD: buf within invalid memory. Returning EFAULT")
return -errno.EFAULT
self.current.write_string(buf, current_dir)
logger.debug("getcwd(0x%08x, %u) -> <%s> (Size %d)", buf, size, current_dir, length)
return length
except OSError as e:
return -e.errno
def test_getcwd_long_pathnames(self):
dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef'
curdir = os.getcwd()
base_path = os.path.abspath(test_support.TESTFN) + '.getcwd'
try:
os.mkdir(base_path)
os.chdir(base_path)
except:
self.skipTest("cannot create directory for testing")
try:
def _create_and_do_getcwd(dirname, current_path_length = 0):
try:
os.mkdir(dirname)
except:
self.skipTest("mkdir cannot create directory sufficiently "
"deep for getcwd test")
os.chdir(dirname)
try:
os.getcwd()
if current_path_length < 4099:
_create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1)
except OSError as e:
expected_errno = errno.ENAMETOOLONG
# The following platforms have quirky getcwd()
# behaviour -- see issue 9185 and 15765 for
# more information.
quirky_platform = (
'sunos' in sys.platform or
'netbsd' in sys.platform or
'openbsd' in sys.platform
)
if quirky_platform:
expected_errno = errno.ERANGE
self.assertEqual(e.errno, expected_errno)
finally:
os.chdir('..')
os.rmdir(dirname)
_create_and_do_getcwd(dirname)
finally:
os.chdir(curdir)
shutil.rmtree(base_path)
def test_getcwd_long_pathnames(self):
dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef'
curdir = os.getcwd()
base_path = os.path.abspath(test_support.TESTFN) + '.getcwd'
try:
os.mkdir(base_path)
os.chdir(base_path)
except:
self.skipTest("cannot create directory for testing")
try:
def _create_and_do_getcwd(dirname, current_path_length = 0):
try:
os.mkdir(dirname)
except:
self.skipTest("mkdir cannot create directory sufficiently "
"deep for getcwd test")
os.chdir(dirname)
try:
os.getcwd()
if current_path_length < 4099:
_create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1)
except OSError as e:
expected_errno = errno.ENAMETOOLONG
# The following platforms have quirky getcwd()
# behaviour -- see issue 9185 and 15765 for
# more information.
quirky_platform = (
'sunos' in sys.platform or
'netbsd' in sys.platform or
'openbsd' in sys.platform
)
if quirky_platform:
expected_errno = errno.ERANGE
self.assertEqual(e.errno, expected_errno)
finally:
os.chdir('..')
os.rmdir(dirname)
_create_and_do_getcwd(dirname)
finally:
os.chdir(curdir)
shutil.rmtree(base_path)
def test_getcwd_long_pathnames(self):
dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef'
curdir = os.getcwd()
base_path = os.path.abspath(test_support.TESTFN) + '.getcwd'
try:
os.mkdir(base_path)
os.chdir(base_path)
except:
self.skipTest("cannot create directory for testing")
try:
def _create_and_do_getcwd(dirname, current_path_length = 0):
try:
os.mkdir(dirname)
except:
self.skipTest("mkdir cannot create directory sufficiently "
"deep for getcwd test")
os.chdir(dirname)
try:
os.getcwd()
if current_path_length < 4099:
_create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1)
except OSError as e:
expected_errno = errno.ENAMETOOLONG
# The following platforms have quirky getcwd()
# behaviour -- see issue 9185 and 15765 for
# more information.
quirky_platform = (
'sunos' in sys.platform or
'netbsd' in sys.platform or
'openbsd' in sys.platform
)
if quirky_platform:
expected_errno = errno.ERANGE
self.assertEqual(e.errno, expected_errno)
finally:
os.chdir('..')
os.rmdir(dirname)
_create_and_do_getcwd(dirname)
finally:
os.chdir(curdir)
shutil.rmtree(base_path)
def test_getcwd_long_pathnames(self):
if hasattr(posix, 'getcwd'):
dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef'
curdir = os.getcwd()
base_path = os.path.abspath(test_support.TESTFN) + '.getcwd'
try:
os.mkdir(base_path)
os.chdir(base_path)
except:
# Just returning nothing instead of the SkipTest exception,
# because the test results in Error in that case.
# Is that ok?
# raise unittest.SkipTest, "cannot create directory for testing"
return
try:
def _create_and_do_getcwd(dirname, current_path_length = 0):
try:
os.mkdir(dirname)
except:
raise unittest.SkipTest, "mkdir cannot create directory sufficiently deep for getcwd test"
os.chdir(dirname)
try:
os.getcwd()
if current_path_length < 4099:
_create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1)
except OSError as e:
expected_errno = errno.ENAMETOOLONG
# The following platforms have quirky getcwd()
# behaviour -- see issue 9185 and 15765 for
# more information.
quirky_platform = (
'sunos' in sys.platform or
'netbsd' in sys.platform or
'openbsd' in sys.platform
)
if quirky_platform:
expected_errno = errno.ERANGE
self.assertEqual(e.errno, expected_errno)
finally:
os.chdir('..')
os.rmdir(dirname)
_create_and_do_getcwd(dirname)
finally:
os.chdir(curdir)
shutil.rmtree(base_path)
def _policy_RequestIG(self, books_needed):
'''Select books from IGs specified in interleave_request attribute.
If interleave_request_pos is present use it as the starting point.'''
db = self.LCEobj.db
ig_req = db.get_xattr(self.shelf, self.XATTR_IG_REQ)
self.LCEobj.errno = errno.ERANGE
assert ig_req is not None, \
'RequestIG policy requires prior %s' % self.XATTR_IG_REQ
assert len(ig_req), \
'RequestIG policy requires prior %s' % self.XATTR_IG_REQ
# Get a starting position for the interleave_request list
self.LCEobj.errno = errno.ENOSPC
pos = db.get_xattr(self.shelf, self.XATTR_IG_REQ_POS)
try:
ig_pos = int(pos)
if ig_pos < 0 or ig_pos > (len(ig_req) - 1):
ig_pos = 0
except TypeError as err: # TSNH, see create_shelf. Legacy paranoia.
ig_pos = 0
resp = db.create_xattr(self.shelf, self.XATTR_IG_REQ_POS, ig_pos)
except ValueError as err:
ig_pos = 0
reqIGs = [ord(ig_req[i:i+1]) for i in range(0, len(ig_req), 1)]
# Determine number of books needed from each IG
igCnt = defaultdict(int)
cur = ig_pos
for cnt in range(0, books_needed):
ig = reqIGs[cur % len(reqIGs)]
igCnt[ig] += 1
cur += 1
# Allocate specified number of books from each selected IG
booksIG = {}
for ig in igCnt.keys():
booksIG[ig] = db.get_books_by_intlv_group(
igCnt[ig], (ig, ), exclude=False)
# Build list of books using request_interleave pattern
self.LCEobj.errno = errno.ENOSPC
bookList = []
cur = ig_pos
for cnt in range(0, books_needed):
ig = reqIGs[cur % len(reqIGs)]
assert len(booksIG[ig]) != 0, 'Not enough books remaining in IG'
bookList.append(booksIG[ig].pop(0))
cur += 1
# Save current position in interleave_request list
db.modify_xattr(self.shelf, self.XATTR_IG_REQ_POS, cur % len(reqIGs))
return bookList