python类check_output()的实例源码

pg_gw_utils.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def configure_analyst_opsvm():
    '''
    Configures Anaylyst for OPSVM
    '''
    if not service_running('plumgrid'):
        restart_pg()
    opsvm_ip = pg_gw_context._pg_dir_context()['opsvm_ip']
    NS_ENTER = ('/opt/local/bin/nsenter -t $(ps ho pid --ppid $(cat '
                '/var/run/libvirt/lxc/plumgrid.pid)) -m -n -u -i -p ')
    sigmund_stop = NS_ENTER + '/usr/bin/service plumgrid-sigmund stop'
    sigmund_status = NS_ENTER \
        + '/usr/bin/service plumgrid-sigmund status'
    sigmund_autoboot = NS_ENTER \
        + '/usr/bin/sigmund-configure --ip {0} --start --autoboot' \
        .format(opsvm_ip)
    try:
        status = subprocess.check_output(sigmund_status, shell=True)
        if 'start/running' in status:
            if subprocess.call(sigmund_stop, shell=True):
                log('plumgrid-sigmund couldn\'t be stopped!')
                return
        subprocess.check_call(sigmund_autoboot, shell=True)
        status = subprocess.check_output(sigmund_status, shell=True)
    except:
        log('plumgrid-sigmund couldn\'t be started!')
host.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def service_running(service_name):
    """Determine whether a system service is running"""
    if init_is_systemd():
        return service('is-active', service_name)
    else:
        try:
            output = subprocess.check_output(
                ['service', service_name, 'status'],
                stderr=subprocess.STDOUT).decode('UTF-8')
        except subprocess.CalledProcessError:
            return False
        else:
            # This works for upstart scripts where the 'service' command
            # returns a consistent string to represent running 'start/running'
            if ("start/running" in output or "is running" in output or
                    "up and running" in output):
                return True
            # Check System V scripts init script return codes
            if service_name in systemv_services_running():
                return True
            return False
hugepage.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def hugepage_support(user, group='hugetlb', nr_hugepages=256,
                     max_map_count=65536, mnt_point='/run/hugepages/kvm',
                     pagesize='2MB', mount=True, set_shmmax=False):
    """Enable hugepages on system.

    Args:
    user (str)  -- Username to allow access to hugepages to
    group (str) -- Group name to own hugepages
    nr_hugepages (int) -- Number of pages to reserve
    max_map_count (int) -- Number of Virtual Memory Areas a process can own
    mnt_point (str) -- Directory to mount hugepages on
    pagesize (str) -- Size of hugepages
    mount (bool) -- Whether to Mount hugepages
    """
    group_info = add_group(group)
    gid = group_info.gr_gid
    add_user_to_group(user, group)
    if max_map_count < 2 * nr_hugepages:
        max_map_count = 2 * nr_hugepages
    sysctl_settings = {
        'vm.nr_hugepages': nr_hugepages,
        'vm.max_map_count': max_map_count,
        'vm.hugetlb_shm_group': gid,
    }
    if set_shmmax:
        shmmax_current = int(check_output(['sysctl', '-n', 'kernel.shmmax']))
        shmmax_minsize = bytes_from_string(pagesize) * nr_hugepages
        if shmmax_minsize > shmmax_current:
            sysctl_settings['kernel.shmmax'] = shmmax_minsize
    sysctl.create(yaml.dump(sysctl_settings), '/etc/sysctl.d/10-hugepage.conf')
    mkdir(mnt_point, owner='root', group='root', perms=0o755, force=False)
    lfstab = fstab.Fstab()
    fstab_entry = lfstab.get_entry_by_attr('mountpoint', mnt_point)
    if fstab_entry:
        lfstab.remove_entry(fstab_entry)
    entry = lfstab.Entry('nodev', mnt_point, 'hugetlbfs',
                         'mode=1770,gid={},pagesize={}'.format(gid, pagesize), 0, 0)
    lfstab.add_entry(entry)
    if mount:
        fstab_mount(mnt_point)
