python类getenv()的实例源码

workflow3.py 文件源码 项目:alfred-mpd 作者: deanishe 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def session_id(self):
        """A unique session ID every time the user uses the workflow.

        .. versionadded:: 1.25

        The session ID persists while the user is using this workflow.
        It expires when the user runs a different workflow or closes
        Alfred.

        """
        if not self._session_id:
            sid = os.getenv('_WF_SESSION_ID')
            if not sid:
                from uuid import uuid4
                sid = uuid4().hex
                self.setvar('_WF_SESSION_ID', sid)

            self._session_id = sid

        return self._session_id
appdirs.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def site_config_dirs(appname):
    """Return a list of potential user-shared config dirs for this application.

        "appname" is the name of application.

    Typical user config directories are:
        macOS:      /Library/Application Support/<AppName>/
        Unix:       /etc or $XDG_CONFIG_DIRS[i]/<AppName>/ for each value in
                    $XDG_CONFIG_DIRS
        Win XP:     C:\Documents and Settings\All Users\Application ...
                    ...Data\<AppName>\
        Vista:      (Fail! "C:\ProgramData" is a hidden *system* directory
                    on Vista.)
        Win 7:      Hidden, but writeable on Win 7:
                    C:\ProgramData\<AppName>\
    """
    if WINDOWS:
        path = os.path.normpath(_get_win_folder("CSIDL_COMMON_APPDATA"))
        pathlist = [os.path.join(path, appname)]
    elif sys.platform == 'darwin':
        pathlist = [os.path.join('/Library/Application Support', appname)]
    else:
        # try looking in $XDG_CONFIG_DIRS
        xdg_config_dirs = os.getenv('XDG_CONFIG_DIRS', '/etc/xdg')
        if xdg_config_dirs:
            pathlist = [
                os.path.join(expanduser(x), appname)
                for x in xdg_config_dirs.split(os.pathsep)
            ]
        else:
            pathlist = []

        # always look in /etc directly as well
        pathlist.append('/etc')

    return pathlist


# -- Windows support functions --
tempfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _candidate_tempdir_list():
    """Generate a list of candidate temporary directories which
    _get_default_tempdir will try."""

    dirlist = []

    # First, try the environment.
    for envname in 'TMPDIR', 'TEMP', 'TMP':
        dirname = _os.getenv(envname)
        if dirname: dirlist.append(dirname)

    # Failing that, try OS-specific locations.
    if _os.name == 'nt':
        dirlist.extend([ r'c:\temp', r'c:\tmp', r'\temp', r'\tmp' ])
    else:
        dirlist.extend([ '/tmp', '/var/tmp', '/usr/tmp' ])

    # As a last resort, the current directory.
    try:
        dirlist.append(_os.getcwd())
    except (AttributeError, OSError):
        dirlist.append(_os.curdir)

    return dirlist
engine.py 文件源码 项目:ibus-replace-with-kanji 作者: esrille 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __load_layout(self, config):
        var = config.get_value('engine/replace-with-kanji-python', 'layout')
        if var is None or var.get_type_string() != 's':
            path = os.path.join(os.getenv('IBUS_REPLACE_WITH_KANJI_LOCATION'), 'layouts')
            path = os.path.join(path, 'roomazi.json')
            if var:
                config.unset('engine/replace-with-kanji-python', 'layout')
        else:
            path = var.get_string()
        logger.info("layout: %s", path)
        layout = roomazi.layout     # Use 'roomazi' as default
        try:
            with open(path) as f:
                layout = json.load(f)
        except ValueError as error:
            logger.error("JSON error: %s", error)
        except OSError as error:
            logger.error("Error: %s", error)
        except:
            logger.error("Unexpected error: %s %s", sys.exc_info()[0], sys.exc_info()[1])
        self.__to_kana = self.__handle_roomazi_layout
        if 'Type' in layout:
            if layout['Type'] == 'Kana':
                self.__to_kana = self.__handle_kana_layout
        return layout
