python类call()的实例源码

pg_gw_utils.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def configure_analyst_opsvm():
    '''
    Configures Anaylyst for OPSVM
    '''
    if not service_running('plumgrid'):
        restart_pg()
    opsvm_ip = pg_gw_context._pg_dir_context()['opsvm_ip']
    NS_ENTER = ('/opt/local/bin/nsenter -t $(ps ho pid --ppid $(cat '
                '/var/run/libvirt/lxc/plumgrid.pid)) -m -n -u -i -p ')
    sigmund_stop = NS_ENTER + '/usr/bin/service plumgrid-sigmund stop'
    sigmund_status = NS_ENTER \
        + '/usr/bin/service plumgrid-sigmund status'
    sigmund_autoboot = NS_ENTER \
        + '/usr/bin/sigmund-configure --ip {0} --start --autoboot' \
        .format(opsvm_ip)
    try:
        status = subprocess.check_output(sigmund_status, shell=True)
        if 'start/running' in status:
            if subprocess.call(sigmund_stop, shell=True):
                log('plumgrid-sigmund couldn\'t be stopped!')
                return
        subprocess.check_call(sigmund_autoboot, shell=True)
        status = subprocess.check_output(sigmund_status, shell=True)
    except:
        log('plumgrid-sigmund couldn\'t be started!')
host.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 195 收藏 0 点赞 0 评论 0
def cmp_pkgrevno(package, revno, pkgcache=None):
    """Compare supplied revno with the revno of the installed package

    *  1 => Installed revno is greater than supplied arg
    *  0 => Installed revno is the same as supplied arg
    * -1 => Installed revno is less than supplied arg

    This function imports apt_cache function from charmhelpers.fetch if
    the pkgcache argument is None. Be sure to add charmhelpers.fetch if
    you call this function, or pass an apt_pkg.Cache() instance.
    """
    import apt_pkg
    if not pkgcache:
        from charmhelpers.fetch import apt_cache
        pkgcache = apt_cache()
    pkg = pkgcache[package]
    return apt_pkg.version_compare(pkg.current_ver.ver_str, revno)
hookenv.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 50 收藏 0 点赞 0 评论 0
def log(message, level=None):
    """Write a message to the juju log"""
    command = ['juju-log']
    if level:
        command += ['-l', level]
    if not isinstance(message, six.string_types):
        message = repr(message)
    command += [message]
    # Missing juju-log should not cause failures in unit tests
    # Send log output to stderr
    try:
        subprocess.call(command)
    except OSError as e:
        if e.errno == errno.ENOENT:
            if level:
                message = "{}: {}".format(level, message)
            message = "juju-log: {}".format(message)
            print(message, file=sys.stderr)
        else:
            raise
workflow.py 文件源码 项目:alfred-mpd 作者: deanishe 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __call__(self, *args, **kwargs):
        """Trap ``SIGTERM`` and call wrapped function."""
        self._caught_signal = None
        # Register handler for SIGTERM, then call `self.func`
        self.old_signal_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGTERM, self.signal_handler)

        self.func(*args, **kwargs)

        # Restore old signal handler
        signal.signal(signal.SIGTERM, self.old_signal_handler)

        # Handle any signal caught during execution
        if self._caught_signal is not None:
            signum, frame = self._caught_signal
            if callable(self.old_signal_handler):
                self.old_signal_handler(signum, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)
update.py 文件源码 项目:alfred-mpd 作者: deanishe 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def install_update():
    """If a newer release is available, download and install it.

    :returns: ``True`` if an update is installed, else ``False``

    """
    update_data = wf().cached_data('__workflow_update_status', max_age=0)

    if not update_data or not update_data.get('available'):
        wf().logger.info('No update available')
        return False

    local_file = download_workflow(update_data['download_url'])

    wf().logger.info('Installing updated workflow ...')
    subprocess.call(['open', local_file])

    update_data['available'] = False
    wf().cache_data('__workflow_update_status', update_data)
    return True