cluster.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def is_crm_leader(resource, retry=False):
    """
    Returns True if the charm calling this is the elected corosync leader,
    as returned by calling the external "crm" command.

    We allow this operation to be retried to avoid the possibility of getting a
    false negative. See LP #1396246 for more info.
    """
    if resource == DC_RESOURCE_NAME:
        return is_crm_dc()
    cmd = ['crm', 'resource', 'show', resource]
    try:
        status = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
        if not isinstance(status, six.text_type):
            status = six.text_type(status, "utf-8")
    except subprocess.CalledProcessError:
        status = None

    if status and get_unit_hostname() in status:
        return True

    if status and "resource %s is NOT running" % (resource) in status:
        raise CRMResourceNotFound("CRM resource %s not found" % (resource))

    return False
hookenv.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def config(scope=None):
    """Juju charm configuration"""
    config_cmd_line = ['config-get']
    if scope is not None:
        config_cmd_line.append(scope)
    else:
        config_cmd_line.append('--all')
    config_cmd_line.append('--format=json')
    try:
        config_data = json.loads(
            subprocess.check_output(config_cmd_line).decode('UTF-8'))
        if scope is not None:
            return config_data
        return Config(config_data)
    except ValueError:
        return None
containers_test.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_get_container_ip(containers):
    """
    Asserts that when an existing container is queried for its IP it returns
    the specified projects and services IP address.
    """
    actual_ip = Containers.get_container_ip_address(
        project=containers.project,
        service=ZOOKEEPER,
        timeout_seconds=5
    )
    container_id = Containers.get_container_info(
        project=containers.project,
        service=ZOOKEEPER
    )['Id']
    command = "docker inspect --format '{{{{ .NetworkSettings.IPAddress }}}}' {container_id}" \
        .format(container_id=container_id)
    expected_ip = subprocess.check_output([command], shell=True)

    assert expected_ip.rstrip() == actual_ip
host.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def fstab_mount(mountpoint):
    """Mount filesystem using fstab"""
    cmd_args = ['mount', mountpoint]
    try:
        subprocess.check_output(cmd_args)
    except subprocess.CalledProcessError as e:
        log('Error unmounting {}\n{}'.format(mountpoint, e.output))
        return False
    return True
hookenv.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def relation_get(attribute=None, unit=None, rid=None):
    """Get relation information"""
    _args = ['relation-get', '--format=json']
    if rid:
        _args.append('-r')
        _args.append(rid)
    _args.append(attribute or '-')
    if unit:
        _args.append(unit)
    try:
        return json.loads(subprocess.check_output(_args).decode('UTF-8'))
    except ValueError:
        return None
    except CalledProcessError as e:
        if e.returncode == 2:
            return None
        raise
ufw.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def service(name, action):
    """
    Open/close access to a service

    :param name: could be a service name defined in `/etc/services` or a port
                 number.
    :param action: `open` or `close`
    """
    if action == 'open':
        subprocess.check_output(['ufw', 'allow', str(name)],
                                universal_newlines=True)
    elif action == 'close':
        subprocess.check_output(['ufw', 'delete', 'allow', str(name)],
                                universal_newlines=True)
    else:
        raise UFWError(("'{}' not supported, use 'allow' "
                        "or 'delete'").format(action))
utils.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def run_action(self, unit_sentry, action,
                   _check_output=subprocess.check_output,
                   params=None):
        """Run the named action on a given unit sentry.

        params a dict of parameters to use
        _check_output parameter is used for dependency injection.

        @return action_id.
        """
        unit_id = unit_sentry.info["unit_name"]
        command = ["juju", "action", "do", "--format=json", unit_id, action]
        if params is not None:
            for key, value in params.iteritems():
                command.append("{}={}".format(key, value))
        self.log.info("Running command: %s\n" % " ".join(command))
        output = _check_output(command, universal_newlines=True)
        data = json.loads(output)
        action_id = data[u'Action queued with id']
        return action_id
gogen.py 文件源码 项目:python-driver 作者: bblfsh 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def generate2():
    """
    Call an external Python 2 program to retrieve the AST symbols of that
    language version
    :return:
    """
    import subprocess as sp
    import tempfile, shutil, sys, traceback

    tempdir = tempfile.mkdtemp()
    tempfile = os.path.join(tempdir, "py2_ast_code.py")

    py2_proc_out = ""
    try:
        with open(tempfile, 'w') as py2code:
            py2code.write(generate_str + WRITESYMS_CODE)

        py2_proc_out = sp.check_output(["python2", tempfile]).decode()
    finally:
        try:
            shutil.rmtree(tempdir)
        except:
            print("Warning: error trying to delete the temporal directory:", file=sys.stderr)
            print(traceback.format_exc(), file=sys.stderr)
    return set(py2_proc_out.splitlines())
