python类fail()的实例源码

sshpot.py 文件源码 项目:netwatch 作者: recourse 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def requestAvatarId(self, credentials):
        return defer.fail(error.UnauthorizedLogin())
service.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def addUser(self, user):
        if user.name in self.users:
            return defer.fail(failure.Failure(ewords.DuplicateUser()))
        self.users[user.name] = user
        return defer.succeed(user)
service.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def addGroup(self, group):
        if group.name in self.groups:
            return defer.fail(failure.Failure(ewords.DuplicateGroup()))
        self.groups[group.name] = group
        return defer.succeed(group)
service.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def lookupUser(self, name):
        assert isinstance(name, unicode)
        name = name.lower()
        try:
            user = self.users[name]
        except KeyError:
            return defer.fail(failure.Failure(ewords.NoSuchUser(name)))
        else:
            return defer.succeed(user)
service.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def lookupGroup(self, name):
        assert isinstance(name, unicode)
        name = name.lower()
        try:
            group = self.groups[name]
        except KeyError:
            return defer.fail(failure.Failure(ewords.NoSuchGroup(name)))
        else:
            return defer.succeed(group)
sasl.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def start(self):
        """
        Start SASL authentication exchange.

        Used the authenticator's C{jid} and C{password} attribute for the
        authentication credentials. If no supported SASL mechanisms are
        advertized by the receiving party, a failing deferred is returned with
        a L{SASLNoAcceptableMechanism} exception.
        """

        jid = self.xmlstream.authenticator.jid
        password = self.xmlstream.authenticator.password

        mechanisms = get_mechanisms(self.xmlstream)
        if 'DIGEST-MD5' in mechanisms:
            self.mechanism = sasl_mechanisms.DigestMD5('xmpp', jid.host, None,
                                                       jid.user, password)
        elif 'PLAIN' in mechanisms:
            self.mechanism = sasl_mechanisms.Plain(None, jid.user, password)
        else:
            return defer.fail(SASLNoAcceptableMechanism)

        self._deferred = defer.Deferred()
        self.xmlstream.addObserver('/challenge', self.onChallenge)
        self.xmlstream.addOnetimeObserver('/success', self.onSuccess)
        self.xmlstream.addOnetimeObserver('/failure', self.onFailure)
        self.sendAuth(self.mechanism.getInitialResponse())
        return self._deferred
tkconch.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def verifyHostKey(self, pubKey, fingerprint):
        #d = defer.Deferred()
        #d.addCallback(lambda x:defer.succeed(1))
        #d.callback(2)
        #return d
        goodKey = isInKnownHosts(options['host'], pubKey, {'known-hosts': None})
        if goodKey == 1: # good key
            return defer.succeed(1)
        elif goodKey == 2: # AAHHHHH changed
            return defer.fail(error.ConchError('bad host key'))
        else:
            if options['host'] == self.transport.getPeer()[1]:
                host = options['host']
                khHost = options['host']
            else:
                host = '%s (%s)' % (options['host'], 
                                    self.transport.getPeer()[1])
                khHost = '%s,%s' % (options['host'], 
                                    self.transport.getPeer()[1])
            keyType = common.getNS(pubKey)[0]
            ques = """The authenticity of host '%s' can't be established.\r
%s key fingerprint is %s.""" % (host, 
                                {'ssh-dss':'DSA', 'ssh-rsa':'RSA'}[keyType], 
                                fingerprint) 
            ques+='\r\nAre you sure you want to continue connecting (yes/no)? '
            return deferredAskFrame(ques, 1).addCallback(self._cbVerifyHostKey, pubKey, khHost, keyType)
transport.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def verifyHostKey(self, hostKey, fingerprint):
        """Returns a Deferred that gets a callback if it is a valid key, or
        an errback if not.

        @type hostKey:      C{str}
        @type fingerprint:  C{str}
        @rtype:             L{Deferred}
        """
        # return  if it's good
        return defer.fail(NotImplementedError)