device.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def __init__(self):
        if (os.getenv("LD_LIBRARY_PATH")):
            self.ld_lib_path = os.getenv("LD_LIBRARY_PATH")
        else:
            self.ld_lib_path = ''
        if (os.getenv("PYTHONPATH")):
            self.pythonpath = os.getenv("PYTHONPATH")
        else:
            self.pythonpath = ''
        if (os.getenv("CLASSPATH")):
            self.classpath = os.getenv("CLASSPATH")
        else:
            self.classpath = ''
        if (os.getenv("OCTAVE_PATH")):
            self.octave_path = os.getenv("OCTAVE_PATH")
        else:
            self.octave_path = ''
device.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __enter__(self):
        if (os.getenv("LD_LIBRARY_PATH")):
            self.ld_lib_path = os.getenv("LD_LIBRARY_PATH")
        else:
            self.ld_lib_path = ''
        if (os.getenv("PYTHONPATH")):
            self.pythonpath = os.getenv("PYTHONPATH")
        else:
            self.pythonpath = ''
        if (os.getenv("CLASSPATH")):
            self.classpath = os.getenv("CLASSPATH")
        else:
            self.classpath = ''
        if (os.getenv("OCTAVE_PATH")):
            self.octave_path = os.getenv("OCTAVE_PATH")
        else:
            self.octave_path = ''
device.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _prependToEnvVar(self, newVal, envVar):
        path = self._getEnvVarAsList(envVar)
        foundValue = False
        for entry in path:
            # Search to determine if the new value is already in the path
            try:
                if os.path.samefile(entry, newVal):
                    # The value is already in the path
                    foundValue = True
                    break
            except OSError:
                # If we can't find concrete files to compare, fall back to string compare
                if entry == newVal:
                    # The value is already in the path
                    foundValue = True
                    break

        if not foundValue:
            # The value does not already exist
            if os.environ.has_key(envVar):
                newpath = newVal+os.path.pathsep + os.getenv(envVar)+os.path.pathsep
            else:
                newpath = newVal+os.path.pathsep
            os.putenv(envVar, newpath)
            os.environ[envVar] = newpath
cli.py 文件源码 项目:kubernaut 作者: datawire 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def create_kubeconfig_var_message(path):
    msg = """Set your KUBECONFIG environment variable to use kubectl"""

    shell = os.getenv("SHELL", "").lower()
    if "/bash" in shell or "/zsh" in shell:
        msg += """

        export KUBECONFIG={0}
        """
    elif "/fish" in shell:
        msg += """ 

        set -g -x KUBECONFIG {0}
        """
    else:
        msg += ". Unable to detect shell therefore assuming a Bash-compatible shell"

        msg += """    

        export KUBECONFIG={0}
        """

    return msg.format(path).lstrip()
pydotenv.py 文件源码 项目:libbuild 作者: appscode 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def resolve_nested_variables(values):
    def _replacement(name):
        """
        get appropiate value for a variable name.
        first search in environ, if not found,
        then look into the dotenv variables
        """
        ret = os.getenv(name, values.get(name, ""))
        return ret

    def _re_sub_callback(match_object):
        """
        From a match object gets the variable name and returns
        the correct replacement
        """
        return _replacement(match_object.group()[2:-1])

    for k, v in values.items():
        values[k] = __posix_variable.sub(_re_sub_callback, v)

    return values
goglib_get_games_list.py 文件源码 项目:games_nebula 作者: yancharkin 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def goglib_get_games_list():

    proc = subprocess.Popen(['lgogdownloader', '--exclude', \
    '1,2,4,8,16,32', '--list-details'],stdout=subprocess.PIPE)
    games_detailed_list = proc.stdout.readlines()
    stdoutdata, stderrdata = proc.communicate()

    if proc.returncode == 0:

        file_path = os.getenv('HOME') + '/.games_nebula/config/games_list'

        games_list_file = open(file_path, 'w')

        for line in games_detailed_list:
            if 'Getting game info' not in line:
                games_list_file.write(line)

        return 0

    else:

        return 1