audio_converter.py 文件源码 项目:subtitle-synchronization 作者: AlbertoSabater 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def getAudio(freq, audio_files=None):

    files = os.listdir(DATA_DIR)
    p = re.compile('.*\.[mkv|avi]')
    files = [ f for f in files if p.match(f) ]

    if audio_files:
        files = [ f for f in files if os.path.splitext(f)[0] in audio_files]

    audio_dirs = []
    for f in files:
        name, extension = os.path.splitext(f)
        command = "ffmpeg -i {0}{1}{2} -ab 160k -ac 2 -ar {3} -vn {0}{1}_{3}.wav".format(DATA_DIR, name, extension, freq)
        audio_dirs.append(DATA_DIR + name + '_' + str(freq) + '.wav')
        subprocess.call(command, shell=True)

    return audio_dirs

# Convert timestamp to seconds
Bombard.py 文件源码 项目:Pillage 作者: kostrin 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def webEnum(self, args):
        print "INFO: Performing nmap http script scan for {}:{}".format(args[0],args[1])
        nmapSCAN = "nmap -sV -Pn -vv -p {} --script='(http* or ssl*) and not (dos or fuzzer or brute)' -oN {}_http.nmap {}".format(args[1],args[0],args[0])
        subprocess.check_output(nmapSCAN, shell=True)

        print "INFO: Performing nikto scan on {}:{}".format(args[0],args[1])
        script="nikto -host {} -port {} -C all >> {}_nikto_{}.txt".format(args[0],args[1],args[0],args[1])
        subprocess.check_output(script, shell=True)

        '''
        print "INFO: Performing dirb scan on {}:{}".format(args[0],args[1])
        dirbList="/usr/share/wordlists/dirbuster/directory-list-2.3-small.txt"
        script="dirb {}://{}:{} {} -S -w >> {}_dirb_{}.txt".format(args[2],args[0],args[1],dirbList,args[0],args[1])
        subprocess.call(script, shell=True)
        '''
        print "INFO: Finished http module for {}:{}".format(args[0],args[1])
data_preprocessing.py 文件源码 项目:AVSR-Deep-Speech 作者: pandeydivesh15 项目源码 文件源码 阅读 48 收藏 0 点赞 0 评论 0
def convert_mp4(video_dir, audio_dir):
    '''
    Args: 
        1. video_dir:   Directory for all video files
        2. audio_dir:   Directory where all converted files will be stored.
    '''

    # Get all file names
    video_file_names = sorted(glob.glob(video_dir + "*.mp4"))
    # Extract actual names of file, also remove any extensions
    video_names = map(lambda x : x.split('/')[-1].split(".")[0], video_file_names)

    # Command for converting video to audio
    command = "ffmpeg -i " + video_dir + "{0}.mp4 -ab 96k -ar 44100 -vn " + audio_dir + "{0}.wav"

    for name in video_names:
        subprocess.call(command.format(name), shell=True)
container.py 文件源码 项目:docklet 作者: unias 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def delete_container(self, lxc_name):
        logger.info ("delete container:%s" % lxc_name)
        if self.imgmgr.deleteFS(lxc_name):
            Container_Collector.billing_increment(lxc_name)
            self.historymgr.log(lxc_name,"Delete")
            logger.info("delete container %s success" % lxc_name)
            return [True, "delete container success"]
        else:
            logger.info("delete container %s failed" % lxc_name)
            return [False, "delete container failed"]
        #status = subprocess.call([self.libpath+"/lxc_control.sh", "delete", lxc_name])
        #if int(status) == 1:
        #    logger.error("delete container %s failed" % lxc_name)
        #    return [False, "delete container failed"]
        #else:
        #    logger.info ("delete container %s success" % lxc_name)
        #    return [True, "delete container success"]

    # start container, if running, restart it