userauth.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def tryAuth(self, kind, user, data):
        log.msg('%s trying auth %s' % (user, kind))
        if kind not in self.supportedAuthentications:
            return defer.fail(error.ConchError('unsupported authentication, failing'))
        kind = kind.replace('-', '_')
        f = getattr(self,'auth_%s'%kind, None)
        if f:
            ret = f(data)
            if not ret:
                return defer.fail(error.ConchError('%s return None instead of a Deferred' % kind))
            else:
                return ret
        return defer.fail(error.ConchError('bad auth type: %s' % kind))
userauth.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def ssh_USERAUTH_PK_OK(self, packet):
        if self.lastAuth == 'publickey':
            # this is ok
            publicKey = self.lastPublicKey
            keyType =  getNS(publicKey)[0]
            b = NS(self.transport.sessionID) + chr(MSG_USERAUTH_REQUEST) + \
            NS(self.user) + NS(self.instance.name) + NS('publickey') + '\xff' +\
            NS(keyType) + NS(publicKey)
            d  = self.signData(publicKey, b)
            if not d:
                self.askForAuth('none', '')
                # this will fail, we'll move on
                return
            d.addCallback(self._cbSignedData)
            d.addErrback(self._ebAuth)
        elif self.lastAuth == 'password':
            prompt, language, rest = getNS(packet, 2)
            self._oldPass = self._newPass = None
            self.getPassword('Old Password: ').addCallbacks(self._setOldPass, self._ebAuth)
            self.getPassword(prompt).addCallbacks(self._setNewPass, self._ebAuth)
        elif self.lastAuth == 'keyboard-interactive':
            name, instruction, lang, data = getNS(packet, 3)
            numPrompts = struct.unpack('!L', data[:4])[0]
            data = data[4:]
            prompts = []
            for i in range(numPrompts):
                prompt, data = getNS(data)
                echo = bool(ord(data[0]))
                data = data[1:]
                prompts.append((prompt, echo))
            d = self.getGenericAnswers(name, instruction, prompts)
            d.addCallback(self._cbGenericAnswers)
            d.addErrback(self._ebAuth)
userauth.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def getGenericAnswers(self, name, instruction, prompts):
        """
        Returns a L{Deferred} with the responses to the promopts.

        @param name: The name of the authentication currently in progress.
        @param instruction: Describes what the authentication wants. 
        @param prompts: A list of (prompt, echo) pairs, where prompt is a
        string to display and echo is a boolean indicating whether the
        user's response should be echoed as they type it.
        """
        return defer.fail(NotImplementedError())
unix.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def msg_sendRequest(self, lst):
        cn, requestType, data, wantReply = lst
        if not self.haveChannel(cn):
            if wantReply:
                self.returnDeferredWire(defer.fail(ConchError("no channel")))
        channel = self.getChannel(cn)
        d = self.conn.sendRequest(channel, requestType, data, wantReply)
        if wantReply:
            self.returnDeferredWire(d)
unix.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def connect(host, port, options, verifyHostKey, userAuthObject):
    if options['nocache']: 
        return defer.fail(ConchError('not using connection caching'))
    d = defer.Deferred()
    filename = os.path.expanduser("~/.conch-%s-%s-%i" % (userAuthObject.user, host, port))
    factory = SSHUnixClientFactory(d, options, userAuthObject)
    reactor.connectUNIX(filename, factory, timeout=2, checkPID=1)
    return d
