python类DEVNULL的实例源码

java_wrapper.py 文件源码 项目:pbtk 作者: marin-m 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def handle_file(self, fname):
        with open(fname, 'rb') as fd:
            if fd.read(4) == b'dex\n':
                new_jar = self.name + '/classes-dex2jar.jar'
                run([dex2jar, fname, '-f', '-o', new_jar], cwd=self.name, stderr=DEVNULL)
                fname = new_jar

        with ZipFile(fname) as jar:
            jar.extractall(self.name)

            for cls in jar.namelist():
                if cls.endswith('.class'):
                    cls = cls.replace('/', '.')[:-6]
                    self.classes.append(cls)

                elif cls.endswith('.dex'):
                    self.handle_file(self.name + '/' + cls)

                elif cls.endswith('.proto'):
                    self.bonus_protos[cls] = jar.read(cls).decode('utf8')

                elif cls.endswith('.so'):
                    self.bonus_protos.update(walk_binary(self.name + '/' + cls))
rtags.py 文件源码 项目:cmake.nvim 作者: phillipbonhomme 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def run_cmake(self):
        print("Running CMake")
        build_dir_cmd_out = subprocess.call(
            ["mkdir", "build"],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL)
        if build_dir_cmd_out != 0:
            print("Can\'t setup CMake build directory.")
            return

        if self.cmake_build_info["build_dir"].is_dir():
            try:
                subprocess.check_output(
                    self.cmake_cmd_info["cmake_cmd"],
                    cwd=str(self.cmake_build_info["build_dir"]))
            except subprocess.CalledProcessError as e:
                print(e.output)
            if not self.cmake_build_info["comp_data_cmake"].is_file():
                print("Couldn't setup CMake Project")
                return
        else:
            print("Couldn't setup CMake Project")
            return
archive.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _openDownloadFile(self, buildId, suffix):
        (tmpFd, tmpName) = mkstemp()
        url = self._makeUrl(buildId, suffix)
        try:
            os.close(tmpFd)
            env = { k:v for (k,v) in os.environ.items() if k in self.__whiteList }
            env["BOB_LOCAL_ARTIFACT"] = tmpName
            env["BOB_REMOTE_ARTIFACT"] = url
            ret = subprocess.call(["/bin/bash", "-ec", self.__downloadCmd],
                stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
                cwd="/tmp", env=env)
            if ret == 0:
                ret = tmpName
                tmpName = None
                return CustomDownloader(ret)
            else:
                raise ArtifactDownloadError("failed (exit {})".format(ret))
        finally:
            if tmpName is not None: os.unlink(tmpName)
git.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def callGit(self, workspacePath, *args):
        cmdLine = ['git']
        cmdLine.extend(args)
        try:
            output = subprocess.check_output(cmdLine, cwd=os.path.join(os.getcwd(), workspacePath, self.__dir),
                universal_newlines=True, stderr=subprocess.DEVNULL)
        except subprocess.CalledProcessError as e:
            raise BuildError("git error:\n Directory: '{}'\n Command: '{}'\n'{}'".format(
                os.path.join(workspacePath, self.__dir), " ".join(cmdLine), e.output.rstrip()))
        return output

    # Get GitSCM status. The purpose of this function is to return the status of the given directory
    #
    # return values:
    #  - error: The SCM is in a error state. Use this if git returned a error code.
    #  - dirty: SCM is dirty. Could be: modified files, switched to another branch/tag/commit/repo, unpushed commits.
    #  - clean: Same branch/tag/commit as specified in the recipe and no local changes.
    #  - empty: Directory is not existing.
    #
    # This function is called when build with --clean-checkout. 'error' and 'dirty' SCMs are moved to attic,
    # while empty and clean directories are not.
svn.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def callSubversion(self, workspacePath, *args):
        cmdLine = ['svn']
        cmdLine.extend(args)

        try:
            output = subprocess.check_output(cmdLine, cwd=workspacePath,
                universal_newlines=True, stderr=subprocess.DEVNULL)
        except subprocess.CalledProcessError as e:
            raise BuildError("svn error:\n Directory: '{}'\n Command: '{}'\n'{}'".format(
                os.path.join(workspacePath, self.__dir), " ".join(cmdLine), e.output.rstrip()))
        return output

    # Get SvnSCM status. The purpose of this function is to return the status of the given directory
    #
    # return values:
    #  - error: the scm is in a error state. Use this if svn call returns a error code.
    #  - dirty: SCM is dirty. Could be: modified files, switched to another URL or revision
    #  - clean: same URL and revision as specified in the recipe and no local changes.
    #  - empty: directory is not existing
    #
    # This function is called when build with --clean-checkout. 'error' and 'dirty' scm's are moved to attic,
    # while empty and clean directories are not.