msSqlRecon.py 文件源码 项目:Pillage 作者: kostrin 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def bruteforce(self, ip_address, port, userList, passList):
        print "INFO: Performing hydra msSql scan against " + ip_address 
        hydraCmd = "hydra -L %s -P %s -f -e n -o pillageResults/%s_msSqlhydra.txt -u %s -s %s mssql" % (userList, passList, ip_address, ip_address, port)
        creds={}
        try:
            results = subprocess.check_output(hydraCmd, shell=True)
            resultarr = results.split("\n")
            for result in resultarr:
                if "login:" in result:
                    print "[*] Valid msSql credentials found: " + result
                    resultList=result.split()
                    self.username=resultList[4]
                    if resultList[6]:
                        self.password=resultList[6]
                    else:
                        self.password=''

        except:
            print "INFO: No valid msSql credentials found"
Bombard.py 文件源码 项目:Pillage 作者: kostrin 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def webEnum(self, args):
        print "INFO: Performing nmap http script scan for {}:{}".format(args[0],args[1])
        nmapSCAN = "nmap -sV -Pn -vv -p {} --script='(http* or ssl*) and not (dos or fuzzer or brute)' -oN {}_http.nmap {}".format(args[1],args[0],args[0])
        subprocess.check_output(nmapSCAN, shell=True)

        print "INFO: Performing nikto scan on {}:{}".format(args[0],args[1])
        script="nikto -host {} -port {} -C all >> {}_nikto_{}.txt".format(args[0],args[1],args[0],args[1])
        subprocess.check_output(script, shell=True)

        '''
        print "INFO: Performing dirb scan on {}:{}".format(args[0],args[1])
        dirbList="/usr/share/wordlists/dirbuster/directory-list-2.3-small.txt"
        script="dirb {}://{}:{} {} -S -w >> {}_dirb_{}.txt".format(args[2],args[0],args[1],dirbList,args[0],args[1])
        subprocess.call(script, shell=True)
        '''
        print "INFO: Finished http module for {}:{}".format(args[0],args[1])
Bombard.py 文件源码 项目:Pillage 作者: kostrin 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def smbEnum(self, args):
        print "INFO: Performing nmap smb script scan for {}:{}".format(args[0],args[1])
        nmapSCAN = "nmap -sV -Pn -vv -p {} --script='(smb*) and not (brute or broadcast or dos or external or fuzzer)' --script-args=unsafe=1 -oN {}_smb.nmap {}".format(args[1],args[0],args[0])
        subprocess.check_output(nmapSCAN, shell=True)

        print "INFO: Performing ntbscan for {}:{}".format(args[0],args[1])
        nbtSCAN = "nbtscan -r -v -h {} >> {}_smbNbt.txt".format(args[0],args[0])
        subprocess.check_output(nbtSCAN, shell=True)

        print "INFO: Performing enum4Linux scan for {}:{}".format(args[0],args[1])
        try:
            enumSCAN = "enum4linux -a -M -v {} >> {}_smbEnum.txt".format(args[0],args[0])
            subprocess.check_output(enumSCAN, shell=True)
        except:
            print "ERROR: enum4Linux scan FAILED for {}:{}".format(args[0],args[1])

        print "INFO: Finished smb module for {}:{}".format(args[0],args[1])
setup.py 文件源码 项目:py_find_1st 作者: roebel 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def compiler_is_clang(comp) :
    print("check for clang compiler ...", end=' ')
    try:
        cc_output = subprocess.check_output(comp+['--version'],
                                            stderr = subprocess.STDOUT, shell=False)
    except OSError as ex:
        print("compiler test call failed with error {0:d} msg: {1}".format(ex.errno, ex.strerror))
        print("no")
        return False

    ret = re.search(b'clang', cc_output) is not None
    if ret :
        print("yes")
    else:
        print("no")
    return ret
