python类S_IRWXG的实例源码

validate.py 文件源码 项目:dcos 作者: dcos 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def validate_ssh_key_path(ssh_key_path):
    assert os.path.isfile(ssh_key_path), 'could not find ssh private key: {}'.format(ssh_key_path)
    assert stat.S_IMODE(
        os.stat(ssh_key_path).st_mode) & (stat.S_IRWXG + stat.S_IRWXO) == 0, (
            'ssh_key_path must be only read / write / executable by the owner. It may not be read / write / executable '
            'by group, or other.')
    with open(ssh_key_path) as fh:
        assert 'ENCRYPTED' not in fh.read(), ('Encrypted SSH keys (which contain passphrases) '
                                              'are not allowed. Use a key without a passphrase.')
ssh.py 文件源码 项目:Marty 作者: NaPs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_tree(self, path):
        path = path.lstrip(os.sep.encode('utf-8'))
        directory = os.path.join(self.root, path)
        tree = Tree()
        directory_items = self._sftp.listdir_attr_b(directory)

        for fattr in directory_items:
            filename = fattr.filename
            item = {}
            if stat.S_ISREG(fattr.st_mode):
                item['type'] = 'blob'
                item['filetype'] = 'regular'
            elif stat.S_ISDIR(fattr.st_mode):
                item['type'] = 'tree'
                item['filetype'] = 'directory'
            elif stat.S_ISLNK(fattr.st_mode):
                item['filetype'] = 'link'
                item['link'] = self._sftp.readlink(os.path.join(directory, filename)).encode('utf8')
            elif stat.S_ISFIFO(fattr.st_mode):
                item['filetype'] = 'fifo'
            else:
                continue  # FIXME: Warn
            fmode = fattr.st_mode & (stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO | stat.S_ISVTX)

            item['uid'] = fattr.st_uid
            item['gid'] = fattr.st_gid
            item['mode'] = fmode
            item['mtime'] = int(fattr.st_mtime)
            item['size'] = fattr.st_size

            tree.add(filename, item)
        return tree
local.py 文件源码 项目:Marty 作者: NaPs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_tree(self, path):
        path = path.lstrip(os.sep.encode('utf-8'))
        directory = os.path.join(self.root, path)
        tree = Tree()
        try:
            directory_items = os.listdir(directory)
        except OSError as err:
            raise RemoteOperationError(err.strerror)

        for filename in directory_items:
            assert isinstance(filename, bytes)
            item = {}
            fullname = os.path.join(directory, filename)
            fstat = os.lstat(fullname)
            if stat.S_ISREG(fstat.st_mode):
                item['type'] = 'blob'
                item['filetype'] = 'regular'
            elif stat.S_ISDIR(fstat.st_mode):
                item['type'] = 'tree'
                item['filetype'] = 'directory'
            elif stat.S_ISLNK(fstat.st_mode):
                item['filetype'] = 'link'
                item['link'] = os.readlink(os.path.join(directory, filename))
            elif stat.S_ISFIFO(fstat.st_mode):
                item['filetype'] = 'fifo'
            else:
                continue  # FIXME: Warn
            fmode = fstat.st_mode & (stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO | stat.S_ISVTX)

            item['uid'] = fstat.st_uid
            item['gid'] = fstat.st_gid
            item['mode'] = fmode
            item['atime'] = int(fstat.st_atime)
            item['mtime'] = int(fstat.st_mtime)
            item['ctime'] = int(fstat.st_ctime)
            item['size'] = fstat.st_size

            tree.add(filename, item)
        return tree
util.py 文件源码 项目:sublimeTextConfig 作者: luoye-fe 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def create_tempdir():
    """Create a directory within the system temp directory used to create temp files."""
    try:
        if os.path.isdir(tempdir):
            shutil.rmtree(tempdir)

        os.mkdir(tempdir)

        # Make sure the directory can be removed by anyone in case the user
        # runs ST later as another user.
        os.chmod(tempdir, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)

    except PermissionError:
        if sublime.platform() != 'windows':
            current_user = pwd.getpwuid(os.geteuid())[0]
            temp_uid = os.stat(tempdir).st_uid
            temp_user = pwd.getpwuid(temp_uid)[0]
            message = (
                'The SublimeLinter temp directory:\n\n{0}\n\ncould not be cleared '
                'because it is owned by \'{1}\' and you are logged in as \'{2}\'. '
                'Please use sudo to remove the temp directory from a terminal.'
            ).format(tempdir, temp_user, current_user)
        else:
            message = (
                'The SublimeLinter temp directory ({}) could not be reset '
                'because it belongs to a different user.'
            ).format(tempdir)

        sublime.error_message(message)

    from . import persist
    persist.debug('temp directory:', tempdir)
