python类setegid()的实例源码

pamauth.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def callIntoPAM(service, user, conv):
    """A testing hook.
    """
    pam = PAM.pam()
    pam.start(service)
    pam.set_item(PAM.PAM_USER, user)
    pam.set_item(PAM.PAM_CONV, conv)
    gid = os.getegid()
    uid = os.geteuid()
    os.setegid(0)
    os.seteuid(0)
    try:
        pam.authenticate() # these will raise
        pam.acct_mgmt()
        return 1
    finally:
        os.setegid(gid)
        os.seteuid(uid)
pamauth.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def callIntoPAM(service, user, conv):
    """A testing hook.
    """
    pam = PAM.pam()
    pam.start(service)
    pam.set_item(PAM.PAM_USER, user)
    pam.set_item(PAM.PAM_CONV, conv)
    gid = os.getegid()
    uid = os.geteuid()
    os.setegid(0)
    os.seteuid(0)
    try:
        pam.authenticate() # these will raise
        pam.acct_mgmt()
        return 1
    finally:
        os.setegid(gid)
        os.seteuid(uid)
daemon.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def main():
    #change to data directory if needed
    os.chdir("/root/data")
    #redirect outputs to a logfile
    sys.stdout = sys.stderr = Log(open(LOGFILE, 'a+'))
    #ensure the that the daemon runs a normal user
    os.setegid(103)     #set group first "pydaemon"
    os.seteuid(103)     #set user "pydaemon"
    #start the user program here:
    USERPROG()
util.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def switchUID(uid, gid, euid=False):
    if euid:
        setuid = os.seteuid
        setgid = os.setegid
    else:
        setuid = os.setuid
        setgid = os.setgid
    if gid is not None:
        setgid(gid)
    if uid is not None:
        initgroups(uid, gid)
        setuid(uid)
checkers.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def requestAvatarId(self, credentials):
        if pwd:
            try:
                cryptedPass = pwd.getpwnam(credentials.username)[1]
            except KeyError:
                return defer.fail(UnauthorizedLogin())
            else:
                if cryptedPass not in ['*', 'x'] and \
                    verifyCryptedPassword(cryptedPass, credentials.password):
                    return defer.succeed(credentials.username)
        if shadow:
            gid = os.getegid()
            uid = os.geteuid()
            os.setegid(0)
            os.seteuid(0)
            try:
                shadowPass = shadow.getspnam(credentials.username)[1]
            except KeyError:
                os.setegid(gid)
                os.seteuid(uid)
                return defer.fail(UnauthorizedLogin())
            os.setegid(gid)
            os.seteuid(uid)
            if verifyCryptedPassword(shadowPass, credentials.password):
                return defer.succeed(credentials.username)
            return defer.fail(UnauthorizedLogin())

        return defer.fail(UnauthorizedLogin())
checkers.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def checkKey(self, credentials):
        sshDir = os.path.expanduser('~%s/.ssh/' % credentials.username)
        if sshDir.startswith('~'): # didn't expand
            return 0
        uid, gid = os.geteuid(), os.getegid()
        ouid, ogid = pwd.getpwnam(credentials.username)[2:4]
        os.setegid(0)
        os.seteuid(0)
        os.setegid(ogid)
        os.seteuid(ouid)
        for name in ['authorized_keys2', 'authorized_keys']:
            if not os.path.exists(sshDir+name):
                continue
            lines = open(sshDir+name).xreadlines()
            os.setegid(0)
            os.seteuid(0)
            os.setegid(gid)
            os.seteuid(uid)
            for l in lines:
                l2 = l.split()
                if len(l2) < 2:
                    continue
                try:
                    if base64.decodestring(l2[1]) == credentials.blob:
                        return 1
                except binascii.Error:
                    continue
        return 0
