python类seteuid()的实例源码

pamauth.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def callIntoPAM(service, user, conv):
    """A testing hook.
    """
    pam = PAM.pam()
    pam.start(service)
    pam.set_item(PAM.PAM_USER, user)
    pam.set_item(PAM.PAM_CONV, conv)
    gid = os.getegid()
    uid = os.geteuid()
    os.setegid(0)
    os.seteuid(0)
    try:
        pam.authenticate() # these will raise
        pam.acct_mgmt()
        return 1
    finally:
        os.setegid(gid)
        os.seteuid(uid)
postgresql_setup.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def create_user(no_utf8=False):
    curr_user = os.geteuid();
    target = pwd.getpwnam('postgres')
    try:
        os.seteuid(target.pw_uid)
        m = hashlib.md5()
        m.update(DB_PASSWORD)
        m.update(DB_USER)
        command = """CREATE USER %s PASSWORD 'md5%s'""" % (DB_USER, m.hexdigest())
        subprocess.check_call(['psql', '-c', command])
        if no_utf8:
            command = """CREATE DATABASE %s OWNER %s""" % (DB_NAME, DB_USER)
        else:
            command = """CREATE DATABASE %s OWNER %s ENCODING 'UTF8'""" % (DB_NAME, DB_USER)
        subprocess.check_call(['psql', '-c', command])
        os.seteuid(curr_user)
    except Exception, e:
        raise SystemExit(str(e))
    return target
test_interrupt.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_interrupted_systemcall(self):
        '''
        Make sure interrupted system calls don't break the world, since we
        can't control what all signals our connection thread will get
        '''
        if 'linux' not in platform:
            raise SkipTest('Unable to reproduce error case on'
                           ' non-linux platforms')

        path = 'interrupt_test'
        value = b"1"
        self.client.create(path, value)

        # set the euid to the current process' euid.
        # glibc sends SIGRT to all children, which will interrupt the
        # system call
        os.seteuid(os.geteuid())

        # basic sanity test that it worked alright
        assert self.client.get(path)[0] == value
pamauth.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def callIntoPAM(service, user, conv):
    """A testing hook.
    """
    pam = PAM.pam()
    pam.start(service)
    pam.set_item(PAM.PAM_USER, user)
    pam.set_item(PAM.PAM_CONV, conv)
    gid = os.getegid()
    uid = os.geteuid()
    os.setegid(0)
    os.seteuid(0)
    try:
        pam.authenticate() # these will raise
        pam.acct_mgmt()
        return 1
    finally:
        os.setegid(gid)
        os.seteuid(uid)
test_openssh_compat.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 53 收藏 0 点赞 0 评论 0
def test_getPrivateKeysAsRoot(self):
        """
        L{OpenSSHFactory.getPrivateKeys} should switch to root if the keys
        aren't readable by the current user.
        """
        keyFile = self.keysDir.child("ssh_host_two_key")
        # Fake permission error by changing the mode
        keyFile.chmod(0000)
        self.addCleanup(keyFile.chmod, 0o777)
        # And restore the right mode when seteuid is called
        savedSeteuid = os.seteuid
        def seteuid(euid):
            keyFile.chmod(0o777)
            return savedSeteuid(euid)
        self.patch(os, "seteuid", seteuid)
        keys = self.factory.getPrivateKeys()
        self.assertEqual(len(keys), 2)
        keyTypes = keys.keys()
        self.assertEqual(set(keyTypes), set([b'ssh-rsa', b'ssh-dss']))
        self.assertEqual(self.mockos.seteuidCalls, [0, os.geteuid()])
        self.assertEqual(self.mockos.setegidCalls, [0, os.getegid()])
test_interrupt.py 文件源码 项目:deb-kazoo 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_interrupted_systemcall(self):
        '''
        Make sure interrupted system calls don't break the world, since we
        can't control what all signals our connection thread will get
        '''
        if 'linux' not in platform:
            raise SkipTest('Unable to reproduce error case on'
                           ' non-linux platforms')

        path = 'interrupt_test'
        value = b"1"
        self.client.create(path, value)

        # set the euid to the current process' euid.
        # glibc sends SIGRT to all children, which will interrupt the
        # system call
        os.seteuid(os.geteuid())

        # basic sanity test that it worked alright
        assert self.client.get(path)[0] == value