container.py 文件源码 项目:docklet 作者: unias 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def recover_container(self, lxc_name):
        logger.info ("recover container:%s" % lxc_name)
        #status = subprocess.call([self.libpath+"/lxc_control.sh", "status", lxc_name])
        [success, status] = self.container_status(lxc_name)
        if not success:
            return [False, status]
        self.imgmgr.checkFS(lxc_name)
        if status == 'stopped':
            logger.info("%s stopped, recover it to running" % lxc_name)
            if self.start_container(lxc_name)[0]:
                self.historymgr.log(lxc_name,"Recover")
                if self.start_services(lxc_name)[0]:
                    logger.info("%s recover success" % lxc_name)
                    return [True, "recover success"]
                else:
                    logger.error("%s recover failed with services not start" % lxc_name)
                    return [False, "recover failed for services not start"]
            else:
                logger.error("%s recover failed for container starting failed" % lxc_name)
                return [False, "recover failed for container starting failed"]
        else:
            logger.info("%s recover success" % lxc_name)
            return [True, "recover success"]
host.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def service(action, service_name, **kwargs):
    """Control a system service.

    :param action: the action to take on the service
    :param service_name: the name of the service to perform th action on
    :param **kwargs: additional params to be passed to the service command in
                    the form of key=value.
    """
    if init_is_systemd():
        cmd = ['systemctl', action, service_name]
    else:
        cmd = ['service', service_name, action]
        for key, value in six.iteritems(kwargs):
            parameter = '%s=%s' % (key, value)
            cmd.append(parameter)
    return subprocess.call(cmd) == 0
hookenv.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def log(message, level=None):
    """Write a message to the juju log"""
    command = ['juju-log']
    if level:
        command += ['-l', level]
    if not isinstance(message, six.string_types):
        message = repr(message)
    command += [message]
    # Missing juju-log should not cause failures in unit tests
    # Send log output to stderr
    try:
        subprocess.call(command)
    except OSError as e:
        if e.errno == errno.ENOENT:
            if level:
                message = "{}: {}".format(level, message)
            message = "juju-log: {}".format(message)
            print(message, file=sys.stderr)
        else:
            raise
host.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def service(action, service_name, **kwargs):
    """Control a system service.

    :param action: the action to take on the service
    :param service_name: the name of the service to perform th action on
    :param **kwargs: additional params to be passed to the service command in
                    the form of key=value.
    """
    if init_is_systemd():
        cmd = ['systemctl', action, service_name]
    else:
        cmd = ['service', service_name, action]
        for key, value in six.iteritems(kwargs):
            parameter = '%s=%s' % (key, value)
            cmd.append(parameter)
    return subprocess.call(cmd) == 0
hookenv.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def load_previous(self, path=None):
        """Load previous copy of config from disk.

        In normal usage you don't need to call this method directly - it
        is called automatically at object initialization.

        :param path:

            File path from which to load the previous config. If `None`,
            config is loaded from the default location. If `path` is
            specified, subsequent `save()` calls will write to the same
            path.

        """
        self.path = path or self.path
        with open(self.path) as f:
            self._prev_dict = json.load(f)
        for k, v in copy.deepcopy(self._prev_dict).items():
            if k not in self:
                self[k] = v
ubuntu.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def _run_apt_command(cmd, fatal=False):
    """Run an apt command with optional retries.

    :param: cmd: str: The apt command to run.
    :param: fatal: bool: Whether the command's output should be checked and
        retried.
    """
    # Provide DEBIAN_FRONTEND=noninteractive if not present in the environment.
    cmd_env = {
        'DEBIAN_FRONTEND': os.environ.get('DEBIAN_FRONTEND', 'noninteractive')}

    if fatal:
        _run_with_retries(
            cmd, cmd_env=cmd_env, retry_exitcodes=(1, APT_NO_LOCK,),
            retry_message="Couldn't acquire DPKG lock")
    else:
        env = os.environ.copy()
        env.update(cmd_env)
        subprocess.call(cmd, env=env)