config.py 文件源码 项目:runcommands 作者: wylee 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def version_getter(config):
    """Get tag associated with HEAD; fall back to SHA1.

    If HEAD is tagged, return the tag name; otherwise fall back to
    HEAD's short SHA1 hash.

    .. note:: Only annotated tags are considered.

    TODO: Support non-annotated tags?

    """
    try:
        check_output(['git', 'rev-parse', '--is-inside-work-tree'], stderr=DEVNULL)
    except CalledProcessError:
        return None
    encoding = getpreferredencoding(do_setlocale=False)
    try:
        version = check_output(['git', 'describe', '--exact-match'], stderr=DEVNULL)
    except CalledProcessError:
        version = check_output(['git', 'rev-parse', '--short', 'HEAD'])
    version = version.decode(encoding).strip()
    return version
vcs_repo.py 文件源码 项目:punch 作者: lgiordani 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _check_system(self):
        null_commands = self.commands + ["--help"]

        if six.PY2:
            not_found_exception = IOError
        else:
            not_found_exception = FileNotFoundError

        try:
            if six.PY2:
                subprocess.check_output(null_commands)
            else:
                subprocess.check_call(null_commands, stdout=subprocess.DEVNULL)

        except not_found_exception:
            raise RepositorySystemError("Cannot run {}".format(self.command))
        except subprocess.CalledProcessError:
            raise RepositorySystemError(
                "Error running {}".format(self.command))
consumer.py 文件源码 项目:paperless 作者: lrnt 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def _convert(self, command_path, input_path, settings=[],
                 prefix='', fileformat='png'):
        if prefix:
            prefix = '{}-'.format(prefix)

        output_path = path.join(
            self.dest_pages,
            '{}%04d.{}'.format(prefix, fileformat)
        )
        exit_code = subprocess.Popen(
            (command_path, *settings, input_path, output_path),
            stdout=subprocess.PIPE, stderr=subprocess.DEVNULL
        ).wait()

        if exit_code != 0:
            raise ProcessingError(
                'Was unable to convert document to {}s ({}).'.format(fileformat,
                                                                     exit_code)
            )

        return glob.glob(path.join(
            self.dest_pages, '{}[0-9][0-9][0-9][0-9].{}'.format(prefix,
                                                                fileformat)
        ))
test_execution.py 文件源码 项目:integration-prototype 作者: SKA-ScienceDataProcessor 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def run_vis(self, config):
        """."""
        self.poll_master_stdout(5, 'Waiting to receive',
                                'run_vis initialisation')

        pid = subprocess.Popen(['/usr/bin/python', '-m',
                                'sip.emulators.csp_visibility_sender',
                                config],
                               stdin=subprocess.DEVNULL,
                               stdout=subprocess.PIPE,
                               stderr=subprocess.PIPE)
        pid.wait()
        output = pid.communicate()
        if DEBUG:
            print('--------------------------')
            print('nvis_sender, stdout:')
            print('--------------------------')
            print(output[0].decode("utf-8"))
            print('--------------------------')
            print('nvis_sender, stderr:')
            print('--------------------------')
            print(output[1].decode("utf-8"))

        self.poll_master_stdout(5, 'Received heap 9', 'run_vis termination')
optimizeManifest.py 文件源码 项目:solace-messaging-cf-dev 作者: SolaceLabs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def getDeployedIps(deploymentName):
    deploymentCount = subprocess.check_output("bosh -e lite deployments | grep {} | wc -l".format(deploymentName),
        shell=True, stderr=subprocess.DEVNULL)

    if int(deploymentCount) == 0:
        return None

    deployedManifest = yaml.load(
            subprocess.check_output("bosh -e lite -d {} manifest".format(deploymentName),
            shell=True, stderr=subprocess.DEVNULL))

    deployedIpConfig = {}
    deployedIpConfig["global"] = getTestSubnet(deployedManifest)["static"]
    deployedJobs = [j for j in deployedManifest["jobs"] if j["name"] in commonUtils.POOL_TYPES]

    for job in deployedJobs:
        jobIps = job["networks"][0]["static_ips"]
        deployedIpConfig[job["name"]] = jobIps

    return deployedIpConfig
