python类getcwd()的实例源码

utils.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 64 收藏 0 点赞 0 评论 0
def _git_update_requirements(venv, package_dir, reqs_dir):
    """
    Update from global requirements.

    Update an OpenStack git directory's requirements.txt and
    test-requirements.txt from global-requirements.txt.
    """
    orig_dir = os.getcwd()
    os.chdir(reqs_dir)
    python = os.path.join(venv, 'bin/python')
    cmd = [python, 'update.py', package_dir]
    try:
        subprocess.check_call(cmd)
    except subprocess.CalledProcessError:
        package = os.path.basename(package_dir)
        error_out("Error updating {} from "
                  "global-requirements.txt".format(package))
    os.chdir(orig_dir)
dispycos_client4.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def client_proc(job_id, data_file, rtask, task=None):
    # send input file to rtask.location; this will be saved to dispycos process's
    # working directory
    if (yield pycos.Pycos().send_file(rtask.location, data_file, timeout=10)) < 0:
        print('Could not send input data to %s' % rtask.location)
        # terminate remote task
        rtask.send(None)
        raise StopIteration(-1)
    # send info about input
    obj = C(job_id, data_file, random.uniform(5, 8), task)
    if (yield rtask.deliver(obj)) != 1:
        print('Could not send input to %s' % rtask.location)
        raise StopIteration(-1)
    # rtask sends result to this task as message
    result = yield task.receive()
    if not result.result_file:
        print('Processing %s failed' % obj.i)
        raise StopIteration(-1)
    # rtask saves results file at this client, which is saved in pycos's
    # dest_path, not current working directory!
    result_file = os.path.join(pycos.Pycos().dest_path, result.result_file)
    # move file to cwd
    target = os.path.join(os.getcwd(), os.path.basename(result_file))
    os.rename(result_file, target)
    print('    job %s output is in %s' % (obj.i, target))
dispycos_client4.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def client_proc(job_id, data_file, rtask, task=None):
    # send input file to rtask.location; this will be saved to dispycos process's
    # working directory
    if (yield pycos.Pycos().send_file(rtask.location, data_file, timeout=10)) < 0:
        print('Could not send input data to %s' % rtask.location)
        # terminate remote task
        rtask.send(None)
        raise StopIteration(-1)
    # send info about input
    obj = C(job_id, data_file, random.uniform(5, 8), task)
    if (yield rtask.deliver(obj)) != 1:
        print('Could not send input to %s' % rtask.location)
        raise StopIteration(-1)
    # rtask sends result to this task as message
    result = yield task.receive()
    if not result.result_file:
        print('Processing %s failed' % obj.i)
        raise StopIteration(-1)
    # rtask saves results file at this client, which is saved in pycos's
    # dest_path, not current working directory!
    result_file = os.path.join(pycos.Pycos().dest_path, result.result_file)
    # move file to cwd
    target = os.path.join(os.getcwd(), os.path.basename(result_file))
    os.rename(result_file, target)
    print('    job %s output is in %s' % (obj.i, target))
dispycos_client4.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def client_proc(job_id, data_file, rtask, task=None):
    # send input file to rtask.location; this will be saved to dispycos process's
    # working directory
    if (yield pycos.Pycos().send_file(rtask.location, data_file, timeout=10)) < 0:
        print('Could not send input data to %s' % rtask.location)
        # terminate remote task
        rtask.send(None)
        raise StopIteration(-1)
    # send info about input
    obj = C(job_id, data_file, random.uniform(5, 8), task)
    if (yield rtask.deliver(obj)) != 1:
        print('Could not send input to %s' % rtask.location)
        raise StopIteration(-1)
    # rtask sends result to this task as message
    result = yield task.receive()
    if not result.result_file:
        print('Processing %s failed' % obj.i)
        raise StopIteration(-1)
    # rtask saves results file at this client, which is saved in pycos's
    # dest_path, not current working directory!
    result_file = os.path.join(pycos.Pycos().dest_path, result.result_file)
    # move file to cwd
    target = os.path.join(os.getcwd(), os.path.basename(result_file))
    os.rename(result_file, target)
    print('    job %s output is in %s' % (obj.i, target))