download.py 文件源码 项目:ICGan-tensorflow 作者: zhangqianhui 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def download_mnist(dirpath):
    data_dir = os.path.join(dirpath, 'mnist')
    if os.path.exists(data_dir):
        print('Found MNIST - skip')
        return
    else:
        os.mkdir(data_dir)
    url_base = 'http://yann.lecun.com/exdb/mnist/'
    file_names = ['train-images-idx3-ubyte.gz','train-labels-idx1-ubyte.gz','t10k-images-idx3-ubyte.gz','t10k-labels-idx1-ubyte.gz']
    for file_name in file_names:
        url = (url_base+file_name).format(**locals())
        print(url)
        out_path = os.path.join(data_dir,file_name)
        cmd = ['curl', url, '-o', out_path]
        print('Downloading ', file_name)
        subprocess.call(cmd)
        cmd = ['gzip', '-d', out_path]
        print('Decompressing ', file_name)
        subprocess.call(cmd)
setup.py 文件源码 项目:rivalcfg 作者: flozz 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def run(self):
        _install.run(self)
        print("Installing udev rules...")
        if not os.path.isdir("/etc/udev/rules.d"):
            print("WARNING: udev rules have not been installed (/etc/udev/rules.d is not a directory)")
            return
        try:
            shutil.copy("./rivalcfg/data/99-steelseries-rival.rules", "/etc/udev/rules.d/")
        except IOError:
            print("WARNING: udev rules have not been installed (permission denied)")
            return
        try:
            subprocess.call(["udevadm", "trigger"])
        except OSError:
            print("WARNING: unable to update udev rules, please run the 'udevadm trigger' command")
            return
        print("Done!")
bio_io.py 文件源码 项目:cellranger 作者: 10XGenomics 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def set_data_field(record, field_name, field_val):
    assert(len(record.samples) == 1)
    new_format = record.FORMAT
    new_fields = new_format.split(':')
    if not(field_name in new_fields):
        new_fields = new_fields + [field_name]
        new_format = ':'.join(new_fields)

    sample_call = get_record_sample_call(record)
    data = sample_call.data
    data_dict = data._asdict()
    data_dict[field_name] = field_val

    new_sample_vals = []
    for field in new_fields:
        new_sample_vals.append(data_dict[field])

    # Note - the old way of passing the fields to pyVCF is memory intensive
    # because a fresh type is allocated for each call to make_calldata_tuple
    #data_instantiator = vcf.model.make_calldata_tuple(new_fields)
    #data = data_instantiator(*new_sample_vals)
    data = FakeNamedTuple(new_fields, new_sample_vals)
    sample_call.data = data
    record.samples[0] = sample_call
    record.FORMAT = new_format
bio_io.py 文件源码 项目:cellranger 作者: 10XGenomics 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def combine_vcfs(output_filename, input_vcf_filenames):
    tmp_filename = output_filename + ".tmp"

    for (i,fn) in enumerate(input_vcf_filenames):
        if i == 0:
            args = 'cat ' + fn
            subprocess.check_call(args + " > " + tmp_filename, shell=True)
        else:
            args = 'grep -v "^#" ' + fn
            ret = subprocess.call(args + " >> " + tmp_filename, shell=True)
            if ret == 2:
                raise Exception("grep call failed: " + args)

    # Sort and index the files
    tk_tabix.sort_vcf(tmp_filename, output_filename)
    tk_tabix.index_vcf(output_filename)

    os.remove(tmp_filename)
rtags.py 文件源码 项目:cmake.nvim 作者: phillipbonhomme 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def run_cmake(self):
        print("Running CMake")
        build_dir_cmd_out = subprocess.call(
            ["mkdir", "build"],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL)
        if build_dir_cmd_out != 0:
            print("Can\'t setup CMake build directory.")
            return

        if self.cmake_build_info["build_dir"].is_dir():
            try:
                subprocess.check_output(
                    self.cmake_cmd_info["cmake_cmd"],
                    cwd=str(self.cmake_build_info["build_dir"]))
            except subprocess.CalledProcessError as e:
                print(e.output)
            if not self.cmake_build_info["comp_data_cmake"].is_file():
                print("Couldn't setup CMake Project")
                return
        else:
            print("Couldn't setup CMake Project")
            return
