python类SSHException()的实例源码

utils.py 文件源码 项目:jumpserver-python-sdk 作者: jumpserver 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def from_string(cls, key_string):
        try:
            pkey = paramiko.RSAKey(file_obj=StringIO(key_string))
            return pkey
        except paramiko.SSHException:
            try:
                pkey = paramiko.DSSKey(file_obj=StringIO(key_string))
                return pkey
            except paramiko.SSHException:
                return None
paramiko_helper.py 文件源码 项目:galaxia 作者: WiproOpenSourcePractice 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def loginandcopy(hostname,uname,pwd,sfile,tfile):
     try:
        log.info("Establishing ssh connection")
        client = getsshClient()
        client.load_system_host_keys()
        client.connect(hostname)#,username=uname)#,password=pwd)
     except paramiko.AuthenticationException:
        print("Authentication failed, please verify your credentials: %s")
     except paramiko.SSHException as sshException:
        print("Unable to establish SSH connection: %s" % sshException)
     except paramiko.BadHostKeyException as badHostKeyException:
        print("Unable to verify server's host key: %s" % badHostKeyException)
     except Exception as e:
        print(e.args)
     try:
        log.info("Getting SCP Client")
        scpclient = scp.SCPClient(client.get_transport())
        log.info(scpclient)
        log.info("Hostname: %s", hostname)
        log.info("source file: %s", sfile)
        log.info("target file: %s", tfile)
        scpclient.put(sfile,tfile)
     except scp.SCPException as e:
        print("Operation error: %s", e)
paramiko_helper.py 文件源码 项目:galaxia 作者: WiproOpenSourcePractice 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def loginandrun(hostname,uname,pwd,command):
     try:
        log.info("Establishing ssh connection")
        client = getsshClient()
        client.load_system_host_keys()
        client.connect(hostname)#,username=uname)#,password=pwd)
     except paramiko.AuthenticationException:
        print("Authentication failed, please verify your credentials: %s")
     except paramiko.SSHException as sshException:
        print("Unable to establish SSH connection: %s" % sshException)
     except paramiko.BadHostKeyException as badHostKeyException:
        print("Unable to verify server's host key: %s" % badHostKeyException)
     try:
        stdin, stdout, stderr = client.exec_command(command)
        result = stderr.read()
        if len(result)  > 0:
            print("hit error" + result)

     except Exception as e:
        print("Operation error: %s", e)


# Any new implementation to use this method
paramiko_helper.py 文件源码 项目:galaxia 作者: WiproOpenSourcePractice 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def loginandcopydir(hostname,uname,pwd,sfile,tfile,recursive,preserve_times):
     try:
        log.info("Establishing ssh connection")
        client = getsshClient()
        client.load_system_host_keys()
        client.connect(hostname) #,username=uname)#,password=pwd)
     except paramiko.AuthenticationException:
        print("Authentication failed, please verify your credentials: %s")
     except paramiko.SSHException as sshException:
        print("Unable to establish SSH connection: %s" % sshException)
     except paramiko.BadHostKeyException as badHostKeyException:
        print("Unable to verify server's host key: %s" % badHostKeyException)
     except Exception as e:
        print(e.args)
     try:
        scpclient = scp.SCPClient(client.get_transport())
        scpclient.put(sfile,tfile,recursive,preserve_times)
     except scp.SCPException as e:
        print("Operation error: %s", e)

# Deprecated
download.py 文件源码 项目:micromasters 作者: mitodl 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def process(self):
        """
        Process response files
        """
        try:
            with self.sftp.cd(settings.EXAMS_SFTP_RESULTS_DIR):
                for remote_path, local_path in self.filtered_files():
                    try:
                        if self.process_zip(local_path):
                            self.sftp.remove(remote_path)

                        log.debug("Processed remote file: %s", remote_path)
                    except (EOFError, SSHException,):
                        raise
                    except:  # pylint: disable=bare-except
                        log.exception("Error processing file: %s", remote_path)
                    finally:
                        if os.path.exists(local_path):
                            os.remove(local_path)
        except (EOFError, SSHException,) as exc:
            raise RetryableSFTPException("Exception processing response files") from exc