builder_base.py 文件源码 项目:rpwng 作者: MrNbaYoh 项目源码 文件源码 阅读 54 收藏 0 点赞 0 评论 0
def build(self, file):
        if self.built:
            raise PermissionError("You cannot build multiple times!")

        if not self.loaded:
            self.load(file)

        old = os.getcwd()
        sys.path.append(os.path.dirname(os.path.abspath(file)))  # for module import that aren't "include" call
        try:
            content = open(file, "rb").read()
            os.chdir(os.path.dirname(os.path.abspath(file)))  # set the current working directory, for open() etc.
            exec(compile(content, file, 'exec'), self.user_functions)
        except Exception as err:
            print("An exception occured while building: ", file=sys.stderr)
            lines = traceback.format_exc(None, err).splitlines()
            print("  " + lines[-1], file=sys.stderr)
            for l in lines[3:-1]:
                print(l, file=sys.stderr)
            exit(1)

        os.chdir(old)
        sys.path.remove(os.path.dirname(os.path.abspath(file)))
        self.built = True
builder_base.py 文件源码 项目:rpwng 作者: MrNbaYoh 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def load(self, file):
        if self.loaded:
            return

        sys.path.append(os.path.dirname(os.path.abspath(file)))  # for module import that aren't "include" call
        old = os.getcwd()
        try:
            content = open(file, "rb").read()
            os.chdir(os.path.dirname(os.path.abspath(file)))  # set the current working directory, for open() etc.
            exec(compile(content, file, 'exec'), self.user_functions)
        except Exception as err:
            print("An exception occured while loading: ", file=sys.stderr)
            lines = traceback.format_exc(None, err).splitlines()
            print("  " + lines[-1], file=sys.stderr)
            for l in lines[3:-1]:
                print(l, file=sys.stderr)
            exit(1)

        os.chdir(old)
        sys.path.remove(os.path.dirname(os.path.abspath(file)))
        self.loaded = True
        self.mem_offset = 0
ez_setup.py 文件源码 项目:Adafruit_Python_PureIO 作者: adafruit 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def archive_context(filename):
    # extracting the archive
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        with get_zip_class()(filename) as archive:
            archive.extractall()

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
run.py 文件源码 项目:pyupdater-wx-demo 作者: wettenhj 项目源码 文件源码 阅读 46 收藏 0 点赞 0 评论 0
def StartFileServer(fileServerDir):
    """
    Start file server.
    """
    if not fileServerDir:
        message = \
            "The PYUPDATER_FILESERVER_DIR environment variable is not set."
        if hasattr(sys, "frozen"):
            logger.error(message)
            return None
        else:
            fileServerDir = os.path.join(os.getcwd(), 'pyu-data', 'deploy')
            message += "\n\tSetting fileServerDir to: %s\n" % fileServerDir
            logger.warning(message)
    fileServerPort = GetEphemeralPort()
    thread = threading.Thread(target=RunFileServer,
                              args=(fileServerDir, fileServerPort))
    thread.start()
    WaitForFileServerToStart(fileServerPort)
    return fileServerPort
export_users.py 文件源码 项目:safetyculture-sdk-python 作者: SafetyCulture 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def save_users_and_groups_to_csv(user_data, csv_output_filepath):
    """
    Creates a CSV file with exported user data
    :param user_data: The exported user data
    :param csv_output_filepath: The output file to save
    :return: None
    """
    full_output_path = os.path.join(os.getcwd(), csv_output_filepath)
    with open(full_output_path, 'wb') as f:
        fields = ['email', 'lastname', 'firstname', 'groups']
        w = csv.DictWriter(f, fields)
        w.writeheader()
        for key, val in sorted(user_data.items()):
            val['groups'] = ", ".join(val['groups'][0::2])
            row = {'email': key}
            row.update(val)
            w.writerow(row)
exporter.py 文件源码 项目:safetyculture-sdk-python 作者: SafetyCulture 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def configure(logger, path_to_config_file, export_formats):
    """
    instantiate and configure logger, load config settings from file, instantiate SafetyCulture SDK
    :param logger:              the logger
    :param path_to_config_file: path to config file
    :param export_formats:      desired export formats
    :return:                    instance of SafetyCulture SDK object, config settings
    """

    config_settings = load_config_settings(logger, path_to_config_file)
    config_settings[EXPORT_FORMATS] = export_formats
    sc_client = sp.SafetyCulture(config_settings[API_TOKEN])

    if config_settings[EXPORT_PATH] is not None:
        create_directory_if_not_exists(logger, config_settings[EXPORT_PATH])
    else:
        logger.info('Invalid export path was found in ' + path_to_config_file + ', defaulting to /exports')
        config_settings[EXPORT_PATH] = os.path.join(os.getcwd(), 'exports')
        create_directory_if_not_exists(logger, config_settings[EXPORT_PATH])

    return sc_client, config_settings
