python类ErrorReturnCode()的实例源码

commands.py 文件源码 项目:docl 作者: cloudify-cosmo 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _ssh_setup(container_id, container_ip):
    logger.info('Applying ssh configuration to manager container')
    try:
        known_hosts = path('~/.ssh/known_hosts').expanduser()
        # Known hosts file may not exist
        ssh_keygen('-R', container_ip)
        fingerprint = None
        while not fingerprint:
            fingerprint = ssh_keyscan(
                container_ip).stdout.split('\n')[0].strip()
            time.sleep(0.01)
        if fingerprint and known_hosts.exists():
            current = known_hosts.text()
            prefix = ''
            if not current.endswith('\n'):
                prefix = '\n'
            known_hosts.write_text(
                '{}{}\n'.format(prefix, fingerprint), append=True)
    except sh.ErrorReturnCode:
        pass
    quiet_docker('exec', container_id, 'mkdir', '-p', '/root/.ssh')
    ssh_public_key = ssh_keygen('-y', '-f', configuration.ssh_key_path).strip()
    with tempfile.NamedTemporaryFile() as f:
        f.write(ssh_public_key)
        f.flush()
        quiet_docker.cp(f.name, '{}:/root/.ssh/authorized_keys'.format(
            container_id))
    # due to a bug in docker 17.06, the file keeps ownership and is not
    # chowned to the main container user automatically
    quiet_docker('exec', container_id, 'chown', 'root:root',
                 '/root/.ssh/authorized_keys')
connection.py 文件源码 项目:hpc-workload-gen 作者: mikelangelo-project 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _get_qstat_job_state(self):
        try:
            self.logger.debug('getting qstat infos')
            ssh_output = self.ssh_host(self.cfg.path_qstat, self.job_id)

            self.logger.debug('qstat output:\n{}'.format(ssh_output))

            if ssh_output != '':
                # slice the header and the last line which is empty
                jobs_displayed = ssh_output.split('\n')[2:-1]
                for job in jobs_displayed:
                    # remove whitespace
                    job_info = ' '.join(job.split())
                    # split into stuff
                    (self.job_id,
                        job_name,
                        job_user,
                        job_time,
                        job_status,
                        job_queue) = job_info.split(' ')
                    self.logger.debug(
                        'job {} has status {}'.format(self.job_id, job_status))

                    return job_status
            else:
                return None

        except ErrorReturnCode as e:
            self.logger.error('\nError in ssh call:\n{}'.format(e))
            print e.stderr
            exit(1)
ffpb.py 文件源码 项目:ffpb 作者: althonos 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def main(argv=None):
    argv = argv or sys.argv[1:]

    if {'-h', '-help', '--help'}.intersection(argv):
        sh.ffmpeg(help=True, _fg=True)
        return 0

    notifier = ProgressNotifier()

    try:

        sh.ffmpeg(
            sys.argv[1:],
            _in=queue.Queue(),
            _err=notifier,
            _out_bufsize=0,
            _err_bufsize=0,
            #_in_bufsize=0,
            _no_out=True,
            _no_pipe=True,
            _tty_in=True,
            #_fg=True,
            #_bg=True,
        )

    except sh.ErrorReturnCode as err:
        print(notifier.lines[-1])
        return err.exit_code

    else:
        print()
        return 0
setup.py 文件源码 项目:openshift-ansible-contrib 作者: openshift 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run(self):
        ''' run command '''
        if self.roles is not None:
            print("Roles:\n{0}".format(yaml.dump(self.roles, default_flow_style=False)))
        if self.excludes is not None:
            print("Excludes:\n{0}".format(yaml.dump(self.excludes, default_flow_style=False)))

        starting_dir = '.'
        if self.roles is not None:
            starting_dir = 'roles'

        molecule_dirs = find_dirs(starting_dir, self.excludes, self.roles, 'molecule')

        print("Found:\n{0}".format(yaml.dump(molecule_dirs, default_flow_style=False)))
        base_dir = os.getcwd()

        errors = ""
        warnings = ""

        for role in molecule_dirs:
            role = os.path.dirname(role)
            print("Testing: {0}".format(role))
            os.chdir(role)

            try:
                print(sh.molecule.test())
            except ErrorReturnCode as e:
                print(e.stdout)
                errors += e.stdout

            os.chdir(base_dir)

        if len(warnings) > 0:
            print("Warnings:\n{0}\n".format(warnings))

        if len(errors) > 0:
            print("Errors:\n{0}\n".format(errors))
            sys.exit(1)