test_decorators_ensure_key_pair.py 文件源码 项目:TigerHost 作者: naphatkrit 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_ensure_key_pair_nonexist_local(runner, fake_ec2_resource, fake_ec2_client, fake_ssh_path_function, fake_key_pair_info):
    fake_ec2_client.create_key_pair.return_value = {
        'KeyMaterial': 'test key'
    }
    with mock.patch('boto3.resource') as mocked_resource, mock.patch('boto3.client') as mocked_client, mock.patch('deploy.utils.decorators._ssh_path', new=fake_ssh_path_function):
        mocked_resource.return_value = fake_ec2_resource
        mocked_client.return_value = fake_ec2_client
        result = runner.invoke(dummy, input='\n')
    assert result.exit_code == exit_codes.SUCCESS
    fake_ec2_resource.KeyPair.assert_called_once_with('test')
    fake_key_pair_info.delete.assert_called_once_with()
    fake_ec2_client.create_key_pair.assert_called_once_with(KeyName='test')
    with open(fake_ssh_path_function('test'), 'r') as f:
        assert f.read() == 'test key'
    s = os.stat(fake_ssh_path_function('test')).st_mode

    # owner
    assert bool(s & stat.S_IRUSR)
    assert bool(s & stat.S_IWUSR)
    assert not bool(s & stat.S_IXUSR)

    # group
    assert not bool(s & stat.S_IRWXG)

    # others
    assert not bool(s & stat.S_IRWXO)
test_decorators_ensure_key_pair.py 文件源码 项目:TigerHost 作者: naphatkrit 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_ensure_key_pair_nonexist_local_private(runner, fake_ec2_resource, fake_ec2_client, fake_ssh_path_function, fake_key_pair_info):
    fake_ec2_client.create_key_pair.return_value = {
        'KeyMaterial': 'test key'
    }
    type(fake_key_pair_info).key_fingerprint = mock.PropertyMock(side_effect=FakeClientError)
    assert not os.system('touch {}'.format(fake_ssh_path_function('test')))
    with mock.patch('boto3.resource') as mocked_resource, mock.patch('boto3.client') as mocked_client, mock.patch('deploy.utils.decorators._ssh_path', new=fake_ssh_path_function):
        mocked_resource.return_value = fake_ec2_resource
        mocked_client.return_value = fake_ec2_client
        result = runner.invoke(dummy, input='\n')
    assert result.exit_code == exit_codes.SUCCESS
    fake_ec2_resource.KeyPair.assert_called_once_with('test')
    fake_key_pair_info.delete.assert_called_once_with()
    fake_ec2_client.create_key_pair.assert_called_once_with(KeyName='test')
    with open(fake_ssh_path_function('test'), 'r') as f:
        assert f.read() == 'test key'

    s = os.stat(fake_ssh_path_function('test')).st_mode
    # owner
    assert bool(s & stat.S_IRUSR)
    assert bool(s & stat.S_IWUSR)
    assert not bool(s & stat.S_IXUSR)

    # group
    assert not bool(s & stat.S_IRWXG)

    # others
    assert not bool(s & stat.S_IRWXO)
test_decorators_ensure_key_pair.py 文件源码 项目:TigerHost 作者: naphatkrit 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_ensure_key_pair_nonexist_local_public(runner, fake_ec2_resource, fake_ec2_client, fake_ssh_path_function, fake_key_pair_info):
    fake_ec2_client.create_key_pair.return_value = {
        'KeyMaterial': 'test key'
    }
    assert not os.system('touch {}.pub'.format(fake_ssh_path_function('test')))
    with mock.patch('boto3.resource') as mocked_resource, mock.patch('boto3.client') as mocked_client, mock.patch('deploy.utils.decorators._ssh_path', new=fake_ssh_path_function):
        mocked_resource.return_value = fake_ec2_resource
        mocked_client.return_value = fake_ec2_client
        result = runner.invoke(dummy, input='\n')
    assert result.exit_code == exit_codes.SUCCESS
    fake_ec2_resource.KeyPair.assert_called_once_with('test')
    fake_key_pair_info.delete.assert_called_once_with()
    fake_ec2_client.create_key_pair.assert_called_once_with(KeyName='test')
    with open(fake_ssh_path_function('test'), 'r') as f:
        assert f.read() == 'test key'

    s = os.stat(fake_ssh_path_function('test')).st_mode
    # owner
    assert bool(s & stat.S_IRUSR)
    assert bool(s & stat.S_IWUSR)
    assert not bool(s & stat.S_IXUSR)

    # group
    assert not bool(s & stat.S_IRWXG)

    # others
    assert not bool(s & stat.S_IRWXO)