unix.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def _runAsUser(self, f, *args, **kw):
        euid = os.geteuid()
        egid = os.getegid()
        groups = os.getgroups()
        uid, gid = self.getUserGroupId()
        os.setegid(0)
        os.seteuid(0)
        os.setgroups(self.getOtherGroups())
        os.setegid(gid)
        os.seteuid(uid)
        try:
            f = iter(f)
        except TypeError:
            f = [(f, args, kw)]
        try:
            for i in f:
                func = i[0]
                args = len(i)>1 and i[1] or ()
                kw = len(i)>2 and i[2] or {}
                r = func(*args, **kw)
        finally:
            os.setegid(0)
            os.seteuid(0)
            os.setgroups(groups)
            os.setegid(egid)
            os.seteuid(euid)
        return r
unix.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def getPtyOwnership(self):
        ttyGid = os.stat(self.ptyTuple[2])[5]
        uid, gid = self.avatar.getUserGroupId()
        euid, egid = os.geteuid(), os.getegid()
        os.setegid(0)
        os.seteuid(0)
        try:
            os.chown(self.ptyTuple[2], uid, ttyGid)
        finally:
            os.setegid(egid)
            os.seteuid(euid)
utils.py 文件源码 项目:skilled-hammer 作者: r00m 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def pull(directory):
    """
    Pulls latest changes with the user rights that owns the folder
    """
    try:
        st = os.stat(directory)
        logger.info("Pulling as {0}:{1}...".format(st.st_uid, st.st_gid))

        # order is important: after seteuid() call the effective UID isn't 0 anymore, so seteuid() will not be allowed
        os.setegid(st.st_uid)
        os.seteuid(st.st_gid)

        repo = git.Repo(directory)
        info = repo.remotes.origin.pull()[0]

        if info.flags & info.ERROR:
            logger.error("Pull failed: {0}".format(info.note))
            return False
        elif info.flags & info.REJECTED:
            logger.error("Could not merge after pull: {0}".format(info.note))
            return False
        elif info.flags & info.HEAD_UPTODATE:
            logger.info("Head is already up to date")
    except PermissionError:
        logger.error("Insufficient permissions to set uid/gid")
        return False
    finally:
        logger.info("Restoring root permissions")
        os.setegid(0)
        os.seteuid(0)

    return True
utils.py 文件源码 项目:skilled-hammer 作者: r00m 项目源码 文件源码 阅读 60 收藏 0 点赞 0 评论 0
def run(project, command, directory, slack_webhook_url):
    """
    Run the specified command as the user that owns the directory
    """
    try:
        st = os.stat(directory)

        # order is important: after seteuid() call the effective UID isn't 0 anymore, so seteuid() will not be allowed
        os.setegid(st.st_uid)
        os.seteuid(st.st_gid)

        logger.info("Changing working directory to '{0}'".format(directory))
        logger.info("Spawning background command '{0}' as {1}:{2} for '{3}'...".format(command, st.st_uid, st.st_gid, project))

        def background():
            """
            I don't care how long it takes to run the command, but Bitbucket gets angry when it takes longer
            than 10 seconds. My npm build takes around 15 secs, so I'd get 3 Webhooks from Bitbucket, because
            it thinks each Webhook timedout.

            Easy way out is to return response immediately and start a background thread that
            does all of the heavy lifting.
            """
            start_time = time.time()
            output = subprocess.check_output(command, shell=True, cwd=directory, stderr=subprocess.STDOUT)
            completed_in = time.time() - start_time

            logger.info("'{0}' background command finished in {1:.2f} seconds".format(project, completed_in))

            if slack_webhook_url:
                slack_notification(slack_webhook_url, "Deployed `{0}` in {1:.2f} seconds! :rocket:".format(project, completed_in), output)

        Thread(target=background).start()
    except PermissionError:
        logger.error("Insufficient permissions to set uid/gid")
    except subprocess.CalledProcessError as e:
        logger.error("Error: {0}".format(e.output))
    finally:
        logger.info("Restoring root permissions")
        os.setegid(0)
        os.seteuid(0)
test_os.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def test_setegid(self):
                if os.getuid() != 0:
                    self.assertRaises(os.error, os.setegid, 0)
                self.assertRaises(OverflowError, os.setegid, 1<<32)
test_process.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def setUp(self):
            safe_rmpath(TESTFN)
            TestProcess.setUp(self)
            os.setegid(1000)
            os.seteuid(1000)
