def setUp(self):
self.loop = self.new_test_loop()
self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol)
self.pipe = mock.Mock(spec_set=io.RawIOBase)
self.pipe.fileno.return_value = 5
blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
blocking_patcher.start()
self.addCleanup(blocking_patcher.stop)
fstat_patcher = mock.patch('os.fstat')
m_fstat = fstat_patcher.start()
st = mock.Mock()
st.st_mode = stat.S_IFSOCK
m_fstat.return_value = st
self.addCleanup(fstat_patcher.stop)
python类S_IFSOCK的实例源码
def get_file_type(path):
"""Retrieve the file type of the path
:param path: The path to get the file type for
:return: The file type as a string or None on error
"""
f_types = {
'socket': stat.S_IFSOCK,
'regular': stat.S_IFREG,
'block': stat.S_IFBLK,
'directory': stat.S_IFDIR,
'character_device': stat.S_IFCHR,
'fifo': stat.S_IFIFO,
}
if not path or not os.path.exists(path):
return None
obj = os.stat(path).st_mode
for key,val in f_types.items():
if obj & val == val:
return key
def setUp(self):
self.loop = self.new_test_loop()
self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol)
self.pipe = mock.Mock(spec_set=io.RawIOBase)
self.pipe.fileno.return_value = 5
blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
blocking_patcher.start()
self.addCleanup(blocking_patcher.stop)
fstat_patcher = mock.patch('os.fstat')
m_fstat = fstat_patcher.start()
st = mock.Mock()
st.st_mode = stat.S_IFSOCK
m_fstat.return_value = st
self.addCleanup(fstat_patcher.stop)
def setUp(self):
self.loop = self.new_test_loop()
self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol)
self.pipe = mock.Mock(spec_set=io.RawIOBase)
self.pipe.fileno.return_value = 5
blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
blocking_patcher.start()
self.addCleanup(blocking_patcher.stop)
fstat_patcher = mock.patch('os.fstat')
m_fstat = fstat_patcher.start()
st = mock.Mock()
st.st_mode = stat.S_IFSOCK
m_fstat.return_value = st
self.addCleanup(fstat_patcher.stop)
def _initialize_aci(self, mode, fileType):
valid_types = [
stat.S_IFREG,
stat.S_IFDIR,
stat.S_IFCHR,
stat.S_IFBLK,
stat.S_IFIFO,
stat.S_IFLNK,
stat.S_IFSOCK]
if fileType not in valid_types:
raise RuntimeError("Invalid file type.")
aci = self.aciCollection.new()
uid = os.getuid()
aci.dbobj.id = uid
aci.dbobj.uname = pwd.getpwuid(uid).pw_name
aci.dbobj.gname = grp.getgrgid(os.getgid()).gr_name
aci.dbobj.mode = int(mode, 8) + fileType
aci.save()
return aci.key
def dbpcs(mode):
if mode & stat.S_IFLNK == stat.S_IFLNK: return 's'
if mode & stat.S_IFSOCK == stat.S_IFSOCK: return 's'
if mode & stat.S_IFREG == stat.S_IFREG: return '-'
if mode & stat.S_IFBLK == stat.S_IFBLK: return 'b'
if mode & stat.S_IFDIR == stat.S_IFDIR: return 'd'
if mode & stat.S_IFIFO == stat.S_IFIFO: return 'p'
if mode & stat.S_IFCHR == stat.S_IFCHR: return 'c'
return '?'
def filter_mknod(path, type):
if exists(path):
return None, None
elif (type & S_IFCHR):
label = "create character special file"
elif (type & S_IFBLK):
label = "create block special file"
elif (type & S_IFIFO):
label = "create named pipe"
elif (type & S_IFSOCK):
label = "create socket"
else:
# mknod(2): "Zero file type is equivalent to type S_IFREG"
label = "create file"
return "%s %s" % (T.cyan(label), T.underline(path)), 0
def test_socketfile_failure_false(self):
self.fs.CreateFile('/tmp/socket', contents='', st_mode=(stat.S_IFSOCK | 0o666))
args = ('--swarm', '--connection', '/tmp/socket')
result = check_swarm.process_args(args=args)
self.assertFalse(check_swarm.socketfile_permissions_failure(parsed_args=result))
def test_socketfile_failure_unwriteable(self):
self.fs.CreateFile('/tmp/unwritable', contents='', st_mode=(stat.S_IFSOCK | 0o000))
args = ('--swarm', '--connection', '/tmp/unwritable')
result = check_swarm.process_args(args=args)
self.assertTrue(check_swarm.socketfile_permissions_failure(parsed_args=result))
def test_socketfile_failure_unreadable(self):
self.fs.CreateFile('/tmp/unreadable', contents='', st_mode=(stat.S_IFSOCK | 0o000))
args = ('--swarm', '--connection', '/tmp/unreadable')
result = check_swarm.process_args(args=args)
self.assertTrue(check_swarm.socketfile_permissions_failure(parsed_args=result))
def test_socketfile_failure_http(self):
self.fs.CreateFile('/tmp/http', contents='', st_mode=(stat.S_IFSOCK | 0o000))
args = ('--swarm', '--connection', 'http://127.0.0.1')
result = check_swarm.process_args(args=args)
self.assertFalse(check_swarm.socketfile_permissions_failure(parsed_args=result))
def setUp(self):
self.setUpPyfakefs()
self.fs.CreateFile(check_swarm.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666))
self.services = [{'Spec': {"Name": 'FOO'}},
{'Spec': {"Name": 'BAR'}}]
self.service = {'Spec': {"Name": 'FOO'}}
self.http_success_with_empty_payload = ('{}', 200)
check_swarm.rc = -1
def test_socketfile_failure_unwriteable(self):
self.fs.CreateFile('/tmp/unwritable', contents='', st_mode=(stat.S_IFSOCK | 0o000))
args = ('--status', 'running', '--connection', '/tmp/unwritable')
result = check_docker.process_args(args=args)
self.assertTrue(check_docker.socketfile_permissions_failure(parsed_args=result))
def test_socketfile_failure_unreadable(self):
self.fs.CreateFile('/tmp/unreadable', contents='', st_mode=(stat.S_IFSOCK | 0o000))
args = ('--status', 'running', '--connection', '/tmp/unreadable')
result = check_docker.process_args(args=args)
self.assertTrue(check_docker.socketfile_permissions_failure(parsed_args=result))
def test_socketfile_failure_http(self):
self.fs.CreateFile('/tmp/http', contents='', st_mode=(stat.S_IFSOCK | 0o000))
args = ('--status', 'running', '--connection', 'http://127.0.0.1')
result = check_docker.process_args(args=args)
self.assertFalse(check_docker.socketfile_permissions_failure(parsed_args=result))
def setUp(self):
self.setUpPyfakefs()
self.fs.CreateFile(check_docker.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666))
self.containers = [{'Names': ['/thing1']}, ]
self.http_success_with_empty_payload = ('{}', 200)
def __str__(self):
"create a unix-style long description of the file (like ls -l)"
if self.st_mode is not None:
kind = stat.S_IFMT(self.st_mode)
if kind == stat.S_IFIFO:
ks = 'p'
elif kind == stat.S_IFCHR:
ks = 'c'
elif kind == stat.S_IFDIR:
ks = 'd'
elif kind == stat.S_IFBLK:
ks = 'b'
elif kind == stat.S_IFREG:
ks = '-'
elif kind == stat.S_IFLNK:
ks = 'l'
elif kind == stat.S_IFSOCK:
ks = 's'
else:
ks = '?'
ks += self._rwx((self.st_mode & 0700) >> 6, self.st_mode & stat.S_ISUID)
ks += self._rwx((self.st_mode & 070) >> 3, self.st_mode & stat.S_ISGID)
ks += self._rwx(self.st_mode & 7, self.st_mode & stat.S_ISVTX, True)
else:
ks = '?---------'
# compute display date
if (self.st_mtime is None) or (self.st_mtime == 0xffffffffL):
# shouldn't really happen
datestr = '(unknown date)'
else:
if abs(time.time() - self.st_mtime) > 15552000:
# (15552000 = 6 months)
datestr = time.strftime('%d %b %Y', time.localtime(self.st_mtime))
else:
datestr = time.strftime('%d %b %H:%M', time.localtime(self.st_mtime))
filename = getattr(self, 'filename', '?')
# not all servers support uid/gid
uid = self.st_uid
gid = self.st_gid
if uid is None:
uid = 0
if gid is None:
gid = 0
return '%s 1 %-8d %-8d %8d %-12s %s' % (ks, uid, gid, self.st_size, datestr, filename)
def __str__(self):
"""create a unix-style long description of the file (like ls -l)"""
if self.st_mode is not None:
kind = stat.S_IFMT(self.st_mode)
if kind == stat.S_IFIFO:
ks = 'p'
elif kind == stat.S_IFCHR:
ks = 'c'
elif kind == stat.S_IFDIR:
ks = 'd'
elif kind == stat.S_IFBLK:
ks = 'b'
elif kind == stat.S_IFREG:
ks = '-'
elif kind == stat.S_IFLNK:
ks = 'l'
elif kind == stat.S_IFSOCK:
ks = 's'
else:
ks = '?'
ks += self._rwx((self.st_mode & o700) >> 6, self.st_mode & stat.S_ISUID)
ks += self._rwx((self.st_mode & o70) >> 3, self.st_mode & stat.S_ISGID)
ks += self._rwx(self.st_mode & 7, self.st_mode & stat.S_ISVTX, True)
else:
ks = '?---------'
# compute display date
if (self.st_mtime is None) or (self.st_mtime == xffffffff):
# shouldn't really happen
datestr = '(unknown date)'
else:
if abs(time.time() - self.st_mtime) > 15552000:
# (15552000 = 6 months)
datestr = time.strftime('%d %b %Y', time.localtime(self.st_mtime))
else:
datestr = time.strftime('%d %b %H:%M', time.localtime(self.st_mtime))
filename = getattr(self, 'filename', '?')
# not all servers support uid/gid
uid = self.st_uid
gid = self.st_gid
if uid is None:
uid = 0
if gid is None:
gid = 0
return '%s 1 %-8d %-8d %8d %-12s %s' % (ks, uid, gid, self.st_size, datestr, filename)
def __str__(self):
"""create a unix-style long description of the file (like ls -l)"""
if self.st_mode is not None:
kind = stat.S_IFMT(self.st_mode)
if kind == stat.S_IFIFO:
ks = 'p'
elif kind == stat.S_IFCHR:
ks = 'c'
elif kind == stat.S_IFDIR:
ks = 'd'
elif kind == stat.S_IFBLK:
ks = 'b'
elif kind == stat.S_IFREG:
ks = '-'
elif kind == stat.S_IFLNK:
ks = 'l'
elif kind == stat.S_IFSOCK:
ks = 's'
else:
ks = '?'
ks += self._rwx((self.st_mode & o700) >> 6, self.st_mode & stat.S_ISUID)
ks += self._rwx((self.st_mode & o70) >> 3, self.st_mode & stat.S_ISGID)
ks += self._rwx(self.st_mode & 7, self.st_mode & stat.S_ISVTX, True)
else:
ks = '?---------'
# compute display date
if (self.st_mtime is None) or (self.st_mtime == xffffffff):
# shouldn't really happen
datestr = '(unknown date)'
else:
if abs(time.time() - self.st_mtime) > 15552000:
# (15552000 = 6 months)
datestr = time.strftime('%d %b %Y', time.localtime(self.st_mtime))
else:
datestr = time.strftime('%d %b %H:%M', time.localtime(self.st_mtime))
filename = getattr(self, 'filename', '?')
# not all servers support uid/gid
uid = self.st_uid
gid = self.st_gid
size = self.st_size
if uid is None:
uid = 0
if gid is None:
gid = 0
if size is None:
size = 0
return '%s 1 %-8d %-8d %8d %-12s %s' % (ks, uid, gid, size, datestr, filename)
def __str__(self):
"""create a unix-style long description of the file (like ls -l)"""
if self.st_mode is not None:
kind = stat.S_IFMT(self.st_mode)
if kind == stat.S_IFIFO:
ks = 'p'
elif kind == stat.S_IFCHR:
ks = 'c'
elif kind == stat.S_IFDIR:
ks = 'd'
elif kind == stat.S_IFBLK:
ks = 'b'
elif kind == stat.S_IFREG:
ks = '-'
elif kind == stat.S_IFLNK:
ks = 'l'
elif kind == stat.S_IFSOCK:
ks = 's'
else:
ks = '?'
ks += self._rwx((self.st_mode & o700) >> 6, self.st_mode & stat.S_ISUID)
ks += self._rwx((self.st_mode & o70) >> 3, self.st_mode & stat.S_ISGID)
ks += self._rwx(self.st_mode & 7, self.st_mode & stat.S_ISVTX, True)
else:
ks = '?---------'
# compute display date
if (self.st_mtime is None) or (self.st_mtime == xffffffff):
# shouldn't really happen
datestr = '(unknown date)'
else:
if abs(time.time() - self.st_mtime) > 15552000:
# (15552000 = 6 months)
datestr = time.strftime('%d %b %Y', time.localtime(self.st_mtime))
else:
datestr = time.strftime('%d %b %H:%M', time.localtime(self.st_mtime))
filename = getattr(self, 'filename', '?')
# not all servers support uid/gid
uid = self.st_uid
gid = self.st_gid
size = self.st_size
if uid is None:
uid = 0
if gid is None:
gid = 0
if size is None:
size = 0
return '%s 1 %-8d %-8d %8d %-12s %s' % (ks, uid, gid, size, datestr, filename)
def __str__(self):
"""create a unix-style long description of the file (like ls -l)"""
if self.st_mode is not None:
kind = stat.S_IFMT(self.st_mode)
if kind == stat.S_IFIFO:
ks = 'p'
elif kind == stat.S_IFCHR:
ks = 'c'
elif kind == stat.S_IFDIR:
ks = 'd'
elif kind == stat.S_IFBLK:
ks = 'b'
elif kind == stat.S_IFREG:
ks = '-'
elif kind == stat.S_IFLNK:
ks = 'l'
elif kind == stat.S_IFSOCK:
ks = 's'
else:
ks = '?'
ks += self._rwx(
(self.st_mode & o700) >> 6, self.st_mode & stat.S_ISUID)
ks += self._rwx(
(self.st_mode & o70) >> 3, self.st_mode & stat.S_ISGID)
ks += self._rwx(
self.st_mode & 7, self.st_mode & stat.S_ISVTX, True)
else:
ks = '?---------'
# compute display date
if (self.st_mtime is None) or (self.st_mtime == xffffffff):
# shouldn't really happen
datestr = '(unknown date)'
else:
if abs(time.time() - self.st_mtime) > 15552000:
# (15552000 = 6 months)
datestr = time.strftime(
'%d %b %Y', time.localtime(self.st_mtime))
else:
datestr = time.strftime(
'%d %b %H:%M', time.localtime(self.st_mtime))
filename = getattr(self, 'filename', '?')
# not all servers support uid/gid
uid = self.st_uid
gid = self.st_gid
size = self.st_size
if uid is None:
uid = 0
if gid is None:
gid = 0
if size is None:
size = 0
return '%s 1 %-8d %-8d %8d %-12s %s' % (
ks, uid, gid, size, datestr, filename)
def __str__(self):
"""create a unix-style long description of the file (like ls -l)"""
if self.st_mode is not None:
kind = stat.S_IFMT(self.st_mode)
if kind == stat.S_IFIFO:
ks = 'p'
elif kind == stat.S_IFCHR:
ks = 'c'
elif kind == stat.S_IFDIR:
ks = 'd'
elif kind == stat.S_IFBLK:
ks = 'b'
elif kind == stat.S_IFREG:
ks = '-'
elif kind == stat.S_IFLNK:
ks = 'l'
elif kind == stat.S_IFSOCK:
ks = 's'
else:
ks = '?'
ks += self._rwx(
(self.st_mode & o700) >> 6, self.st_mode & stat.S_ISUID)
ks += self._rwx(
(self.st_mode & o70) >> 3, self.st_mode & stat.S_ISGID)
ks += self._rwx(
self.st_mode & 7, self.st_mode & stat.S_ISVTX, True)
else:
ks = '?---------'
# compute display date
if (self.st_mtime is None) or (self.st_mtime == xffffffff):
# shouldn't really happen
datestr = '(unknown date)'
else:
if abs(time.time() - self.st_mtime) > 15552000:
# (15552000 = 6 months)
datestr = time.strftime(
'%d %b %Y', time.localtime(self.st_mtime))
else:
datestr = time.strftime(
'%d %b %H:%M', time.localtime(self.st_mtime))
filename = getattr(self, 'filename', '?')
# not all servers support uid/gid
uid = self.st_uid
gid = self.st_gid
size = self.st_size
if uid is None:
uid = 0
if gid is None:
gid = 0
if size is None:
size = 0
return '%s 1 %-8d %-8d %8d %-12s %s' % (
ks, uid, gid, size, datestr, filename)
def __str__(self):
"""create a unix-style long description of the file (like ls -l)"""
if self.st_mode is not None:
kind = stat.S_IFMT(self.st_mode)
if kind == stat.S_IFIFO:
ks = 'p'
elif kind == stat.S_IFCHR:
ks = 'c'
elif kind == stat.S_IFDIR:
ks = 'd'
elif kind == stat.S_IFBLK:
ks = 'b'
elif kind == stat.S_IFREG:
ks = '-'
elif kind == stat.S_IFLNK:
ks = 'l'
elif kind == stat.S_IFSOCK:
ks = 's'
else:
ks = '?'
ks += self._rwx((self.st_mode & o700) >> 6, self.st_mode & stat.S_ISUID)
ks += self._rwx((self.st_mode & o70) >> 3, self.st_mode & stat.S_ISGID)
ks += self._rwx(self.st_mode & 7, self.st_mode & stat.S_ISVTX, True)
else:
ks = '?---------'
# compute display date
if (self.st_mtime is None) or (self.st_mtime == xffffffff):
# shouldn't really happen
datestr = '(unknown date)'
else:
if abs(time.time() - self.st_mtime) > 15552000:
# (15552000 = 6 months)
datestr = time.strftime('%d %b %Y', time.localtime(self.st_mtime))
else:
datestr = time.strftime('%d %b %H:%M', time.localtime(self.st_mtime))
filename = getattr(self, 'filename', '?')
# not all servers support uid/gid
uid = self.st_uid
gid = self.st_gid
size = self.st_size
if uid is None:
uid = 0
if gid is None:
gid = 0
if size is None:
size = 0
return '%s 1 %-8d %-8d %8d %-12s %s' % (ks, uid, gid, size, datestr, filename)
def __str__(self):
"""create a unix-style long description of the file (like ls -l)"""
if self.st_mode is not None:
kind = stat.S_IFMT(self.st_mode)
if kind == stat.S_IFIFO:
ks = 'p'
elif kind == stat.S_IFCHR:
ks = 'c'
elif kind == stat.S_IFDIR:
ks = 'd'
elif kind == stat.S_IFBLK:
ks = 'b'
elif kind == stat.S_IFREG:
ks = '-'
elif kind == stat.S_IFLNK:
ks = 'l'
elif kind == stat.S_IFSOCK:
ks = 's'
else:
ks = '?'
ks += self._rwx((self.st_mode & o700) >> 6, self.st_mode & stat.S_ISUID)
ks += self._rwx((self.st_mode & o70) >> 3, self.st_mode & stat.S_ISGID)
ks += self._rwx(self.st_mode & 7, self.st_mode & stat.S_ISVTX, True)
else:
ks = '?---------'
# compute display date
if (self.st_mtime is None) or (self.st_mtime == xffffffff):
# shouldn't really happen
datestr = '(unknown date)'
else:
if abs(time.time() - self.st_mtime) > 15552000:
# (15552000 = 6 months)
datestr = time.strftime('%d %b %Y', time.localtime(self.st_mtime))
else:
datestr = time.strftime('%d %b %H:%M', time.localtime(self.st_mtime))
filename = getattr(self, 'filename', '?')
# not all servers support uid/gid
uid = self.st_uid
gid = self.st_gid
size = self.st_size
if uid is None:
uid = 0
if gid is None:
gid = 0
if size is None:
size = 0
return '%s 1 %-8d %-8d %8d %-12s %s' % (ks, uid, gid, size, datestr, filename)
def chmod(self, ppath, mode):
"""
Change mode for files or directories
:param ppath: path of file/dir
:param mode: string of the mode
Examples:
flistmeta.chmod("/tmp/dir1", "777")
"""
fType, dirObj = self._search_db(ppath)
if dirObj.dbobj.state != "":
raise RuntimeError("%s: No such file or directory" % ppath)
try:
mode = int(mode, 8)
except ValueError:
raise ValueError("Invalid mode.")
else:
if fType == "D":
_mode = mode + stat.S_IFDIR
aclObj = self.aciCollection.get(dirObj.dbobj.aclkey)
aclObj.dbobj.mode = _mode
aclObj.save()
elif fType == "F" or fType == "L":
_mode = mode + stat.S_IFREG if fType == "F" else mode + stat.S_IFLNK
_, propList = self._getPropertyList(dirObj.dbobj, fType)
for file in propList:
if file.name == j.sal.fs.getBaseName(ppath):
aclObj = self.aciCollection.get(file.aclkey)
aclObj.dbobj.mode = _mode
aclObj.save()
else:
for file in dirObj.dbobj.links:
if file.name == j.sal.fs.getBaseName(ppath):
aclObj = self.aciCollection.get(file.aclkey)
if stat.S_ISSOCK(aclObj.dbobj.st_mode):
_mode = mode + stat.S_IFSOCK
elif stat.S_ISBLK(aclObj.dbobj.st_mode):
_mode = mode + stat.S_IFBLK
elif stat.S_ISCHR(aclObj.dbobj.st_mode):
_mode = mode + stat.S_IFCHR
elif stat.S_ISFIFO(aclObj.dbobj.st_mode):
_mode = mode + stat.S_IFIFO
aclObj.dbobj.mode = _mode
aclObj.save()