ntpath.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def abspath(path):
        """Return the absolute version of a path."""

        if path: # Empty path must return current working directory.
            path = os.fspath(path)
            try:
                path = _getfullpathname(path)
            except OSError:
                pass # Bad path - return unchanged.
        elif isinstance(path, bytes):
            path = os.getcwdb()
        else:
            path = os.getcwd()
        return normpath(path)

# realpath is a no-op on systems without islink support
tempfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def _candidate_tempdir_list():
    """Generate a list of candidate temporary directories which
    _get_default_tempdir will try."""

    dirlist = []

    # First, try the environment.
    for envname in 'TMPDIR', 'TEMP', 'TMP':
        dirname = _os.getenv(envname)
        if dirname: dirlist.append(dirname)

    # Failing that, try OS-specific locations.
    if _os.name == 'nt':
        dirlist.extend([ r'c:\temp', r'c:\tmp', r'\temp', r'\tmp' ])
    else:
        dirlist.extend([ '/tmp', '/var/tmp', '/usr/tmp' ])

    # As a last resort, the current directory.
    try:
        dirlist.append(_os.getcwd())
    except (AttributeError, OSError):
        dirlist.append(_os.curdir)

    return dirlist
ez_setup.py 文件源码 项目:py_find_1st 作者: roebel 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def archive_context(filename):
    # extracting the archive
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        with get_zip_class()(filename) as archive:
            archive.extractall()

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
ez_setup.py 文件源码 项目:Adafruit_Python_PCA9685 作者: adafruit 项目源码 文件源码 阅读 58 收藏 0 点赞 0 评论 0
def archive_context(filename):
    # extracting the archive
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        with get_zip_class()(filename) as archive:
            archive.extractall()

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
qiushimm.py 文件源码 项目:Python 作者: Guzi219 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def init_work_dir(self):
        retval = os.getcwd()
        print '#current dir is : ' + retval
        # ??????
        store_dir = retval + os.sep + 'tmp'
        print '#all imgs are going to be stored in dir :' + store_dir

        if not os.path.exists(store_dir):
            print '#tmp dir does not exist, attemp to mkdir'
            os.mkdir(store_dir)
            print '#mkdir sucessfully'
        else:
            print '#tmp dir is already exist'

        self.store_dir = store_dir

        # print '#now change current dir to tmp'
        # os.chdir(store_dir) #no neccessary
        # print os.getcwd()
qiubaiadult.py 文件源码 项目:Python 作者: Guzi219 项目源码 文件源码 阅读 59 收藏 0 点赞 0 评论 0
def init_work_dir(self):
        retval = os.getcwd()
        print '#current dir is : ' + retval
        # ??????
        store_dir = retval + os.sep + 'tmp'
        print '#all imgs are going to be stored in dir :' + store_dir

        if not os.path.exists(store_dir):
            print '#tmp dir does not exist, attemp to mkdir'
            os.mkdir(store_dir)
            print '#mkdir sucessfully'
        else:
            print '#tmp dir is already exist'

        self.store_dir = store_dir

        # print '#now change current dir to tmp'
        # os.chdir(store_dir) #no neccessary
        # print os.getcwd()
invoke.py 文件源码 项目:ardy 作者: avara1986 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def _run_local_lambda(self, lambda_config):
        prev_folder = os.getcwd()
        os.chdir(self.config.get_projectdir())
        sys.path.append(self.config.get_projectdir())
        lambda_name = lambda_config["FunctionName"]
        lambda_handler = self.import_function(lambda_config["Handler"])

        # Run and set a counter
        start = time.time()
        results = lambda_handler({}, MockContext(lambda_name))
        end = time.time()

        # restore folder
        os.chdir(prev_folder)

        # Print results
        logger.info("{0}".format(results))
        logger.info("\nexecution time: {:.8f}s\nfunction execution "
                    "timeout: {:2}s".format(end - start, lambda_config["Timeout"]))
pydotenv.py 文件源码 项目:Dockerfiles 作者: appscode 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def find_dotenv(filename='.env', raise_error_if_not_found=False, usecwd=False):
    """
    Search in increasingly higher folders for the given file

    Returns path to the file if found, or an empty string otherwise
    """
    if usecwd or '__file__' not in globals():
        # should work without __file__, e.g. in REPL or IPython notebook
        path = os.getcwd()
    else:
        # will work for .py files
        frame_filename = sys._getframe().f_back.f_code.co_filename
        path = os.path.dirname(os.path.abspath(frame_filename))

    for dirname in _walk_to_root(path):
        check_path = os.path.join(dirname, filename)
        if os.path.exists(check_path):
            return check_path

    if raise_error_if_not_found:
        raise IOError('File not found')

    return ''