regrtest.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def cleanup_test_droppings(testname, verbose):
    import stat
    import gc

    # First kill any dangling references to open files etc.
    gc.collect()

    # Try to clean up junk commonly left behind.  While tests shouldn't leave
    # any files or directories behind, when a test fails that can be tedious
    # for it to arrange.  The consequences can be especially nasty on Windows,
    # since if a test leaves a file open, it cannot be deleted by name (while
    # there's nothing we can do about that here either, we can display the
    # name of the offending test, which is a real help).
    for name in (test_support.TESTFN,
                 "db_home",
                ):
        if not os.path.exists(name):
            continue

        if os.path.isdir(name):
            kind, nuker = "directory", shutil.rmtree
        elif os.path.isfile(name):
            kind, nuker = "file", os.unlink
        else:
            raise SystemError("os.path says %r exists but is neither "
                              "directory nor file" % name)

        if verbose:
            print "%r left behind %s %r" % (testname, kind, name)
        try:
            # if we have chmod, fix possible permissions problems
            # that might prevent cleanup
            if (hasattr(os, 'chmod')):
                os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
            nuker(name)
        except Exception, msg:
            print >> sys.stderr, ("%r left behind %s %r and it couldn't be "
                "removed: %s" % (testname, kind, name, msg))
test_stat.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_mode(self):
        with open(TESTFN, 'w'):
            pass
        if os.name == 'posix':
            os.chmod(TESTFN, 0o700)
            st_mode = self.get_mode()
            self.assertS_IS("REG", st_mode)
            self.assertEqual(stat.S_IMODE(st_mode),
                             stat.S_IRWXU)

            os.chmod(TESTFN, 0o070)
            st_mode = self.get_mode()
            self.assertS_IS("REG", st_mode)
            self.assertEqual(stat.S_IMODE(st_mode),
                             stat.S_IRWXG)

            os.chmod(TESTFN, 0o007)
            st_mode = self.get_mode()
            self.assertS_IS("REG", st_mode)
            self.assertEqual(stat.S_IMODE(st_mode),
                             stat.S_IRWXO)

            os.chmod(TESTFN, 0o444)
            st_mode = self.get_mode()
            self.assertS_IS("REG", st_mode)
            self.assertEqual(stat.S_IMODE(st_mode), 0o444)
        else:
            os.chmod(TESTFN, 0o700)
            st_mode = self.get_mode()
            self.assertS_IS("REG", st_mode)
            self.assertEqual(stat.S_IFMT(st_mode),
                             stat.S_IFREG)
regrtest.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def cleanup_test_droppings(testname, verbose):
    import shutil
    import stat
    import gc

    # First kill any dangling references to open files etc.
    # This can also issue some ResourceWarnings which would otherwise get
    # triggered during the following test run, and possibly produce failures.
    gc.collect()

    # Try to clean up junk commonly left behind.  While tests shouldn't leave
    # any files or directories behind, when a test fails that can be tedious
    # for it to arrange.  The consequences can be especially nasty on Windows,
    # since if a test leaves a file open, it cannot be deleted by name (while
    # there's nothing we can do about that here either, we can display the
    # name of the offending test, which is a real help).
    for name in (support.TESTFN,
                 "db_home",
                ):
        if not os.path.exists(name):
            continue

        if os.path.isdir(name):
            kind, nuker = "directory", shutil.rmtree
        elif os.path.isfile(name):
            kind, nuker = "file", os.unlink
        else:
            raise SystemError("os.path says %r exists but is neither "
                              "directory nor file" % name)

        if verbose:
            print("%r left behind %s %r" % (testname, kind, name))
        try:
            # if we have chmod, fix possible permissions problems
            # that might prevent cleanup
            if (hasattr(os, 'chmod')):
                os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
            nuker(name)
        except Exception as msg:
            print(("%r left behind %s %r and it couldn't be "
                "removed: %s" % (testname, kind, name, msg)), file=sys.stderr)