setup.py 文件源码 项目:openshift-ansible 作者: nearform 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def run(self):
        ''' run command '''
        if self.roles is not None:
            print("Roles:\n{0}".format(yaml.dump(self.roles, default_flow_style=False)))
        if self.excludes is not None:
            print("Excludes:\n{0}".format(yaml.dump(self.excludes, default_flow_style=False)))

        starting_dir = '.'
        if self.roles is not None:
            starting_dir = 'roles'

        molecule_dirs = find_dirs(starting_dir, self.excludes, self.roles, 'molecule')

        print("Found:\n{0}".format(yaml.dump(molecule_dirs, default_flow_style=False)))
        base_dir = os.getcwd()

        errors = ""
        warnings = ""

        for role in molecule_dirs:
            role = os.path.dirname(role)
            print("Testing: {0}".format(role))
            os.chdir(role)

            try:
                print(sh.molecule.test())
            except ErrorReturnCode as e:
                print(e.stdout)
                errors += e.stdout

            os.chdir(base_dir)

        if len(warnings) > 0:
            print("Warnings:\n{0}\n".format(warnings))

        if len(errors) > 0:
            print("Errors:\n{0}\n".format(errors))
            sys.exit(1)
test_cookiecutter_generation.py 文件源码 项目:cookiecutter-django-gulp 作者: valerymelou 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_flake8_compliance(cookies):
    """generated project should pass flake8"""
    result = cookies.bake()

    try:
        sh.flake8(str(result.project))
    except sh.ErrorReturnCode as e:
        pytest.fail(e)
repos.py 文件源码 项目:s2e-env 作者: S2E 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def git_clone(git_repo_url, git_repo_dir):
    try:
        logger.info('Fetching from %s to %s', git_repo_url, git_repo_dir)
        git.clone(git_repo_url, git_repo_dir, _out=sys.stdout,
                  _err=sys.stderr, _fg=True)
    except ErrorReturnCode as e:
        raise CommandError(e)
image_download.py 文件源码 项目:s2e-env 作者: S2E 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _decompress(path):
    """
    Decompress a .tar.xz file at the given path.

    The decompressed data will be located in the same directory as ``path``.
    """
    logger.info('Decompressing %s', path)
    try:
        tar(extract=True, xz=True, verbose=True, file=path,
            directory=os.path.dirname(path), _fg=True, _out=sys.stdout,
            _err=sys.stderr)
    except ErrorReturnCode as e:
        raise CommandError(e)
init.py 文件源码 项目:s2e-env 作者: S2E 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _install_dependencies():
    """
    Install S2E's dependencies.

    Only apt-get is supported for now.
    """
    logger.info('Installing S2E dependencies')

    ubuntu_ver = _get_ubuntu_version()
    if not ubuntu_ver:
        return

    install_packages = CONSTANTS['dependencies']['common'] +                    \
                       CONSTANTS['dependencies']['ubuntu_%d' % ubuntu_ver] +    \
                       CONSTANTS['dependencies']['ida']

    try:
        # Enable 32-bit libraries
        dpkg_add_arch = sudo.bake('dpkg', add_architecture=True, _fg=True)
        dpkg_add_arch('i386')

        # Perform apt-get install
        apt_get = sudo.bake('apt-get', _fg=True)
        apt_get.update()
        apt_get.install(install_packages)
    except ErrorReturnCode as e:
        raise CommandError(e)