vm_setup.py 文件源码 项目:setup 作者: mechaphish 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def ssh_exec_command(ssh_obj, cmd, prefix=''):
    """
        Execute a command on the ssh connection.
    :param ssh_obj: SSH object.
    :param cmd: command to run.
    :param prefix: Prefix to be used for printing
    :return: stdout and stderr
    """
    try:
        print(prefix+ "[*] Running Command:" + cmd)
        _, stdout_obj, stderr_obj = ssh_obj.exec_command(cmd)
        exit_status = stdout_obj.channel.recv_exit_status()
        if exit_status == 0:
            print(prefix + GREEN_PRINT + "[+] Command:" + cmd + " Completed Successfully" + END_PRINT)
        else:
            print(prefix + RED_PRINT + "[*] Command:" + cmd + " Return Code:" + str(exit_status) + END_PRINT)

    except paramiko.SSHException as e:
        print(prefix + RED_PRINT + "[!] Problem occurred while running command: %s, Error: %s" % (cmd, e) + END_PRINT)
        raise e
    return stdout_obj, stderr_obj
ui_ons_cli.py 文件源码 项目:taf 作者: taf3 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def check_device_state(self):
        """Attempts to connect to the shell retries number of times.

        Raises:
            Exception:  device is not ready

        """

        if not (self.switch.cli.conn.check_client() and self.switch.cli.conn.check_shell()):
            try:
                self.switch.ui.connect()
            except (CLISSHException, SSHException, SocketError):
                self.switch.ui.disconnect()
                raise Exception("Device is not ready.")

        # Add cli application check

# Platform
agent_rpc.py 文件源码 项目:intel-manager-for-lustre 作者: intel-hpdd 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def construct_ssh_auth_args(self, root_pw=None, pkey=None, pkey_pw=None):

        args = {}
        if root_pw:
            args.update({"password": root_pw})

        if pkey:
            try:
                #  private key to match a public key on the server
                #  optionally encrypted
                pkey_filelike = StringIO(pkey)
                if pkey_pw:
                    pkey_paramiko = paramiko.RSAKey.from_private_key(
                        pkey_filelike,
                        pkey_pw)
                else:
                    pkey_paramiko = paramiko.RSAKey.from_private_key(pkey_filelike)
                args.update({"pkey": pkey_paramiko})
            except SSHException:
                #  Invalid key, or wrong passphase to enc key
                #  pass on form of auth
                pass

        return args
demo.py 文件源码 项目:_ 作者: zengchunyun 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def agent_auth(transport, username):
    """
    Attempt to authenticate to the given transport using any of the private
    keys available from an SSH agent.
    """

    agent = paramiko.Agent()
    agent_keys = agent.get_keys()
    if len(agent_keys) == 0:
        return

    for key in agent_keys:
        print('Trying ssh-agent key %s' % hexlify(key.get_fingerprint()))
        try:
            transport.auth_publickey(username, key)
            print('... success!')
            return
        except paramiko.SSHException:
            print('... nope.')
monitor.py 文件源码 项目:awslambdamonitor 作者: gene1wood 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def ssh(config, host):
    """
    Check that a host is running SSH

    :param config: Unused
    :param host: The host to check
    :return: 3-tuple of (success, name, message)
        success: Boolean value indicating if there is a problem or not
        name: DNS name
        message: String describing the status
    """
    del config
    name = host
    try:
        ssh_conn = paramiko.SSHClient()
        ssh_conn.set_missing_host_key_policy(
            paramiko.client.MissingHostKeyPolicy())
        ssh_conn.connect(host)
        return True
    except (paramiko.BadHostKeyException, paramiko.AuthenticationException,
            paramiko.SSHException, socket.error) as e:
        return (False, name, "Unable to SSH to %s %s %s"
                % (host, e.__class__, e))