java.py 文件源码 项目:camisole 作者: prologin 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def find_class_having_main(self, classes):
        for file in classes:
            # run javap(1) with type signatures
            try:
                stdout = subprocess.check_output(
                    [self.extra_binaries['disassembler'].cmd, '-s', str(file)],
                    stderr=subprocess.DEVNULL, env=self.compiler.env)
            except subprocess.SubprocessError:  # noqa
                continue
            # iterate on lines to find p s v main() signature and then
            # its descriptor on the line below; we don't rely on the type
            # from the signature, because it could be String[], String... or
            # some other syntax I'm not even aware of
            lines = iter(stdout.decode().split('\n'))
            for line in lines:
                line = line.lstrip()
                if line.startswith('public static') and 'void main(' in line:
                    if next(lines).lstrip() == PSVMAIN_DESCRIPTOR:
                        return file.stem
tarsnap.py 文件源码 项目:snappy 作者: ricci 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def runTarsnap(args, timeout = None):
    command = [config.tarsnap_bin] + config.tarsnap_extra_args + args
    proc = subprocess.Popen(command,
                stdout = subprocess.PIPE, stderr = subprocess.PIPE,
                stdin = subprocess.DEVNULL, universal_newlines = True)
    result = CheapoCompletedProcess()
    try:
        result.stdout, result.stderr = proc.communicate(timeout = timeout)
        result.returncode = proc.wait()
        if result.returncode:
            sys.exit("Error running tarsnap:\nCommand: {}\nSTDOUT:\n{}\nSTDERR:\n{}\n".format(" ".join(command),result.stdout,result.stderr))
        return result
    except subprocess.TimeoutExpired:
        print("Tarsnap timed out, sending SIGQUIT...")
        proc.send_signal(signal.SIGQUIT)
        result.stdout, result.stderr = proc.communicate()
        result.returncode = proc.wait()
        print("Tarsnap finished")
        if result.returncode:
            sys.exit("Error running tarsnap:\nCommand: {}\nSTDOUT:\n{}\nSTDERR:\n{}\n".format(" ".join(command),result.stdout,result.stderr))
        return result
graph.py 文件源码 项目:skastic 作者: mypalmike 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def parse_text(self, img):
    filename = "obj_{}.png".format(id(self))
    self.save_bounding_box(img, filename)
    cmd = [
        'tesseract',
        filename,
        'stdout',
        '--psm',
        '7',
        '-l',
        'eng+equ',
        '-c',
        'tessedit_char_whitelist=abcdefghijklmnopqrstuvwxyz0123456789=+-*/'
    ]
    result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
    self.text = result.stdout.decode('ascii').strip().lower() or ''
cli_helpers.py 文件源码 项目:two1-python 作者: 21dotco 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def running_old_sell(manager, installer):
    if any(installed is False for package, installed in installer.check_dependencies()):
        return False

    if isinstance(manager.machine, Two1MachineVirtual):
        try:
            vbox_conf = subprocess.check_output(["VBoxManage", "showvminfo", "21", "--machinereadable"],
                                                stderr=subprocess.DEVNULL)
            if type(vbox_conf) is bytes:
                vbox_conf = vbox_conf.decode()
            if "bridgeadapter3" in vbox_conf:
                return True
            else:
                return False
        except subprocess.CalledProcessError:
            return False
    else:
        return False
composer.py 文件源码 项目:two1-python 作者: 21dotco 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _start_sell_service(self, service_name, failed_to_start_hook, started_hook, failed_to_up_hook, up_hook,
                            timeout=Two1Composer.SERVICE_START_TIMEOUT):
        try:
            subprocess.check_output(["docker-compose", "-f", Two1Composer.COMPOSE_FILE, "up", "-d", service_name],
                                    stderr=subprocess.DEVNULL, env=self.machine_env)
        except subprocess.CalledProcessError:
            failed_to_start_hook(service_name)
        else:
            started_hook(service_name)
            if service_name == 'router':
                time.sleep(5)
            elif service_name != 'router' and service_name != 'base':
                start = time.clock()

                exec_id = self.docker_client.exec_create('sell_router', "curl %s:5000" % service_name)['Id']
                self.docker_client.exec_start(exec_id)
                running = True

                while time.clock() - start < timeout and running is True:
                    running = self.docker_client.exec_inspect(exec_id)['Running']

                if running is True:
                    failed_to_up_hook(service_name)
                else:
                    up_hook(service_name)