init.py 文件源码 项目:s2e-env 作者: S2E 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _get_s2e_sources(env_path):
    """
    Download the S2E manifest repository and initialize all of the S2E
    repositories with repo.
    """
    # Download repo
    repo = _get_repo(env_path)

    s2e_source_path = os.path.join(env_path, 'source', 's2e')

    # Create the S2E source directory and cd to it to run repo
    os.mkdir(s2e_source_path)
    orig_dir = os.getcwd()
    os.chdir(s2e_source_path)

    git_url = CONSTANTS['repos']['url']
    git_s2e_repo = CONSTANTS['repos']['s2e']

    try:
        # Now use repo to initialize all the repositories
        logger.info('Fetching %s from %s', git_s2e_repo, git_url)
        repo.init(u='%s/%s' % (git_url, git_s2e_repo), _out=sys.stdout,
                  _err=sys.stderr, _fg=True)
        repo.sync(_out=sys.stdout, _err=sys.stderr, _fg=True)
    except ErrorReturnCode as e:
        # Clean up - remove the half-created S2E environment
        shutil.rmtree(env_path)
        raise CommandError(e)
    finally:
        # Change back to the original directory
        os.chdir(orig_dir)

    # Success!
    logger.success('Fetched %s', git_s2e_repo)
import_project.py 文件源码 项目:s2e-env 作者: S2E 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _get_project_name(archive):
    """
    Get the project name from the archive.

    The project name is the name of the root directory in the archive.
    """
    try:
        contents = tar(exclude='*/*', list=True, file=archive)
        return os.path.dirname(str(contents))
    except ErrorReturnCode as e:
        raise CommandError('Failed to list archive - %s' % e)
build.py 文件源码 项目:s2e-env 作者: S2E 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        # Exit if the makefile doesn't exist
        makefile = self.env_path('source', 's2e', 'Makefile')
        if not os.path.isfile(makefile):
            raise CommandError('No makefile found in %s' %
                               os.path.dirname(makefile))

        # If the build directory doesn't exist, create it
        build_dir = self.env_path('build', 's2e')
        if not os.path.isdir(build_dir):
            os.mkdir(build_dir)

        # Set up some environment variables
        env_vars = os.environ.copy()
        env_vars['S2EPREFIX'] = self.install_path()

        components = options['components']
        self._make = sh.Command('make').bake(directory=build_dir, file=makefile, _env=env_vars)

        # If the user has specified any components to rebuild, do this before
        # the build
        if components:
            self._rebuild_components(components)

        try:
            # Run make
            if options['debug']:
                logger.info('Building S2E (debug) in %s', build_dir)
                self._make('all-debug', _out=sys.stdout, _err=sys.stderr, _fg=True)
            else:
                logger.info('Building S2E (release) in %s', build_dir)
                self._make('install', _out=sys.stdout, _err=sys.stderr, _fg=True)
        except ErrorReturnCode as e:
            raise CommandError(e)

        return 'S2E built'
image_build.py 文件源码 项目:s2e-env 作者: S2E 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _invoke_make(self, img_build_dir, rule_names, num_cores, iso_dir=''):
        env = os.environ.copy()
        env['S2E_INSTALL_ROOT'] = self.install_path()
        env['S2E_LINUX_KERNELS_ROOT'] = \
            self.source_path(CONSTANTS['repos']['images']['linux'])
        env['OUTDIR'] = self.image_path()

        if iso_dir:
            env['ISODIR'] = iso_dir

        logger.debug('Invoking makefile with:')
        logger.debug('export S2E_INSTALL_ROOT=%s', env['S2E_INSTALL_ROOT'])
        logger.debug('export S2E_LINUX_KERNELS_ROOT=%s', env['S2E_LINUX_KERNELS_ROOT'])
        logger.debug('export OUTDIR=%s', env['OUTDIR'])
        logger.debug('export ISODIR=%s', env.get('ISODIR', ''))

        if not self._headless:
            env['GRAPHICS'] = ''
        else:
            logger.warn('Image creation will run in headless mode. '
                        'Use --gui to see graphic output for debugging.')

        try:
            make = sh.Command('make').bake(file=os.path.join(img_build_dir,
                                                             'Makefile'),
                                           directory=self.image_path(),
                                           _out=sys.stdout, _err=sys.stderr,
                                           _env=env, _fg=True)

            make_image = make.bake(j=num_cores)
            make_image(rule_names)
        except ErrorReturnCode as e:
            raise CommandError(e)