runner.py 文件源码 项目:foremast 作者: gogoair 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def __init__(self):
        """Setup the Runner for all Foremast modules."""
        debug_flag()

        self.email = os.getenv("EMAIL")
        self.env = os.getenv("ENV")
        self.group = os.getenv("PROJECT")
        self.region = os.getenv("REGION")
        self.repo = os.getenv("GIT_REPO")
        self.runway_dir = os.getenv("RUNWAY_DIR")
        self.artifact_path = os.getenv("ARTIFACT_PATH")
        self.artifact_version = os.getenv("ARTIFACT_VERSION")
        self.promote_stage = os.getenv("PROMOTE_STAGE", "latest")

        self.git_project = "{}/{}".format(self.group, self.repo)
        parsed = gogoutils.Parser(self.git_project)
        generated = gogoutils.Generator(*parsed.parse_url(), formats=consts.APP_FORMATS)

        self.app = generated.app_name()
        self.trigger_job = generated.jenkins()['name']
        self.git_short = generated.gitlab()['main']

        self.raw_path = "./raw.properties"
        self.json_path = self.raw_path + ".json"
        self.configs = None
config.py 文件源码 项目:mechanic 作者: server-mechanic 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def __init__(self, mode):
    self.mode = mode
    self.mechanicRootDir = getenv("MECHANIC_ROOT_DIR", "")
    if mode != "USER":
      self.configFile = "${MECHANIC_ROOT_DIR}/etc/mechanic.conf"
      self.logFile = ""
      self.migrationDirs = ["${MECHANIC_ROOT_DIR}/etc/mechanic/migration.d", "${MECHANIC_ROOT_DIR}/var/lib/mechanic/migration.d"]
      self.preMigrationDirs = ["${MECHANIC_ROOT_DIR}/etc/mechanic/pre-migration.d", "${MECHANIC_ROOT_DIR}/var/lib/mechanic/pre-migration.d"]
      self.postMigrationDirs = ["${MECHANIC_ROOT_DIR}/etc/mechanic/post-migration.d", "${MECHANIC_ROOT_DIR}/var/lib/mechanic/post-migration.d"]
      self.stateDir = "${MECHANIC_ROOT_DIR}/var/lib/mechanic/state"
      self.runDir = "${MECHANIC_ROOT_DIR}/var/lib/mechanic/tmp"
    else:
      self.configFile = "${HOME}/.mechanic/mechanic.conf"
      self.logFile = "stderr"
      self.migrationDirs = ["${HOME}/.mechanic/migration.d"]
      self.preMigrationDirs = ["${HOME}/.mechanic/pre-migration.d"]
      self.postMigrationDirs = ["${HOME}/.mechanic/post-migration.d"]
      self.stateDir = "${HOME}/.mechanic/state"
      self.runDir = "${HOME}/.mechanic/tmp"
client.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 47 收藏 0 点赞 0 评论 0
def _get_well_known_file():
    """Get the well known file produced by command 'gcloud auth login'."""
    # TODO(orestica): Revisit this method once gcloud provides a better way
    # of pinpointing the exact location of the file.
    default_config_dir = os.getenv(_CLOUDSDK_CONFIG_ENV_VAR)
    if default_config_dir is None:
        if os.name == 'nt':
            try:
                default_config_dir = os.path.join(os.environ['APPDATA'],
                                                  _CLOUDSDK_CONFIG_DIRECTORY)
            except KeyError:
                # This should never happen unless someone is really
                # messing with things.
                drive = os.environ.get('SystemDrive', 'C:')
                default_config_dir = os.path.join(drive, '\\',
                                                  _CLOUDSDK_CONFIG_DIRECTORY)
        else:
            default_config_dir = os.path.join(os.path.expanduser('~'),
                                              '.config',
                                              _CLOUDSDK_CONFIG_DIRECTORY)

    return os.path.join(default_config_dir, _WELL_KNOWN_CREDENTIALS_FILE)