test_client.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_7_banner_timeout(self):
        """
        verify that the SSHClient has a configurable banner timeout.
        """
        # Start the thread with a 1 second wait.
        threading.Thread(target=self._run, kwargs={'delay': 1}).start()
        host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
        public_host_key = paramiko.RSAKey(data=host_key.asbytes())

        self.tc = paramiko.SSHClient()
        self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key)
        # Connect with a half second banner timeout.
        self.assertRaises(
            paramiko.SSHException,
            self.tc.connect,
            self.addr,
            self.port,
            username='slowdive',
            password='pygmalion',
            banner_timeout=0.5
        )
sshclient.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def exec_command(self, cmd, indata=None, timeout=30):
        logger.info("Executing command: {0}".format(cmd))
        if self.transport is None or self.transport.is_active() is False:
            self.reconnect()
            if self.transport is None or self.transport.is_active() is False:
                logger.error("connection failed.")
                return -1, None

        input_data = self.__fix_indata(indata)

        try:
            session = self.transport.open_session()
            session.set_combine_stderr(True)
            session.get_pty()
            session.exec_command(cmd)
        except paramiko.SSHException as ex:
            logger.error("Exception for command '{0}: {1}'".format(cmd, ex))
            session.close()
            return -1, None
        output = self.poll(session, timeout, input_data)
        status = session.recv_exit_status()
        logger.info("Returned status {0}".format(status))
        session.close()
        return status, output
cmdshell.py 文件源码 项目:Chromium_DepotTools 作者: p07r0457 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def run(self, command):
        """
        Execute a command on the remote host.  Return a tuple containing
        an integer status and a two strings, the first containing stdout
        and the second containing stderr from the command.
        """
        boto.log.debug('running:%s on %s' % (command, self.server.instance_id))
        status = 0
        try:
            t = self._ssh_client.exec_command(command)
        except paramiko.SSHException:
            status = 1
        std_out = t[1].read()
        std_err = t[2].read()
        t[0].close()
        t[1].close()
        t[2].close()
        boto.log.debug('stdout: %s' % std_out)
        boto.log.debug('stderr: %s' % std_err)
        return (status, std_out, std_err)
ssh.py 文件源码 项目:deb-python-hplefthandclient 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _connect(self, ssh):
        if self.san_password:
            ssh.connect(self.san_ip,
                        port=self.san_ssh_port,
                        username=self.san_login,
                        password=self.san_password,
                        timeout=self.ssh_conn_timeout)
        elif self.san_privatekey:
            pkfile = os.path.expanduser(self.san_privatekey)
            privatekey = paramiko.RSAKey.from_private_key_file(pkfile)
            ssh.connect(self.san_ip,
                        port=self.san_ssh_port,
                        username=self.san_login,
                        pkey=privatekey,
                        timeout=self.ssh_conn_timeout)
        else:
            msg = "Specify a password or private_key"
            raise exceptions.SSHException(msg)
ssh.py 文件源码 项目:deb-python-hplefthandclient 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def open(self):
        """Opens a new SSH connection if the transport layer is missing.

        This can be called if an active SSH connection is open already.

        """
        # Create a new SSH connection if the transport layer is missing.
        if self.ssh:
            transport_active = False
            if self.ssh.get_transport():
                transport_active = self.ssh.get_transport().is_active()
            if not transport_active:
                try:
                    self._connect(self.ssh)
                except Exception as e:
                    msg = "Error connecting via ssh: %s" % e
                    self._logger.error(msg)
                    raise paramiko.SSHException(msg)
ssh.py 文件源码 项目:deb-python-hplefthandclient 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def was_command_successful(self, output):
        """
        Given the entire output of an SSH command, this will check to see if
        the result returned is 0, aka it was successful. If result is not 0,
        the command failed and we return False.

        :param output: The output string of an SSH command
        :type output: str

        :returns: True if the command was successful
        """
        output_string = ''.join(output)
        match = re.match('.*result\s+0', output_string)
        if not match:
            raise exceptions.SSHException(
                "The command did not execute successfully.")

        return True
