python类RSAKey()的实例源码

utils.py 文件源码 项目:jumpserver-python-sdk 作者: jumpserver 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def from_string(cls, key_string):
        try:
            pkey = paramiko.RSAKey(file_obj=StringIO(key_string))
            return pkey
        except paramiko.SSHException:
            try:
                pkey = paramiko.DSSKey(file_obj=StringIO(key_string))
                return pkey
            except paramiko.SSHException:
                return None
utils.py 文件源码 项目:coco 作者: jumpserver 项目源码 文件源码 阅读 78 收藏 0 点赞 0 评论 0
def ssh_key_gen(length=2048, type='rsa', password=None, username='jumpserver', hostname=None):
    """Generate user ssh private and public key

    Use paramiko RSAKey generate it.
    :return private key str and public key str
    """

    if hostname is None:
        hostname = os.uname()[1]

    f = StringIO()

    try:
        if type == 'rsa':
            private_key_obj = paramiko.RSAKey.generate(length)
        elif type == 'dsa':
            private_key_obj = paramiko.DSSKey.generate(length)
        else:
            raise IOError('SSH private key must be `rsa` or `dsa`')
        private_key_obj.write_private_key(f, password=password)
        private_key = f.getvalue()
        public_key = ssh_pubkey_gen(private_key_obj, username=username, hostname=hostname)
        return private_key, public_key
    except IOError:
        raise IOError('These is error when generate ssh key.')
test_client.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_4_auto_add_policy(self):
        """
        verify that SSHClient's AutoAddPolicy works.
        """
        threading.Thread(target=self._run).start()
        host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
        public_host_key = paramiko.RSAKey(data=host_key.asbytes())

        self.tc = paramiko.SSHClient()
        self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.assertEqual(0, len(self.tc.get_host_keys()))
        self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')

        self.event.wait(1.0)
        self.assertTrue(self.event.is_set())
        self.assertTrue(self.ts.is_active())
        self.assertEqual('slowdive', self.ts.get_username())
        self.assertEqual(True, self.ts.is_authenticated())
        self.assertEqual(1, len(self.tc.get_host_keys()))
        self.assertEqual(public_host_key, self.tc.get_host_keys()['[%s]:%d' % (self.addr, self.port)]['ssh-rsa'])
test_client.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_7_banner_timeout(self):
        """
        verify that the SSHClient has a configurable banner timeout.
        """
        # Start the thread with a 1 second wait.
        threading.Thread(target=self._run, kwargs={'delay': 1}).start()
        host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
        public_host_key = paramiko.RSAKey(data=host_key.asbytes())

        self.tc = paramiko.SSHClient()
        self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key)
        # Connect with a half second banner timeout.
        self.assertRaises(
            paramiko.SSHException,
            self.tc.connect,
            self.addr,
            self.port,
            username='slowdive',
            password='pygmalion',
            banner_timeout=0.5
        )
test_hostkeys.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_4_dict_set(self):
        hostdict = paramiko.HostKeys('hostfile.temp')
        key = paramiko.RSAKey(data=decodebytes(keyblob))
        key_dss = paramiko.DSSKey(data=decodebytes(keyblob_dss))
        hostdict['secure.example.com'] = {
            'ssh-rsa': key,
            'ssh-dss': key_dss
        }
        hostdict['fake.example.com'] = {}
        hostdict['fake.example.com']['ssh-rsa'] = key

        self.assertEqual(3, len(hostdict))
        self.assertEqual(2, len(list(hostdict.values())[0]))
        self.assertEqual(1, len(list(hostdict.values())[1]))
        self.assertEqual(1, len(list(hostdict.values())[2]))
        fp = hexlify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint()).upper()
        self.assertEqual(b'7EC91BB336CB6D810B124B1353C32396', fp)
        fp = hexlify(hostdict['secure.example.com']['ssh-dss'].get_fingerprint()).upper()
        self.assertEqual(b'4478F0B9A23CC5182009FF755BC1D26C', fp)
helpers.py 文件源码 项目:shakedown 作者: dcos 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def validate_key(key_path):
    """ Validate a key

        :param key_path: path to a key to use for authentication
        :type key_path: str

        :return: key object used for authentication
        :rtype: paramiko.RSAKey
    """

    key_path = os.path.expanduser(key_path)

    if not os.path.isfile(key_path):
        return False

    return paramiko.RSAKey.from_private_key_file(key_path)
