def validate_ssh_key_path(ssh_key_path):
assert os.path.isfile(ssh_key_path), 'could not find ssh private key: {}'.format(ssh_key_path)
assert stat.S_IMODE(
os.stat(ssh_key_path).st_mode) & (stat.S_IRWXG + stat.S_IRWXO) == 0, (
'ssh_key_path must be only read / write / executable by the owner. It may not be read / write / executable '
'by group, or other.')
with open(ssh_key_path) as fh:
assert 'ENCRYPTED' not in fh.read(), ('Encrypted SSH keys (which contain passphrases) '
'are not allowed. Use a key without a passphrase.')
python类S_IRWXG的实例源码
def get_tree(self, path):
path = path.lstrip(os.sep.encode('utf-8'))
directory = os.path.join(self.root, path)
tree = Tree()
directory_items = self._sftp.listdir_attr_b(directory)
for fattr in directory_items:
filename = fattr.filename
item = {}
if stat.S_ISREG(fattr.st_mode):
item['type'] = 'blob'
item['filetype'] = 'regular'
elif stat.S_ISDIR(fattr.st_mode):
item['type'] = 'tree'
item['filetype'] = 'directory'
elif stat.S_ISLNK(fattr.st_mode):
item['filetype'] = 'link'
item['link'] = self._sftp.readlink(os.path.join(directory, filename)).encode('utf8')
elif stat.S_ISFIFO(fattr.st_mode):
item['filetype'] = 'fifo'
else:
continue # FIXME: Warn
fmode = fattr.st_mode & (stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO | stat.S_ISVTX)
item['uid'] = fattr.st_uid
item['gid'] = fattr.st_gid
item['mode'] = fmode
item['mtime'] = int(fattr.st_mtime)
item['size'] = fattr.st_size
tree.add(filename, item)
return tree
def get_tree(self, path):
path = path.lstrip(os.sep.encode('utf-8'))
directory = os.path.join(self.root, path)
tree = Tree()
try:
directory_items = os.listdir(directory)
except OSError as err:
raise RemoteOperationError(err.strerror)
for filename in directory_items:
assert isinstance(filename, bytes)
item = {}
fullname = os.path.join(directory, filename)
fstat = os.lstat(fullname)
if stat.S_ISREG(fstat.st_mode):
item['type'] = 'blob'
item['filetype'] = 'regular'
elif stat.S_ISDIR(fstat.st_mode):
item['type'] = 'tree'
item['filetype'] = 'directory'
elif stat.S_ISLNK(fstat.st_mode):
item['filetype'] = 'link'
item['link'] = os.readlink(os.path.join(directory, filename))
elif stat.S_ISFIFO(fstat.st_mode):
item['filetype'] = 'fifo'
else:
continue # FIXME: Warn
fmode = fstat.st_mode & (stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO | stat.S_ISVTX)
item['uid'] = fstat.st_uid
item['gid'] = fstat.st_gid
item['mode'] = fmode
item['atime'] = int(fstat.st_atime)
item['mtime'] = int(fstat.st_mtime)
item['ctime'] = int(fstat.st_ctime)
item['size'] = fstat.st_size
tree.add(filename, item)
return tree
def create_tempdir():
"""Create a directory within the system temp directory used to create temp files."""
try:
if os.path.isdir(tempdir):
shutil.rmtree(tempdir)
os.mkdir(tempdir)
# Make sure the directory can be removed by anyone in case the user
# runs ST later as another user.
os.chmod(tempdir, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
except PermissionError:
if sublime.platform() != 'windows':
current_user = pwd.getpwuid(os.geteuid())[0]
temp_uid = os.stat(tempdir).st_uid
temp_user = pwd.getpwuid(temp_uid)[0]
message = (
'The SublimeLinter temp directory:\n\n{0}\n\ncould not be cleared '
'because it is owned by \'{1}\' and you are logged in as \'{2}\'. '
'Please use sudo to remove the temp directory from a terminal.'
).format(tempdir, temp_user, current_user)
else:
message = (
'The SublimeLinter temp directory ({}) could not be reset '
'because it belongs to a different user.'
).format(tempdir)
sublime.error_message(message)
from . import persist
persist.debug('temp directory:', tempdir)
def test_ensure_key_pair_nonexist_local(runner, fake_ec2_resource, fake_ec2_client, fake_ssh_path_function, fake_key_pair_info):
fake_ec2_client.create_key_pair.return_value = {
'KeyMaterial': 'test key'
}
with mock.patch('boto3.resource') as mocked_resource, mock.patch('boto3.client') as mocked_client, mock.patch('deploy.utils.decorators._ssh_path', new=fake_ssh_path_function):
mocked_resource.return_value = fake_ec2_resource
mocked_client.return_value = fake_ec2_client
result = runner.invoke(dummy, input='\n')
assert result.exit_code == exit_codes.SUCCESS
fake_ec2_resource.KeyPair.assert_called_once_with('test')
fake_key_pair_info.delete.assert_called_once_with()
fake_ec2_client.create_key_pair.assert_called_once_with(KeyName='test')
with open(fake_ssh_path_function('test'), 'r') as f:
assert f.read() == 'test key'
s = os.stat(fake_ssh_path_function('test')).st_mode
# owner
assert bool(s & stat.S_IRUSR)
assert bool(s & stat.S_IWUSR)
assert not bool(s & stat.S_IXUSR)
# group
assert not bool(s & stat.S_IRWXG)
# others
assert not bool(s & stat.S_IRWXO)
def test_ensure_key_pair_nonexist_local_private(runner, fake_ec2_resource, fake_ec2_client, fake_ssh_path_function, fake_key_pair_info):
fake_ec2_client.create_key_pair.return_value = {
'KeyMaterial': 'test key'
}
type(fake_key_pair_info).key_fingerprint = mock.PropertyMock(side_effect=FakeClientError)
assert not os.system('touch {}'.format(fake_ssh_path_function('test')))
with mock.patch('boto3.resource') as mocked_resource, mock.patch('boto3.client') as mocked_client, mock.patch('deploy.utils.decorators._ssh_path', new=fake_ssh_path_function):
mocked_resource.return_value = fake_ec2_resource
mocked_client.return_value = fake_ec2_client
result = runner.invoke(dummy, input='\n')
assert result.exit_code == exit_codes.SUCCESS
fake_ec2_resource.KeyPair.assert_called_once_with('test')
fake_key_pair_info.delete.assert_called_once_with()
fake_ec2_client.create_key_pair.assert_called_once_with(KeyName='test')
with open(fake_ssh_path_function('test'), 'r') as f:
assert f.read() == 'test key'
s = os.stat(fake_ssh_path_function('test')).st_mode
# owner
assert bool(s & stat.S_IRUSR)
assert bool(s & stat.S_IWUSR)
assert not bool(s & stat.S_IXUSR)
# group
assert not bool(s & stat.S_IRWXG)
# others
assert not bool(s & stat.S_IRWXO)
def test_ensure_key_pair_nonexist_local_public(runner, fake_ec2_resource, fake_ec2_client, fake_ssh_path_function, fake_key_pair_info):
fake_ec2_client.create_key_pair.return_value = {
'KeyMaterial': 'test key'
}
assert not os.system('touch {}.pub'.format(fake_ssh_path_function('test')))
with mock.patch('boto3.resource') as mocked_resource, mock.patch('boto3.client') as mocked_client, mock.patch('deploy.utils.decorators._ssh_path', new=fake_ssh_path_function):
mocked_resource.return_value = fake_ec2_resource
mocked_client.return_value = fake_ec2_client
result = runner.invoke(dummy, input='\n')
assert result.exit_code == exit_codes.SUCCESS
fake_ec2_resource.KeyPair.assert_called_once_with('test')
fake_key_pair_info.delete.assert_called_once_with()
fake_ec2_client.create_key_pair.assert_called_once_with(KeyName='test')
with open(fake_ssh_path_function('test'), 'r') as f:
assert f.read() == 'test key'
s = os.stat(fake_ssh_path_function('test')).st_mode
# owner
assert bool(s & stat.S_IRUSR)
assert bool(s & stat.S_IWUSR)
assert not bool(s & stat.S_IXUSR)
# group
assert not bool(s & stat.S_IRWXG)
# others
assert not bool(s & stat.S_IRWXO)
def cleanup_test_droppings(testname, verbose):
import stat
import gc
# First kill any dangling references to open files etc.
gc.collect()
# Try to clean up junk commonly left behind. While tests shouldn't leave
# any files or directories behind, when a test fails that can be tedious
# for it to arrange. The consequences can be especially nasty on Windows,
# since if a test leaves a file open, it cannot be deleted by name (while
# there's nothing we can do about that here either, we can display the
# name of the offending test, which is a real help).
for name in (test_support.TESTFN,
"db_home",
):
if not os.path.exists(name):
continue
if os.path.isdir(name):
kind, nuker = "directory", shutil.rmtree
elif os.path.isfile(name):
kind, nuker = "file", os.unlink
else:
raise SystemError("os.path says %r exists but is neither "
"directory nor file" % name)
if verbose:
print "%r left behind %s %r" % (testname, kind, name)
try:
# if we have chmod, fix possible permissions problems
# that might prevent cleanup
if (hasattr(os, 'chmod')):
os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
nuker(name)
except Exception, msg:
print >> sys.stderr, ("%r left behind %s %r and it couldn't be "
"removed: %s" % (testname, kind, name, msg))
def test_mode(self):
with open(TESTFN, 'w'):
pass
if os.name == 'posix':
os.chmod(TESTFN, 0o700)
st_mode = self.get_mode()
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IMODE(st_mode),
stat.S_IRWXU)
os.chmod(TESTFN, 0o070)
st_mode = self.get_mode()
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IMODE(st_mode),
stat.S_IRWXG)
os.chmod(TESTFN, 0o007)
st_mode = self.get_mode()
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IMODE(st_mode),
stat.S_IRWXO)
os.chmod(TESTFN, 0o444)
st_mode = self.get_mode()
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IMODE(st_mode), 0o444)
else:
os.chmod(TESTFN, 0o700)
st_mode = self.get_mode()
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IFMT(st_mode),
stat.S_IFREG)
def cleanup_test_droppings(testname, verbose):
import shutil
import stat
import gc
# First kill any dangling references to open files etc.
# This can also issue some ResourceWarnings which would otherwise get
# triggered during the following test run, and possibly produce failures.
gc.collect()
# Try to clean up junk commonly left behind. While tests shouldn't leave
# any files or directories behind, when a test fails that can be tedious
# for it to arrange. The consequences can be especially nasty on Windows,
# since if a test leaves a file open, it cannot be deleted by name (while
# there's nothing we can do about that here either, we can display the
# name of the offending test, which is a real help).
for name in (support.TESTFN,
"db_home",
):
if not os.path.exists(name):
continue
if os.path.isdir(name):
kind, nuker = "directory", shutil.rmtree
elif os.path.isfile(name):
kind, nuker = "file", os.unlink
else:
raise SystemError("os.path says %r exists but is neither "
"directory nor file" % name)
if verbose:
print("%r left behind %s %r" % (testname, kind, name))
try:
# if we have chmod, fix possible permissions problems
# that might prevent cleanup
if (hasattr(os, 'chmod')):
os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
nuker(name)
except Exception as msg:
print(("%r left behind %s %r and it couldn't be "
"removed: %s" % (testname, kind, name, msg)), file=sys.stderr)
def cleanup_test_droppings(testname, verbose):
import stat
import gc
# First kill any dangling references to open files etc.
gc.collect()
# Try to clean up junk commonly left behind. While tests shouldn't leave
# any files or directories behind, when a test fails that can be tedious
# for it to arrange. The consequences can be especially nasty on Windows,
# since if a test leaves a file open, it cannot be deleted by name (while
# there's nothing we can do about that here either, we can display the
# name of the offending test, which is a real help).
for name in (test_support.TESTFN,
"db_home",
):
if not os.path.exists(name):
continue
if os.path.isdir(name):
kind, nuker = "directory", shutil.rmtree
elif os.path.isfile(name):
kind, nuker = "file", os.unlink
else:
raise SystemError("os.path says %r exists but is neither "
"directory nor file" % name)
if verbose:
print "%r left behind %s %r" % (testname, kind, name)
try:
# if we have chmod, fix possible permissions problems
# that might prevent cleanup
if (hasattr(os, 'chmod')):
os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
nuker(name)
except Exception, msg:
print >> sys.stderr, ("%r left behind %s %r and it couldn't be "
"removed: %s" % (testname, kind, name, msg))
def _RmTreeHandleReadOnly(func, path, exc):
"""An error handling function for use with shutil.rmtree. This will
detect failures to remove read-only files, and will change their properties
prior to removing them. This is necessary on Windows as os.remove will return
an access error for read-only files, and git repos contain read-only
pack/index files.
"""
excvalue = exc[1]
if func in (os.rmdir, os.remove) and excvalue.errno == errno.EACCES:
_LOGGER.debug('Removing read-only path: %s', path)
os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
func(path)
else:
raise
def startDB(self):
import kinterbasdb
self.DB_NAME = os.path.join(self.DB_DIR, DBTestConnector.DB_NAME)
os.chmod(self.DB_DIR, stat.S_IRWXU + stat.S_IRWXG + stat.S_IRWXO)
sql = 'create database "%s" user "%s" password "%s"'
sql %= (self.DB_NAME, self.DB_USER, self.DB_PASS);
conn = kinterbasdb.create_database(sql)
conn.close()
def cleanup_test_droppings(testname, verbose):
import shutil
import stat
import gc
# First kill any dangling references to open files etc.
# This can also issue some ResourceWarnings which would otherwise get
# triggered during the following test run, and possibly produce failures.
gc.collect()
# Try to clean up junk commonly left behind. While tests shouldn't leave
# any files or directories behind, when a test fails that can be tedious
# for it to arrange. The consequences can be especially nasty on Windows,
# since if a test leaves a file open, it cannot be deleted by name (while
# there's nothing we can do about that here either, we can display the
# name of the offending test, which is a real help).
for name in (support.TESTFN,
"db_home",
):
if not os.path.exists(name):
continue
if os.path.isdir(name):
kind, nuker = "directory", shutil.rmtree
elif os.path.isfile(name):
kind, nuker = "file", os.unlink
else:
raise SystemError("os.path says %r exists but is neither "
"directory nor file" % name)
if verbose:
print("%r left behind %s %r" % (testname, kind, name))
try:
# if we have chmod, fix possible permissions problems
# that might prevent cleanup
if (hasattr(os, 'chmod')):
os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
nuker(name)
except Exception as msg:
print(("%r left behind %s %r and it couldn't be "
"removed: %s" % (testname, kind, name, msg)), file=sys.stderr)
def cli_generate_key(ctx, key_file, passphrase):
"""Generate Public and Private Keys."""
if os.path.isfile(key_file):
raise click.UsageError("%s already exists." % key_file)
key_file_pub = "%s.pub" % key_file
if os.path.isfile(key_file_pub):
raise click.UsageError("%s already exists." % key_file_pub)
if len(passphrase) < 8:
raise click.UsageError("Passphrase too short")
key_file_dir = os.path.dirname(key_file)
if key_file_dir and not os.path.isdir(key_file_dir):
os.mkdir(key_file_dir, 0o700)
if not key_file_dir:
# take currently dir if none was specified
key_file_dir = "."
if not os.access(key_file_dir, os.W_OK):
raise click.UsageError("Directory '%s' not read and writeable" % key_file_dir)
output_file_dir_stat = os.stat(key_file_dir).st_mode
if output_file_dir_stat & stat.S_IRWXG:
raise click.UsageError("Directory '%s' has group permissions and is insecure" % key_file_dir)
if output_file_dir_stat & stat.S_IRWXO:
raise click.UsageError("Directory '%s' has other permissions and is insecure" % key_file_dir)
p = nacl.signing.SigningKey.generate()
save_public_keyfile(key_file_pub, p.verify_key)
save_private_keyfile(key_file, p, passphrase)
def test_save(self, tmpdir):
first = PrivateKey.create()
fh = tmpdir.join('somefile')
first.save(fh.strpath)
second = PrivateKey.load(filename=fh.strpath)
assert first.dump() == second.dump()
bits = os.stat(fh.strpath).st_mode
perms = bits & (stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
assert perms == (stat.S_IRUSR | stat.S_IWUSR)
def handleRemoveReadonly(func, path, exc):
excvalue = exc[1]
if func in (os.rmdir, os.remove) and excvalue.errno == errno.EACCES:
os.chmod(path, stat.S_IRWXU| stat.S_IRWXG| stat.S_IRWXO) # 0777
func(path)
else:
raise
def test_store_pubkey(self, chown_mock):
store_pubkey('foo', self.tempdir, 'abc')
chown_mock.assert_called_once_with('-R', 'foo:foo', self.tempdir)
authorized_keys_file = os.path.join(self.tempdir,
'.ssh/authorized_keys')
authorized_keys = open(authorized_keys_file).read()
self.assertEqual(authorized_keys, 'abc\n')
file_stat = os.stat(authorized_keys_file).st_mode
access = file_stat & (stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
self.assertEqual(access, 0o0600)
ssh_dir = os.path.join(self.tempdir, '.ssh')
filestat_ssh = os.stat(ssh_dir).st_mode
access = filestat_ssh & (stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
self.assertEqual(access, 0o0700)
def _get_octal_mode_from_symbolic_perms(self, path_stat, user, perms):
prev_mode = stat.S_IMODE(path_stat.st_mode)
is_directory = stat.S_ISDIR(path_stat.st_mode)
has_x_permissions = (prev_mode & EXEC_PERM_BITS) > 0
apply_X_permission = is_directory or has_x_permissions
# Permission bits constants documented at:
# http://docs.python.org/2/library/stat.html#stat.S_ISUID
if apply_X_permission:
X_perms = {
'u': {'X': stat.S_IXUSR},
'g': {'X': stat.S_IXGRP},
'o': {'X': stat.S_IXOTH}
}
else:
X_perms = {
'u': {'X': 0},
'g': {'X': 0},
'o': {'X': 0}
}
user_perms_to_modes = {
'u': {
'r': stat.S_IRUSR,
'w': stat.S_IWUSR,
'x': stat.S_IXUSR,
's': stat.S_ISUID,
't': 0,
'u': prev_mode & stat.S_IRWXU,
'g': (prev_mode & stat.S_IRWXG) << 3,
'o': (prev_mode & stat.S_IRWXO) << 6 },
'g': {
'r': stat.S_IRGRP,
'w': stat.S_IWGRP,
'x': stat.S_IXGRP,
's': stat.S_ISGID,
't': 0,
'u': (prev_mode & stat.S_IRWXU) >> 3,
'g': prev_mode & stat.S_IRWXG,
'o': (prev_mode & stat.S_IRWXO) << 3 },
'o': {
'r': stat.S_IROTH,
'w': stat.S_IWOTH,
'x': stat.S_IXOTH,
's': 0,
't': stat.S_ISVTX,
'u': (prev_mode & stat.S_IRWXU) >> 6,
'g': (prev_mode & stat.S_IRWXG) >> 3,
'o': prev_mode & stat.S_IRWXO }
}
# Insert X_perms into user_perms_to_modes
for key, value in X_perms.items():
user_perms_to_modes[key].update(value)
or_reduce = lambda mode, perm: mode | user_perms_to_modes[user][perm]
return reduce(or_reduce, perms, 0)