test_HPELeftHandClient_MockSSH.py 文件源码 项目:deb-python-hplefthandclient 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_create_ssh_except(self):
        """Make sure that SSH exceptions are not quietly eaten."""

        self.cl.setSSHOptions(ip,
                              user,
                              password,
                              known_hosts_file=None,
                              missing_key_policy=paramiko.AutoAddPolicy)

        self.cl.ssh.ssh = mock.Mock()
        self.cl.ssh.ssh.invoke_shell.side_effect = Exception('boom')

        cmd = ['fake']
        self.assertRaises(exceptions.SSHException, self.cl.ssh._run_ssh, cmd)

        self.cl.ssh.ssh.assert_has_calls(
            [
                mock.call.get_transport(),
                mock.call.get_transport().is_alive(),
                mock.call.invoke_shell(),
                mock.call.get_transport(),
                mock.call.get_transport().is_alive(),
            ]
        )
test_HPELeftHandClient_MockSSH.py 文件源码 项目:deb-python-hplefthandclient 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_was_command_successful_false(self):
        # Create our fake SSH client
        ssh_client = ssh.HPELeftHandSSHClient('foo', 'bar', 'biz')

        # Test invalid command output
        cmd_out = ['invalid']
        self.assertRaises(exceptions.SSHException,
                          ssh_client.was_command_successful,
                          cmd_out)

        # Test valid command output, but command failed
        cmd_out = ['',
                   'HP StoreVirtual LeftHand OS Command Line Interface',
                   '(C) Copyright 2007-2013',
                   '',
                   'RESPONSE',
                   ' result         8080878378']
        self.assertRaises(exceptions.SSHException,
                          ssh_client.was_command_successful,
                          cmd_out)
test_HPELeftHandClient_MockSSH.py 文件源码 项目:deb-python-hplefthandclient 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_connect_without_password_and_private_key(self):
        ssh_client = ssh.HPELeftHandSSHClient(ip, user, password,
                                              known_hosts_file=None,
                                              missing_key_policy=paramiko.
                                              AutoAddPolicy)
        mock_expand_user = mock.Mock()
        mock_expand_user.return_value = 'my_user'

        mock_from_private_key_file = mock.Mock()
        mock_from_private_key_file.return_value = 'my_private_key'

        ssh_client.san_password = None
        ssh_client.san_privatekey = None

        ssh_client.ssh = mock.Mock()
        ssh_client.ssh.get_transport.return_value = False
        self.assertRaises(paramiko.SSHException, ssh_client.open)
test_HPELeftHandClient_MockSSH.py 文件源码 项目:deb-python-hplefthandclient 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_run_ssh_with_exception(self):
        ssh_client = ssh.HPELeftHandSSHClient(ip, user, password,
                                              known_hosts_file=None,
                                              missing_key_policy=paramiko.
                                              AutoAddPolicy)

        ssh_client.check_ssh_injection = mock.Mock()
        ssh_client.check_ssh_injection.return_value = True
        ssh_client._ssh_execute = mock.Mock()
        ssh_client._ssh_execute.side_effect = Exception('End this here')
        ssh_client._create_ssh = mock.Mock()
        ssh_client._create_ssh.return_value = True
        ssh_client.ssh = mock.Mock()
        ssh_client.ssh.get_transport.is_alive.return_value = True

        command = ['fake']
        self.assertRaises(exceptions.SSHException, ssh_client._run_ssh,
                          command, attempts=1)
        ssh_client.check_ssh_injection.assert_called_once()
        ssh_client._ssh_execute.assert_called_once()
        ssh_client._create_ssh.assert_not_called()