crypto.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def generate_key(bits):
    """Generate a paramiko RSAKey"""
    # NOTE(dims): pycryptodome has changed the signature of the RSA.generate
    # call. specifically progress_func has been dropped. paramiko still uses
    # pycrypto. However some projects like latest pysaml2 have switched from
    # pycrypto to pycryptodome as pycrypto seems to have been abandoned.
    # paramiko project has started transition to pycryptodome as well but
    # there is no release yet with that support. So at the moment depending on
    # which version of pysaml2 is installed, Nova is likely to break. So we
    # call "RSA.generate(bits)" which works on both pycrypto and pycryptodome
    # and then wrap it into a paramiko.RSAKey
    rsa = RSA.generate(bits)
    key = paramiko.RSAKey(vals=(rsa.e, rsa.n))
    key.d = rsa.d
    key.p = rsa.p
    key.q = rsa.q
    return key
cache.py 文件源码 项目:netconf-proxy 作者: fortinet-solutions-cse 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def setup_travis ():
    import getpass
    import sys
    global private_key                                      # pylint: disable=W0603

    logging.basicConfig(level=logging.DEBUG, stream=sys.stderr)
    print("Setup called.")
    if 'USER' in os.environ:
        if os.environ['USER'] != "travis":
            return
    else:
        if getpass.getuser() != "travis":
            return

    print("Executing under Travis-CI")
    ssh_dir = "{}/.ssh".format(os.environ['HOME'])
    priv_filename = os.path.join(ssh_dir, "id_rsa")
    if os.path.exists(priv_filename):
        logger.error("Found private keyfile")
        print("Found private keyfile")
        return
    else:
        logger.error("Creating ssh dir " + ssh_dir)
        print("Creating ssh dir " + ssh_dir)
        os.system("mkdir -p {}".format(ssh_dir))
        priv = ssh.RSAKey.generate(bits=1024)
        private_key = priv

        logger.error("Generating private keyfile " + priv_filename)
        print("Generating private keyfile " + priv_filename)
        priv.write_private_key_file(filename=priv_filename)

        pub = ssh.RSAKey(filename=priv_filename)
        auth_filename = os.path.join(ssh_dir, "authorized_keys")
        logger.error("Adding keys to authorized_keys file " + auth_filename)
        print("Adding keys to authorized_keys file " + auth_filename)
        with open(auth_filename, "a") as authfile:
            authfile.write("{} {}\n".format(pub.get_name(), pub.get_base64()))
        logger.error("Done generating keys")
        print("Done generating keys")
utils.py 文件源码 项目:coco 作者: jumpserver 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def ssh_key_string_to_obj(text):
    key_f = StringIO(text)
    key = None
    try:
        key = paramiko.RSAKey.from_private_key(key_f)
    except paramiko.SSHException:
        pass

    try:
        key = paramiko.DSSKey.from_private_key(key_f)
    except paramiko.SSHException:
        pass
    return key
interface.py 文件源码 项目:coco 作者: jumpserver 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_host_key(cls):
        logger.debug("Get ssh server host key")
        if not os.path.isfile(cls.host_key_path):
            cls.host_key_gen()
        return paramiko.RSAKey(filename=cls.host_key_path)
server.py 文件源码 项目:mock-ssh-server 作者: carletes 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, server, client_conn):
        self.server = server
        self.thread = None
        self.command_queues = {}
        client, _ = client_conn
        self.transport = t = paramiko.Transport(client)
        t.add_server_key(paramiko.RSAKey(filename=SERVER_KEY_PATH))
        t.set_subsystem_handler("sftp", sftp.SFTPServer)
server.py 文件源码 项目:mock-ssh-server 作者: carletes 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def add_user(self, uid, private_key_path):
        k = paramiko.RSAKey.from_private_key_file(private_key_path)
        self._users[uid] = (private_key_path, k)