handlers.py 文件源码 项目:chitin 作者: SamStudio8 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def check_integrity(self):
        from subprocess import check_output
        reads = 0
        try:
            p = check_output("samtools view -c %s" % self.path, shell=True)
            reads = int(p.split("\n")[0].strip())
        except Exception as e:
            print e
            pass

        has_index = False
        has_indate_index = None
        if os.path.exists(self.path + ".bai"):
            has_index = True
            if os.path.getmtime(self.path) <= os.path.getmtime(self.path + ".bai"):
                has_indate_index = True
            else:
                has_indate_index = False

        return {
            ("has_reads", "has 0 reads"): reads > 0,
            ("has_index", "has no BAI"): has_index,
            ("has_indate_index", "has a BAI older than itself"): has_indate_index,
        }
hookenv.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def status_get():
    """Retrieve the previously set juju workload state and message

    If the status-get command is not found then assume this is juju < 1.23 and
    return 'unknown', ""

    """
    cmd = ['status-get', "--format=json", "--include-data"]
    try:
        raw_status = subprocess.check_output(cmd)
    except OSError as e:
        if e.errno == errno.ENOENT:
            return ('unknown', "")
        else:
            raise
    else:
        status = json.loads(raw_status.decode("UTF-8"))
        return (status["status"], status["message"])
hookenv.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def config(scope=None):
    """Juju charm configuration"""
    config_cmd_line = ['config-get']
    if scope is not None:
        config_cmd_line.append(scope)
    else:
        config_cmd_line.append('--all')
    config_cmd_line.append('--format=json')
    try:
        config_data = json.loads(
            subprocess.check_output(config_cmd_line).decode('UTF-8'))
        if scope is not None:
            return config_data
        return Config(config_data)
    except ValueError:
        return None
hookenv.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def relation_get(attribute=None, unit=None, rid=None):
    """Get relation information"""
    _args = ['relation-get', '--format=json']
    if rid:
        _args.append('-r')
        _args.append(rid)
    _args.append(attribute or '-')
    if unit:
        _args.append(unit)
    try:
        return json.loads(subprocess.check_output(_args).decode('UTF-8'))
    except ValueError:
        return None
    except CalledProcessError as e:
        if e.returncode == 2:
            return None
        raise
cluster.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def is_crm_dc():
    """
    Determine leadership by querying the pacemaker Designated Controller
    """
    cmd = ['crm', 'status']
    try:
        status = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
        if not isinstance(status, six.text_type):
            status = six.text_type(status, "utf-8")
    except subprocess.CalledProcessError as ex:
        raise CRMDCNotFound(str(ex))

    current_dc = ''
    for line in status.split('\n'):
        if line.startswith('Current DC'):
            # Current DC: juju-lytrusty-machine-2 (168108163) - partition with quorum
            current_dc = line.split(':')[1].split()[0]
    if current_dc == get_unit_hostname():
        return True
    elif current_dc == 'NONE':
        raise CRMDCNotFound('Current DC: NONE')

    return False
utils.py 文件源码 项目:cellranger 作者: 10XGenomics 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_unmapped_read_count_from_indexed_bam(bam_file_name):
    """
    Get number of unmapped reads from an indexed BAM file.

    Args:
        bam_file_name (str): Name of indexed BAM file.

    Returns:
        int: number of unmapped reads in the BAM

    Note:
        BAM must be indexed for lookup using samtools.

    """

    index_output = subprocess.check_output('samtools idxstats %s' % bam_file_name, shell=True)
    return int(index_output.strip().split('\n')[-1].split()[-1])
bcl.py 文件源码 项目:cellranger 作者: 10XGenomics 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_bcl2fastq_v2(hostname):
    try:
        subprocess.check_call(["which", "bcl2fastq"])
        # Restore the LD_LIBRARY_PATH set aside by sourceme.bash/shell10x.
        # Required for some installations of bcl2fastq.
        new_environ = dict(os.environ)
        new_environ['LD_LIBRARY_PATH'] = os.environ.get('_TENX_LD_LIBRARY_PATH', '')
        output = subprocess.check_output(["bcl2fastq", "--version"], env=new_environ, stderr=subprocess.STDOUT)
        match = None
        for l in output.split("\n"):
            match = re.match("bcl2fastq v([0-9.]+)", l)
            if match is not None:
                return (match.groups()[0], None)

        return (None, "bcl2fastq version not recognized -- please check the output of bcl2fastq --version")
    except subprocess.CalledProcessError:
        msg = "On machine: %s, bcl2fastq not found on PATH." % hostname
        return (None, msg)