devshell.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def _SendRecv():
    """Communicate with the Developer Shell server socket."""

    port = int(os.getenv(DEVSHELL_ENV, 0))
    if port == 0:
        raise NoDevshellServer()

    sock = socket.socket()
    sock.connect(('localhost', port))

    data = CREDENTIAL_INFO_REQUEST_JSON
    msg = '%s\n%s' % (len(data), data)
    sock.sendall(_to_bytes(msg, encoding='utf-8'))

    header = sock.recv(6).decode()
    if '\n' not in header:
        raise CommunicationError('saw no newline in the first 6 bytes')
    len_str, json_str = header.split('\n', 1)
    to_read = int(len_str) - len(json_str)
    if to_read > 0:
        json_str += sock.recv(to_read, socket.MSG_WAITALL).decode()

    return CredentialInfoResponse(json_str)
configure.py 文件源码 项目:anonymine 作者: oskar-skog 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def find_EXECUTABLES(Makefile, flags):
    '''
    See the doc-string for find_prefix as well.

    Set Makefile['EXECUTABLES'] if needed to.
    Depends (directly) on $(gamesdir) and $(bindir).
    Depends (indirectly) on $(prefix).
    '''
    if 'EXECUTABLES' not in Makefile:
        acceptable = os.getenv('PATH').split(':')
        for exec_dir in ('gamesdir', 'bindir'):
            if expand(exec_dir, Makefile) in acceptable:
                Makefile['EXECUTABLES'] = '$('+exec_dir+')'
                return False
        else:
            return True
    else:
        return False
settings.py 文件源码 项目:metadataproxy 作者: lyft 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def bool_env(var_name, default=False):
    """
    Get an environment variable coerced to a boolean value.
    Example:
        Bash:
            $ export SOME_VAL=True
        settings.py:
            SOME_VAL = bool_env('SOME_VAL', False)
    Arguments:
        var_name: The name of the environment variable.
        default: The default to use if `var_name` is not specified in the
                 environment.
    Returns: `var_name` or `default` coerced to a boolean using the following
        rules:
            "False", "false" or "" => False
            Any other non-empty string => True
    """
    test_val = getenv(var_name, default)
    # Explicitly check for 'False', 'false', and '0' since all non-empty
    # string are normally coerced to True.
    if test_val in ('False', 'false', '0'):
        return False
    return bool(test_val)
test_multi_inserts.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_in_flight_is_one(self):
        """
        Verify that in_flight value stays equal to one while doing multiple inserts.
        The number of inserts can be set through INSERTS_ITERATIONS environmental variable.
        Default value is 1000000.
        """
        prepared = self.session.prepare("INSERT INTO race (x) VALUES (?)")
        iterations = int(os.getenv("INSERT_ITERATIONS", 1000000))
        i = 0
        leaking_connections = False
        while i < iterations and not leaking_connections:
            bound = prepared.bind((i,))
            self.session.execute(bound)
            for pool in self.session._pools.values():
                if leaking_connections:
                    break
                for conn in pool.get_connections():
                    if conn.in_flight > 1:
                        print(self.session.get_pool_state())
                        leaking_connections = True
                        break
            i = i + 1

        self.assertFalse(leaking_connections, 'Detected leaking connection after %s iterations' % i)
test_main.py 文件源码 项目:open-nti 作者: Juniper 项目源码 文件源码 阅读 89 收藏 0 点赞 0 评论 0
def teardown_module(module):
    global c
    global OPENNTI_C
    global OPENNTI_IN_JTI_C
    global OPENNTI_IN_LOG_C

    # Delete all files in /tests/output/
    if not os.getenv('TRAVIS'):

        c.stop(container=OPENNTI_C)
        c.remove_container(container=OPENNTI_C)

        c.stop(container=OPENNTI_IN_JTI_C)
        c.remove_container(container=OPENNTI_IN_JTI_C)

        c.stop(container=OPENNTI_IN_LOG_C)
        c.remove_container(container=OPENNTI_IN_LOG_C)

        c.stop(container=TCP_REPLAY_C)
        c.remove_container(container=TCP_REPLAY_C)