server.py 文件源码 项目:mock-ssh-server 作者: carletes 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def client(self, uid):
        private_key_path, _ = self._users[uid]
        c = paramiko.SSHClient()
        host_keys = c.get_host_keys()
        key = paramiko.RSAKey.from_private_key_file(SERVER_KEY_PATH)
        host_keys.add(self.host, "ssh-rsa", key)
        host_keys.add("[%s]:%d" % (self.host, self.port), "ssh-rsa", key)
        c.set_missing_host_key_policy(paramiko.RejectPolicy())
        c.connect(hostname=self.host,
                  port=self.port,
                  username=uid,
                  key_filename=private_key_path,
                  allow_agent=False,
                  look_for_keys=False)
        return c
utils.py 文件源码 项目:tcp-qa 作者: Mirantis 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def generate_keys():
    file_obj = StringIO.StringIO()
    key = paramiko.RSAKey.generate(1024)
    key.write_private_key(file_obj)
    public = key.get_base64()
    private = file_obj.getvalue()
    file_obj.close()
    return {'private': private,
            'public': public}
utils.py 文件源码 项目:tcp-qa 作者: Mirantis 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def load_keyfile(file_path):
    with open(file_path, 'r') as private_key_file:
        private = private_key_file.read()
    key = paramiko.RSAKey(file_obj=StringIO.StringIO(private))
    public = key.get_base64()
    return {'private': private,
            'public': public}
utils.py 文件源码 项目:tcp-qa 作者: Mirantis 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def dump_keyfile(file_path, key):
    key = paramiko.RSAKey(file_obj=StringIO.StringIO(key['private']))
    key.write_private_key_file(file_path)
    os.chmod(file_path, 0o644)
test_client.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _run(self, allowed_keys=None, delay=0):
        if allowed_keys is None:
            allowed_keys = FINGERPRINTS.keys()
        self.socks, addr = self.sockl.accept()
        self.ts = paramiko.Transport(self.socks)
        host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
        self.ts.add_server_key(host_key)
        server = NullServer(allowed_keys=allowed_keys)
        if delay:
            time.sleep(delay)
        self.ts.start_server(self.event, server)
test_client.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_5_save_host_keys(self):
        """
        verify that SSHClient correctly saves a known_hosts file.
        """
        warnings.filterwarnings('ignore', 'tempnam.*')

        host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
        public_host_key = paramiko.RSAKey(data=host_key.asbytes())
        fd, localname = mkstemp()
        os.close(fd)

        client = paramiko.SSHClient()
        self.assertEquals(0, len(client.get_host_keys()))

        host_id = '[%s]:%d' % (self.addr, self.port)

        client.get_host_keys().add(host_id, 'ssh-rsa', public_host_key)
        self.assertEquals(1, len(client.get_host_keys()))
        self.assertEquals(public_host_key, client.get_host_keys()[host_id]['ssh-rsa'])

        client.save_host_keys(localname)

        with open(localname) as fd:
            assert host_id in fd.read()

        os.unlink(localname)
test_client.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_6_cleanup(self):
        """
        verify that when an SSHClient is collected, its transport (and the
        transport's packetizer) is closed.
        """
        # Unclear why this is borked on Py3, but it is, and does not seem worth
        # pursuing at the moment.
        # XXX: It's the release of the references to e.g packetizer that fails
        # in py3...
        if not PY2:
            return
        threading.Thread(target=self._run).start()
        host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
        public_host_key = paramiko.RSAKey(data=host_key.asbytes())

        self.tc = paramiko.SSHClient()
        self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.assertEqual(0, len(self.tc.get_host_keys()))
        self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')

        self.event.wait(1.0)
        self.assertTrue(self.event.is_set())
        self.assertTrue(self.ts.is_active())

        p = weakref.ref(self.tc._transport.packetizer)
        self.assertTrue(p() is not None)
        self.tc.close()
        del self.tc

        # hrm, sometimes p isn't cleared right away.  why is that?
        #st = time.time()
        #while (time.time() - st < 5.0) and (p() is not None):
        #    time.sleep(0.1)

        # instead of dumbly waiting for the GC to collect, force a collection
        # to see whether the SSHClient object is deallocated correctly
        import gc
        gc.collect()

        self.assertTrue(p() is None)
test_ssh_gss.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _run(self):
        self.socks, addr = self.sockl.accept()
        self.ts = paramiko.Transport(self.socks)
        host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
        self.ts.add_server_key(host_key)
        server = NullServer()
        self.ts.start_server(self.event, server)