machine.py 文件源码 项目:two1-python 作者: 21dotco 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def start_networking(self):
        """ Start ZeroTier daemon.
        """
        if not self.status_networking():
            try:
                subprocess.Popen(['sudo', 'service', 'zerotier-one', 'start'], stderr=subprocess.DEVNULL)
            except subprocess.CalledProcessError:
                return 1
            else:
                now = time.time()

                while time.time() <= now + 60:
                    if self.status_networking():
                        return 0

                return 1
        else:
            return 0
machine.py 文件源码 项目:two1-python 作者: 21dotco 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def status_networking(self):
        """ Get status of ZeroTier One service.
        """
        try:
            if "ps_zerotier.sh" not in self.docker_ssh("ls", stderr=subprocess.DEVNULL).decode().split("\n"):
                subprocess.check_output(["docker-machine",
                                         "scp",
                                         os.path.join(
                                             os.path.dirname(os.path.abspath(__file__)),
                                             "util",
                                             "scripts",
                                             "ps_zerotier.sh"),
                                         "21:~/ps_zerotier.sh"])
            self.docker_ssh("chmod a+x ~/ps_zerotier.sh", stderr=subprocess.DEVNULL)
            processes = self.docker_ssh("./ps_zerotier.sh", stderr=subprocess.DEVNULL).decode().split("\n")
            for process in processes:
                if process.find("zerotier-one -d") != -1:
                    return True
            return False
        except subprocess.CalledProcessError:
            return False
machine.py 文件源码 项目:two1-python 作者: 21dotco 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def connect_market(self, client, market):
        try:
            zt_device_address = json.loads(self.docker_ssh("sudo ./zerotier-cli info -j",
                                                           stderr=subprocess.DEVNULL).decode())["address"]
            response = client.join(market, zt_device_address)
            if response.ok:
                network_id = response.json().get("networkid")
                self.docker_ssh("sudo ./zerotier-cli join %s" % network_id, stderr=subprocess.DEVNULL)
                if self.wait_for_zt_confirmation():
                    pass
        except exceptions.ServerRequestError as e:
            if e.status_code == 400:
                logger.info(uxstring.UxString.invalid_network)
            else:
                raise e
        except subprocess.CalledProcessError as e:
            logger.info(str(e))
        time.sleep(10)  # wait for interface to come up
        return
machine.py 文件源码 项目:two1-python 作者: 21dotco 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _get_market_address(self):
        """ Get status of 21mkt network connection.

        Returns:
            zt_ip (str): ZeroTier IP address.
        """
        try:
            zt_conf = self.docker_ssh("sudo ./zerotier-cli listnetworks -j", stderr=subprocess.DEVNULL)
            if type(zt_conf) == bytes:
                zt_conf = zt_conf.decode()
            zt_conf_json = json.loads(zt_conf)
            for net in zt_conf_json:
                if net["name"] == "21mkt":
                    if net["status"] == "OK":
                        ip_addrs = net["assignedAddresses"]
                        for addr in ip_addrs:
                            potential_match = Two1Machine.ZEROTIER_CLI_IPV6_PATTERN.search(addr)
                            if potential_match is not None:
                                return "[%s]" % potential_match.group(0)
                        return ""
                    else:
                        return ""
            return ""
        except Exception:
            return ""
machine.py 文件源码 项目:two1-python 作者: 21dotco 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def status_machine(self):
        """ Get status of virtual machine.
        """
        try:
            status = subprocess.check_output(["docker-machine", "status",
                                             self.name],
                                             stderr=subprocess.DEVNULL).decode().rstrip()
            if status.lower() == "running":
                return VmState.RUNNING
            elif status.lower() == "stopped":
                return VmState.STOPPED
            else:
                return VmState.UNKNOWN
        except:
            return VmState.NOEXIST

    # private methods