daemon.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def main():
    #change to data directory if needed
    os.chdir("/root/data")
    #redirect outputs to a logfile
    sys.stdout = sys.stderr = Log(open(LOGFILE, 'a+'))
    #ensure the that the daemon runs a normal user
    os.setegid(103)     #set group first "pydaemon"
    os.seteuid(103)     #set user "pydaemon"
    #start the user program here:
    USERPROG()
util.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def switchUID(uid, gid, euid=False):
    if euid:
        setuid = os.seteuid
        setgid = os.setegid
    else:
        setuid = os.setuid
        setgid = os.setgid
    if gid is not None:
        setgid(gid)
    if uid is not None:
        initgroups(uid, gid)
        setuid(uid)
checkers.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def requestAvatarId(self, credentials):
        if pwd:
            try:
                cryptedPass = pwd.getpwnam(credentials.username)[1]
            except KeyError:
                return defer.fail(UnauthorizedLogin())
            else:
                if cryptedPass not in ['*', 'x'] and \
                    verifyCryptedPassword(cryptedPass, credentials.password):
                    return defer.succeed(credentials.username)
        if shadow:
            gid = os.getegid()
            uid = os.geteuid()
            os.setegid(0)
            os.seteuid(0)
            try:
                shadowPass = shadow.getspnam(credentials.username)[1]
            except KeyError:
                os.setegid(gid)
                os.seteuid(uid)
                return defer.fail(UnauthorizedLogin())
            os.setegid(gid)
            os.seteuid(uid)
            if verifyCryptedPassword(shadowPass, credentials.password):
                return defer.succeed(credentials.username)
            return defer.fail(UnauthorizedLogin())

        return defer.fail(UnauthorizedLogin())
checkers.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def checkKey(self, credentials):
        sshDir = os.path.expanduser('~%s/.ssh/' % credentials.username)
        if sshDir.startswith('~'): # didn't expand
            return 0
        uid, gid = os.geteuid(), os.getegid()
        ouid, ogid = pwd.getpwnam(credentials.username)[2:4]
        os.setegid(0)
        os.seteuid(0)
        os.setegid(ogid)
        os.seteuid(ouid)
        for name in ['authorized_keys2', 'authorized_keys']:
            if not os.path.exists(sshDir+name):
                continue
            lines = open(sshDir+name).xreadlines()
            os.setegid(0)
            os.seteuid(0)
            os.setegid(gid)
            os.seteuid(uid)
            for l in lines:
                l2 = l.split()
                if len(l2) < 2:
                    continue
                try:
                    if base64.decodestring(l2[1]) == credentials.blob:
                        return 1
                except binascii.Error:
                    continue
        return 0
unix.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def _runAsUser(self, f, *args, **kw):
        euid = os.geteuid()
        egid = os.getegid()
        groups = os.getgroups()
        uid, gid = self.getUserGroupId()
        os.setegid(0)
        os.seteuid(0)
        os.setgroups(self.getOtherGroups())
        os.setegid(gid)
        os.seteuid(uid)
        try:
            f = iter(f)
        except TypeError:
            f = [(f, args, kw)]
        try:
            for i in f:
                func = i[0]
                args = len(i)>1 and i[1] or ()
                kw = len(i)>2 and i[2] or {}
                r = func(*args, **kw)
        finally:
            os.setegid(0)
            os.seteuid(0)
            os.setgroups(groups)
            os.setegid(egid)
            os.seteuid(euid)
        return r
unix.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def getPtyOwnership(self):
        ttyGid = os.stat(self.ptyTuple[2])[5]
        uid, gid = self.avatar.getUserGroupId()
        euid, egid = os.geteuid(), os.getegid()
        os.setegid(0)
        os.seteuid(0)
        try:
            os.chown(self.ptyTuple[2], uid, ttyGid)
        finally:
            os.setegid(egid)
            os.seteuid(euid)