test_ssh_gss.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_1_gss_auth(self):
        """
        Verify that Paramiko can handle SSHv2 GSS-API / SSPI authentication
        (gssapi-with-mic) in client and server mode.
        """
        host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
        public_host_key = paramiko.RSAKey(data=host_key.asbytes())

        self.tc = paramiko.SSHClient()
        self.tc.get_host_keys().add('[%s]:%d' % (self.hostname, self.port),
                                    'ssh-rsa', public_host_key)
        self.tc.connect(self.hostname, self.port, username=self.username,
                        gss_auth=True)

        self.event.wait(1.0)
        self.assert_(self.event.is_set())
        self.assert_(self.ts.is_active())
        self.assertEquals(self.username, self.ts.get_username())
        self.assertEquals(True, self.ts.is_authenticated())

        stdin, stdout, stderr = self.tc.exec_command('yes')
        schan = self.ts.accept(1.0)

        schan.send('Hello there.\n')
        schan.send_stderr('This is on stderr.\n')
        schan.close()

        self.assertEquals('Hello there.\n', stdout.readline())
        self.assertEquals('', stdout.readline())
        self.assertEquals('This is on stderr.\n', stderr.readline())
        self.assertEquals('', stderr.readline())

        stdin.close()
        stdout.close()
        stderr.close()
test_hostkeys.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_2_add(self):
        hostdict = paramiko.HostKeys('hostfile.temp')
        hh = '|1|BMsIC6cUIP2zBuXR3t2LRcJYjzM=|hpkJMysjTk/+zzUUzxQEa2ieq6c='
        key = paramiko.RSAKey(data=decodebytes(keyblob))
        hostdict.add(hh, 'ssh-rsa', key)
        self.assertEqual(3, len(list(hostdict)))
        x = hostdict['foo.example.com']
        fp = hexlify(x['ssh-rsa'].get_fingerprint()).upper()
        self.assertEqual(b'7EC91BB336CB6D810B124B1353C32396', fp)
        self.assertTrue(hostdict.check('foo.example.com', key))
test_kex_gss.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_1_gsskex_and_auth(self):
        """
        Verify that Paramiko can handle SSHv2 GSS-API / SSPI authenticated
        Diffie-Hellman Key Exchange and user authentication with the GSS-API
        context created during key exchange.
        """
        host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
        public_host_key = paramiko.RSAKey(data=host_key.asbytes())

        self.tc = paramiko.SSHClient()
        self.tc.get_host_keys().add('[%s]:%d' % (self.hostname, self.port),
                                    'ssh-rsa', public_host_key)
        self.tc.connect(self.hostname, self.port, username=self.username,
                        gss_auth=True, gss_kex=True)

        self.event.wait(1.0)
        self.assert_(self.event.is_set())
        self.assert_(self.ts.is_active())
        self.assertEquals(self.username, self.ts.get_username())
        self.assertEquals(True, self.ts.is_authenticated())

        stdin, stdout, stderr = self.tc.exec_command('yes')
        schan = self.ts.accept(1.0)

        schan.send('Hello there.\n')
        schan.send_stderr('This is on stderr.\n')
        schan.close()

        self.assertEquals('Hello there.\n', stdout.readline())
        self.assertEquals('', stdout.readline())
        self.assertEquals('This is on stderr.\n', stderr.readline())
        self.assertEquals('', stderr.readline())

        stdin.close()
        stdout.close()
        stderr.close()
tcpServer.py 文件源码 项目:wetland 作者: ohmyadd 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def handle(self):
        transport = paramiko.Transport(self.request)

        rsafile = self.server.cfg.get("ssh", "private_rsa")
        dsafile = self.server.cfg.get("ssh", "private_dsa")
        rsakey = paramiko.RSAKey(filename=rsafile)
        dsakey = paramiko.DSSKey(filename=dsafile)
        transport.add_server_key(rsakey)
        transport.add_server_key(dsakey)

        transport.local_version = self.server.cfg.get("ssh", "banner")

        transport.set_subsystem_handler('sftp', paramiko.SFTPServer,
                                        sftpServer.sftp_server)
        nw = network.network(self.client_address[0],
                    self.server.cfg.get("wetland", "docker_addr"))
        nw.create()

        sServer = sshServer.ssh_server(transport=transport, network=nw)

        try:
            transport.start_server(server=sServer)
        except paramiko.SSHException:
            return
        except Exception as e:
            print e
            nw.delete()
            sServer.docker_trans.close()
            return

        try:
            while True:
                chann = transport.accept(60)
                # no channel left
                if not transport._channels.values():
                    break
        except Exception as e:
            print e
        finally:
            nw.delete()
            sServer.docker_trans.close()