kubectl_util.py 文件源码 项目:benchmarks 作者: tensorflow 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def DeletePods(pod_name, yaml_file):
  """Deletes pods based on the given kubernetes config.

  Args:
    pod_name: 'name-prefix' selector for the pods.
    yaml_file: kubernetes yaml config.

  Raises:
    TimeoutError: if jobs didn't terminate for a long time.
  """
  command = [_KUBECTL, 'delete', '--filename=%s' % yaml_file]
  logging.info('Deleting pods: %s', ' '.join(command))
  subprocess.call(command)

  def CheckPodsAreTerminated():
    return not _GetPodNames(pod_name)
  if not _WaitUntil(100, CheckPodsAreTerminated):
    raise TimeoutError(
        'Timed out waiting for %s pod to terminate.' % pod_name)
evaluate_models.py 文件源码 项目:dl4mt-multi 作者: nyu-dl 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def call_script(model_path, config, proto, eval_script,
                num_process=10, normalize=True):
    '''
    open pipe and call the script
    '''
    try:
        subprocess.call(
            " python {} {} --config={} "
            " --proto={} "
            " -p {} {}".format(
                eval_script, model_path, config,
                proto, num_process,
                ' -n ' if normalize else ''),
            shell=True)
    except:
        traceback.print_exc(file=sys.stdout)
        print 'error in call_bleu_script()'
    print model_path
_debug.py 文件源码 项目:silverchain 作者: tomokinakamaru 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def draw(self, graph, highlight=None):
        gv = ''
        gv += 'digraph G{} {{\n'.format(self._n)
        gv += '  layout=neato;\n'
        gv += self._ranks
        for s, d in graph.edges_iter():
            gv += '  "{}" -> "{}"'.format(s, d)
            if (s, d) == highlight:
                gv += '[color=red,penwidth=3]'
            gv += ';\n'
        gv += '}'

        fname = self.WSDIR + '/{0:04d}'.format(self._n)
        with open(fname + '.dot', 'w') as f:
            print(gv, file=f)

        cmd = ''
        cmd += '/usr/local/bin/neato'
        cmd += ' -Tpng {f}.dot -o{f}.png'.format(f=fname)
        subprocess.call(cmd, shell=True)

        self._n += 1
util.py 文件源码 项目:ipwb 作者: oduwsdl 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def isDaemonAlive(hostAndPort="{0}:{1}".format(IPFSAPI_IP, IPFSAPI_PORT)):
    """Ensure that the IPFS daemon is running via HTTP before proceeding"""
    client = ipfsapi.Client(IPFSAPI_IP, IPFSAPI_PORT)

    try:
        # OSError if ipfs not installed, redundant of below
        # subprocess.call(['ipfs', '--version'], stdout=open(devnull, 'wb'))

        # ConnectionError/AttributeError if IPFS daemon not running
        client.id()
        return True
    except (ConnectionError, exceptions.AttributeError):
        logError("Daemon is not running at http://" + hostAndPort)
        return False
    except OSError:
        logError("IPFS is likely not installed. "
                 "See https://ipfs.io/docs/install/")
        sys.exit()
    except:
        logError('Unknown error in retrieving daemon status')
        logError(sys.exc_info()[0])
ImageGrab.py 文件源码 项目:imagepaste 作者: robinchenyu 项目源码 文件源码 阅读 47 收藏 0 点赞 0 评论 0
def grab(bbox=None):
    if sys.platform == "darwin":
        f, file = tempfile.mkstemp('.png')
        os.close(f)
        subprocess.call(['screencapture', '-x', file])
        im = Image.open(file)
        im.load()
        os.unlink(file)
    else:
        size, data = grabber()
        im = Image.frombytes(
            "RGB", size, data,
            # RGB, 32-bit line padding, origo in lower left corner
            "raw", "BGR", (size[0]*3 + 3) & -4, -1
            )
    if bbox:
        im = im.crop(bbox)
    return im