utils.py 文件源码 项目:skilled-hammer 作者: r00m 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def pull(directory):
    """
    Pulls latest changes with the user rights that owns the folder
    """
    try:
        st = os.stat(directory)
        logger.info("Pulling as {0}:{1}...".format(st.st_uid, st.st_gid))

        # order is important: after seteuid() call the effective UID isn't 0 anymore, so seteuid() will not be allowed
        os.setegid(st.st_uid)
        os.seteuid(st.st_gid)

        repo = git.Repo(directory)
        info = repo.remotes.origin.pull()[0]

        if info.flags & info.ERROR:
            logger.error("Pull failed: {0}".format(info.note))
            return False
        elif info.flags & info.REJECTED:
            logger.error("Could not merge after pull: {0}".format(info.note))
            return False
        elif info.flags & info.HEAD_UPTODATE:
            logger.info("Head is already up to date")
    except PermissionError:
        logger.error("Insufficient permissions to set uid/gid")
        return False
    finally:
        logger.info("Restoring root permissions")
        os.setegid(0)
        os.seteuid(0)

    return True
utils.py 文件源码 项目:skilled-hammer 作者: r00m 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run(project, command, directory, slack_webhook_url):
    """
    Run the specified command as the user that owns the directory
    """
    try:
        st = os.stat(directory)

        # order is important: after seteuid() call the effective UID isn't 0 anymore, so seteuid() will not be allowed
        os.setegid(st.st_uid)
        os.seteuid(st.st_gid)

        logger.info("Changing working directory to '{0}'".format(directory))
        logger.info("Spawning background command '{0}' as {1}:{2} for '{3}'...".format(command, st.st_uid, st.st_gid, project))

        def background():
            """
            I don't care how long it takes to run the command, but Bitbucket gets angry when it takes longer
            than 10 seconds. My npm build takes around 15 secs, so I'd get 3 Webhooks from Bitbucket, because
            it thinks each Webhook timedout.

            Easy way out is to return response immediately and start a background thread that
            does all of the heavy lifting.
            """
            start_time = time.time()
            output = subprocess.check_output(command, shell=True, cwd=directory, stderr=subprocess.STDOUT)
            completed_in = time.time() - start_time

            logger.info("'{0}' background command finished in {1:.2f} seconds".format(project, completed_in))

            if slack_webhook_url:
                slack_notification(slack_webhook_url, "Deployed `{0}` in {1:.2f} seconds! :rocket:".format(project, completed_in), output)

        Thread(target=background).start()
    except PermissionError:
        logger.error("Insufficient permissions to set uid/gid")
    except subprocess.CalledProcessError as e:
        logger.error("Error: {0}".format(e.output))
    finally:
        logger.info("Restoring root permissions")
        os.setegid(0)
        os.seteuid(0)
test_os.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_seteuid(self):
                if os.getuid() != 0:
                    self.assertRaises(os.error, os.seteuid, 0)
                self.assertRaises(OverflowError, os.seteuid, 1<<32)
test_process.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def setUp(self):
            safe_rmpath(TESTFN)
            TestProcess.setUp(self)
            os.setegid(1000)
            os.seteuid(1000)
test_process.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def tearDown(self):
            os.setegid(self.PROCESS_UID)
            os.seteuid(self.PROCESS_GID)
            TestProcess.tearDown(self)
test_os.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_seteuid(self):
        if os.getuid() != 0:
            self.assertRaises(os.error, os.seteuid, 0)
        self.assertRaises(OverflowError, os.seteuid, 1<<32)
test_os.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_seteuid(self):
        if os.getuid() != 0:
            self.assertRaises(os.error, os.seteuid, 0)
        self.assertRaises(OverflowError, os.seteuid, 1<<32)
util.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def switchUID(uid, gid, euid=False):
    if euid:
        setuid = os.seteuid
        setgid = os.setegid
    else:
        setuid = os.setuid
        setgid = os.setgid
    if gid is not None:
        setgid(gid)
    if uid is not None:
        initgroups(uid, gid)
        setuid(uid)
checkers.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def requestAvatarId(self, credentials):
        if pwd:
            try:
                cryptedPass = pwd.getpwnam(credentials.username)[1]
            except KeyError:
                return defer.fail(UnauthorizedLogin())
            else:
                if cryptedPass not in ['*', 'x'] and \
                    verifyCryptedPassword(cryptedPass, credentials.password):
                    return defer.succeed(credentials.username)
        if shadow:
            gid = os.getegid()
            uid = os.geteuid()
            os.setegid(0)
            os.seteuid(0)
            try:
                shadowPass = shadow.getspnam(credentials.username)[1]
            except KeyError:
                os.setegid(gid)
                os.seteuid(uid)
                return defer.fail(UnauthorizedLogin())
            os.setegid(gid)
            os.seteuid(uid)
            if verifyCryptedPassword(shadowPass, credentials.password):
                return defer.succeed(credentials.username)
            return defer.fail(UnauthorizedLogin())

        return defer.fail(UnauthorizedLogin())