acbs_parser.py 文件源码 项目:acbs 作者: AOSC-Dev 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def parse_ab3_defines(defines_file):  # , pkg_name):
    try:
        fp = open(defines_file, 'rt')
        abd_cont = fp.read()
        fp.close()
    except:
        print('[E] Failed to load autobuild defines file! Do you have read permission?')
        return False
    script = "ARCH={}\n".format(
        get_arch_name()) + abd_cont + gen_laundry_list(['PKGNAME', 'PKGDEP', 'BUILDDEP'])
    try:
        # Better to be replaced by subprocess.Popen
        abd_out = subprocess.check_output(script, shell=True)
    except:
        print('[E] Malformed Autobuild defines file found! Couldn\'t continue!')
        return False
    abd_fp = io.StringIO('[wrap]\n' + abd_out.decode('utf-8'))
    abd_config = RawConfigParser()
    abd_config.read_file(abd_fp)
    abd_config_dict = {}
    for i in abd_config['wrap']:
        abd_config_dict[i.upper()] = abd_config['wrap'][i]
    return abd_config_dict
test_cmake.py 文件源码 项目:cmake.nvim 作者: phillipbonhomme 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def test_RTagsDaemonStartClean(self):
        try:
            os.chdir("clean")
        except OSError:
            print("Test Error: Couldn't cd into 'dirty' test directory.")
            raise
        self.assertFalse(self.cmake_build_info["build_dir"].is_dir())
        self.plugin.setup_rtags_daemon()
        try:
            rtags_daemon_status = subprocess.check_output(
                self.cmake_cmd_info["rtags_status"])
        except subprocess.CalledProcessError as e:
            print(e.output)
        self.assertTrue(
            len("*********************************\nfileids\n*********************************\n*********************************\nheadererrors\n*********************************\n*********************************\ninfo\n*********************************\nRunning a release build\nsocketFile: /Users/phillipbonhomme/.rdm\ndataDir: /Users/phillipbonhomme/.cache/rtags/\noptions: 0x14jobCount: 4\nrpVisitFileTimeout: 60000\nrpIndexDataMessageTimeout: 60000\nrpConnectTimeout: 0\nrpConnectTimeout: 0\ndefaultArguments: List<String>(-ferror-limit=50, -Wall, -fspell-checking, -Wno-unknown-warning-option\")\nincludePaths: List<Source::Include>(\")\ndefines: List<Source::Define>(-DRTAGS=\")\nignoredCompilers: Set<Path>(\")\n*********************************\njobs\n*********************************\n"
                ) <= len(str(rtags_daemon_status)))
test_cmake.py 文件源码 项目:cmake.nvim 作者: phillipbonhomme 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_RTagsDaemonStartDirty(self):
        try:
            os.chdir("dirty")
        except OSError:
            print("Test Error: Couldn't cd into 'dirty' test directory.")
            raise
        self.assertTrue(self.cmake_build_info["build_dir"].is_dir())
        self.plugin.setup_rtags_daemon()
        try:
            rtags_daemon_status = subprocess.check_output(
                self.cmake_cmd_info["rtags_status"])
        except subprocess.CalledProcessError as e:
            print(e.output)
        self.assertTrue(
            len("*********************************\nfileids\n*********************************\n*********************************\nheadererrors\n*********************************\n*********************************\ninfo\n*********************************\nRunning a release build\nsocketFile: /Users/phillipbonhomme/.rdm\ndataDir: /Users/phillipbonhomme/.cache/rtags/\noptions: 0x14jobCount: 4\nrpVisitFileTimeout: 60000\nrpIndexDataMessageTimeout: 60000\nrpConnectTimeout: 0\nrpConnectTimeout: 0\ndefaultArguments: List<String>(-ferror-limit=50, -Wall, -fspell-checking, -Wno-unknown-warning-option\")\nincludePaths: List<Source::Include>(\")\ndefines: List<Source::Define>(-DRTAGS=\")\nignoredCompilers: Set<Path>(\")\n*********************************\njobs\n*********************************\n"
                ) <= len(str(rtags_daemon_status)))
