def startDB(self):
import kinterbasdb
self.DB_NAME = os.path.join(self.DB_DIR, DBTestConnector.DB_NAME)
os.chmod(self.DB_DIR, stat.S_IRWXU + stat.S_IRWXG + stat.S_IRWXO)
sql = 'create database "%s" user "%s" password "%s"'
sql %= (self.DB_NAME, self.DB_USER, self.DB_PASS);
conn = kinterbasdb.create_database(sql)
conn.close()
python类S_IRWXO的实例源码
def test_copymode_follow_symlinks(self):
tmp_dir = self.mkdtemp()
src = os.path.join(tmp_dir, 'foo')
dst = os.path.join(tmp_dir, 'bar')
src_link = os.path.join(tmp_dir, 'baz')
dst_link = os.path.join(tmp_dir, 'quux')
write_file(src, 'foo')
write_file(dst, 'foo')
os.symlink(src, src_link)
os.symlink(dst, dst_link)
os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
# file to file
os.chmod(dst, stat.S_IRWXO)
self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
shutil.copymode(src, dst)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
# follow src link
os.chmod(dst, stat.S_IRWXO)
shutil.copymode(src_link, dst)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
# follow dst link
os.chmod(dst, stat.S_IRWXO)
shutil.copymode(src, dst_link)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
# follow both links
os.chmod(dst, stat.S_IRWXO)
shutil.copymode(src_link, dst)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
def test_copymode_symlink_to_symlink(self):
tmp_dir = self.mkdtemp()
src = os.path.join(tmp_dir, 'foo')
dst = os.path.join(tmp_dir, 'bar')
src_link = os.path.join(tmp_dir, 'baz')
dst_link = os.path.join(tmp_dir, 'quux')
write_file(src, 'foo')
write_file(dst, 'foo')
os.symlink(src, src_link)
os.symlink(dst, dst_link)
os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
os.chmod(dst, stat.S_IRWXU)
os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
# link to link
os.lchmod(dst_link, stat.S_IRWXO)
shutil.copymode(src_link, dst_link, follow_symlinks=False)
self.assertEqual(os.lstat(src_link).st_mode,
os.lstat(dst_link).st_mode)
self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
# src link - use chmod
os.lchmod(dst_link, stat.S_IRWXO)
shutil.copymode(src_link, dst, follow_symlinks=False)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
# dst link - use chmod
os.lchmod(dst_link, stat.S_IRWXO)
shutil.copymode(src, dst_link, follow_symlinks=False)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
def test_copystat_symlinks(self):
tmp_dir = self.mkdtemp()
src = os.path.join(tmp_dir, 'foo')
dst = os.path.join(tmp_dir, 'bar')
src_link = os.path.join(tmp_dir, 'baz')
dst_link = os.path.join(tmp_dir, 'qux')
write_file(src, 'foo')
src_stat = os.stat(src)
os.utime(src, (src_stat.st_atime,
src_stat.st_mtime - 42.0)) # ensure different mtimes
write_file(dst, 'bar')
self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
os.symlink(src, src_link)
os.symlink(dst, dst_link)
if hasattr(os, 'lchmod'):
os.lchmod(src_link, stat.S_IRWXO)
if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
os.lchflags(src_link, stat.UF_NODUMP)
src_link_stat = os.lstat(src_link)
# follow
if hasattr(os, 'lchmod'):
shutil.copystat(src_link, dst_link, follow_symlinks=True)
self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
# don't follow
shutil.copystat(src_link, dst_link, follow_symlinks=False)
dst_link_stat = os.lstat(dst_link)
if os.utime in os.supports_follow_symlinks:
for attr in 'st_atime', 'st_mtime':
# The modification times may be truncated in the new file.
self.assertLessEqual(getattr(src_link_stat, attr),
getattr(dst_link_stat, attr) + 1)
if hasattr(os, 'lchmod'):
self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
# tell to follow but dst is not a link
shutil.copystat(src_link, dst, follow_symlinks=False)
self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
00000.1)
def test_copy2_symlinks(self):
tmp_dir = self.mkdtemp()
src = os.path.join(tmp_dir, 'foo')
dst = os.path.join(tmp_dir, 'bar')
src_link = os.path.join(tmp_dir, 'baz')
write_file(src, 'foo')
os.symlink(src, src_link)
if hasattr(os, 'lchmod'):
os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
os.lchflags(src_link, stat.UF_NODUMP)
src_stat = os.stat(src)
src_link_stat = os.lstat(src_link)
# follow
shutil.copy2(src_link, dst, follow_symlinks=True)
self.assertFalse(os.path.islink(dst))
self.assertEqual(read_file(src), read_file(dst))
os.remove(dst)
# don't follow
shutil.copy2(src_link, dst, follow_symlinks=False)
self.assertTrue(os.path.islink(dst))
self.assertEqual(os.readlink(dst), os.readlink(src_link))
dst_stat = os.lstat(dst)
if os.utime in os.supports_follow_symlinks:
for attr in 'st_atime', 'st_mtime':
# The modification times may be truncated in the new file.
self.assertLessEqual(getattr(src_link_stat, attr),
getattr(dst_stat, attr) + 1)
if hasattr(os, 'lchmod'):
self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode)
self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode)
if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
def cleanup_test_droppings(testname, verbose):
import shutil
import stat
import gc
# First kill any dangling references to open files etc.
# This can also issue some ResourceWarnings which would otherwise get
# triggered during the following test run, and possibly produce failures.
gc.collect()
# Try to clean up junk commonly left behind. While tests shouldn't leave
# any files or directories behind, when a test fails that can be tedious
# for it to arrange. The consequences can be especially nasty on Windows,
# since if a test leaves a file open, it cannot be deleted by name (while
# there's nothing we can do about that here either, we can display the
# name of the offending test, which is a real help).
for name in (support.TESTFN,
"db_home",
):
if not os.path.exists(name):
continue
if os.path.isdir(name):
kind, nuker = "directory", shutil.rmtree
elif os.path.isfile(name):
kind, nuker = "file", os.unlink
else:
raise SystemError("os.path says %r exists but is neither "
"directory nor file" % name)
if verbose:
print("%r left behind %s %r" % (testname, kind, name))
try:
# if we have chmod, fix possible permissions problems
# that might prevent cleanup
if (hasattr(os, 'chmod')):
os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
nuker(name)
except Exception as msg:
print(("%r left behind %s %r and it couldn't be "
"removed: %s" % (testname, kind, name, msg)), file=sys.stderr)
def test_mode(self):
with open(TESTFN, 'w'):
pass
if os.name == 'posix':
os.chmod(TESTFN, 0o700)
st_mode, modestr = self.get_mode()
self.assertEqual(modestr, '-rwx------')
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IMODE(st_mode),
stat.S_IRWXU)
os.chmod(TESTFN, 0o070)
st_mode, modestr = self.get_mode()
self.assertEqual(modestr, '----rwx---')
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IMODE(st_mode),
stat.S_IRWXG)
os.chmod(TESTFN, 0o007)
st_mode, modestr = self.get_mode()
self.assertEqual(modestr, '-------rwx')
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IMODE(st_mode),
stat.S_IRWXO)
os.chmod(TESTFN, 0o444)
st_mode, modestr = self.get_mode()
self.assertS_IS("REG", st_mode)
self.assertEqual(modestr, '-r--r--r--')
self.assertEqual(stat.S_IMODE(st_mode), 0o444)
else:
os.chmod(TESTFN, 0o700)
st_mode, modestr = self.get_mode()
self.assertEqual(modestr[:3], '-rw')
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IFMT(st_mode),
stat.S_IFREG)
def check_ssh_private_key_filemode(ssh_private_key):
# type: (pathlib.Path) -> bool
"""Check SSH private key filemode
:param pathlib.Path ssh_private_key: SSH private key
:rtype: bool
:return: private key filemode is ok
"""
def _mode_check(fstat, flag):
return bool(fstat & flag)
if util.on_windows():
return True
fstat = ssh_private_key.stat().st_mode
modes = frozenset((stat.S_IRWXG, stat.S_IRWXO))
return not any([_mode_check(fstat, x) for x in modes])
def validate_ssh_key_path(ssh_key_path):
assert os.path.isfile(ssh_key_path), 'could not find ssh private key: {}'.format(ssh_key_path)
assert stat.S_IMODE(
os.stat(ssh_key_path).st_mode) & (stat.S_IRWXG + stat.S_IRWXO) == 0, (
'ssh_key_path must be only read / write / executable by the owner. It may not be read / write / executable '
'by group, or other.')
with open(ssh_key_path) as fh:
assert 'ENCRYPTED' not in fh.read(), ('Encrypted SSH keys (which contain passphrases) '
'are not allowed. Use a key without a passphrase.')
def check(self):
"""Verify correct permissions and ownership of database files."""
scheme, _, file, _, _ = urlsplit(settings.DB_URI)
if scheme != 'file':
log.warning('DB_URI does not point to a file but %s -- forgot to set "BUILTIN_ZEO = False"?', scheme)
return
try:
st = os.stat(file)
except FileNotFoundError:
# Database not created yet, not an error.
return
error = False
if st.st_uid != os.geteuid():
log.error('Database file %s has wrong owner: %d (file) vs %d (daemon user)', file, st.st_uid, os.geteuid())
error = True
if st.st_gid != os.getegid():
log.error('Database file %s has wrong group: %d (file) vs %d (daemon user)', file, st.st_gid, os.getegid())
# not a critical error, could be, theoretically, on purpose
if error:
sys.exit(1)
# Fix perms, if any
perms = stat.S_IMODE(st.st_mode)
perms_should_be = perms & ~stat.S_IRWXO
if perms != perms_should_be:
try:
os.chmod(file, perms_should_be)
except OSError as ose:
log.debug('Tried to fix permissions on database file, but it didn\'t work: %s', file, ose)
def get_tree(self, path):
path = path.lstrip(os.sep.encode('utf-8'))
directory = os.path.join(self.root, path)
tree = Tree()
directory_items = self._sftp.listdir_attr_b(directory)
for fattr in directory_items:
filename = fattr.filename
item = {}
if stat.S_ISREG(fattr.st_mode):
item['type'] = 'blob'
item['filetype'] = 'regular'
elif stat.S_ISDIR(fattr.st_mode):
item['type'] = 'tree'
item['filetype'] = 'directory'
elif stat.S_ISLNK(fattr.st_mode):
item['filetype'] = 'link'
item['link'] = self._sftp.readlink(os.path.join(directory, filename)).encode('utf8')
elif stat.S_ISFIFO(fattr.st_mode):
item['filetype'] = 'fifo'
else:
continue # FIXME: Warn
fmode = fattr.st_mode & (stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO | stat.S_ISVTX)
item['uid'] = fattr.st_uid
item['gid'] = fattr.st_gid
item['mode'] = fmode
item['mtime'] = int(fattr.st_mtime)
item['size'] = fattr.st_size
tree.add(filename, item)
return tree
def get_tree(self, path):
path = path.lstrip(os.sep.encode('utf-8'))
directory = os.path.join(self.root, path)
tree = Tree()
try:
directory_items = os.listdir(directory)
except OSError as err:
raise RemoteOperationError(err.strerror)
for filename in directory_items:
assert isinstance(filename, bytes)
item = {}
fullname = os.path.join(directory, filename)
fstat = os.lstat(fullname)
if stat.S_ISREG(fstat.st_mode):
item['type'] = 'blob'
item['filetype'] = 'regular'
elif stat.S_ISDIR(fstat.st_mode):
item['type'] = 'tree'
item['filetype'] = 'directory'
elif stat.S_ISLNK(fstat.st_mode):
item['filetype'] = 'link'
item['link'] = os.readlink(os.path.join(directory, filename))
elif stat.S_ISFIFO(fstat.st_mode):
item['filetype'] = 'fifo'
else:
continue # FIXME: Warn
fmode = fstat.st_mode & (stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO | stat.S_ISVTX)
item['uid'] = fstat.st_uid
item['gid'] = fstat.st_gid
item['mode'] = fmode
item['atime'] = int(fstat.st_atime)
item['mtime'] = int(fstat.st_mtime)
item['ctime'] = int(fstat.st_ctime)
item['size'] = fstat.st_size
tree.add(filename, item)
return tree
def create_tempdir():
"""Create a directory within the system temp directory used to create temp files."""
try:
if os.path.isdir(tempdir):
shutil.rmtree(tempdir)
os.mkdir(tempdir)
# Make sure the directory can be removed by anyone in case the user
# runs ST later as another user.
os.chmod(tempdir, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
except PermissionError:
if sublime.platform() != 'windows':
current_user = pwd.getpwuid(os.geteuid())[0]
temp_uid = os.stat(tempdir).st_uid
temp_user = pwd.getpwuid(temp_uid)[0]
message = (
'The SublimeLinter temp directory:\n\n{0}\n\ncould not be cleared '
'because it is owned by \'{1}\' and you are logged in as \'{2}\'. '
'Please use sudo to remove the temp directory from a terminal.'
).format(tempdir, temp_user, current_user)
else:
message = (
'The SublimeLinter temp directory ({}) could not be reset '
'because it belongs to a different user.'
).format(tempdir)
sublime.error_message(message)
from . import persist
persist.debug('temp directory:', tempdir)
def test_ensure_key_pair_nonexist_local(runner, fake_ec2_resource, fake_ec2_client, fake_ssh_path_function, fake_key_pair_info):
fake_ec2_client.create_key_pair.return_value = {
'KeyMaterial': 'test key'
}
with mock.patch('boto3.resource') as mocked_resource, mock.patch('boto3.client') as mocked_client, mock.patch('deploy.utils.decorators._ssh_path', new=fake_ssh_path_function):
mocked_resource.return_value = fake_ec2_resource
mocked_client.return_value = fake_ec2_client
result = runner.invoke(dummy, input='\n')
assert result.exit_code == exit_codes.SUCCESS
fake_ec2_resource.KeyPair.assert_called_once_with('test')
fake_key_pair_info.delete.assert_called_once_with()
fake_ec2_client.create_key_pair.assert_called_once_with(KeyName='test')
with open(fake_ssh_path_function('test'), 'r') as f:
assert f.read() == 'test key'
s = os.stat(fake_ssh_path_function('test')).st_mode
# owner
assert bool(s & stat.S_IRUSR)
assert bool(s & stat.S_IWUSR)
assert not bool(s & stat.S_IXUSR)
# group
assert not bool(s & stat.S_IRWXG)
# others
assert not bool(s & stat.S_IRWXO)
def test_ensure_key_pair_nonexist_local_private(runner, fake_ec2_resource, fake_ec2_client, fake_ssh_path_function, fake_key_pair_info):
fake_ec2_client.create_key_pair.return_value = {
'KeyMaterial': 'test key'
}
type(fake_key_pair_info).key_fingerprint = mock.PropertyMock(side_effect=FakeClientError)
assert not os.system('touch {}'.format(fake_ssh_path_function('test')))
with mock.patch('boto3.resource') as mocked_resource, mock.patch('boto3.client') as mocked_client, mock.patch('deploy.utils.decorators._ssh_path', new=fake_ssh_path_function):
mocked_resource.return_value = fake_ec2_resource
mocked_client.return_value = fake_ec2_client
result = runner.invoke(dummy, input='\n')
assert result.exit_code == exit_codes.SUCCESS
fake_ec2_resource.KeyPair.assert_called_once_with('test')
fake_key_pair_info.delete.assert_called_once_with()
fake_ec2_client.create_key_pair.assert_called_once_with(KeyName='test')
with open(fake_ssh_path_function('test'), 'r') as f:
assert f.read() == 'test key'
s = os.stat(fake_ssh_path_function('test')).st_mode
# owner
assert bool(s & stat.S_IRUSR)
assert bool(s & stat.S_IWUSR)
assert not bool(s & stat.S_IXUSR)
# group
assert not bool(s & stat.S_IRWXG)
# others
assert not bool(s & stat.S_IRWXO)
def test_ensure_key_pair_nonexist_local_public(runner, fake_ec2_resource, fake_ec2_client, fake_ssh_path_function, fake_key_pair_info):
fake_ec2_client.create_key_pair.return_value = {
'KeyMaterial': 'test key'
}
assert not os.system('touch {}.pub'.format(fake_ssh_path_function('test')))
with mock.patch('boto3.resource') as mocked_resource, mock.patch('boto3.client') as mocked_client, mock.patch('deploy.utils.decorators._ssh_path', new=fake_ssh_path_function):
mocked_resource.return_value = fake_ec2_resource
mocked_client.return_value = fake_ec2_client
result = runner.invoke(dummy, input='\n')
assert result.exit_code == exit_codes.SUCCESS
fake_ec2_resource.KeyPair.assert_called_once_with('test')
fake_key_pair_info.delete.assert_called_once_with()
fake_ec2_client.create_key_pair.assert_called_once_with(KeyName='test')
with open(fake_ssh_path_function('test'), 'r') as f:
assert f.read() == 'test key'
s = os.stat(fake_ssh_path_function('test')).st_mode
# owner
assert bool(s & stat.S_IRUSR)
assert bool(s & stat.S_IWUSR)
assert not bool(s & stat.S_IXUSR)
# group
assert not bool(s & stat.S_IRWXG)
# others
assert not bool(s & stat.S_IRWXO)
def cleanup_test_droppings(testname, verbose):
import stat
import gc
# First kill any dangling references to open files etc.
gc.collect()
# Try to clean up junk commonly left behind. While tests shouldn't leave
# any files or directories behind, when a test fails that can be tedious
# for it to arrange. The consequences can be especially nasty on Windows,
# since if a test leaves a file open, it cannot be deleted by name (while
# there's nothing we can do about that here either, we can display the
# name of the offending test, which is a real help).
for name in (test_support.TESTFN,
"db_home",
):
if not os.path.exists(name):
continue
if os.path.isdir(name):
kind, nuker = "directory", shutil.rmtree
elif os.path.isfile(name):
kind, nuker = "file", os.unlink
else:
raise SystemError("os.path says %r exists but is neither "
"directory nor file" % name)
if verbose:
print "%r left behind %s %r" % (testname, kind, name)
try:
# if we have chmod, fix possible permissions problems
# that might prevent cleanup
if (hasattr(os, 'chmod')):
os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
nuker(name)
except Exception, msg:
print >> sys.stderr, ("%r left behind %s %r and it couldn't be "
"removed: %s" % (testname, kind, name, msg))
def test_mode(self):
with open(TESTFN, 'w'):
pass
if os.name == 'posix':
os.chmod(TESTFN, 0o700)
st_mode = self.get_mode()
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IMODE(st_mode),
stat.S_IRWXU)
os.chmod(TESTFN, 0o070)
st_mode = self.get_mode()
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IMODE(st_mode),
stat.S_IRWXG)
os.chmod(TESTFN, 0o007)
st_mode = self.get_mode()
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IMODE(st_mode),
stat.S_IRWXO)
os.chmod(TESTFN, 0o444)
st_mode = self.get_mode()
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IMODE(st_mode), 0o444)
else:
os.chmod(TESTFN, 0o700)
st_mode = self.get_mode()
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IFMT(st_mode),
stat.S_IFREG)
def test_copymode_follow_symlinks(self):
tmp_dir = self.mkdtemp()
src = os.path.join(tmp_dir, 'foo')
dst = os.path.join(tmp_dir, 'bar')
src_link = os.path.join(tmp_dir, 'baz')
dst_link = os.path.join(tmp_dir, 'quux')
write_file(src, 'foo')
write_file(dst, 'foo')
os.symlink(src, src_link)
os.symlink(dst, dst_link)
os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
# file to file
os.chmod(dst, stat.S_IRWXO)
self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
shutil.copymode(src, dst)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
# On Windows, os.chmod does not follow symlinks (issue #15411)
if os.name != 'nt':
# follow src link
os.chmod(dst, stat.S_IRWXO)
shutil.copymode(src_link, dst)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
# follow dst link
os.chmod(dst, stat.S_IRWXO)
shutil.copymode(src, dst_link)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
# follow both links
os.chmod(dst, stat.S_IRWXO)
shutil.copymode(src_link, dst_link)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
def test_copymode_symlink_to_symlink(self):
tmp_dir = self.mkdtemp()
src = os.path.join(tmp_dir, 'foo')
dst = os.path.join(tmp_dir, 'bar')
src_link = os.path.join(tmp_dir, 'baz')
dst_link = os.path.join(tmp_dir, 'quux')
write_file(src, 'foo')
write_file(dst, 'foo')
os.symlink(src, src_link)
os.symlink(dst, dst_link)
os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
os.chmod(dst, stat.S_IRWXU)
os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
# link to link
os.lchmod(dst_link, stat.S_IRWXO)
shutil.copymode(src_link, dst_link, follow_symlinks=False)
self.assertEqual(os.lstat(src_link).st_mode,
os.lstat(dst_link).st_mode)
self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
# src link - use chmod
os.lchmod(dst_link, stat.S_IRWXO)
shutil.copymode(src_link, dst, follow_symlinks=False)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
# dst link - use chmod
os.lchmod(dst_link, stat.S_IRWXO)
shutil.copymode(src, dst_link, follow_symlinks=False)
self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)