test_cookiecutter_generation.py 文件源码 项目:cookiecutter-saas 作者: jayfk 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def run_docker_dev_test(path, coverage=False):
    """
    Method to check that docker runs with dev.yml
    """
    try:
        # build django, power up the stack and run the test
        sh.docker_compose(
            "--file", "{}/dev.yml".format(path), "build", "django"
        )
        sh.docker_compose("--file", "{}/dev.yml".format(path), "build")
        if coverage:
            sh.docker_compose(
                "--file", "{}/dev.yml".format(path), "run", "django", "coverage", "run", "manage.py", "test"
            )
            sh.docker_compose(
                "--file", "{}/dev.yml".format(path), "run", "django", "coverage", "xml", "-o", "coverage.xml"
            )
            shutil.copyfile(os.path.join(str(path), ".coverage"), os.path.join(PROJECT_DIR, ".coverage"))
            shutil.copyfile(os.path.join(str(path), "coverage.xml"),
                            os.path.join(PROJECT_DIR, "coverage.xml"))
        else:
            sh.docker_compose(
                "--file", "{}/dev.yml".format(path), "run", "django", "python", "manage.py", "test"
            )

        # test that the development server is running
        sh.docker_compose("--file", "{}/dev.yml".format(path), "up", "-d")
        time.sleep(10)
        curl = sh.curl("-I", "http://localhost:8000/")
        assert "200 OK" in curl
        assert "Server: Werkzeug" in curl

        # since we are running a lot of tests with different configurations,
        # we need to clean up the environment. Stop all running containers,
        # remove them and remove the postgres_data volume.
        sh.docker_compose("--file", "{}/dev.yml".format(path), "stop")
        sh.docker_compose("--file", "{}/dev.yml".format(path), "rm", "-f")
        sh.docker("volume", "rm", "cookiecuttersaastestproject_postgres_data_dev")
    except sh.ErrorReturnCode as e:
        # in case there are errors it's good to have full output of
        # stdout and stderr.
        pytest.fail("STDOUT: {} \n\n\n STDERR: {}".format(
            e.stdout.decode("utf-8"), e.stderr.decode("utf-8"))
        )
test.py 文件源码 项目:AerisCloud 作者: AerisCloud 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _run_ansible_lint(organization):
    al_bin = os.path.join(aeriscloud_path, 'venv/bin/ansible-lint')

    env = ansible_env(os.environ.copy())

    if organization:
        environment_files = glob.glob(get_env_path(organization) + '/*.yml')
    else:
        environment_files = glob.glob(organization_path + '/*/*.yml')

    if not environment_files:
        return 0

    args = environment_files + ['-r', os.path.join(ansible_path, 'rules')]

    click.echo('Running ansible-lint ... ', nl=False)

    errors = 0
    try:
        python(al_bin, *args,
               _env=env, _err_to_out=True, _ok_code=[0])
        click.echo('[%s]' % click.style('OK', fg='green'))
    except ErrorReturnCode as e:
        parser = re.compile(
            r'^\[(?P<error_code>[^\]]+)\] (?P<error_message>[^\n]+)\n'
            r'%s(?P<file_name>[^:]+):(?P<line_number>[0-9]+)\n'
            r'Task/Handler: (?P<task_name>[^\n]+)\n\n' % (ansible_path + '/'),
            re.MULTILINE
        )

        click.echo('[%s]\n' % click.style('FAIL', fg='red'))

        last_file = None
        pos = 0
        while pos < len(e.stdout):
            match = parser.match(e.stdout, pos)
            if not match:
                click.secho("Error: %s" % e.stdout)
                errors += 1
                break
            error = match.groupdict()

            if error['file_name'] != last_file:
                click.secho('  Errors in file: %s' % error['file_name'],
                            fg='blue', bold=True)
                last_file = error['file_name']

            click.echo('    line %s task %s: %s %s' % (
                click.style(error['line_number'], fg='green'),
                click.style(error['task_name'], fg='green'),
                click.style(error['error_code'], fg='red'),
                click.style(error['error_message'], fg='red'),
            ))
            errors += 1
            pos = match.end()
    return errors