test_cmake.py 文件源码 项目:cmake.nvim 作者: phillipbonhomme 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_RTagsClientStartDirty(self):
        try:
            os.chdir("dirty")
        except OSError:
            print("Test Error: Couldn't cd into 'dirty' test directory.")
            raise
        self.assertTrue(self.cmake_build_info["build_dir"].is_dir())
        self.assertTrue(self.cmake_build_info["comp_data_cmake"].is_file())
        self.plugin.setup_rtags_daemon()
        self.plugin.connect_rtags_client()
        try:
            rtags_client_status = subprocess.check_output(
                self.cmake_cmd_info["rtags_file_status"] +
                [str(src_info["cpp"])])
        except subprocess.CalledProcessError as e:
            print(e.output)
        self.assertTrue(str(rtags_client_status).find("managed"))
        try:
            rtags_client_status = subprocess.check_output(
                self.cmake_cmd_info["rtags_file_status"] +
                [str(src_info["test_cpp"])])
        except subprocess.CalledProcessError as e:
            print(e.output)
        self.assertTrue(str(rtags_client_status).find("managed"))
test_cmake.py 文件源码 项目:cmake.nvim 作者: phillipbonhomme 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def test_RTagsClientSetFile(self):
        try:
            os.chdir("dirty")
        except OSError:
            print("Test Error: Couldn't cd into 'dirty' test directory.")
            raise
        self.assertTrue(self.cmake_build_info["build_dir"].is_dir())
        self.assertTrue(self.cmake_build_info["comp_data_cmake"].is_file())
        self.plugin.setup_rtags_daemon()
        self.plugin.connect_rtags_client()
        self.plugin.rtags_set_file([str(src_info["cpp"])])
        try:
            rtags_client_status = subprocess.check_output(
                self.cmake_cmd_info["rtags_file_status"] +
                [str(src_info["cpp"])])
        except subprocess.CalledProcessError as e:
            print(e.output)
        self.assertTrue(str(rtags_client_status).find("managed"))
test_cmake.py 文件源码 项目:cmake.nvim 作者: phillipbonhomme 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_RTagsClientUpdateBuffers(self):
        try:
            os.chdir("dirty")
        except OSError:
            print("Test Error: Couldn't cd into 'dirty' test directory.")
            raise
        self.assertTrue(self.cmake_build_info["build_dir"].is_dir())
        self.assertTrue(self.cmake_build_info["comp_data_cmake"].is_file())
        self.plugin.setup_rtags_daemon()
        self.plugin.connect_rtags_client()
        self.plugin.update_rtags_buffers(
            [str(src_info["test_cpp"]),
             str(src_info["cpp"])])
        try:
            rtags_client_status = subprocess.check_output(
                self.cmake_cmd_info["rtags_buffers"])
        except subprocess.CalledProcessError as e:
            print(e.output)
        filepath = os.getcwd() + str(src_info["test_cpp"])
        self.assertTrue(str(rtags_client_status).find(filepath))
rtags.py 文件源码 项目:cmake.nvim 作者: phillipbonhomme 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run_cmake(self):
        print("Running CMake")
        build_dir_cmd_out = subprocess.call(
            ["mkdir", "build"],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL)
        if build_dir_cmd_out != 0:
            print("Can\'t setup CMake build directory.")
            return

        if self.cmake_build_info["build_dir"].is_dir():
            try:
                subprocess.check_output(
                    self.cmake_cmd_info["cmake_cmd"],
                    cwd=str(self.cmake_build_info["build_dir"]))
            except subprocess.CalledProcessError as e:
                print(e.output)
            if not self.cmake_build_info["comp_data_cmake"].is_file():
                print("Couldn't setup CMake Project")
                return
        else:
            print("Couldn't setup CMake Project")
            return
3DChromatin_ReplicateQC.py 文件源码 项目:genomedisco 作者: kundajelab 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def run_script(script_name,running_mode):
    subp.check_output(['bash','-c','chmod 755 '+script_name])
    if running_mode=='NA':
        output=subp.check_output(['bash','-c',script_name])
        if output!='':
            print output
    if running_mode=='write_script':
        pass
    if running_mode=='sge':
        memo='3G'
        output=subp.check_output(['bash','-c','qsub -l h_vmem='+memo+' -o '+script_name+'.o -e '+script_name+'.e '+script_name])
    #TODO: if you choose slurm, then you need to change the settings and provide a file with settings
    if running_mode=='slurm':
        memo='50G'
        partition='akundaje'
        output=subp.check_output(['bash','-c','sbatch --mem '+memo+' -o '+script_name+'.o -e '+script_name+'.e'+' -p '+partition+' '+script_name])


问题


面经


文章

微信
公众号

扫码关注公众号