cmdshell.py 文件源码 项目:node-gn 作者: Shouqun 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run(self, command):
        """
        Execute a command on the remote host.  Return a tuple containing
        an integer status and a two strings, the first containing stdout
        and the second containing stderr from the command.
        """
        boto.log.debug('running:%s on %s' % (command, self.server.instance_id))
        status = 0
        try:
            t = self._ssh_client.exec_command(command)
        except paramiko.SSHException:
            status = 1
        std_out = t[1].read()
        std_err = t[2].read()
        t[0].close()
        t[1].close()
        t[2].close()
        boto.log.debug('stdout: %s' % std_out)
        boto.log.debug('stderr: %s' % std_err)
        return (status, std_out, std_err)
Perf.py 文件源码 项目:ARNPerf 作者: zhaoqige 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def SSHConnect(host, user, passwd, port):
    ssh = paramiko.SSHClient()
    try:
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # any remote (~/.ssh/known_hosts)    
        ssh.connect(host, port = int(port), 
                    username = user, password = passwd, 
                    allow_agent=False, look_for_keys=False)

    except:
    #except paramiko.SSHException:
        ssh.close()
        ssh = None
        print('error> failed to connect', host, 
                '(please check your input: ip, port, user, password)')

    return ssh
cmdshell.py 文件源码 项目:depot_tools 作者: webrtc-uwp 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def run(self, command):
        """
        Execute a command on the remote host.  Return a tuple containing
        an integer status and a two strings, the first containing stdout
        and the second containing stderr from the command.
        """
        boto.log.debug('running:%s on %s' % (command, self.server.instance_id))
        status = 0
        try:
            t = self._ssh_client.exec_command(command)
        except paramiko.SSHException:
            status = 1
        std_out = t[1].read()
        std_err = t[2].read()
        t[0].close()
        t[1].close()
        t[2].close()
        boto.log.debug('stdout: %s' % std_out)
        boto.log.debug('stderr: %s' % std_err)
        return (status, std_out, std_err)
ssh.py 文件源码 项目:dask-ec2 作者: dask 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def connect(self):
        """Connect to host
        """
        try:
            self.client.connect(self.host,
                                username=self.username,
                                password=self.password,
                                port=self.port,
                                pkey=self.pkey,
                                timeout=self.timeout)
        except paramiko.AuthenticationException as e:
            raise DaskEc2Exception("Authentication Error to host '%s'" % self.host)
        except sock_gaierror as e:
            raise DaskEc2Exception("Unknown host '%s'" % self.host)
        except sock_error as e:
            raise DaskEc2Exception("Error connecting to host '%s:%s'\n%s" % (self.host, self.port, e))
        except paramiko.SSHException as e:
            raise DaskEc2Exception("General SSH error - %s" % e)
__init__.py 文件源码 项目:worker 作者: mechaphish 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _initialize_ssh_connection(self):
        LOG.debug("Connecting to the VM via SSH")
        self.ssh = paramiko.client.SSHClient()

        self.ssh.set_missing_host_key_policy(paramiko.client.AutoAddPolicy())
        try:
            self.ssh.connect("127.0.0.1", port=self._ssh_port, username=self._ssh_username,
                             key_filename=self._ssh_keyfile, timeout=self._ssh_timeout)
            # Set TCP Keep-Alive to 5 seconds, so that the connection does not die
            transport = self.ssh.get_transport()
            transport.set_keepalive(5)
            # also raises BadHostKeyException, should be taken care of via AutoAddPolicy()
            # also raises AuthenticationException, should never occur because keys are provisioned
        except socket.error as e:
            LOG.error("TCP error connecting to SSH on VM.")
            raise e
        except paramiko.SSHException as e:
            LOG.error("SSH error trying to connect to VM.")
            raise e