logmanagement.py 文件源码 项目:PowerMeter-Reader 作者: lucab85 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def setup_logging(self, default_path=PATH_LOGGING, default_level=logging.INFO, env_key='LOG_CFG'):
        path = default_path
        self.logconf = None
        value = os.getenv(env_key, None)
        if value:
            path = value
        if os.path.exists(path):
            with open(path, 'rt') as f:
                config = json.load(f)
            self.logconf = logging.config.dictConfig(config)
        elif os.path.exists(path.replace("../", "")):
            with open(path.replace("../", ""), 'rt') as f:
                config = json.load(f)
                self._changePath(config["handlers"])
            self.logconf = logging.config.dictConfig(config)
        else:
            print("Configurazione log non trovata (\"%s\"): applico le impostazioni predefinite" % path)
            self.logconf = logging.basicConfig(level=default_level)
nvidia.py 文件源码 项目:girder_worker 作者: girder 项目源码 文件源码 阅读 48 收藏 0 点赞 0 评论 0
def add_nvidia_docker_to_config(container_config):
    if not container_config.get('HostConfig', None):
        container_config['HostConfig'] = {}

    nvidia_config = get_nvidia_configuration()

    # Setup the Volumes
    container_config['HostConfig'].setdefault('VolumeDriver', nvidia_config['VolumeDriver'])
    container_config['HostConfig'].setdefault('Binds', [])
    container_config['HostConfig']['Binds'].extend(nvidia_config['Volumes'])

    # Get nvidia control devices
    devices = container_config['HostConfig'].get('Devices', [])
    # suport both '0 1' and '0, 1' formats, just like nvidia-docker
    gpu_isolation = os.getenv('NV_GPU', '').replace(',', ' ').split()
    pattern = re.compile(r'/nvidia([0-9]+)$')
    for device in nvidia_config['Devices']:
        if gpu_isolation:
            card_number = pattern.search(device)
            if card_number and card_number.group(1) not in gpu_isolation:
                continue
        devices.extend(parse_devices([device]))

    container_config['HostConfig']['Devices'] = devices
process_controller.py 文件源码 项目:SublimeTerm 作者: percevalw 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def keep_reading(self):
        """Output thread method for the process

        Sends the process output to the ViewController (through OutputTranscoder)
        """
        while True:
            if self.stop:
                break
            ret = self.process.poll()
            if ret is not None:
                self.stop = True
            readable, writable, executable = select.select([self.master], [], [], 5)
            if readable:
                """ We read the new content """
                data = os.read(self.master, 1024)
                text = data.decode('UTF-8', errors='replace')
                log_debug("RAW", repr(text))
                log_debug("PID", os.getenv('BASHPID'))
                self.output_transcoder.decode(text)
            #                log_debug("{} >> {}".format(int(time.time()), repr(text)))
nrpe.py 文件源码 项目:charm-keystone 作者: openstack 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def copy_nrpe_checks():
    """
    Copy the nrpe checks into place

    """
    NAGIOS_PLUGINS = '/usr/local/lib/nagios/plugins'
    nrpe_files_dir = os.path.join(os.getenv('CHARM_DIR'), 'hooks',
                                  'charmhelpers', 'contrib', 'openstack',
                                  'files')

    if not os.path.exists(NAGIOS_PLUGINS):
        os.makedirs(NAGIOS_PLUGINS)
    for fname in glob.glob(os.path.join(nrpe_files_dir, "check_*")):
        if os.path.isfile(fname):
            shutil.copy2(fname,
                         os.path.join(NAGIOS_PLUGINS, os.path.basename(fname)))