regrtest.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def cleanup_test_droppings(testname, verbose):
    import stat
    import gc

    # First kill any dangling references to open files etc.
    gc.collect()

    # Try to clean up junk commonly left behind.  While tests shouldn't leave
    # any files or directories behind, when a test fails that can be tedious
    # for it to arrange.  The consequences can be especially nasty on Windows,
    # since if a test leaves a file open, it cannot be deleted by name (while
    # there's nothing we can do about that here either, we can display the
    # name of the offending test, which is a real help).
    for name in (test_support.TESTFN,
                 "db_home",
                ):
        if not os.path.exists(name):
            continue

        if os.path.isdir(name):
            kind, nuker = "directory", shutil.rmtree
        elif os.path.isfile(name):
            kind, nuker = "file", os.unlink
        else:
            raise SystemError("os.path says %r exists but is neither "
                              "directory nor file" % name)

        if verbose:
            print "%r left behind %s %r" % (testname, kind, name)
        try:
            # if we have chmod, fix possible permissions problems
            # that might prevent cleanup
            if (hasattr(os, 'chmod')):
                os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
            nuker(name)
        except Exception, msg:
            print >> sys.stderr, ("%r left behind %s %r and it couldn't be "
                "removed: %s" % (testname, kind, name, msg))
get_syzygy_binaries.py 文件源码 项目:buildroot 作者: flutter 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _RmTreeHandleReadOnly(func, path, exc):
  """An error handling function for use with shutil.rmtree. This will
  detect failures to remove read-only files, and will change their properties
  prior to removing them. This is necessary on Windows as os.remove will return
  an access error for read-only files, and git repos contain read-only
  pack/index files.
  """
  excvalue = exc[1]
  if func in (os.rmdir, os.remove) and excvalue.errno == errno.EACCES:
    _LOGGER.debug('Removing read-only path: %s', path)
    os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
    func(path)
  else:
    raise
test_adbapi.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def startDB(self):
        import kinterbasdb
        self.DB_NAME = os.path.join(self.DB_DIR, DBTestConnector.DB_NAME)
        os.chmod(self.DB_DIR, stat.S_IRWXU + stat.S_IRWXG + stat.S_IRWXO)
        sql = 'create database "%s" user "%s" password "%s"'
        sql %= (self.DB_NAME, self.DB_USER, self.DB_PASS);
        conn = kinterbasdb.create_database(sql)
        conn.close()
regrtest.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def cleanup_test_droppings(testname, verbose):
    import shutil
    import stat
    import gc

    # First kill any dangling references to open files etc.
    # This can also issue some ResourceWarnings which would otherwise get
    # triggered during the following test run, and possibly produce failures.
    gc.collect()

    # Try to clean up junk commonly left behind.  While tests shouldn't leave
    # any files or directories behind, when a test fails that can be tedious
    # for it to arrange.  The consequences can be especially nasty on Windows,
    # since if a test leaves a file open, it cannot be deleted by name (while
    # there's nothing we can do about that here either, we can display the
    # name of the offending test, which is a real help).
    for name in (support.TESTFN,
                 "db_home",
                ):
        if not os.path.exists(name):
            continue

        if os.path.isdir(name):
            kind, nuker = "directory", shutil.rmtree
        elif os.path.isfile(name):
            kind, nuker = "file", os.unlink
        else:
            raise SystemError("os.path says %r exists but is neither "
                              "directory nor file" % name)

        if verbose:
            print("%r left behind %s %r" % (testname, kind, name))
        try:
            # if we have chmod, fix possible permissions problems
            # that might prevent cleanup
            if (hasattr(os, 'chmod')):
                os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
            nuker(name)
        except Exception as msg:
            print(("%r left behind %s %r and it couldn't be "
                "removed: %s" % (testname, kind, name, msg)), file=sys.stderr)
sodium11.py 文件源码 项目:sodium11 作者: trbs 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def cli_generate_key(ctx, key_file, passphrase):
    """Generate Public and Private Keys."""
    if os.path.isfile(key_file):
        raise click.UsageError("%s already exists." % key_file)

    key_file_pub = "%s.pub" % key_file
    if os.path.isfile(key_file_pub):
        raise click.UsageError("%s already exists." % key_file_pub)

    if len(passphrase) < 8:
        raise click.UsageError("Passphrase too short")

    key_file_dir = os.path.dirname(key_file)
    if key_file_dir and not os.path.isdir(key_file_dir):
        os.mkdir(key_file_dir, 0o700)

    if not key_file_dir:
        # take currently dir if none was specified
        key_file_dir = "."

    if not os.access(key_file_dir, os.W_OK):
        raise click.UsageError("Directory '%s' not read and writeable" % key_file_dir)

    output_file_dir_stat = os.stat(key_file_dir).st_mode
    if output_file_dir_stat & stat.S_IRWXG:
        raise click.UsageError("Directory '%s' has group permissions and is insecure" % key_file_dir)
    if output_file_dir_stat & stat.S_IRWXO:
        raise click.UsageError("Directory '%s' has other permissions and is insecure" % key_file_dir)

    p = nacl.signing.SigningKey.generate()

    save_public_keyfile(key_file_pub, p.verify_key)
    save_private_keyfile(key_file, p, passphrase)
