def test_zipfile_timestamp():
# An environment variable can be used to influence the timestamp on
# TarInfo objects inside the zip. See issue #143. TemporaryDirectory is
# not a context manager under Python 3.
with temporary_directory() as tempdir:
for filename in ('one', 'two', 'three'):
path = os.path.join(tempdir, filename)
with codecs.open(path, 'w', encoding='utf-8') as fp:
fp.write(filename + '\n')
zip_base_name = os.path.join(tempdir, 'dummy')
# The earliest date representable in TarInfos, 1980-01-01
with environ('SOURCE_DATE_EPOCH', '315576060'):
zip_filename = wheel.archive.make_wheelfile_inner(
zip_base_name, tempdir)
with readable_zipfile(zip_filename) as zf:
for info in zf.infolist():
assert info.date_time[:3] == (1980, 1, 1)
python类environ()的实例源码
def _warn_unsafe_extraction_path(path):
"""
If the default extraction path is overridden and set to an insecure
location, such as /tmp, it opens up an opportunity for an attacker to
replace an extracted file with an unauthorized payload. Warn the user
if a known insecure location is used.
See Distribute #375 for more details.
"""
if os.name == 'nt' and not path.startswith(os.environ['windir']):
# On Windows, permissions are generally restrictive by default
# and temp directories are not writable by other users, so
# bypass the warning.
return
mode = os.stat(path).st_mode
if mode & stat.S_IWOTH or mode & stat.S_IWGRP:
msg = ("%s is writable by group/others and vulnerable to attack "
"when "
"used with get_resource_filename. Consider a more secure "
"location (set with .set_extraction_path or the "
"PYTHON_EGG_CACHE environment variable)." % path)
warnings.warn(msg, UserWarning)
def _pythonpath():
items = os.environ.get('PYTHONPATH', '').split(os.pathsep)
return filter(None, items)
def initialize (self, sadFile):
self.app_cnt = 0
if self.__timeout is not None:
self.domMgr.configure([CF.DataType('COMPONENT_BINDING_TIMEOUT', to_any(self.__timeout))])
try:
self.domMgr.installApplication(sadFile)
except CF.DomainManager.ApplicationAlreadyInstalled:
pass
domRoot = os.path.join(os.environ["SDRROOT"], "dom")
sad = ossie.parsers.sad.parse(domRoot + sadFile)
app_id = sad.get_id()
for appFact in self.domMgr._get_applicationFactories():
if appFact._get_identifier() == app_id:
self.appFact = appFact
return
raise KeyError, "Couldn't find app factory"
test_run_no_updates_available.py 文件源码
项目:pyupdater-wx-demo
作者: wettenhj
项目源码
文件源码
阅读 36
收藏 0
点赞 0
评论 0
def setUp(self):
tempFile = tempfile.NamedTemporaryFile()
self.fileServerDir = tempFile.name
tempFile.close()
os.mkdir(self.fileServerDir)
os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir
privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'),
encoding='base64')
signature = privateKey.sign(six.b(json.dumps(VERSIONS, sort_keys=True)),
encoding='base64').decode()
VERSIONS['signature'] = signature
keysFilePath = os.path.join(self.fileServerDir, 'keys.gz')
with gzip.open(keysFilePath, 'wb') as keysFile:
keysFile.write(json.dumps(KEYS, sort_keys=True))
versionsFilePath = os.path.join(self.fileServerDir, 'versions.gz')
with gzip.open(versionsFilePath, 'wb') as versionsFile:
versionsFile.write(json.dumps(VERSIONS, sort_keys=True))
os.environ['WXUPDATEDEMO_TESTING'] = 'True'
from wxupdatedemo.config import CLIENT_CONFIG
self.clientConfig = CLIENT_CONFIG
self.clientConfig.PUBLIC_KEY = PUBLIC_KEY
def setUp(self):
tempFile = tempfile.NamedTemporaryFile()
self.fileServerDir = tempFile.name
tempFile.close()
os.mkdir(self.fileServerDir)
os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir
privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'),
encoding='base64')
signature = privateKey.sign(six.b(json.dumps(VERSIONS, sort_keys=True)),
encoding='base64').decode()
VERSIONS['signature'] = signature
keysFilePath = os.path.join(self.fileServerDir, 'keys.gz')
with gzip.open(keysFilePath, 'wb') as keysFile:
keysFile.write(json.dumps(KEYS, sort_keys=True))
versionsFilePath = os.path.join(self.fileServerDir, 'versions.gz')
with gzip.open(versionsFilePath, 'wb') as versionsFile:
versionsFile.write(json.dumps(VERSIONS, sort_keys=True))
os.environ['WXUPDATEDEMO_TESTING'] = 'True'
from wxupdatedemo.config import CLIENT_CONFIG
self.clientConfig = CLIENT_CONFIG
self.clientConfig.PUBLIC_KEY = PUBLIC_KEY
self.clientConfig.APP_NAME = APP_NAME
def _warn_unsafe_extraction_path(path):
"""
If the default extraction path is overridden and set to an insecure
location, such as /tmp, it opens up an opportunity for an attacker to
replace an extracted file with an unauthorized payload. Warn the user
if a known insecure location is used.
See Distribute #375 for more details.
"""
if os.name == 'nt' and not path.startswith(os.environ['windir']):
# On Windows, permissions are generally restrictive by default
# and temp directories are not writable by other users, so
# bypass the warning.
return
mode = os.stat(path).st_mode
if mode & stat.S_IWOTH or mode & stat.S_IWGRP:
msg = ("%s is writable by group/others and vulnerable to attack "
"when "
"used with get_resource_filename. Consider a more secure "
"location (set with .set_extraction_path or the "
"PYTHON_EGG_CACHE environment variable)." % path)
warnings.warn(msg, UserWarning)
def paths_on_pythonpath(paths):
"""
Add the indicated paths to the head of the PYTHONPATH environment
variable so that subprocesses will also see the packages at
these paths.
Do this in a context that restores the value on exit.
"""
nothing = object()
orig_pythonpath = os.environ.get('PYTHONPATH', nothing)
current_pythonpath = os.environ.get('PYTHONPATH', '')
try:
prefix = os.pathsep.join(paths)
to_join = filter(None, [prefix, current_pythonpath])
new_path = os.pathsep.join(to_join)
if new_path:
os.environ['PYTHONPATH'] = new_path
yield
finally:
if orig_pythonpath is nothing:
os.environ.pop('PYTHONPATH', None)
else:
os.environ['PYTHONPATH'] = orig_pythonpath
def to_config(config_cls, environ=os.environ):
if config_cls._prefix:
app_prefix = (config_cls._prefix,)
else:
app_prefix = ()
def default_get(environ, metadata, prefix, name):
ce = metadata[CNF_KEY]
if ce.name is not None:
var = ce.name
else:
var = ("_".join(app_prefix + prefix + (name,))).upper()
log.debug("looking for env var '%s'." % (var,))
val = environ.get(var, ce.default)
if val is RAISE:
raise MissingEnvValueError(var)
return val
return _to_config(config_cls, default_get, environ, ())
def _to_config(config_cls, default_get, environ, prefix):
vals = {}
for a in attr.fields(config_cls):
try:
ce = a.metadata[CNF_KEY]
except KeyError:
continue
if ce.sub_cls is None:
get = ce.callback or default_get
val = get(environ, a.metadata, prefix, a.name)
else:
val = _to_config(
ce.sub_cls, default_get, environ,
prefix + ((a.name if prefix else a.name),)
)
vals[a.name] = val
return config_cls(**vals)
def get_by_cluster_id(self, cluster_id):
instance = db().query(self.model).\
filter(self.model.env_id == cluster_id).first()
if instance is not None:
try:
instance.repo = Repo(os.path.join(const.REPOS_DIR,
instance.repo_name))
except exc.NoSuchPathError:
logger.debug("Repo folder does not exist. Cloning repo")
self._create_key_file(instance.repo_name, instance.user_key)
if instance.user_key:
os.environ['GIT_SSH'] = \
self._get_ssh_cmd(instance.repo_name)
repo_path = os.path.join(const.REPOS_DIR, instance.repo_name)
repo = Repo.clone_from(instance.git_url, repo_path)
instance.repo = repo
return instance
def create(self, data):
if not os.path.exists(const.REPOS_DIR):
os.mkdir(const.REPOS_DIR)
repo_path = os.path.join(const.REPOS_DIR, data['repo_name'])
if os.path.exists(repo_path):
logger.debug('Repo directory exists. Removing...')
shutil.rmtree(repo_path)
user_key = data.get('user_key', '')
if user_key:
self._create_key_file(data['repo_name'], user_key)
os.environ['GIT_SSH'] = self._get_ssh_cmd(data['repo_name'])
repo = Repo.clone_from(data['git_url'], repo_path)
instance = super(GitRepo, self).create(data)
instance.repo = repo
return instance
def post(self):
if (request.form['username']):
data = {"user": request.form['username'], "key": request.form['password']}
result = dockletRequest.unauthorizedpost('/login/', data)
ok = result and result.get('success', None)
if (ok and (ok == "true")):
# set cookie:docklet-jupyter-cookie for jupyter notebook
resp = make_response(redirect(request.args.get('next',None) or '/dashboard/'))
app_key = os.environ['APP_KEY']
resp.set_cookie('docklet-jupyter-cookie', cookie_tool.generate_cookie(request.form['username'], app_key))
# set session for docklet
session['username'] = request.form['username']
session['nickname'] = result['data']['nickname']
session['description'] = result['data']['description']
session['avatar'] = '/static/avatar/'+ result['data']['avatar']
session['usergroup'] = result['data']['group']
session['status'] = result['data']['status']
session['token'] = result['data']['token']
return resp
else:
return redirect('/login/')
else:
return redirect('/login/')
def get(self):
form = external_generate.external_auth_generate_request()
result = dockletRequest.unauthorizedpost('/external_login/', form)
ok = result and result.get('success', None)
if (ok and (ok == "true")):
# set cookie:docklet-jupyter-cookie for jupyter notebook
resp = make_response(redirect(request.args.get('next',None) or '/dashboard/'))
app_key = os.environ['APP_KEY']
resp.set_cookie('docklet-jupyter-cookie', cookie_tool.generate_cookie(result['data']['username'], app_key))
# set session for docklet
session['username'] = result['data']['username']
session['nickname'] = result['data']['nickname']
session['description'] = result['data']['description']
session['avatar'] = '/static/avatar/'+ result['data']['avatar']
session['usergroup'] = result['data']['group']
session['status'] = result['data']['status']
session['token'] = result['data']['token']
return resp
else:
return redirect('/login/')
def post(self):
form = external_generate.external_auth_generate_request()
result = dockletRequest.unauthorizedpost('/external_login/', form)
ok = result and result.get('success', None)
if (ok and (ok == "true")):
# set cookie:docklet-jupyter-cookie for jupyter notebook
resp = make_response(redirect(request.args.get('next',None) or '/dashboard/'))
app_key = os.environ['APP_KEY']
resp.set_cookie('docklet-jupyter-cookie', cookie_tool.generate_cookie(result['data']['username'], app_key))
# set session for docklet
session['username'] = result['data']['username']
session['nickname'] = result['data']['nickname']
session['description'] = result['data']['description']
session['avatar'] = '/static/avatar/'+ result['data']['avatar']
session['usergroup'] = result['data']['group']
session['status'] = result['data']['status']
session['token'] = result['data']['token']
return resp
else:
return redirect('/login/')
pytest_selenium_pdiff.py 文件源码
项目:pytest-selenium-pdiff
作者: rentlytics
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def pytest_configure(config):
settings['SCREENSHOTS_PATH'] = config.getoption('screenshots_path')
settings['PDIFF_PATH'] = config.getoption('pdiff_path')
settings['ALLOW_SCREENSHOT_CAPTURE'] = config.getoption('allow_screenshot_capture')
if 'ALLOW_SCREENSHOT_CAPTURE' in os.environ:
settings['ALLOW_SCREENSHOT_CAPTURE'] = True
try:
from sh import compare
settings['USE_IMAGEMAGICK'] = True
except ImportError:
pass
try:
from sh import perceptualdiff
settings['USE_PERCEPTUALDIFF'] = True
except ImportError:
pass
def _warn_unsafe_extraction_path(path):
"""
If the default extraction path is overridden and set to an insecure
location, such as /tmp, it opens up an opportunity for an attacker to
replace an extracted file with an unauthorized payload. Warn the user
if a known insecure location is used.
See Distribute #375 for more details.
"""
if os.name == 'nt' and not path.startswith(os.environ['windir']):
# On Windows, permissions are generally restrictive by default
# and temp directories are not writable by other users, so
# bypass the warning.
return
mode = os.stat(path).st_mode
if mode & stat.S_IWOTH or mode & stat.S_IWGRP:
msg = ("%s is writable by group/others and vulnerable to attack "
"when "
"used with get_resource_filename. Consider a more secure "
"location (set with .set_extraction_path or the "
"PYTHON_EGG_CACHE environment variable)." % path)
warnings.warn(msg, UserWarning)
def _warn_unsafe_extraction_path(path):
"""
If the default extraction path is overridden and set to an insecure
location, such as /tmp, it opens up an opportunity for an attacker to
replace an extracted file with an unauthorized payload. Warn the user
if a known insecure location is used.
See Distribute #375 for more details.
"""
if os.name == 'nt' and not path.startswith(os.environ['windir']):
# On Windows, permissions are generally restrictive by default
# and temp directories are not writable by other users, so
# bypass the warning.
return
mode = os.stat(path).st_mode
if mode & stat.S_IWOTH or mode & stat.S_IWGRP:
msg = (
"%s is writable by group/others and vulnerable to attack "
"when "
"used with get_resource_filename. Consider a more secure "
"location (set with .set_extraction_path or the "
"PYTHON_EGG_CACHE environment variable)." % path
)
warnings.warn(msg, UserWarning)
def set_environ(env_name, value):
"""Set the environment variable 'env_name' to 'value'
Save previous value, yield, and then restore the previous value stored in
the environment variable 'env_name'.
If 'value' is None, do nothing"""
value_changed = value is not None
if value_changed:
old_value = os.environ.get(env_name)
os.environ[env_name] = value
try:
yield
finally:
if value_changed:
if old_value is None:
del os.environ[env_name]
else:
os.environ[env_name] = old_value
def paths_on_pythonpath(paths):
"""
Add the indicated paths to the head of the PYTHONPATH environment
variable so that subprocesses will also see the packages at
these paths.
Do this in a context that restores the value on exit.
"""
nothing = object()
orig_pythonpath = os.environ.get('PYTHONPATH', nothing)
current_pythonpath = os.environ.get('PYTHONPATH', '')
try:
prefix = os.pathsep.join(paths)
to_join = filter(None, [prefix, current_pythonpath])
new_path = os.pathsep.join(to_join)
if new_path:
os.environ['PYTHONPATH'] = new_path
yield
finally:
if orig_pythonpath is nothing:
os.environ.pop('PYTHONPATH', None)
else:
os.environ['PYTHONPATH'] = orig_pythonpath
def _test_Valgrind(self, valgrind):
# Clear the device cache to prevent false positives
deviceCacheDir = os.path.join(scatest.getSdrCache(), ".ExecutableDevice_node", "ExecutableDevice1")
shutil.rmtree(deviceCacheDir, ignore_errors=True)
os.environ['VALGRIND'] = valgrind
try:
# Checking that the node and device launch as expected
nb, devMgr = self.launchDeviceManager("/nodes/test_ExecutableDevice_node/DeviceManager.dcd.xml")
finally:
del os.environ['VALGRIND']
self.assertFalse(devMgr is None)
self.assertEquals(len(devMgr._get_registeredDevices()), 1, msg='device failed to launch with valgrind')
children = getChildren(nb.pid)
self.assertEqual(len(children), 1)
devMgr.shutdown()
# Check that a valgrind logfile exists
logfile = os.path.join(deviceCacheDir, 'valgrind.%s.log' % children[0])
self.assertTrue(os.path.exists(logfile))
def test_setSDRROOT(self):
# None type
self.assertRaises(TypeError, sb.setSDRROOT, None)
# Bad dir should not change root
sdrroot = sb.getSDRROOT()
self.assertRaises(AssertionError, sb.setSDRROOT, 'TEMP_PATH')
self.assertEquals(sdrroot, sb.getSDRROOT())
# Good dir with no dev/dom should not change root
self.assertRaises(AssertionError, sb.setSDRROOT, 'jackhammer')
self.assertEquals(sdrroot, sb.getSDRROOT())
# New root
sb.setSDRROOT('sdr')
self.assertEquals(sb.getSDRROOT(), 'sdr')
# Restore sdrroot
sb.setSDRROOT(os.environ['SDRROOT'])
def _prependToEnvVar(self, newVal, envVar):
path = self._getEnvVarAsList(envVar)
foundValue = False
for entry in path:
# Search to determine if the new value is already in the path
try:
if os.path.samefile(entry, newVal):
# The value is already in the path
foundValue = True
break
except OSError:
# If we can't find concrete files to compare, fall back to string compare
if entry == newVal:
# The value is already in the path
foundValue = True
break
if not foundValue:
# The value does not already exist
if os.environ.has_key(envVar):
newpath = newVal+os.path.pathsep + os.getenv(envVar)+os.path.pathsep
else:
newpath = newVal+os.path.pathsep
os.putenv(envVar, newpath)
os.environ[envVar] = newpath
def principal_unit():
"""Returns the principal unit of this unit, otherwise None"""
# Juju 2.2 and above provides JUJU_PRINCIPAL_UNIT
principal_unit = os.environ.get('JUJU_PRINCIPAL_UNIT', None)
# If it's empty, then this unit is the principal
if principal_unit == '':
return os.environ['JUJU_UNIT_NAME']
elif principal_unit is not None:
return principal_unit
# For Juju 2.1 and below, let's try work out the principle unit by
# the various charms' metadata.yaml.
for reltype in relation_types():
for rid in relation_ids(reltype):
for unit in related_units(rid):
md = _metadata_unit(unit)
if not md:
continue
subordinate = md.pop('subordinate', None)
if not subordinate:
return unit
return None
def principal_unit():
"""Returns the principal unit of this unit, otherwise None"""
# Juju 2.2 and above provides JUJU_PRINCIPAL_UNIT
principal_unit = os.environ.get('JUJU_PRINCIPAL_UNIT', None)
# If it's empty, then this unit is the principal
if principal_unit == '':
return os.environ['JUJU_UNIT_NAME']
elif principal_unit is not None:
return principal_unit
# For Juju 2.1 and below, let's try work out the principle unit by
# the various charms' metadata.yaml.
for reltype in relation_types():
for rid in relation_ids(reltype):
for unit in related_units(rid):
md = _metadata_unit(unit)
if not md:
continue
subordinate = md.pop('subordinate', None)
if not subordinate:
return unit
return None
def _get_user_provided_overrides(modules):
"""Load user-provided config overrides.
:param modules: stack modules to lookup in user overrides yaml file.
:returns: overrides dictionary.
"""
overrides = os.path.join(os.environ['JUJU_CHARM_DIR'],
'hardening.yaml')
if os.path.exists(overrides):
log("Found user-provided config overrides file '%s'" %
(overrides), level=DEBUG)
settings = yaml.safe_load(open(overrides))
if settings and settings.get(modules):
log("Applying '%s' overrides" % (modules), level=DEBUG)
return settings.get(modules)
log("No overrides found for '%s'" % (modules), level=DEBUG)
else:
log("No hardening config overrides file '%s' found in charm "
"root dir" % (overrides), level=DEBUG)
return {}
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--weights', default="YOLO_small.ckpt", type=str)
parser.add_argument('--data_dir', default="data", type=str)
parser.add_argument('--threshold', default=0.2, type=float)
parser.add_argument('--iou_threshold', default=0.5, type=float)
parser.add_argument('--gpu', default='', type=str)
args = parser.parse_args()
if args.gpu is not None:
cfg.GPU = args.gpu
if args.data_dir != cfg.DATA_PATH:
update_config_paths(args.data_dir, args.weights)
os.environ['CUDA_VISIBLE_DEVICES'] = cfg.GPU
yolo = YOLONet()
pascal = pascal_voc('train')
solver = Solver(yolo, pascal)
print('Start training ...')
solver.train()
print('Done training.')
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--weights', default="YOLO_small.ckpt", type=str)
parser.add_argument('--weight_dir', default='weights', type=str)
parser.add_argument('--data_dir', default="data", type=str)
parser.add_argument('--gpu', default='', type=str)
args = parser.parse_args()
os.environ['CUDA_VISIBLE_DEVICES'] = args.gpu
yolo = YOLONet(False)
weight_file = os.path.join(args.data_dir, args.weight_dir, args.weights)
detector = Detector(yolo, weight_file)
# detect from camera
# cap = cv2.VideoCapture(-1)
# detector.camera_detector(cap)
# detect from image file
imname = 'test/person.jpg'
detector.image_detector(imname)
def get_bcl2fastq_v2(hostname):
try:
subprocess.check_call(["which", "bcl2fastq"])
# Restore the LD_LIBRARY_PATH set aside by sourceme.bash/shell10x.
# Required for some installations of bcl2fastq.
new_environ = dict(os.environ)
new_environ['LD_LIBRARY_PATH'] = os.environ.get('_TENX_LD_LIBRARY_PATH', '')
output = subprocess.check_output(["bcl2fastq", "--version"], env=new_environ, stderr=subprocess.STDOUT)
match = None
for l in output.split("\n"):
match = re.match("bcl2fastq v([0-9.]+)", l)
if match is not None:
return (match.groups()[0], None)
return (None, "bcl2fastq version not recognized -- please check the output of bcl2fastq --version")
except subprocess.CalledProcessError:
msg = "On machine: %s, bcl2fastq not found on PATH." % hostname
return (None, msg)
def __init__(self, additional_compose_file=None, additional_services=None):
# To resolve docker client server version mismatch issue.
os.environ["COMPOSE_API_VERSION"] = "auto"
dir_name = os.path.split(os.getcwd())[-1]
self.project = "{}{}".format(
re.sub(r'[^a-z0-9]', '', dir_name.lower()),
getpass.getuser()
)
self.additional_compose_file = additional_compose_file
self.services = ["zookeeper", "schematizer", "kafka"]
if additional_services is not None:
self.services.extend(additional_services)
# This variable is meant to capture the running/not-running state of
# the dependent testing containers when tests start running. The idea
# is, we'll only start and stop containers if they aren't already
# running. If they are running, we'll just use the ones that exist.
# It takes a while to start all the containers, so when running lots of
# tests, it's best to start them out-of-band and leave them up for the
# duration of the session.
self.containers_already_running = self._are_containers_already_running()