def bind(self, address):
if isinstance(address, unicode):
address = address.encode('utf8')
rc = C.zmq_bind(self._zmq_socket, address)
if rc < 0:
if IPC_PATH_MAX_LEN and C.zmq_errno() == errno_mod.ENAMETOOLONG:
# py3compat: address is bytes, but msg wants str
if str is unicode:
address = address.decode('utf-8', 'replace')
path = address.split('://', 1)[-1]
msg = ('ipc path "{0}" is longer than {1} '
'characters (sizeof(sockaddr_un.sun_path)).'
.format(path, IPC_PATH_MAX_LEN))
raise ZMQError(C.zmq_errno(), msg=msg)
else:
_check_rc(rc)
python类ENAMETOOLONG的实例源码
def bind(self, address):
if isinstance(address, unicode):
address = address.encode('utf8')
rc = C.zmq_bind(self._zmq_socket, address)
if rc < 0:
if IPC_PATH_MAX_LEN and C.zmq_errno() == errno_mod.ENAMETOOLONG:
# py3compat: address is bytes, but msg wants str
if str is unicode:
address = address.decode('utf-8', 'replace')
path = address.split('://', 1)[-1]
msg = ('ipc path "{0}" is longer than {1} '
'characters (sizeof(sockaddr_un.sun_path)).'
.format(path, IPC_PATH_MAX_LEN))
raise ZMQError(C.zmq_errno(), msg=msg)
else:
_check_rc(rc)
def bind(self, address):
if isinstance(address, unicode):
address = address.encode('utf8')
rc = C.zmq_bind(self._zmq_socket, address)
if rc < 0:
if IPC_PATH_MAX_LEN and C.zmq_errno() == errno_mod.ENAMETOOLONG:
# py3compat: address is bytes, but msg wants str
if str is unicode:
address = address.decode('utf-8', 'replace')
path = address.split('://', 1)[-1]
msg = ('ipc path "{0}" is longer than {1} '
'characters (sizeof(sockaddr_un.sun_path)).'
.format(path, IPC_PATH_MAX_LEN))
raise ZMQError(C.zmq_errno(), msg=msg)
else:
_check_rc(rc)
def bind(self, address):
if isinstance(address, unicode):
address = address.encode('utf8')
rc = C.zmq_bind(self._zmq_socket, address)
if rc < 0:
if IPC_PATH_MAX_LEN and C.zmq_errno() == errno_mod.ENAMETOOLONG:
# py3compat: address is bytes, but msg wants str
if str is unicode:
address = address.decode('utf-8', 'replace')
path = address.split('://', 1)[-1]
msg = ('ipc path "{0}" is longer than {1} '
'characters (sizeof(sockaddr_un.sun_path)).'
.format(path, IPC_PATH_MAX_LEN))
raise ZMQError(C.zmq_errno(), msg=msg)
else:
_check_rc(rc)
def bind(self, address):
if isinstance(address, unicode):
address = address.encode('utf8')
rc = C.zmq_bind(self._zmq_socket, address)
if rc < 0:
if IPC_PATH_MAX_LEN and C.zmq_errno() == errno_mod.ENAMETOOLONG:
# py3compat: address is bytes, but msg wants str
if str is unicode:
address = address.decode('utf-8', 'replace')
path = address.split('://', 1)[-1]
msg = ('ipc path "{0}" is longer than {1} '
'characters (sizeof(sockaddr_un.sun_path)).'
.format(path, IPC_PATH_MAX_LEN))
raise ZMQError(C.zmq_errno(), msg=msg)
else:
_check_rc(rc)
def bind(self, address):
if isinstance(address, unicode):
address = address.encode('utf8')
rc = C.zmq_bind(self._zmq_socket, address)
if rc < 0:
if IPC_PATH_MAX_LEN and C.zmq_errno() == errno_mod.ENAMETOOLONG:
# py3compat: address is bytes, but msg wants str
if str is unicode:
address = address.decode('utf-8', 'replace')
path = address.split('://', 1)[-1]
msg = ('ipc path "{0}" is longer than {1} '
'characters (sizeof(sockaddr_un.sun_path)).'
.format(path, IPC_PATH_MAX_LEN))
raise ZMQError(C.zmq_errno(), msg=msg)
else:
_check_rc(rc)
def main():
if len(sys.argv) < 3:
print('Need data_dir and link_dir.\n'
'e.g.: kenshin-rebuild-link.py /kenshin/data/a /kenshin/link/a')
sys.exit(1)
data_dir, link_dir = sys.argv[1:]
data_dir = os.path.abspath(data_dir)
link_dir = os.path.abspath(link_dir)
for schema_name in os.listdir(data_dir):
hs_file_pat = os.path.join(data_dir, schema_name, '*.hs')
for fp in glob.glob(hs_file_pat):
with open(fp) as f:
header = kenshin.header(f)
metric_list = header['tag_list']
for metric in metric_list:
if metric != '':
try:
create_link(metric, link_dir, fp)
except OSError as exc:
if exc.errno == errno.ENAMETOOLONG:
pass
else:
raise
def rebuildLink(instance_data_dir, instance_link_dir):
for schema_name in os.listdir(instance_data_dir):
hs_file_pat = os.path.join(instance_data_dir, schema_name, '*.hs')
for fp in glob.glob(hs_file_pat):
with open(fp) as f:
header = kenshin.header(f)
metric_list = header['tag_list']
for metric in metric_list:
if metric != '':
link_path = getMetricPathByInstanceDir(instance_link_dir, metric)
try:
_createLinkHelper(link_path, fp)
except OSError as exc:
if exc.errno == errno.ENAMETOOLONG:
pass
else:
raise
def createLink(metric, file_path):
metric_path = getMetricPath(metric)
try:
_createLinkHelper(metric_path, file_path)
except OSError as exc:
if exc.errno == errno.ENAMETOOLONG:
pass
else:
raise
def test_getcwd_long_pathnames(self):
dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef'
curdir = os.getcwd()
base_path = os.path.abspath(test_support.TESTFN) + '.getcwd'
try:
os.mkdir(base_path)
os.chdir(base_path)
except:
self.skipTest("cannot create directory for testing")
try:
def _create_and_do_getcwd(dirname, current_path_length = 0):
try:
os.mkdir(dirname)
except:
self.skipTest("mkdir cannot create directory sufficiently "
"deep for getcwd test")
os.chdir(dirname)
try:
os.getcwd()
if current_path_length < 4099:
_create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1)
except OSError as e:
expected_errno = errno.ENAMETOOLONG
# The following platforms have quirky getcwd()
# behaviour -- see issue 9185 and 15765 for
# more information.
quirky_platform = (
'sunos' in sys.platform or
'netbsd' in sys.platform or
'openbsd' in sys.platform
)
if quirky_platform:
expected_errno = errno.ERANGE
self.assertEqual(e.errno, expected_errno)
finally:
os.chdir('..')
os.rmdir(dirname)
_create_and_do_getcwd(dirname)
finally:
os.chdir(curdir)
shutil.rmtree(base_path)
def test_getcwd_long_pathnames(self):
dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef'
curdir = os.getcwd()
base_path = os.path.abspath(test_support.TESTFN) + '.getcwd'
try:
os.mkdir(base_path)
os.chdir(base_path)
except:
self.skipTest("cannot create directory for testing")
try:
def _create_and_do_getcwd(dirname, current_path_length = 0):
try:
os.mkdir(dirname)
except:
self.skipTest("mkdir cannot create directory sufficiently "
"deep for getcwd test")
os.chdir(dirname)
try:
os.getcwd()
if current_path_length < 4099:
_create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1)
except OSError as e:
expected_errno = errno.ENAMETOOLONG
# The following platforms have quirky getcwd()
# behaviour -- see issue 9185 and 15765 for
# more information.
quirky_platform = (
'sunos' in sys.platform or
'netbsd' in sys.platform or
'openbsd' in sys.platform
)
if quirky_platform:
expected_errno = errno.ERANGE
self.assertEqual(e.errno, expected_errno)
finally:
os.chdir('..')
os.rmdir(dirname)
_create_and_do_getcwd(dirname)
finally:
os.chdir(curdir)
shutil.rmtree(base_path)
def test_getcwd_long_pathnames(self):
dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef'
curdir = os.getcwd()
base_path = os.path.abspath(test_support.TESTFN) + '.getcwd'
try:
os.mkdir(base_path)
os.chdir(base_path)
except:
self.skipTest("cannot create directory for testing")
try:
def _create_and_do_getcwd(dirname, current_path_length = 0):
try:
os.mkdir(dirname)
except:
self.skipTest("mkdir cannot create directory sufficiently "
"deep for getcwd test")
os.chdir(dirname)
try:
os.getcwd()
if current_path_length < 4099:
_create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1)
except OSError as e:
expected_errno = errno.ENAMETOOLONG
# The following platforms have quirky getcwd()
# behaviour -- see issue 9185 and 15765 for
# more information.
quirky_platform = (
'sunos' in sys.platform or
'netbsd' in sys.platform or
'openbsd' in sys.platform
)
if quirky_platform:
expected_errno = errno.ERANGE
self.assertEqual(e.errno, expected_errno)
finally:
os.chdir('..')
os.rmdir(dirname)
_create_and_do_getcwd(dirname)
finally:
os.chdir(curdir)
shutil.rmtree(base_path)
def test_getcwd_long_pathnames(self):
if hasattr(posix, 'getcwd'):
dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef'
curdir = os.getcwd()
base_path = os.path.abspath(test_support.TESTFN) + '.getcwd'
try:
os.mkdir(base_path)
os.chdir(base_path)
except:
# Just returning nothing instead of the SkipTest exception,
# because the test results in Error in that case.
# Is that ok?
# raise unittest.SkipTest, "cannot create directory for testing"
return
try:
def _create_and_do_getcwd(dirname, current_path_length = 0):
try:
os.mkdir(dirname)
except:
raise unittest.SkipTest, "mkdir cannot create directory sufficiently deep for getcwd test"
os.chdir(dirname)
try:
os.getcwd()
if current_path_length < 4099:
_create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1)
except OSError as e:
expected_errno = errno.ENAMETOOLONG
# The following platforms have quirky getcwd()
# behaviour -- see issue 9185 and 15765 for
# more information.
quirky_platform = (
'sunos' in sys.platform or
'netbsd' in sys.platform or
'openbsd' in sys.platform
)
if quirky_platform:
expected_errno = errno.ERANGE
self.assertEqual(e.errno, expected_errno)
finally:
os.chdir('..')
os.rmdir(dirname)
_create_and_do_getcwd(dirname)
finally:
os.chdir(curdir)
shutil.rmtree(base_path)