test_rsa.py 文件源码 项目:humancrypto 作者: iffy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_save(self, tmpdir):
        first = PrivateKey.create()
        fh = tmpdir.join('somefile')
        first.save(fh.strpath)
        second = PrivateKey.load(filename=fh.strpath)
        assert first.dump() == second.dump()
        bits = os.stat(fh.strpath).st_mode
        perms = bits & (stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
        assert perms == (stat.S_IRUSR | stat.S_IWUSR)
driver.py 文件源码 项目:pyomo 作者: Pyomo 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def handleRemoveReadonly(func, path, exc):
  excvalue = exc[1]
  if func in (os.rmdir, os.remove) and excvalue.errno == errno.EACCES:
      os.chmod(path, stat.S_IRWXU| stat.S_IRWXG| stat.S_IRWXO) # 0777
      func(path)
  else:
      raise
index_tests.py 文件源码 项目:c-bastion 作者: ImmobilienScout24 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_store_pubkey(self, chown_mock):
        store_pubkey('foo', self.tempdir, 'abc')
        chown_mock.assert_called_once_with('-R', 'foo:foo', self.tempdir)
        authorized_keys_file = os.path.join(self.tempdir,
                                            '.ssh/authorized_keys')
        authorized_keys = open(authorized_keys_file).read()
        self.assertEqual(authorized_keys, 'abc\n')
        file_stat = os.stat(authorized_keys_file).st_mode
        access = file_stat & (stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
        self.assertEqual(access, 0o0600)

        ssh_dir = os.path.join(self.tempdir, '.ssh')
        filestat_ssh = os.stat(ssh_dir).st_mode
        access = filestat_ssh & (stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
        self.assertEqual(access, 0o0700)
basic.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def _get_octal_mode_from_symbolic_perms(self, path_stat, user, perms):
        prev_mode = stat.S_IMODE(path_stat.st_mode)

        is_directory = stat.S_ISDIR(path_stat.st_mode)
        has_x_permissions = (prev_mode & EXEC_PERM_BITS) > 0
        apply_X_permission = is_directory or has_x_permissions

        # Permission bits constants documented at:
        # http://docs.python.org/2/library/stat.html#stat.S_ISUID
        if apply_X_permission:
            X_perms = {
                'u': {'X': stat.S_IXUSR},
                'g': {'X': stat.S_IXGRP},
                'o': {'X': stat.S_IXOTH}
            }
        else:
            X_perms = {
                'u': {'X': 0},
                'g': {'X': 0},
                'o': {'X': 0}
            }

        user_perms_to_modes = {
            'u': {
                'r': stat.S_IRUSR,
                'w': stat.S_IWUSR,
                'x': stat.S_IXUSR,
                's': stat.S_ISUID,
                't': 0,
                'u': prev_mode & stat.S_IRWXU,
                'g': (prev_mode & stat.S_IRWXG) << 3,
                'o': (prev_mode & stat.S_IRWXO) << 6 },
            'g': {
                'r': stat.S_IRGRP,
                'w': stat.S_IWGRP,
                'x': stat.S_IXGRP,
                's': stat.S_ISGID,
                't': 0,
                'u': (prev_mode & stat.S_IRWXU) >> 3,
                'g': prev_mode & stat.S_IRWXG,
                'o': (prev_mode & stat.S_IRWXO) << 3 },
            'o': {
                'r': stat.S_IROTH,
                'w': stat.S_IWOTH,
                'x': stat.S_IXOTH,
                's': 0,
                't': stat.S_ISVTX,
                'u': (prev_mode & stat.S_IRWXU) >> 6,
                'g': (prev_mode & stat.S_IRWXG) >> 3,
                'o': prev_mode & stat.S_IRWXO }
        }

        # Insert X_perms into user_perms_to_modes
        for key, value in X_perms.items():
            user_perms_to_modes[key].update(value)

        or_reduce = lambda mode, perm: mode | user_perms_to_modes[user][perm]
        return reduce(or_reduce, perms, 0)


问题


面经


文章

微信
公众号

扫码关注公众号