helpers.py 文件源码 项目:shakedown 作者: dcos 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_transport(host, username, key):
    """ Create a transport object

        :param host: the hostname to connect to
        :type host: str
        :param username: SSH username
        :type username: str
        :param key: key object used for authentication
        :type key: paramiko.RSAKey

        :return: a transport object
        :rtype: paramiko.Transport
    """

    if host == shakedown.master_ip():
        transport = paramiko.Transport(host)
    else:
        transport_master = paramiko.Transport(shakedown.master_ip())
        transport_master = start_transport(transport_master, username, key)

        if not transport_master.is_authenticated():
            print("error: unable to authenticate {}@{} with key {}".format(username, shakedown.master_ip(), key_path))
            return False

        try:
            channel = transport_master.open_channel('direct-tcpip', (host, 22), ('127.0.0.1', 0))
        except paramiko.SSHException:
            print("error: unable to connect to {}".format(host))
            return False

        transport = paramiko.Transport(channel)

    return transport
helpers.py 文件源码 项目:shakedown 作者: dcos 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def start_transport(transport, username, key):
    """ Begin a transport client and authenticate it

        :param transport: the transport object to start
        :type transport: paramiko.Transport
        :param username: SSH username
        :type username: str
        :param key: key object used for authentication
        :type key: paramiko.RSAKey

        :return: the transport object passed
        :rtype: paramiko.Transport
    """

    transport.start_client()

    agent = paramiko.agent.Agent()
    keys = itertools.chain((key,) if key else (), agent.get_keys())
    for test_key in keys:
        try:
            transport.auth_publickey(username, test_key)
            break
        except paramiko.AuthenticationException as e:
            pass
    else:
        raise ValueError('No valid key supplied')

    return transport


# SSH connection will be auto-terminated at the conclusion of this operation, causing
# a race condition; the try/except block attempts to close the channel and/or transport
# but does not issue a failure if it has already been closed.
__init__.py 文件源码 项目:BigBrotherBot-For-UrT43 作者: ptitbigorneau 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_private_key(self, password=None):
        if self.private_key_file is not None:
            with open(self.private_key_file, "r") as f:
                key_head = f.readline()
            if 'DSA' in key_head:
                key_type = paramiko.DSSKey
            elif 'RSA' in key_head:
                key_type = paramiko.RSAKey
            else:
                raise ValueError("can't identify private key type")
            with open(self.private_key_file, "r") as f:
                return key_type.from_private_key(f, password=password)
sshutils.py 文件源码 项目:niceman 作者: ReproNim 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def load_remote_rsa_key(self, remote_filename):
        """
        Returns paramiko.RSAKey object for an RSA key located on the remote
        machine
        """
        rfile = self.remote_file(remote_filename, 'r')
        key = get_rsa_key(key_file_obj=rfile)
        rfile.close()
        return key
sshutils.py 文件源码 项目:niceman 作者: ReproNim 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_rsa_key(key_location=None, key_file_obj=None, passphrase=None,
                use_pycrypto=False):
    key_fobj = key_file_obj or open(key_location)
    try:
        if use_pycrypto:
            key = RSA.importKey(key_fobj, passphrase=passphrase)
        else:
            key = paramiko.RSAKey.from_private_key(key_fobj,
                                                   password=passphrase)
        return key
    except (paramiko.SSHException, ValueError):
        raise exception.SSHError(
            "Invalid RSA private key file or missing passphrase: %s" %
            key_location)
sshutils.py 文件源码 项目:niceman 作者: ReproNim 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def generate_rsa_key():
    return paramiko.RSAKey.generate(2048)


问题


面经


文章

微信
公众号

扫码关注公众号