checkers.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def checkKey(self, credentials):
        sshDir = os.path.expanduser('~%s/.ssh/' % credentials.username)
        if sshDir.startswith('~'): # didn't expand
            return 0
        uid, gid = os.geteuid(), os.getegid()
        ouid, ogid = pwd.getpwnam(credentials.username)[2:4]
        os.setegid(0)
        os.seteuid(0)
        os.setegid(ogid)
        os.seteuid(ouid)
        for name in ['authorized_keys2', 'authorized_keys']:
            if not os.path.exists(sshDir+name):
                continue
            lines = open(sshDir+name).xreadlines()
            os.setegid(0)
            os.seteuid(0)
            os.setegid(gid)
            os.seteuid(uid)
            for l in lines:
                l2 = l.split()
                if len(l2) < 2:
                    continue
                try:
                    if base64.decodestring(l2[1]) == credentials.blob:
                        return 1
                except binascii.Error:
                    continue
        return 0
unix.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _runAsUser(self, f, *args, **kw):
        euid = os.geteuid()
        egid = os.getegid()
        groups = os.getgroups()
        uid, gid = self.getUserGroupId()
        os.setegid(0)
        os.seteuid(0)
        os.setgroups(self.getOtherGroups())
        os.setegid(gid)
        os.seteuid(uid)
        try:
            f = iter(f)
        except TypeError:
            f = [(f, args, kw)]
        try:
            for i in f:
                func = i[0]
                args = len(i)>1 and i[1] or ()
                kw = len(i)>2 and i[2] or {}
                r = func(*args, **kw)
        finally:
            os.setegid(0)
            os.seteuid(0)
            os.setgroups(groups)
            os.setegid(egid)
            os.seteuid(euid)
        return r
unix.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def getPtyOwnership(self):
        ttyGid = os.stat(self.ptyTuple[2])[5]
        uid, gid = self.avatar.getUserGroupId()
        euid, egid = os.geteuid(), os.getegid()
        os.setegid(0)
        os.seteuid(0)
        try:
            os.chown(self.ptyTuple[2], uid, ttyGid)
        finally:
            os.setegid(egid)
            os.seteuid(euid)
test_os.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_seteuid(self):
                if os.getuid() != 0:
                    self.assertRaises(os.error, os.seteuid, 0)
                self.assertRaises(OverflowError, os.seteuid, 1<<32)
test_os.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_seteuid(self):
        if os.getuid() != 0:
            self.assertRaises(os.error, os.seteuid, 0)
        self.assertRaises(OverflowError, os.seteuid, 1<<32)
test_os.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_seteuid(self):
        if os.getuid() != 0:
            self.assertRaises(OSError, os.seteuid, 0)
        self.assertRaises(OverflowError, os.seteuid, 1<<32)
test_os.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_seteuid(self):
                if os.getuid() != 0:
                    self.assertRaises(os.error, os.seteuid, 0)
                self.assertRaises(OverflowError, os.seteuid, 1<<32)
start.py 文件源码 项目:mamonsu 作者: postgrespro 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _try_run_as_postgres(self):
        if platform.LINUX and os.getegid() == 0:
            try:
                uid = pwd.getpwnam('postgres').pw_uid
                os.seteuid(uid)
                return True
            except Exception as e:
                logging.error('Failed run as postgres: {0}'.format(e))
                pass
        return False
start.py 文件源码 项目:mamonsu 作者: postgrespro 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _try_run_as_postgres(self):
        if platform.UNIX and os.getegid() == 0:
            try:
                import pwd
                uid = pwd.getpwnam('postgres').pw_uid
                os.seteuid(uid)
                return True
            except Exception as e:
                sys.stderr.write("Failed run as postgres: {0}\n".format(e))
                pass
        return False


问题


面经


文章

微信
公众号

扫码关注公众号