test_redisrwlock_connection.py 文件源码 项目:redisrwlock 作者: veshboo 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def runRedisServer(port=6379):
    """runs redis-server"""
    # waits until it can accept client connection by reading its all
    # startup messages until it says 'ready to accept ...', then
    # redirect any following output to DEVNULL.
    port = str(port)
    server = subprocess.Popen(['redis-server', '--port', port],
                              stdout=subprocess.PIPE,
                              universal_newlines=True)
    message = _REDIS_READY_MESSAGE + port
    while message not in server.stdout.readline():
        pass
    dumper = subprocess.Popen(['cat'],
                              stdin=server.stdout,
                              stdout=subprocess.DEVNULL)
    return server, dumper
tests.py 文件源码 项目:RSVPBot 作者: recursecenter 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def devserver(port):
    config.rc_root = 'http://localhost:{}'.format(port)
    config.rc_api_root = config.rc_root + '/api/v1'

    proc = subprocess.Popen(
        ['python', 'devserver/__init__.py'],
        stdin=subprocess.DEVNULL,
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL,
        env={**os.environ, 'PORT': str(port)}
    )

    # wait for the dev server to come up
    time.sleep(1)

    try:
        yield
    finally:
        proc.kill()
        proc.wait()
git.py 文件源码 项目:bridge-test 作者: Half-Shot 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def clone(self, repo, branch="master"):
        location_index = repo+'#'+branch
        if self.repos.get(location_index) is not None:
            logger.debug("Repo %s already exists, reusing old." % location_index)
            return
        location = os.path.join(tempfile.gettempdir(), os.path.basename(location_index))
        if os.path.exists(location):
            logger.debug("Repo %s with branch %s already exists on disk, reusing and pulling." % (repo, branch))
            self.pull(location)
        else:
            logger.debug("Cloning %s with branch %s" % (repo, branch))
            result = subprocess.run(
                ["git", "clone", "-b", branch, repo, location],
                stdout=subprocess.DEVNULL,
                stderr=subprocess.DEVNULL,
            )
            result.check_returncode()
        self.repos[location_index] = location
        self.copies[location_index] = []
        return location
run_driver.py 文件源码 项目:temci 作者: parttimenerd 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _instancecheck_impl(self, value, info: Info = NoInfo()):
        if not isinstance(value, List(Str())):
            return info.errormsg(self)
        if not is_perf_available():
            return info.wrap(True)
        assert isinstance(value, list)
        if "wall-clock" in value:
            value = value.copy()
            value.remove("wall-clock")
        cmd = "perf stat -x ';' -e {props} -- /bin/echo".format(props=",".join(value))
        proc = subprocess.Popen(["/bin/sh", "-c", cmd], stdout=subprocess.DEVNULL,
                                stderr=subprocess.PIPE, universal_newlines=True)
        out, err = proc.communicate()
        if proc.poll() > 0:
            return info.errormsg(self, "Not a valid properties list: " + str(err).split("\n")[0].strip())
        return info.wrap(True)
action.py 文件源码 项目:temci 作者: parttimenerd 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _exec(self, cmd: str, fail_on_error: bool = False,
              error_message: str = "Failed executing {cmd!r}: out={out!r}, err={err!r}",
              timeout: int = 10) -> bool:
        """
        Execute the passed command.

        :param cmd:
        :param fail_on_error:
        :param error_message: error message format
        :param timeout: time in seconds after which the command is aborted
        :return: Was the command executed successfully?
        """
        out_mode = subprocess.PIPE if fail_on_error else subprocess.DEVNULL
        proc = subprocess.Popen(["/bin/sh", "-c", cmd], stdout=out_mode, stderr=out_mode,
                                universal_newlines=True)
        out, err = proc.communicate(timeout=timeout)
        if proc.wait() > 0:
            if fail_on_error:
                self._fail(error_message.format(cmd=cmd, out=out, err=err))
            else:
                return False
        return True