test_process.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def tearDown(self):
            os.setegid(self.PROCESS_UID)
            os.seteuid(self.PROCESS_GID)
            TestProcess.tearDown(self)
test_os.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_setegid(self):
        if os.getuid() != 0:
            self.assertRaises(os.error, os.setegid, 0)
        self.assertRaises(OverflowError, os.setegid, 1<<32)
test_os.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_setegid(self):
        if os.getuid() != 0:
            self.assertRaises(os.error, os.setegid, 0)
        self.assertRaises(OverflowError, os.setegid, 1<<32)
util.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def switchUID(uid, gid, euid=False):
    if euid:
        setuid = os.seteuid
        setgid = os.setegid
    else:
        setuid = os.setuid
        setgid = os.setgid
    if gid is not None:
        setgid(gid)
    if uid is not None:
        initgroups(uid, gid)
        setuid(uid)
checkers.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def requestAvatarId(self, credentials):
        if pwd:
            try:
                cryptedPass = pwd.getpwnam(credentials.username)[1]
            except KeyError:
                return defer.fail(UnauthorizedLogin())
            else:
                if cryptedPass not in ['*', 'x'] and \
                    verifyCryptedPassword(cryptedPass, credentials.password):
                    return defer.succeed(credentials.username)
        if shadow:
            gid = os.getegid()
            uid = os.geteuid()
            os.setegid(0)
            os.seteuid(0)
            try:
                shadowPass = shadow.getspnam(credentials.username)[1]
            except KeyError:
                os.setegid(gid)
                os.seteuid(uid)
                return defer.fail(UnauthorizedLogin())
            os.setegid(gid)
            os.seteuid(uid)
            if verifyCryptedPassword(shadowPass, credentials.password):
                return defer.succeed(credentials.username)
            return defer.fail(UnauthorizedLogin())

        return defer.fail(UnauthorizedLogin())
checkers.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def checkKey(self, credentials):
        sshDir = os.path.expanduser('~%s/.ssh/' % credentials.username)
        if sshDir.startswith('~'): # didn't expand
            return 0
        uid, gid = os.geteuid(), os.getegid()
        ouid, ogid = pwd.getpwnam(credentials.username)[2:4]
        os.setegid(0)
        os.seteuid(0)
        os.setegid(ogid)
        os.seteuid(ouid)
        for name in ['authorized_keys2', 'authorized_keys']:
            if not os.path.exists(sshDir+name):
                continue
            lines = open(sshDir+name).xreadlines()
            os.setegid(0)
            os.seteuid(0)
            os.setegid(gid)
            os.seteuid(uid)
            for l in lines:
                l2 = l.split()
                if len(l2) < 2:
                    continue
                try:
                    if base64.decodestring(l2[1]) == credentials.blob:
                        return 1
                except binascii.Error:
                    continue
        return 0
unix.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def _runAsUser(self, f, *args, **kw):
        euid = os.geteuid()
        egid = os.getegid()
        groups = os.getgroups()
        uid, gid = self.getUserGroupId()
        os.setegid(0)
        os.seteuid(0)
        os.setgroups(self.getOtherGroups())
        os.setegid(gid)
        os.seteuid(uid)
        try:
            f = iter(f)
        except TypeError:
            f = [(f, args, kw)]
        try:
            for i in f:
                func = i[0]
                args = len(i)>1 and i[1] or ()
                kw = len(i)>2 and i[2] or {}
                r = func(*args, **kw)
        finally:
            os.setegid(0)
            os.seteuid(0)
            os.setgroups(groups)
            os.setegid(egid)
            os.seteuid(euid)
        return r
unix.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def getPtyOwnership(self):
        ttyGid = os.stat(self.ptyTuple[2])[5]
        uid, gid = self.avatar.getUserGroupId()
        euid, egid = os.geteuid(), os.getegid()
        os.setegid(0)
        os.seteuid(0)
        try:
            os.chown(self.ptyTuple[2], uid, ttyGid)
        finally:
            os.setegid(egid)
            os.seteuid(euid)
test_os.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def test_setegid(self):
                if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
                    self.assertRaises(os.error, os.setegid, 0)
                self.assertRaises(OverflowError, os.setegid, 1<<32)