auto_content.py 文件源码 项目:sat6_scripts 作者: RedHatSatellite 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def publish_cv(dryrun):
    print "Running Content View Publish..."

    # Set the initial state
    good_publish = False

    if not dryrun:
        rc = subprocess.call(['/usr/local/bin/publish_content_views', '-q', '-a'])
    else:
        msg = "Dry run - not actually performing publish"
        helpers.log_msg(msg, 'WARNING')
        rc = subprocess.call(['/usr/local/bin/publish_content_views', '-q', '-a', '-d'])

    if rc == 0:
        good_publish = True

    return good_publish
auto_content.py 文件源码 项目:sat6_scripts 作者: RedHatSatellite 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def promote_cv(dryrun, lifecycle):
    print "Running Content View Promotion to " + lifecycle + "..."

    # Set the initial state
    good_promote = False

    if not dryrun:
        rc = subprocess.call(['/usr/local/bin/promote_content_views', '-q', '-e', lifecycle])
    else:
        msg = "Dry run - not actually performing promotion"
        helpers.log_msg(msg, 'WARNING')
        rc = subprocess.call(['/usr/local/bin/promote_content_views', '-q', '-d', '-e', lifecycle])

    if rc == 0:
        good_promote = True

    return good_promote
push_puppetforge.py 文件源码 项目:sat6_scripts 作者: RedHatSatellite 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def postModule(moduleTar, moduleInputDir, pfserver, pfmodpath, pfuser, pftoken):
    """
    Function to push puppet modules using curl to Artifiactory repository
    """
    # Remove module's extension (.tar.gz)
    puppetModuleNameNoExt = splitext(moduleTar)[0]

    # Remove the path from the module
    puppetModuleName = puppetModuleNameNoExt.split('/')[-1]

    # Split the module name into the required parts
    puppetModuleNameList = puppetModuleName.split('-')
    author = puppetModuleNameList[0]
    moduleName = puppetModuleNameList[1]
    version = puppetModuleNameList[2]

    url = "http://" + pfserver + pfmodpath + "/" + author + "/" + moduleName + "/" + moduleTar
    fileName = moduleInputDir + "/" + moduleTar

    # Put the files using curl (need to clean this up)
    authtoken = pfuser + ":" + pftoken
    subprocess.call(['curl', '-u', authtoken, '-XPUT', url, '-T', fileName])
run.py 文件源码 项目:bap-ida-python 作者: BinaryAnalysisPlatform 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def _setup_headers(self, bap):
        "pass type information from IDA to BAP"
        # this is very fragile, and may break in case
        # if we have several BAP instances, especially
        # when they are running on different binaries.
        # Will leave it as it is until issue #588 is
        # resolved in the upstream
        with self.tmpfile("h") as out:
            ida.output_types(out)
            subprocess.call(bap, [
                '--api-add', 'c:"{0}"'.format(out.name),
            ])

        def cleanup():
            subprocess.call(bap, [
                "--api-remove", "c:{0}".
                format(os.path.basename(out.name))
            ])
        self.on_cleanup(cleanup)
powstream_utils.py 文件源码 项目:pscheduler 作者: perfsonar 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def graceful_exit(tmpdir, keep_data_files=False, proc=None, pkill_cmd=None):
    #kill process if any, but keep in on try so doesn't prevent directory clean-up
    try:
        if proc: 
            proc.terminate()
            log.debug("Sent terminate to powstream process %s" % proc.pid)
    except: 
        pass

    #if they are still not down, force them down
    try:
        if pkill_cmd:
            time.sleep("2")
            call([pkill_cmd, '-9', '-f', tmpdir ], shell=False)
    except:
        pass

    #clean directory
    try:
        cleanup_dir(tmpdir, keep_data_files=keep_data_files)
    except:
        pass

##
# Removes a data file


问题


面经


文章

微信
公众号

扫码关注公众号