ex_actions.py 文件源码 项目:NeoVintageous 作者: NeoVintageous 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def _changing_cd(f, *args, **kwargs):
    def inner(*args, **kwargs):
        try:
            state = State(args[0].view)
        except AttributeError:
            state = State(args[0].window.active_view())

        old = os.getcwd()
        try:
            # FIXME: Under some circumstances, like when switching projects to
            # a file whose _cmdline_cd has not been set, _cmdline_cd might
            # return 'None'. In such cases, change to the actual current
            # directory as a last measure. (We should probably fix this anyway).
            os.chdir(state.settings.vi['_cmdline_cd'] or old)
            f(*args, **kwargs)
        finally:
            os.chdir(old)

    return inner
test_15_LoggingConfig.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def test_comp_macro_directories_config_python(self):
        file_loc = os.getcwd()
        self.comp = sb.launch(self.cname, impl="python", execparams={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/logconfig.cfg'} )
        fp = None
        try:
            fp = open('foo/bar/test.log','r')
        except:
            pass
        try:
            os.remove('foo/bar/test.log')
        except:
            pass
        try:
            os.rmdir('foo/bar')
        except:
            pass
        try:
            os.rmdir('foo')
        except:
            pass
        self.assertNotEquals(fp, None)
test_15_LoggingConfig.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 64 收藏 0 点赞 0 评论 0
def test_comp_macro_directories_config_cpp(self):
        file_loc = os.getcwd()
        self.comp = sb.launch(self.cname, impl="cpp", execparams={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/logconfig.cfg'} )
        fp = None
        try:
            fp = open('foo/bar/test.log','r')
        except:
            pass
        try:
            os.remove('foo/bar/test.log')
        except:
            pass
        try:
            os.rmdir('foo/bar')
        except:
            pass
        try:
            os.rmdir('foo')
        except:
            pass
        self.assertNotEquals(fp, None)
test_15_LoggingConfig.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def test_comp_macro_directories_config_java(self):
        file_loc = os.getcwd()
        self.comp = sb.launch(self.cname, impl="java", execparams={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/logconfig.cfg'} )
        fp = None
        try:
            fp = open('foo/bar/test.log','r')
        except:
            pass
        try:
            os.remove('foo/bar/test.log')
        except:
            pass
        try:
            os.rmdir('foo/bar')
        except:
            pass
        try:
            os.rmdir('foo')
        except:
            pass
        self.assertNotEquals(fp, None)
test_03_DeviceLifeCycle.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def setUp(self):

        cfg = "log4j.rootLogger=TRACE,CONSOLE,FILE\n" + \
            "log4j.debug=false\n" + \
            "# Direct log messages to FILE\n" + \
            "log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender\n" + \
            "log4j.appender.CONSOLE.File=stdout\n" + \
            "log4j.appender.FILE=org.apache.log4j.FileAppender\n" + \
            "log4j.appender.FILE.File="+os.getcwd()+"/tmp_logfile.log\n" + \
            "log4j.appender.CONSOLE.threshold=TRACE\n" + \
            "log4j.appender.FILE.threshold=TRACE\n" + \
            "log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout\n" + \
            "log4j.appender.CONSOLE.layout.ConversionPattern=%p:%c - %m [%F:%L]%n\n" + \
            "log4j.appender.FILE.layout=org.apache.log4j.PatternLayout\n" + \
            "log4j.appender.FILE.layout.ConversionPattern=%d %p:%c - %m [%F:%L]%n\n"

        fp = open('tmp_logfile.config','w')
        fp.write(cfg)
        fp.close()

        nodebooter, self._domMgr = self.launchDomainManager()
        self._domBooter = nodebooter
test_04_ApplicationFactory.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def setUp(self):
        cfg = "log4j.rootLogger=DEBUG,STDOUT,FILE\n " + \
            "# Direct log messages to FILE\n" + \
            "log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender\n" + \
            "log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout\n" + \
            "log4j.appender.FILE=org.apache.log4j.FileAppender\n" + \
            "log4j.appender.FILE.File=tmp_logfile.log\n" + \
            "log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout\n" + \
            "log4j.appender.CONSOLE.layout.ConversionPattern=%p:%c - %m [%F:%L]%n\n" + \
            "log4j.appender.FILE.layout=org.apache.log4j.PatternLayout\n" + \
            "log4j.appender.FILE.layout.ConversionPattern=%d %p:%c - %m [%F:%L]%n\n"

        fp = open('tmp_logfile.config','w')
        fp.write(cfg)
        fp.close()
        self.domBooter, self._domMgr = self.launchDomainManager(loggingURI=os.getcwd()+'/tmp_logfile.config')
        self.devBooter, self._devMgr = self.launchDeviceManager("/nodes/test_ExecutableDevice_node/DeviceManager.dcd.xml")
        self._app = None
utils.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 56 收藏 0 点赞 0 评论 0
def _git_update_requirements(venv, package_dir, reqs_dir):
    """
    Update from global requirements.

    Update an OpenStack git directory's requirements.txt and
    test-requirements.txt from global-requirements.txt.
    """
    orig_dir = os.getcwd()
    os.chdir(reqs_dir)
    python = os.path.join(venv, 'bin/python')
    cmd = [python, 'update.py', package_dir]
    try:
        subprocess.check_call(cmd)
    except subprocess.CalledProcessError:
        package = os.path.basename(package_dir)
        error_out("Error updating {} from "
                  "global-requirements.txt".format(package))
    os.chdir(orig_dir)
utils.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def _git_update_requirements(venv, package_dir, reqs_dir):
    """
    Update from global requirements.

    Update an OpenStack git directory's requirements.txt and
    test-requirements.txt from global-requirements.txt.
    """
    orig_dir = os.getcwd()
    os.chdir(reqs_dir)
    python = os.path.join(venv, 'bin/python')
    cmd = [python, 'update.py', package_dir]
    try:
        subprocess.check_call(cmd)
    except subprocess.CalledProcessError:
        package = os.path.basename(package_dir)
        error_out("Error updating {} from "
                  "global-requirements.txt".format(package))
    os.chdir(orig_dir)
get_nodes.py 文件源码 项目:stalkerGKSU 作者: zense 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def get_json(org):
    d = {"nodes":[],"links":[]}
    for i in graph:
        dt = {}
        dt["id"]=i
        dt["group"]=1
        d["nodes"].append(dt)
    for i in graph:
        for j in graph[i]:
            for k in j:
                dt={}
                dt["source"]=i
                dt["target"]=k[1::]
                dt["value"]=10
                d["links"].append(dt)
    string_json = json.dumps(d)
    filename = org + ".json"
    f = open(os.path.join(os.getcwd(), "static", filename), "w")
    f.write(string_json)
    f.close()
pydotenv.py 文件源码 项目:libbuild 作者: appscode 项目源码 文件源码 阅读 48 收藏 0 点赞 0 评论 0
def find_dotenv(filename='.env', raise_error_if_not_found=False, usecwd=False):
    """
    Search in increasingly higher folders for the given file

    Returns path to the file if found, or an empty string otherwise
    """
    if usecwd or '__file__' not in globals():
        # should work without __file__, e.g. in REPL or IPython notebook
        path = os.getcwd()
    else:
        # will work for .py files
        frame_filename = sys._getframe().f_back.f_code.co_filename
        path = os.path.dirname(os.path.abspath(frame_filename))

    for dirname in _walk_to_root(path):
        check_path = os.path.join(dirname, filename)
        if os.path.exists(check_path):
            return check_path

    if raise_error_if_not_found:
        raise IOError('File not found')

    return ''
containers.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 53 收藏 0 点赞 0 评论 0
def __init__(self, additional_compose_file=None, additional_services=None):
        # To resolve docker client server version mismatch issue.
        os.environ["COMPOSE_API_VERSION"] = "auto"
        dir_name = os.path.split(os.getcwd())[-1]
        self.project = "{}{}".format(
            re.sub(r'[^a-z0-9]', '', dir_name.lower()),
            getpass.getuser()
        )
        self.additional_compose_file = additional_compose_file

        self.services = ["zookeeper", "schematizer", "kafka"]

        if additional_services is not None:
            self.services.extend(additional_services)

        # This variable is meant to capture the running/not-running state of
        # the dependent testing containers when tests start running.  The idea
        # is, we'll only start and stop containers if they aren't already
        # running.  If they are running, we'll just use the ones that exist.
        # It takes a while to start all the containers, so when running lots of
        # tests, it's best to start them out-of-band and leave them up for the
        # duration of the session.
        self.containers_already_running = self._are_containers_already_running()
ez_setup.py 文件源码 项目:BackManager 作者: linuxyan 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def archive_context(filename):
    """
    Unzip filename to a temporary directory, set to the cwd.

    The unzipped target is cleaned up after.
    """
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        with ContextualZipFile(filename) as archive:
            archive.extractall()

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)


问题


面经


文章

微信
公众号

扫码关注公众号