__init__.py 文件源码 项目:worker 作者: mechaphish 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def execute(self, command):
        assert self.ssh is not None

        environment = " ".join("{}='{}'".format(k, v) for k, v in os.environ.items()
                               if k.startswith("POSTGRES"))
        env_command = "{} {}".format(environment, command)
        LOG.debug("Executing command: %s", env_command)
        stdout_content = None
        stderr_content = None
        try:
            _, stdout, stderr = self.ssh.exec_command(env_command)
            exit_status = stdout.channel.recv_exit_status()
            if exit_status != 0:
                raise paramiko.SSHException("'%s' failed with exit status %d", command, exit_status)
            stdout_content = stdout.read()
            stderr_content = stderr.read()
        except paramiko.SSHException as e:
            LOG.error("Unable to excute command '%s' on host: %s", command, e)
            LOG.debug("stdout: %s", stdout.read())
            LOG.debug("stderr: %s", stderr.read())
            raise e
        return stdout_content, stderr_content
control_gpio_GUI.py 文件源码 项目:Remote_raspberrypi_GPIO_Control 作者: Rahul14singh 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def trySSHConnect(self,host, portNum):
        paramiko.util.log_to_file ('paramiko.log') 
        try:
            self.ssh = paramiko.SSHClient()
            self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            self.ssh.connect(host,port=portNum,username="pi", password="raspberry")
            self.ssh.get_transport().window_size = 3 * 1024 * 1024
            command='python /home/pi/Desktop/control_gpio_pi/initial_check.py'
            stdin,stdout,stderr = self.ssh.exec_command(command)
            print('\nstout:',stdout.read())
        except paramiko.AuthenticationException:
            print ("Authentication failed!")
            return -1
        except paramiko.BadHostKeyException:
            print ("BadHostKey Exception!")
            return -1
        except paramiko.SSHException:
            print ("SSH Exception!")
            self.ssh.close()
            return -1
        except socket.error as e:
            print ("Socket error ", e)
            return -1
        except:
            print ("Could not SSH to %s, unhandled exception" % host)
            return -1
        print ("Made connection to " + host + ":" + str(portNum))
        return 0
sftp.py 文件源码 项目:micromasters 作者: mitodl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_connection():
    """
    Creates a new SFTP connection

    Returns:
        connection(pysftp.Connection):
            the configured connection
    """
    missing_settings = []
    for key in PEARSON_UPLOAD_REQUIRED_SETTINGS:
        if getattr(settings, key) is None:
            missing_settings.append(key)

    if missing_settings:
        raise ImproperlyConfigured(
            "The setting(s) {} are required".format(', '.join(missing_settings))
        )

    cnopts = pysftp.CnOpts()
    cnopts.hostkeys = None  # ignore knownhosts

    try:
        return pysftp.Connection(
            host=str(settings.EXAMS_SFTP_HOST),
            port=int(settings.EXAMS_SFTP_PORT),
            username=str(settings.EXAMS_SFTP_USERNAME),
            password=str(settings.EXAMS_SFTP_PASSWORD),
            cnopts=cnopts,
        )
    except (ConnectionException, SSHException) as ex:
        raise RetryableSFTPException() from ex
upload.py 文件源码 项目:micromasters 作者: mitodl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def upload_tsv(file_path):
    """
    Upload the given TSV files to the remote

    Args:
        file_path (str): absolute path to the file to be uploaded
    """
    try:
        with get_connection() as sftp:
            with sftp.cd(settings.EXAMS_SFTP_UPLOAD_DIR):
                sftp.put(file_path)
    except (EOFError, SSHException,) as exc:
        raise RetryableSFTPException() from exc
download_test.py 文件源码 项目:micromasters 作者: mitodl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_process_ssh_exception_cd(
            self, process_zip_mock, filtered_files_mock, os_path_exists_mock, os_remove_mock):
        """Test that SSH exceptions bubble up"""
        self.sftp.cd.side_effect = SSHException('exception')

        processor = download.ArchivedResponseProcessor(self.sftp)
        with self.assertRaises(RetryableSFTPException):
            processor.process()

        filtered_files_mock.assert_not_called()
        self.sftp.remove.assert_not_called()
        process_zip_mock.assert_not_called()
        os_path_exists_mock.assert_not_called()
        os_remove_mock.assert_not_called()


问题


面经


文章

微信
公众号

扫码关注公众号