test_os.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_setegid(self):
        if os.getuid() != 0:
            self.assertRaises(os.error, os.setegid, 0)
        self.assertRaises(OverflowError, os.setegid, 1<<32)
test_os.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_setegid(self):
        if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
            self.assertRaises(OSError, os.setegid, 0)
        self.assertRaises(OverflowError, os.setegid, 1<<32)
test_os.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_setegid(self):
                if os.getuid() != 0:
                    self.assertRaises(os.error, os.setegid, 0)
                self.assertRaises(OverflowError, os.setegid, 1<<32)
authorizers.py 文件源码 项目:sdk-samples 作者: cradlepoint 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def impersonate_user(self, username, password):
            """Change process effective user/group ids to reflect
            logged in user.
            """
            try:
                pwdstruct = pwd.getpwnam(username)
            except KeyError:
                raise AuthorizerError(self.msg_no_such_user)
            else:
                os.setegid(pwdstruct.pw_gid)
                os.seteuid(pwdstruct.pw_uid)
authorizers.py 文件源码 项目:sdk-samples 作者: cradlepoint 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def terminate_impersonation(self, username):
            """Revert process effective user/group IDs."""
            os.setegid(PROCESS_GID)
            os.seteuid(PROCESS_UID)
collector.py 文件源码 项目:navdoon 作者: farzadghanei 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def _change_process_user_group(self):
        # type: () -> None
        if self.user:
            self._log("changing process user to {}".format(self.user))
            os.seteuid(self.user)
        if self.group:
            self._log("changing process group to {}".format(self.group))
            os.setegid(self.group)
util.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def runAsEffectiveUser(euid, egid, function, *args, **kwargs):
    """
    Run the given function wrapped with seteuid/setegid calls.

    This will try to minimize the number of seteuid/setegid calls, comparing
    current and wanted permissions

    @param euid: effective UID used to call the function.
    @type euid: C{int}

    @type egid: effective GID used to call the function.
    @param egid: C{int}

    @param function: the function run with the specific permission.
    @type function: any callable

    @param *args: arguments passed to C{function}
    @param **kwargs: keyword arguments passed to C{function}
    """
    uid, gid = os.geteuid(), os.getegid()
    if uid == euid and gid == egid:
        return function(*args, **kwargs)
    else:
        if uid != 0 and (uid != euid or gid != egid):
            os.seteuid(0)
        if gid != egid:
            os.setegid(egid)
        if euid != 0 and (euid != uid or gid != egid):
            os.seteuid(euid)
        try:
            return function(*args, **kwargs)
        finally:
            if euid != 0 and (uid != euid or gid != egid):
                os.seteuid(0)
            if gid != egid:
                os.setegid(gid)
            if uid != 0 and (uid != euid or gid != egid):
                os.seteuid(uid)
unix.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _runAsUser(self, f, *args, **kw):
        euid = os.geteuid()
        egid = os.getegid()
        groups = os.getgroups()
        uid, gid = self.getUserGroupId()
        os.setegid(0)
        os.seteuid(0)
        os.setgroups(self.getOtherGroups())
        os.setegid(gid)
        os.seteuid(uid)
        try:
            f = iter(f)
        except TypeError:
            f = [(f, args, kw)]
        try:
            for i in f:
                func = i[0]
                args = len(i) > 1 and i[1] or ()
                kw = len(i) > 2 and i[2] or {}
                r = func(*args, **kw)
        finally:
            os.setegid(0)
            os.seteuid(0)
            os.setgroups(groups)
            os.setegid(egid)
            os.seteuid(euid)
        return r
unix.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def getPtyOwnership(self):
        ttyGid = os.stat(self.ptyTuple[2])[5]
        uid, gid = self.avatar.getUserGroupId()
        euid, egid = os.geteuid(), os.getegid()
        os.setegid(0)
        os.seteuid(0)
        try:
            os.chown(self.ptyTuple[2], uid, ttyGid)
        finally:
            os.setegid(egid)
            os.seteuid(euid)


问题


面经


文章

微信
公众号

扫码关注公众号