nrpe.py 文件源码 项目:charm-keystone 作者: openstack 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def copy_nrpe_checks():
    """
    Copy the nrpe checks into place

    """
    NAGIOS_PLUGINS = '/usr/local/lib/nagios/plugins'
    nrpe_files_dir = os.path.join(os.getenv('CHARM_DIR'), 'hooks',
                                  'charmhelpers', 'contrib', 'openstack',
                                  'files')

    if not os.path.exists(NAGIOS_PLUGINS):
        os.makedirs(NAGIOS_PLUGINS)
    for fname in glob.glob(os.path.join(nrpe_files_dir, "check_*")):
        if os.path.isfile(fname):
            shutil.copy2(fname,
                         os.path.join(NAGIOS_PLUGINS, os.path.basename(fname)))
nrpe.py 文件源码 项目:charm-keystone 作者: openstack 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def copy_nrpe_checks():
    """
    Copy the nrpe checks into place

    """
    NAGIOS_PLUGINS = '/usr/local/lib/nagios/plugins'
    nrpe_files_dir = os.path.join(os.getenv('CHARM_DIR'), 'hooks',
                                  'charmhelpers', 'contrib', 'openstack',
                                  'files')

    if not os.path.exists(NAGIOS_PLUGINS):
        os.makedirs(NAGIOS_PLUGINS)
    for fname in glob.glob(os.path.join(nrpe_files_dir, "check_*")):
        if os.path.isfile(fname):
            shutil.copy2(fname,
                         os.path.join(NAGIOS_PLUGINS, os.path.basename(fname)))
grid.py 文件源码 项目:bob.bio.base 作者: bioidiap 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def indices(list_to_split, number_of_parallel_jobs, task_id=None):
  """This function returns the first and last index for the files for the current job ID.
     If no job id is set (e.g., because a sub-job is executed locally), it simply returns all indices."""

  if number_of_parallel_jobs is None or number_of_parallel_jobs == 1:
    return None

  # test if the 'SEG_TASK_ID' environment is set
  sge_task_id = os.getenv('SGE_TASK_ID') if task_id is None else task_id
  if sge_task_id is None:
    # task id is not set, so this function is not called from a grid job
    # hence, we process the whole list
    return (0,len(list_to_split))
  else:
    job_id = int(sge_task_id) - 1
    # compute number of files to be executed
    number_of_objects_per_job = int(math.ceil(float(len(list_to_split) / float(number_of_parallel_jobs))))
    start = job_id * number_of_objects_per_job
    end = min((job_id + 1) * number_of_objects_per_job, len(list_to_split))
    return (start, end)
nrpe.py 文件源码 项目:charm-nova-cloud-controller 作者: openstack 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def copy_nrpe_checks():
    """
    Copy the nrpe checks into place

    """
    NAGIOS_PLUGINS = '/usr/local/lib/nagios/plugins'
    nrpe_files_dir = os.path.join(os.getenv('CHARM_DIR'), 'hooks',
                                  'charmhelpers', 'contrib', 'openstack',
                                  'files')

    if not os.path.exists(NAGIOS_PLUGINS):
        os.makedirs(NAGIOS_PLUGINS)
    for fname in glob.glob(os.path.join(nrpe_files_dir, "check_*")):
        if os.path.isfile(fname):
            shutil.copy2(fname,
                         os.path.join(NAGIOS_PLUGINS, os.path.basename(fname)))
