def session_id(self):
"""A unique session ID every time the user uses the workflow.
.. versionadded:: 1.25
The session ID persists while the user is using this workflow.
It expires when the user runs a different workflow or closes
Alfred.
"""
if not self._session_id:
sid = os.getenv('_WF_SESSION_ID')
if not sid:
from uuid import uuid4
sid = uuid4().hex
self.setvar('_WF_SESSION_ID', sid)
self._session_id = sid
return self._session_id
python类getenv()的实例源码
def site_config_dirs(appname):
"""Return a list of potential user-shared config dirs for this application.
"appname" is the name of application.
Typical user config directories are:
macOS: /Library/Application Support/<AppName>/
Unix: /etc or $XDG_CONFIG_DIRS[i]/<AppName>/ for each value in
$XDG_CONFIG_DIRS
Win XP: C:\Documents and Settings\All Users\Application ...
...Data\<AppName>\
Vista: (Fail! "C:\ProgramData" is a hidden *system* directory
on Vista.)
Win 7: Hidden, but writeable on Win 7:
C:\ProgramData\<AppName>\
"""
if WINDOWS:
path = os.path.normpath(_get_win_folder("CSIDL_COMMON_APPDATA"))
pathlist = [os.path.join(path, appname)]
elif sys.platform == 'darwin':
pathlist = [os.path.join('/Library/Application Support', appname)]
else:
# try looking in $XDG_CONFIG_DIRS
xdg_config_dirs = os.getenv('XDG_CONFIG_DIRS', '/etc/xdg')
if xdg_config_dirs:
pathlist = [
os.path.join(expanduser(x), appname)
for x in xdg_config_dirs.split(os.pathsep)
]
else:
pathlist = []
# always look in /etc directly as well
pathlist.append('/etc')
return pathlist
# -- Windows support functions --
def _candidate_tempdir_list():
"""Generate a list of candidate temporary directories which
_get_default_tempdir will try."""
dirlist = []
# First, try the environment.
for envname in 'TMPDIR', 'TEMP', 'TMP':
dirname = _os.getenv(envname)
if dirname: dirlist.append(dirname)
# Failing that, try OS-specific locations.
if _os.name == 'nt':
dirlist.extend([ r'c:\temp', r'c:\tmp', r'\temp', r'\tmp' ])
else:
dirlist.extend([ '/tmp', '/var/tmp', '/usr/tmp' ])
# As a last resort, the current directory.
try:
dirlist.append(_os.getcwd())
except (AttributeError, OSError):
dirlist.append(_os.curdir)
return dirlist
def __load_layout(self, config):
var = config.get_value('engine/replace-with-kanji-python', 'layout')
if var is None or var.get_type_string() != 's':
path = os.path.join(os.getenv('IBUS_REPLACE_WITH_KANJI_LOCATION'), 'layouts')
path = os.path.join(path, 'roomazi.json')
if var:
config.unset('engine/replace-with-kanji-python', 'layout')
else:
path = var.get_string()
logger.info("layout: %s", path)
layout = roomazi.layout # Use 'roomazi' as default
try:
with open(path) as f:
layout = json.load(f)
except ValueError as error:
logger.error("JSON error: %s", error)
except OSError as error:
logger.error("Error: %s", error)
except:
logger.error("Unexpected error: %s %s", sys.exc_info()[0], sys.exc_info()[1])
self.__to_kana = self.__handle_roomazi_layout
if 'Type' in layout:
if layout['Type'] == 'Kana':
self.__to_kana = self.__handle_kana_layout
return layout
def __init__(self):
if (os.getenv("LD_LIBRARY_PATH")):
self.ld_lib_path = os.getenv("LD_LIBRARY_PATH")
else:
self.ld_lib_path = ''
if (os.getenv("PYTHONPATH")):
self.pythonpath = os.getenv("PYTHONPATH")
else:
self.pythonpath = ''
if (os.getenv("CLASSPATH")):
self.classpath = os.getenv("CLASSPATH")
else:
self.classpath = ''
if (os.getenv("OCTAVE_PATH")):
self.octave_path = os.getenv("OCTAVE_PATH")
else:
self.octave_path = ''
def __enter__(self):
if (os.getenv("LD_LIBRARY_PATH")):
self.ld_lib_path = os.getenv("LD_LIBRARY_PATH")
else:
self.ld_lib_path = ''
if (os.getenv("PYTHONPATH")):
self.pythonpath = os.getenv("PYTHONPATH")
else:
self.pythonpath = ''
if (os.getenv("CLASSPATH")):
self.classpath = os.getenv("CLASSPATH")
else:
self.classpath = ''
if (os.getenv("OCTAVE_PATH")):
self.octave_path = os.getenv("OCTAVE_PATH")
else:
self.octave_path = ''
def _prependToEnvVar(self, newVal, envVar):
path = self._getEnvVarAsList(envVar)
foundValue = False
for entry in path:
# Search to determine if the new value is already in the path
try:
if os.path.samefile(entry, newVal):
# The value is already in the path
foundValue = True
break
except OSError:
# If we can't find concrete files to compare, fall back to string compare
if entry == newVal:
# The value is already in the path
foundValue = True
break
if not foundValue:
# The value does not already exist
if os.environ.has_key(envVar):
newpath = newVal+os.path.pathsep + os.getenv(envVar)+os.path.pathsep
else:
newpath = newVal+os.path.pathsep
os.putenv(envVar, newpath)
os.environ[envVar] = newpath
def create_kubeconfig_var_message(path):
msg = """Set your KUBECONFIG environment variable to use kubectl"""
shell = os.getenv("SHELL", "").lower()
if "/bash" in shell or "/zsh" in shell:
msg += """
export KUBECONFIG={0}
"""
elif "/fish" in shell:
msg += """
set -g -x KUBECONFIG {0}
"""
else:
msg += ". Unable to detect shell therefore assuming a Bash-compatible shell"
msg += """
export KUBECONFIG={0}
"""
return msg.format(path).lstrip()
def resolve_nested_variables(values):
def _replacement(name):
"""
get appropiate value for a variable name.
first search in environ, if not found,
then look into the dotenv variables
"""
ret = os.getenv(name, values.get(name, ""))
return ret
def _re_sub_callback(match_object):
"""
From a match object gets the variable name and returns
the correct replacement
"""
return _replacement(match_object.group()[2:-1])
for k, v in values.items():
values[k] = __posix_variable.sub(_re_sub_callback, v)
return values
def goglib_get_games_list():
proc = subprocess.Popen(['lgogdownloader', '--exclude', \
'1,2,4,8,16,32', '--list-details'],stdout=subprocess.PIPE)
games_detailed_list = proc.stdout.readlines()
stdoutdata, stderrdata = proc.communicate()
if proc.returncode == 0:
file_path = os.getenv('HOME') + '/.games_nebula/config/games_list'
games_list_file = open(file_path, 'w')
for line in games_detailed_list:
if 'Getting game info' not in line:
games_list_file.write(line)
return 0
else:
return 1
def __init__(self):
"""Setup the Runner for all Foremast modules."""
debug_flag()
self.email = os.getenv("EMAIL")
self.env = os.getenv("ENV")
self.group = os.getenv("PROJECT")
self.region = os.getenv("REGION")
self.repo = os.getenv("GIT_REPO")
self.runway_dir = os.getenv("RUNWAY_DIR")
self.artifact_path = os.getenv("ARTIFACT_PATH")
self.artifact_version = os.getenv("ARTIFACT_VERSION")
self.promote_stage = os.getenv("PROMOTE_STAGE", "latest")
self.git_project = "{}/{}".format(self.group, self.repo)
parsed = gogoutils.Parser(self.git_project)
generated = gogoutils.Generator(*parsed.parse_url(), formats=consts.APP_FORMATS)
self.app = generated.app_name()
self.trigger_job = generated.jenkins()['name']
self.git_short = generated.gitlab()['main']
self.raw_path = "./raw.properties"
self.json_path = self.raw_path + ".json"
self.configs = None
def __init__(self, mode):
self.mode = mode
self.mechanicRootDir = getenv("MECHANIC_ROOT_DIR", "")
if mode != "USER":
self.configFile = "${MECHANIC_ROOT_DIR}/etc/mechanic.conf"
self.logFile = ""
self.migrationDirs = ["${MECHANIC_ROOT_DIR}/etc/mechanic/migration.d", "${MECHANIC_ROOT_DIR}/var/lib/mechanic/migration.d"]
self.preMigrationDirs = ["${MECHANIC_ROOT_DIR}/etc/mechanic/pre-migration.d", "${MECHANIC_ROOT_DIR}/var/lib/mechanic/pre-migration.d"]
self.postMigrationDirs = ["${MECHANIC_ROOT_DIR}/etc/mechanic/post-migration.d", "${MECHANIC_ROOT_DIR}/var/lib/mechanic/post-migration.d"]
self.stateDir = "${MECHANIC_ROOT_DIR}/var/lib/mechanic/state"
self.runDir = "${MECHANIC_ROOT_DIR}/var/lib/mechanic/tmp"
else:
self.configFile = "${HOME}/.mechanic/mechanic.conf"
self.logFile = "stderr"
self.migrationDirs = ["${HOME}/.mechanic/migration.d"]
self.preMigrationDirs = ["${HOME}/.mechanic/pre-migration.d"]
self.postMigrationDirs = ["${HOME}/.mechanic/post-migration.d"]
self.stateDir = "${HOME}/.mechanic/state"
self.runDir = "${HOME}/.mechanic/tmp"
def _get_well_known_file():
"""Get the well known file produced by command 'gcloud auth login'."""
# TODO(orestica): Revisit this method once gcloud provides a better way
# of pinpointing the exact location of the file.
default_config_dir = os.getenv(_CLOUDSDK_CONFIG_ENV_VAR)
if default_config_dir is None:
if os.name == 'nt':
try:
default_config_dir = os.path.join(os.environ['APPDATA'],
_CLOUDSDK_CONFIG_DIRECTORY)
except KeyError:
# This should never happen unless someone is really
# messing with things.
drive = os.environ.get('SystemDrive', 'C:')
default_config_dir = os.path.join(drive, '\\',
_CLOUDSDK_CONFIG_DIRECTORY)
else:
default_config_dir = os.path.join(os.path.expanduser('~'),
'.config',
_CLOUDSDK_CONFIG_DIRECTORY)
return os.path.join(default_config_dir, _WELL_KNOWN_CREDENTIALS_FILE)
def _SendRecv():
"""Communicate with the Developer Shell server socket."""
port = int(os.getenv(DEVSHELL_ENV, 0))
if port == 0:
raise NoDevshellServer()
sock = socket.socket()
sock.connect(('localhost', port))
data = CREDENTIAL_INFO_REQUEST_JSON
msg = '%s\n%s' % (len(data), data)
sock.sendall(_to_bytes(msg, encoding='utf-8'))
header = sock.recv(6).decode()
if '\n' not in header:
raise CommunicationError('saw no newline in the first 6 bytes')
len_str, json_str = header.split('\n', 1)
to_read = int(len_str) - len(json_str)
if to_read > 0:
json_str += sock.recv(to_read, socket.MSG_WAITALL).decode()
return CredentialInfoResponse(json_str)
def find_EXECUTABLES(Makefile, flags):
'''
See the doc-string for find_prefix as well.
Set Makefile['EXECUTABLES'] if needed to.
Depends (directly) on $(gamesdir) and $(bindir).
Depends (indirectly) on $(prefix).
'''
if 'EXECUTABLES' not in Makefile:
acceptable = os.getenv('PATH').split(':')
for exec_dir in ('gamesdir', 'bindir'):
if expand(exec_dir, Makefile) in acceptable:
Makefile['EXECUTABLES'] = '$('+exec_dir+')'
return False
else:
return True
else:
return False
def bool_env(var_name, default=False):
"""
Get an environment variable coerced to a boolean value.
Example:
Bash:
$ export SOME_VAL=True
settings.py:
SOME_VAL = bool_env('SOME_VAL', False)
Arguments:
var_name: The name of the environment variable.
default: The default to use if `var_name` is not specified in the
environment.
Returns: `var_name` or `default` coerced to a boolean using the following
rules:
"False", "false" or "" => False
Any other non-empty string => True
"""
test_val = getenv(var_name, default)
# Explicitly check for 'False', 'false', and '0' since all non-empty
# string are normally coerced to True.
if test_val in ('False', 'false', '0'):
return False
return bool(test_val)
test_multi_inserts.py 文件源码
项目:deb-python-cassandra-driver
作者: openstack
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def test_in_flight_is_one(self):
"""
Verify that in_flight value stays equal to one while doing multiple inserts.
The number of inserts can be set through INSERTS_ITERATIONS environmental variable.
Default value is 1000000.
"""
prepared = self.session.prepare("INSERT INTO race (x) VALUES (?)")
iterations = int(os.getenv("INSERT_ITERATIONS", 1000000))
i = 0
leaking_connections = False
while i < iterations and not leaking_connections:
bound = prepared.bind((i,))
self.session.execute(bound)
for pool in self.session._pools.values():
if leaking_connections:
break
for conn in pool.get_connections():
if conn.in_flight > 1:
print(self.session.get_pool_state())
leaking_connections = True
break
i = i + 1
self.assertFalse(leaking_connections, 'Detected leaking connection after %s iterations' % i)
def teardown_module(module):
global c
global OPENNTI_C
global OPENNTI_IN_JTI_C
global OPENNTI_IN_LOG_C
# Delete all files in /tests/output/
if not os.getenv('TRAVIS'):
c.stop(container=OPENNTI_C)
c.remove_container(container=OPENNTI_C)
c.stop(container=OPENNTI_IN_JTI_C)
c.remove_container(container=OPENNTI_IN_JTI_C)
c.stop(container=OPENNTI_IN_LOG_C)
c.remove_container(container=OPENNTI_IN_LOG_C)
c.stop(container=TCP_REPLAY_C)
c.remove_container(container=TCP_REPLAY_C)
def setup_logging(self, default_path=PATH_LOGGING, default_level=logging.INFO, env_key='LOG_CFG'):
path = default_path
self.logconf = None
value = os.getenv(env_key, None)
if value:
path = value
if os.path.exists(path):
with open(path, 'rt') as f:
config = json.load(f)
self.logconf = logging.config.dictConfig(config)
elif os.path.exists(path.replace("../", "")):
with open(path.replace("../", ""), 'rt') as f:
config = json.load(f)
self._changePath(config["handlers"])
self.logconf = logging.config.dictConfig(config)
else:
print("Configurazione log non trovata (\"%s\"): applico le impostazioni predefinite" % path)
self.logconf = logging.basicConfig(level=default_level)
def add_nvidia_docker_to_config(container_config):
if not container_config.get('HostConfig', None):
container_config['HostConfig'] = {}
nvidia_config = get_nvidia_configuration()
# Setup the Volumes
container_config['HostConfig'].setdefault('VolumeDriver', nvidia_config['VolumeDriver'])
container_config['HostConfig'].setdefault('Binds', [])
container_config['HostConfig']['Binds'].extend(nvidia_config['Volumes'])
# Get nvidia control devices
devices = container_config['HostConfig'].get('Devices', [])
# suport both '0 1' and '0, 1' formats, just like nvidia-docker
gpu_isolation = os.getenv('NV_GPU', '').replace(',', ' ').split()
pattern = re.compile(r'/nvidia([0-9]+)$')
for device in nvidia_config['Devices']:
if gpu_isolation:
card_number = pattern.search(device)
if card_number and card_number.group(1) not in gpu_isolation:
continue
devices.extend(parse_devices([device]))
container_config['HostConfig']['Devices'] = devices
def keep_reading(self):
"""Output thread method for the process
Sends the process output to the ViewController (through OutputTranscoder)
"""
while True:
if self.stop:
break
ret = self.process.poll()
if ret is not None:
self.stop = True
readable, writable, executable = select.select([self.master], [], [], 5)
if readable:
""" We read the new content """
data = os.read(self.master, 1024)
text = data.decode('UTF-8', errors='replace')
log_debug("RAW", repr(text))
log_debug("PID", os.getenv('BASHPID'))
self.output_transcoder.decode(text)
# log_debug("{} >> {}".format(int(time.time()), repr(text)))
def copy_nrpe_checks():
"""
Copy the nrpe checks into place
"""
NAGIOS_PLUGINS = '/usr/local/lib/nagios/plugins'
nrpe_files_dir = os.path.join(os.getenv('CHARM_DIR'), 'hooks',
'charmhelpers', 'contrib', 'openstack',
'files')
if not os.path.exists(NAGIOS_PLUGINS):
os.makedirs(NAGIOS_PLUGINS)
for fname in glob.glob(os.path.join(nrpe_files_dir, "check_*")):
if os.path.isfile(fname):
shutil.copy2(fname,
os.path.join(NAGIOS_PLUGINS, os.path.basename(fname)))
def copy_nrpe_checks():
"""
Copy the nrpe checks into place
"""
NAGIOS_PLUGINS = '/usr/local/lib/nagios/plugins'
nrpe_files_dir = os.path.join(os.getenv('CHARM_DIR'), 'hooks',
'charmhelpers', 'contrib', 'openstack',
'files')
if not os.path.exists(NAGIOS_PLUGINS):
os.makedirs(NAGIOS_PLUGINS)
for fname in glob.glob(os.path.join(nrpe_files_dir, "check_*")):
if os.path.isfile(fname):
shutil.copy2(fname,
os.path.join(NAGIOS_PLUGINS, os.path.basename(fname)))
def copy_nrpe_checks():
"""
Copy the nrpe checks into place
"""
NAGIOS_PLUGINS = '/usr/local/lib/nagios/plugins'
nrpe_files_dir = os.path.join(os.getenv('CHARM_DIR'), 'hooks',
'charmhelpers', 'contrib', 'openstack',
'files')
if not os.path.exists(NAGIOS_PLUGINS):
os.makedirs(NAGIOS_PLUGINS)
for fname in glob.glob(os.path.join(nrpe_files_dir, "check_*")):
if os.path.isfile(fname):
shutil.copy2(fname,
os.path.join(NAGIOS_PLUGINS, os.path.basename(fname)))
def indices(list_to_split, number_of_parallel_jobs, task_id=None):
"""This function returns the first and last index for the files for the current job ID.
If no job id is set (e.g., because a sub-job is executed locally), it simply returns all indices."""
if number_of_parallel_jobs is None or number_of_parallel_jobs == 1:
return None
# test if the 'SEG_TASK_ID' environment is set
sge_task_id = os.getenv('SGE_TASK_ID') if task_id is None else task_id
if sge_task_id is None:
# task id is not set, so this function is not called from a grid job
# hence, we process the whole list
return (0,len(list_to_split))
else:
job_id = int(sge_task_id) - 1
# compute number of files to be executed
number_of_objects_per_job = int(math.ceil(float(len(list_to_split) / float(number_of_parallel_jobs))))
start = job_id * number_of_objects_per_job
end = min((job_id + 1) * number_of_objects_per_job, len(list_to_split))
return (start, end)
def copy_nrpe_checks():
"""
Copy the nrpe checks into place
"""
NAGIOS_PLUGINS = '/usr/local/lib/nagios/plugins'
nrpe_files_dir = os.path.join(os.getenv('CHARM_DIR'), 'hooks',
'charmhelpers', 'contrib', 'openstack',
'files')
if not os.path.exists(NAGIOS_PLUGINS):
os.makedirs(NAGIOS_PLUGINS)
for fname in glob.glob(os.path.join(nrpe_files_dir, "check_*")):
if os.path.isfile(fname):
shutil.copy2(fname,
os.path.join(NAGIOS_PLUGINS, os.path.basename(fname)))
def setup_environment():
root = os.getenv('LAMBDA_TASK_ROOT')
bin_dir = os.path.join(root, 'bin')
os.environ['PATH'] += ':' + bin_dir
os.environ['GIT_EXEC_PATH'] = bin_dir
ssh_dir = tempfile.mkdtemp()
ssh_identity = os.path.join(ssh_dir, 'identity')
with os.fdopen(os.open(ssh_identity, os.O_WRONLY | os.O_CREAT, 0o600),
'w') as f:
f.write(base64.b64decode(os.getenv('SSH_IDENTITY')))
ssh_config = os.path.join(ssh_dir, 'config')
with open(ssh_config, 'w') as f:
f.write('CheckHostIP no\n'
'StrictHostKeyChecking yes\n'
'IdentityFile %s\n'
'UserKnownHostsFile %s\n' %
(ssh_identity, os.path.join(root, 'known_hosts')))
os.environ['GIT_SSH_COMMAND'] = 'ssh -F %s' % ssh_config
def run():
get_log_states()
db_name = os.getenv('DB_NAME') or \
raiser(ValueError('DB_NAME is required'))
bucket = os.getenv('S3_BUCKET') or \
raiser(ValueError('S3_BUCKET is required'))
region = os.getenv('REGION', 'us-west-2')
key = os.getenv('S3_KEY', 'pgbadger/')
try:
files = download_log_files(db_name)
sync_s3(bucket, key)
run_pgbadger(files)
sync_s3(bucket, key, upload=True)
# upload_to_s3(bucket, key, region)
except Exception as e:
traceback.print_exc()
finally:
save_log_states()
def list(self):
self.parser.add_argument('--unit-type', help='Type of unit valid value\
is docker', required=True)
self.parser.add_argument('--search-type', help='search type', required=False)
self.parser.add_argument('--search-string', help='search string', required=False)
args = self.parser.parse_args()
unit_type = vars(args)['unit_type']
search_type = vars(args)['search_type']
search_string = vars(args)['search_string']
data = {'unit_type': unit_type, 'search_type': search_type, 'search_string': search_string}
galaxia_api_endpoint = os.getenv("galaxia_api_endpoint")
target_url = client.concatenate_url(galaxia_api_endpoint,
self.catalogue_uri)
resp = client.http_request('GET', target_url, self.headers, data)
if unit_type == 'container':
format_print.format_dict(resp.json(), "keys")
if unit_type == 'dashboard':
format_print.format_dict(resp.json(), "keys")
if unit_type == 'exporter':
header = ["EXPORTER_NAME", "EXPORTER_ID"]
format_print.format_dict(resp.json(), header)
if unit_type == 'node':
header = ["Instance_Name", "Host_Name"]
format_print.format_dict(resp.json(), header)
def create(self):
self.parser.add_argument('--source-system', help='Source system',
required=True)
self.parser.add_argument('--target-system', help='Target system',
required=True)
self.parser.add_argument('--metrics-list', help='List of metrics to\
export', required=True)
self.parser.add_argument('--time-interval', help='Time interval\
in which to push metrics to target\
system', required=True)
self.parser.add_argument('--unit-type', help='Type of unit valid value\
is docker', required=True)
self.parser.add_argument('--exporter-name', help='Unique Name for\
exporter', required=True)
args = self.parser.parse_args()
json_data = client.create_request_data(**vars(args))
galaxia_api_endpoint = os.getenv("galaxia_api_endpoint")
target_url = client.concatenate_url(galaxia_api_endpoint,
self.exporter_uri)
resp = client.http_request('POST', target_url, self.headers,
json_data)
print resp.text