connection.py 文件源码 项目:hpc-workload-gen 作者: mikelangelo-project 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def submit_job(self):
        """Submit the job to qsub, returns job_id."""
        self.logger.info('Submitting job ...')

        job_script_path = self._get_job_script_path()

        arg_list = self._build_qsub_args()
        arg_list.append(job_script_path)
        self.logger.debug('arg_liste: {}'.format(arg_list))

        try:
            self.logger.debug(
                '{} {}'.format(self.cfg.path_qsub, arg_list)
            )

            # get stating time and convert
            self.time_stamp_jobstart = int(time.time()) * 1000
            self.logger.debug(
                'stat time stamp: {}'.format(self.time_stamp_jobstart)
            )
            # Job submit
            ssh_output = self.ssh_host(self.cfg.path_qsub, *arg_list)

            # searching job id
            for line in ssh_output:
                self.logger.debug('searching for job id in \n{}'.format(line))

                if "hlrs.de" in line:
                    self.logger.debug('possible job id found: {}'.format(line))
                    self.job_id = str(line)
                    if self.cfg.grafana:
                        self.logger.info(
                            'Job performance data at:\n'
                            '{}var-JobId=snapTask-{}-{}&'
                            'from={}&'
                            'to=now'.format(
                                self.cfg.grafana_base_string,
                                self.cfg.user_name,
                                self.job_id.rstrip(),
                                self.time_stamp_jobstart
                            )
                        )
                    return

            self.logger.error(
                'no job id found in \n{}\nexiting!!!'.format(ssh_output)
            )
            exit(1)

        except ErrorReturnCode as e:
            self.logger.error('\nError in ssh call:\n{}'.format(e))
            print e.stderr
            exit(1)
ida_basic_block.py 文件源码 项目:s2e-env 作者: S2E 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _get_basic_blocks(self):
        """
        Extract basic block information from the target binary using S2E's IDA
        Pro script.

        This extraction is done within a temporary directory so that we don't
        pollute the file system with temporary idbs and other such things.
        """
        logger.info('Generating basic block information from IDA Pro')

        try:
            with TemporaryDirectory() as temp_dir:
                target_path = self._project_desc['target_path']

                # Copy the binary to the temporary directory. Because projects
                # are created with a symlink to the target program, then IDA
                # Pro will generate the idb and bblist files in the symlinked
                # target's directory. Which is not what we want
                target_name = os.path.basename(target_path)

                temp_target_path = os.path.join(temp_dir, target_name)
                shutil.copyfile(target_path, temp_target_path)

                # Run the IDA Pro extractBasicBlocks script
                env_vars = os.environ.copy()
                env_vars['TVHEADLESS'] = '1'
                # This is required if s2e-env runs inside screen
                env_vars['TERM'] = 'xterm'

                ida = sh.Command(self._ida_path)
                ida('-A', '-B',
                    '-S%s' % self.install_path('bin', 'extractBasicBlocks.py'),
                    temp_target_path, _out=os.devnull, _tty_out=False,
                    _cwd=temp_dir, _env=env_vars)

                # Check that the basic block list file was correctly generated
                bblist_file = os.path.join(temp_dir, '%s.bblist' % target_name)
                if not os.path.isfile(bblist_file):
                    raise CommandError('Failed to generate bblist file for '
                                       '%s' % target_name)

                # Parse the basic block list file
                #
                # to_basic_block takes a 3-tuple read from the bblist file and
                # converts it to a BasicBlock
                to_basic_block = lambda tup: BasicBlock(int(tup[0], 16),
                                                        int(tup[1], 16),
                                                        tup[2])
                with open(bblist_file, 'r') as f:
                    return [to_basic_block(l.rstrip().split(' ')) for l in f]
        except ErrorReturnCode as e:
            raise CommandError(e)


问题


面经


文章

微信
公众号

扫码关注公众号