default.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def verifyHostKey(transport, host, pubKey, fingerprint):
    goodKey = isInKnownHosts(host, pubKey, transport.factory.options)
    if goodKey == 1: # good key
        return defer.succeed(1)
    elif goodKey == 2: # AAHHHHH changed
        return defer.fail(ConchError('changed host key'))
    else:
        oldout, oldin = sys.stdout, sys.stdin
        sys.stdin = sys.stdout = open('/dev/tty','r+')
        if host == transport.transport.getPeer().host:
            khHost = host
        else:
            host = '%s (%s)' % (host,
                                transport.transport.getPeer().host)
            khHost = '%s,%s' % (host,
                                transport.transport.getPeer().host)
        keyType = common.getNS(pubKey)[0]
        print """The authenticity of host '%s' can't be established.
%s key fingerprint is %s.""" % (host,
                            {'ssh-dss':'DSA', 'ssh-rsa':'RSA'}[keyType],
                            fingerprint)
        try:
            ans = raw_input('Are you sure you want to continue connecting (yes/no)? ')
        except KeyboardInterrupt:
            return defer.fail(ConchError("^C"))
        while ans.lower() not in ('yes', 'no'):
            ans = raw_input("Please type 'yes' or 'no': ")
        sys.stdout,sys.stdin=oldout,oldin
        if ans == 'no':
            print 'Host key verification failed.'
            return defer.fail(ConchError('bad host key'))
        print "Warning: Permanently added '%s' (%s) to the list of known hosts." % (khHost, {'ssh-dss':'DSA', 'ssh-rsa':'RSA'}[keyType])
        known_hosts = open(os.path.expanduser('~/.ssh/known_hosts'), 'r+')
        known_hosts.seek(-1, 2)
        if known_hosts.read(1) != '\n':
            known_hosts.write('\n')
        encodedKey = base64.encodestring(pubKey).replace('\n', '')
        known_hosts.write('%s %s %s\n' % (khHost, keyType, encodedKey))
        known_hosts.close()
        return defer.succeed(1)
checkers.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def requestAvatarId(self, credentials):
        if pwd:
            try:
                cryptedPass = pwd.getpwnam(credentials.username)[1]
            except KeyError:
                return defer.fail(UnauthorizedLogin())
            else:
                if cryptedPass not in ['*', 'x'] and \
                    verifyCryptedPassword(cryptedPass, credentials.password):
                    return defer.succeed(credentials.username)
        if shadow:
            gid = os.getegid()
            uid = os.geteuid()
            os.setegid(0)
            os.seteuid(0)
            try:
                shadowPass = shadow.getspnam(credentials.username)[1]
            except KeyError:
                os.setegid(gid)
                os.seteuid(uid)
                return defer.fail(UnauthorizedLogin())
            os.setegid(gid)
            os.seteuid(uid)
            if verifyCryptedPassword(shadowPass, credentials.password):
                return defer.succeed(credentials.username)
            return defer.fail(UnauthorizedLogin())

        return defer.fail(UnauthorizedLogin())
checkers.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def requestAvatarId(self, credentials):
        ifac = interface.providedBy(credentials)
        for i in ifac:
            c = self.checkers.get(i)
            if c is not None:
                return c.requestAvatarId(credentials).addCallback(
                    self._cbGoodAuthentication, credentials)
        return defer.fail(UnhandledCredentials("No checker for %s" % \
            ', '.join(map(reflect.qal, ifac))))
telnet.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def disableLocal(option):
        """Disable the given option locally.

        Unlike enableLocal, this method cannot fail.  The option must be
        disabled.
        """
telnet.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def do(option):
        """Indicate a desire for the peer to begin performing the given option.

        Returns a Deferred that fires with True when the peer begins performing
        the option, or False when the peer refuses to perform it.  If the peer
        is already performing the given option, the Deferred will fail with
        L{AlreadyEnabled}.  If a negotiation regarding this option is already
        in progress, the Deferred will fail with L{AlreadyNegotiating}.

        Note: It is currently possible that this Deferred will never fire,
        if the peer never responds, or if the peer believes the option to
        already be enabled.
        """
telnet.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def will(option):
        """Indicate our willingness to begin performing this option locally.

        Returns a Deferred that fires with True when the peer agrees to allow
        us to begin performing this option, or False if the peer refuses to
        allow us to begin performing it.  If the option is already enabled
        locally, the Deferred will fail with L{AlreadyEnabled}.  If negotiation
        regarding this option is already in progress, the Deferred will fail with
        L{AlreadyNegotiating}.

        Note: It is currently possible that this Deferred will never fire,
        if the peer never responds, or if the peer believes the option to
        already be enabled.
        """


问题


面经


文章

微信
公众号

扫码关注公众号