lambda_function.py 文件源码 项目:PTE 作者: pwn2winctf 项目源码 文件源码 阅读 55 收藏 0 点赞 0 评论 0
def setup_environment():
    root = os.getenv('LAMBDA_TASK_ROOT')
    bin_dir = os.path.join(root, 'bin')
    os.environ['PATH'] += ':' + bin_dir
    os.environ['GIT_EXEC_PATH'] = bin_dir

    ssh_dir = tempfile.mkdtemp()

    ssh_identity = os.path.join(ssh_dir, 'identity')
    with os.fdopen(os.open(ssh_identity, os.O_WRONLY | os.O_CREAT, 0o600),
                   'w') as f:
        f.write(base64.b64decode(os.getenv('SSH_IDENTITY')))

    ssh_config = os.path.join(ssh_dir, 'config')
    with open(ssh_config, 'w') as f:
        f.write('CheckHostIP no\n'
                'StrictHostKeyChecking yes\n'
                'IdentityFile %s\n'
                'UserKnownHostsFile %s\n' %
                (ssh_identity, os.path.join(root, 'known_hosts')))

    os.environ['GIT_SSH_COMMAND'] = 'ssh -F %s' % ssh_config
run.py 文件源码 项目:pgbadger-rds-cron 作者: CanopyTax 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def run():
    get_log_states()
    db_name = os.getenv('DB_NAME') or \
              raiser(ValueError('DB_NAME is required'))
    bucket = os.getenv('S3_BUCKET') or \
             raiser(ValueError('S3_BUCKET is required'))
    region = os.getenv('REGION', 'us-west-2')
    key = os.getenv('S3_KEY', 'pgbadger/')
    try:
        files = download_log_files(db_name)
        sync_s3(bucket, key)
        run_pgbadger(files)
        sync_s3(bucket, key, upload=True)
        # upload_to_s3(bucket, key, region)
    except Exception as e:
        traceback.print_exc()
    finally:
        save_log_states()
galaxia.py 文件源码 项目:galaxia 作者: WiproOpenSourcePractice 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def list(self):
        self.parser.add_argument('--unit-type', help='Type of unit valid value\
                                                     is docker', required=True)
        self.parser.add_argument('--search-type', help='search type', required=False)
        self.parser.add_argument('--search-string', help='search string', required=False)
        args = self.parser.parse_args()
        unit_type = vars(args)['unit_type']
        search_type = vars(args)['search_type']
        search_string = vars(args)['search_string']
        data = {'unit_type': unit_type, 'search_type': search_type, 'search_string': search_string}
        galaxia_api_endpoint = os.getenv("galaxia_api_endpoint")
        target_url = client.concatenate_url(galaxia_api_endpoint,
                                            self.catalogue_uri)
        resp = client.http_request('GET', target_url, self.headers, data)
        if unit_type == 'container':
            format_print.format_dict(resp.json(), "keys")
        if unit_type == 'dashboard':
            format_print.format_dict(resp.json(), "keys")
        if unit_type == 'exporter':
            header = ["EXPORTER_NAME", "EXPORTER_ID"]
            format_print.format_dict(resp.json(), header)
        if unit_type == 'node':
            header = ["Instance_Name", "Host_Name"]
            format_print.format_dict(resp.json(), header)
galaxia.py 文件源码 项目:galaxia 作者: WiproOpenSourcePractice 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def create(self):
        self.parser.add_argument('--source-system', help='Source system',
                                 required=True)
        self.parser.add_argument('--target-system', help='Target system',
                                 required=True)
        self.parser.add_argument('--metrics-list', help='List of metrics to\
                                                        export', required=True)
        self.parser.add_argument('--time-interval', help='Time interval\
                                        in which to push metrics to target\
                                                         system', required=True)
        self.parser.add_argument('--unit-type', help='Type of unit valid value\
                                                     is docker', required=True)
        self.parser.add_argument('--exporter-name', help='Unique Name for\
                                                    exporter', required=True)

        args = self.parser.parse_args()
        json_data = client.create_request_data(**vars(args))
        galaxia_api_endpoint = os.getenv("galaxia_api_endpoint")
        target_url = client.concatenate_url(galaxia_api_endpoint,
                                            self.exporter_uri)
        resp = client.http_request('POST', target_url, self.headers,
                                   json_data)
        print resp.text


问题


面经


文章

微信
公众号

扫码关注公众号