verify.py 文件源码 项目:rp2017-codegolf 作者: ReflectionsProjections 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def cpp_verify(data, user_file):
    answer = open(user_file).read()
    # write to temp file
    with open('answer.cpp', 'w') as temp:
        temp.write(answer)
    try:
        test = open('task/test.h').read()
        with open('test.h', 'w') as temp:
            temp.write(test)
    except IOError: # user has not requested task yet
        print("A task folder could not be found. Request a task\nusing python client.py request <task_id>")
        sys.exit(1) # terminate program
    proc = sp.Popen(['g++', 'answer.cpp', 'test.h', '-o', 'test'], stderr=sp.DEVNULL, stdout=sp.DEVNULL)
    proc.wait()
    if(proc.returncode):
        print("Answer failed to compile.")
        sys.exit(1)
    proc = sp.Popen(['./test'], stdin=sp.PIPE, stdout=sp.PIPE)
    test_result, _ = proc.communicate(input=data)
    # dispose of temporary files
    os.remove('answer.cpp')
    os.remove('test.h')
    os.remove('test')

    return test_result
pyhttps.py 文件源码 项目:pyhttps 作者: talhasch 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def create_ssl_cert():
    if PY3:
        from subprocess import DEVNULL
    else:
        DEVNULL = open(os.devnull, 'wb')

    try:
        ssl_exec_list = ['openssl', 'req', '-new', '-x509', '-keyout', ssl_cert_path,
                         '-out', ssl_cert_path, '-days', '365', '-nodes',
                         '-subj', '/CN=www.talhasch.com/O=Talhasch Inc./C=TR']
        call(ssl_exec_list, stdout=DEVNULL, stderr=DEVNULL)
    except OpenSslExecutableNotFoundError:
        logging.error('openssl executable not found!')
        exit(1)

    logging.info('Self signed ssl certificate created at {}'.format(ssl_cert_path))
shell.py 文件源码 项目:ml-utils 作者: LinxiFan 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _DEPREC_bash_run(cmd, suppress_err=0):
    if not U.f_exists('~/.bashrc'):
        print('WARNING: ~/.bashrc not found. '
              'bash_output() will not be able to read the aliases.', file=sys.stderr)
    # hack: if you don't echo something first, your alias such as `ll` will
    # kill the script after it's run.
    # https://stackoverflow.com/questions/45558993/subprocess-interactive-bash-behavior
    cmd = 'printf ""; ' + cmd
    err = pc.DEVNULL if suppress_err else pc.STDOUT
    proc = pc.check_output(['/bin/bash', '-i', '-c', cmd],
                           stderr=err,
                           ).decode()
    # try:
    #     err = pc.DEVNULL if suppress_err else None
    #     return pc.check_output(['/bin/bash', '-i', '-c', cmd],
    #                            stderr=err,
    #                            ).decode()
    # except pc.CalledProcessError as exc:
    #     if suppress_err:
    #         print('Failed bash call:\n', exc.output.decode(), file=sys.stderr)
    #     raise
Aireplay.py 文件源码 项目:WiFree 作者: Ablablab 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __start(self, args):
        # build aireplay flags
        flags = args

        if self.DEAUTH_BSSID_FLAG not in flags:
            flags.extend([self.DEAUTH_BSSID_FLAG, self._bssid])
        if '--ignore-negative-one' not in flags:
            flags.extend(["--ignore-negative-one"])

        # build Popen command as list
        cmd = [self.AIREPLAY] + flags + [self._miff]

        print("\t" + " ".join(cmd))

        self._proc = Popen(cmd,env={'PATH': os.environ['PATH']}, \
                               stderr=DEVNULL, stdin=DEVNULL, stdout=DEVNULL)

        return True
SplitVideo.py 文件源码 项目:BiliLive 作者: hr3lxphr6j 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def cut_video(input_file: str, output_file: str, start_time: str, end_time: str, is_concat: bool = True):
    """
    ????
    :param input_file: ????
    :param output_file: ????
    :param start_time: ????
    :param end_time: ????
    :param is_concat: ?????Virtual Concatenation Script
    :return:
    """
    ffmpeg_command = ['ffmpeg', '-y']
    if is_concat:
        ffmpeg_command.extend(['-protocol_whitelist', 'file,pipe',
                               '-safe', '0',
                               '-f', 'concat',
                               '-i', '-'])
    else:
        ffmpeg_command.extend(['-i', input_file])
    ffmpeg_command.extend(['-ss', start_time,
                           '-to', end_time,
                           '-c', 'copy',
                           output_file])
    child = subprocess.Popen(ffmpeg_command, stdin=subprocess.PIPE, stderr=subprocess.DEVNULL)
    child.communicate(input_file if is_concat else None)


问题


面经


文章

微信
公众号

扫码关注公众号