def test_resampling_value(shapefile, bins, samples):
samples = (samples//bins) * bins
lonlats, filename = shapefile
random_filename = tempfile.mktemp() + ".shp"
resampling.resample_by_magnitude(filename,
random_filename,
target_field='lat',
bins=bins,
output_samples=samples,
bootstrap=True
)
resampled_sf = shp.Reader(random_filename)
resampled_shapefields = [f[0] for f in resampled_sf.fields[1:]]
new_coords, new_val, new_othervals = \
geoio.load_shapefile(random_filename, 'lat')
assert 'lat' in resampled_shapefields
assert 'lon' not in resampled_shapefields
assert np.all((samples, 2) == new_coords.shape)
assert new_othervals == {} # only the target is retained after resampling
python类mktemp()的实例源码
def test_resampling_spatial(shapefile, rows, cols, samples):
tiles = rows * cols
samples = (samples // tiles) * tiles
lonlats, filename = shapefile
random_filename = tempfile.mktemp() + ".shp"
resampling.resample_spatially(filename,
random_filename,
target_field='lat',
rows=rows,
cols=cols,
output_samples=samples,
bootstrap=True
)
resampled_sf = shp.Reader(random_filename)
resampled_shapefields = [f[0] for f in resampled_sf.fields[1:]]
new_coords, new_val, new_othervals = \
geoio.load_shapefile(random_filename, 'lat')
assert 'lat' in resampled_shapefields
assert 'lon' not in resampled_shapefields
assert np.all((samples, 2) >= new_coords.shape)
assert new_othervals == {} # only the target is retained after resampling
def test_do_work(self):
# input_file, mask_file, output_file, resampling, extents, jpeg
output_file = tempfile.mktemp(suffix='.tif')
options = Options(resampling='bilinear',
extents=self.extents,
jpeg=True,
reproject=True)
crop.do_work(input_file=self.std2000_no_mask,
output_file=output_file,
options=options,
mask_file=self.mask)
# output file was created
self.assertTrue(exists(output_file))
# assert jpeg was created
self.assertTrue(exists(
join(crop.TMPDIR, basename(output_file).split('.')[0] + '.jpg')))
os.remove(output_file)
def atomic_write(path):
tmp = tempfile.mktemp(
prefix='.' + os.path.basename(path),
dir=os.path.dirname(path),
)
try:
with open(tmp, 'w') as f:
yield f
except BaseException:
os.remove(tmp)
raise
else:
os.rename(tmp, path)
# TODO: at some point there will be so many options we'll want to make a config
# object or similar instead of adding more arguments here
def screenshot(self, filename=None):
"""
Return:
PIL.Image
Raises:
EnvironmentError
"""
tmpfile = tempfile.mktemp(prefix='atx-screencap-', suffix='.tiff')
try:
idevice("screenshot", "--udid", self.udid, tmpfile)
except subprocess.CalledProcessError as e:
sys.exit(e.message)
try:
image = Image.open(tmpfile)
image.load()
if filename:
image.save(filename)
return image
finally:
if os.path.exists(tmpfile):
os.unlink(tmpfile)
def _adb_screencap(self, scale=1.0):
"""
capture screen with adb shell screencap
"""
remote_file = tempfile.mktemp(dir='/data/local/tmp/', prefix='screencap-', suffix='.png')
local_file = tempfile.mktemp(prefix='atx-screencap-', suffix='.png')
self.shell('screencap', '-p', remote_file)
try:
self.pull(remote_file, local_file)
image = imutils.open_as_pillow(local_file)
if scale is not None and scale != 1.0:
image = image.resize([int(scale * s) for s in image.size], Image.BICUBIC)
rotation = self.rotation()
if rotation:
method = getattr(Image, 'ROTATE_{}'.format(rotation*90))
image = image.transpose(method)
return image
finally:
self.remove(remote_file)
os.unlink(local_file)
def _adb_minicap(self, scale=1.0):
"""
capture screen with minicap
https://github.com/openstf/minicap
"""
remote_file = tempfile.mktemp(dir='/data/local/tmp/', prefix='minicap-', suffix='.jpg')
local_file = tempfile.mktemp(prefix='atx-minicap-', suffix='.jpg')
(w, h, r) = self.display
params = '{x}x{y}@{rx}x{ry}/{r}'.format(x=w, y=h, rx=int(w*scale), ry=int(h*scale), r=r*90)
try:
self.shell('LD_LIBRARY_PATH=/data/local/tmp', self.__minicap, '-s', '-P', params, '>', remote_file)
self.pull(remote_file, local_file)
image = imutils.open_as_pillow(local_file)
return image
finally:
self.remove(remote_file)
os.unlink(local_file)
def test_blocking_lock_file(self):
my_file = tempfile.mktemp()
lock_file = BlockingLockFile(my_file)
lock_file._obtain_lock()
# next one waits for the lock
start = time.time()
wait_time = 0.1
wait_lock = BlockingLockFile(my_file, 0.05, wait_time)
self.failUnlessRaises(IOError, wait_lock._obtain_lock)
elapsed = time.time() - start
extra_time = 0.02
if is_win:
# for Appveyor
extra_time *= 6 # NOTE: Indeterministic failures here...
self.assertLess(elapsed, wait_time + extra_time)
def setUp(self):
self._dbfile = tempfile.mktemp()
def test_SPDParser(self):
# Verify the input is valid
spdPath = os.path.abspath("sdr/dom/components/CommandWrapper/CommandWrapper.spd.xml")
status = self._xmllint(spdPath, "SPD")
self.assertEqual(status, 0, "Input XML isn't DTD valid")
spd = parsers.SPDParser.parse(spdPath)
self.assertEqual(spd.get_id(), "DCE:458872f6-a316-4082-b1eb-ce5704f5c49d")
self.assertEqual(spd.get_name(), "CommandWrapper")
self.assertEqual(str(spd.get_author()[0].get_name()[0]), "REDHAWK test author")
self.assertEqual(spd.get_propertyfile().get_type(), "PRF")
self.assertEqual(spd.get_propertyfile().get_localfile().get_name(), "CommandWrapper.prf.xml")
# Verify that we can write the output and still be DTD valid
tmpfile = tempfile.mktemp()
try:
tmp = open(tmpfile, "w")
spd.export(tmp, 0)
tmp.close()
status = self._xmllint(tmpfile, "SPD")
self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
finally:
try:
os.remove(tmpfile)
except OSError:
pass
def test_PRFParser(self):
prf = parsers.PRFParser.parse("sdr/dom/components/CommandWrapper/CommandWrapper.prf.xml")
props = {}
for property in prf.get_simple():
props[property.get_id()] = property
for property in prf.get_simplesequence():
props[property.get_id()] = property
self.assertEqual(props["DCE:a4e7b230-1d17-4a86-aeff-ddc6ea3df26e"].get_mode(), "readwrite")
self.assertEqual(props["DCE:a4e7b230-1d17-4a86-aeff-ddc6ea3df26e"].get_name(), "command")
self.assertEqual(props["DCE:a4e7b230-1d17-4a86-aeff-ddc6ea3df26e"].get_type(), "string")
self.assertEqual(props["DCE:a4e7b230-1d17-4a86-aeff-ddc6ea3df26e"].get_value(), "/bin/echo")
self.assertEqual(props["DCE:a4e7b230-1d17-4a86-aeff-ddc6ea3df26e"].get_kind()[0].get_kindtype(), "configure")
self.assertEqual(props["DCE:5d8bfe8d-bc25-4f26-8144-248bc343aa53"].get_mode(), "readwrite")
self.assertEqual(props["DCE:5d8bfe8d-bc25-4f26-8144-248bc343aa53"].get_name(), "args")
self.assertEqual(props["DCE:5d8bfe8d-bc25-4f26-8144-248bc343aa53"].get_type(), "string")
self.assertEqual(props["DCE:5d8bfe8d-bc25-4f26-8144-248bc343aa53"].get_values().get_value()[0], "Hello World")
self.assertEqual(props["DCE:5d8bfe8d-bc25-4f26-8144-248bc343aa53"].get_kind()[0].get_kindtype(), "configure")
# Verify that we can write the output and still be DTD valid
tmpfile = tempfile.mktemp()
try:
tmp = open(tmpfile, "w")
prf.export(tmp, 0)
tmp.close()
status = self._xmllint(tmpfile, "PRF")
self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
finally:
try:
os.remove(tmpfile)
except OSError:
pass
def test_SCDParser(self):
scd = parsers.SCDParser.parse("sdr/dom/components/CommandWrapper/CommandWrapper.scd.xml")
self.assertEqual(scd.get_corbaversion(), "2.2")
self.assertEqual(scd.get_componentrepid().get_repid(), "IDL:CF/Resource:1.0")
self.assertEqual(scd.get_componenttype(), "resource")
self.assertEqual(scd.get_componentfeatures().get_supportsinterface()[0].get_repid(), "IDL:CF/Resource:1.0")
self.assertEqual(scd.get_componentfeatures().get_supportsinterface()[0].get_supportsname(), "Resource")
self.assertEqual(scd.get_interfaces().get_interface()[0].get_name(), "Resource")
self.assertEqual(scd.get_interfaces().get_interface()[0].get_inheritsinterface()[0].get_repid(), "IDL:CF/LifeCycle:1.0")
# Verify that we can write the output and still be DTD valid
tmpfile = tempfile.mktemp()
try:
tmp = open(tmpfile, "w")
scd.export(tmp, 0)
tmp.close()
status = self._xmllint(tmpfile, "SCD")
self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
finally:
try:
os.remove(tmpfile)
except OSError:
pass
def test_SADParser_usesdeviceref(self):
sad = parsers.SADParser.parse("sdr/parser_tests/usesdeviceref.sad.xml")
self.assertEqual(sad.get_id(), "colloc_usesdev_1")
self.assertEqual(sad.get_name(), "colloc_usesdev")
self.assertEqual(len(sad.componentfiles.get_componentfile()), 1)
self.assertEqual(len(sad.partitioning.get_hostcollocation()), 1)
colloc=sad.partitioning.get_hostcollocation()[0]
self.assertEqual(len(colloc.get_componentplacement()),1)
comp_place =colloc.get_componentplacement()[0]
self.assertEqual(len(comp_place.get_componentinstantiation()),1)
comp_ci=comp_place.get_componentinstantiation()[0]
self.assertEqual(comp_ci.id_, "P1_1")
self.assertEqual(comp_ci.get_usagename(), "P1_1")
self.assertEqual(len(colloc.get_usesdeviceref()),1)
udev_ref =colloc.get_usesdeviceref()[0]
self.assertEqual(udev_ref.refid, "FrontEndTuner_1")
# Verify that we can write the output and still be DTD valid
tmpfile = tempfile.mktemp()
try:
tmp = open(tmpfile, "w")
sad.export(tmp, 0)
tmp.close()
status = self._xmllint(tmpfile, "SAD")
self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
finally:
try:
os.remove(tmpfile)
except OSError:
pass
def test_SADParser_devicerequires(self):
sad = parsers.SADParser.parse("sdr/parser_tests/devicerequires.sad.xml")
self.assertEqual(sad.get_id(), "device_requires_multicolor")
self.assertEqual(sad.get_name(), "device_requires_multicolor")
self.assertEqual(len(sad.componentfiles.get_componentfile()), 1)
self.assertEqual(len(sad.partitioning.get_componentplacement()), 2)
comp_place=sad.partitioning.get_componentplacement()[0]
comp_in=comp_place.get_componentinstantiation()[0]
self.assertEqual(comp_place.componentfileref.refid, "SimpleComponent_SPD_1")
self.assertEqual(comp_in.id_, "SimpleComponent_Red")
self.assertEqual(comp_in.get_usagename(), "SimpleComponent_Red")
self.assertEqual(len(comp_in.devicerequires.get_requires()),2)
self.assertEqual(comp_in.devicerequires.get_requires()[0].id, "color")
self.assertEqual(comp_in.devicerequires.get_requires()[0].value, "RED")
self.assertEqual(comp_in.devicerequires.get_requires()[1].id, "rank")
self.assertEqual(comp_in.devicerequires.get_requires()[1].value, "15")
comp_place=sad.partitioning.get_componentplacement()[1]
comp_in=comp_place.get_componentinstantiation()[0]
self.assertEqual(comp_place.componentfileref.refid, "SimpleComponent_SPD_1")
self.assertEqual(comp_in.id_, "SimpleComponent_Green")
self.assertEqual(comp_in.get_usagename(), "SimpleComponent_Green")
self.assertEqual(len(comp_in.devicerequires.get_requires()),1)
self.assertEqual(comp_in.devicerequires.get_requires()[0].id, "color")
self.assertEqual(comp_in.devicerequires.get_requires()[0].value, "GREEN")
# Verify that we can write the output and still be DTD valid
tmpfile = tempfile.mktemp()
try:
tmp = open(tmpfile, "w")
sad.export(tmp, 0)
tmp.close()
status = self._xmllint(tmpfile, "SAD")
self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
finally:
try:
os.remove(tmpfile)
except OSError:
pass
def test_SADParser_loggingconfig(self):
sad = parsers.SADParser.parse("sdr/parser_tests/loggingconfig.sad.xml")
self.assertEqual(sad.get_id(), "device_requires_multicolor")
self.assertEqual(sad.get_name(), "device_requires_multicolor")
self.assertEqual(len(sad.componentfiles.get_componentfile()), 1)
self.assertEqual(len(sad.partitioning.get_componentplacement()), 2)
comp_place=sad.partitioning.get_componentplacement()[0]
comp_in=comp_place.get_componentinstantiation()[0]
self.assertEqual(comp_place.componentfileref.refid, "SimpleComponent_SPD_1")
self.assertEqual(comp_in.id_, "SimpleComponent_Red")
self.assertEqual(comp_in.get_usagename(), "SimpleComponent_Red")
self.assertEqual(comp_in.loggingconfig.level, "ERROR")
self.assertEqual(comp_in.loggingconfig.value, "path/to/my/log/file")
comp_place=sad.partitioning.get_componentplacement()[1]
comp_in=comp_place.get_componentinstantiation()[0]
self.assertEqual(comp_place.componentfileref.refid, "SimpleComponent_SPD_1")
self.assertEqual(comp_in.id_, "SimpleComponent_Green")
self.assertEqual(comp_in.get_usagename(), "SimpleComponent_Green")
self.assertEqual(comp_in.loggingconfig.value, "path/to/my/log/file2")
# Verify that we can write the output and still be DTD valid
tmpfile = tempfile.mktemp()
try:
tmp = open(tmpfile, "w")
sad.export(tmp, 0)
tmp.close()
status = self._xmllint(tmpfile, "SAD")
self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
finally:
try:
os.remove(tmpfile)
except OSError:
pass
def test_SADParser_affinityconfig(self):
sad = parsers.SADParser.parse("sdr/parser_tests/affinity.sad.xml")
self.assertEqual(sad.get_id(), "device_requires_multicolor")
self.assertEqual(sad.get_name(), "device_requires_multicolor")
self.assertEqual(len(sad.componentfiles.get_componentfile()), 1)
self.assertEqual(len(sad.partitioning.get_componentplacement()), 1)
comp_place=sad.partitioning.get_componentplacement()[0]
comp_in=comp_place.get_componentinstantiation()[0]
self.assertEqual(comp_place.componentfileref.refid, "SimpleComponent_SPD_1")
self.assertEqual(comp_in.id_, "SimpleComponent_Red")
self.assertEqual(comp_in.get_usagename(), "SimpleComponent_Red")
self.assertEqual(comp_in.loggingconfig.level, "ERROR")
self.assertEqual(comp_in.loggingconfig.value, "path/to/my/log/file")
self.assertEqual(len(comp_in.affinity.get_simpleref()),2)
self.assertEqual(comp_in.affinity.get_simpleref()[0].refid, "affinity::exec_directive_class")
self.assertEqual(comp_in.affinity.get_simpleref()[0].value, "socket")
self.assertEqual(comp_in.affinity.get_simpleref()[1].refid, "affinity::exec_directive_value")
self.assertEqual(comp_in.affinity.get_simpleref()[1].value, "0")
# Verify that we can write the output and still be DTD valid
tmpfile = tempfile.mktemp()
try:
tmp = open(tmpfile, "w")
sad.export(tmp, 0)
tmp.close()
status = self._xmllint(tmpfile, "SAD")
self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
finally:
try:
os.remove(tmpfile)
except OSError:
pass
def test_DCDParser_loggingconfig(self):
dcd = parsers.DCDParser.parse("sdr/parser_tests/loggingconfig.dcd.xml")
self.assertEqual(dcd.get_id(), "test_GPP_green")
self.assertEqual(dcd.get_name(), "test_GPP_green")
self.assertEqual(len(dcd.componentfiles.get_componentfile()), 1)
self.assertEqual(len(dcd.partitioning.get_componentplacement()), 1)
gpp=dcd.partitioning.get_componentplacement()[0]
gpp_ci=gpp.get_componentinstantiation()[0]
self.assertEqual(gpp.get_componentfileref().get_refid(), "GPP1_file_1")
self.assertEqual(gpp_ci.get_id(), "test_GPP_green::GPP_1")
self.assertEqual(gpp_ci.get_usagename(), "test_GPP_green::GPP_1")
self.assertEqual(gpp_ci.loggingconfig.level, "ERROR")
self.assertEqual(gpp_ci.loggingconfig.value, "path/to/my/log/file")
# Verify that we can write the output and still be DTD valid
tmpfile = tempfile.mktemp()
try:
tmp = open(tmpfile, "w")
dcd.export(tmp, 0)
tmp.close()
status = self._xmllint(tmpfile, "DCD")
self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
finally:
try:
os.remove(tmpfile)
except OSError:
pass
def test_DCDParser_affinity(self):
dcd = parsers.DCDParser.parse("sdr/parser_tests/affinity.dcd.xml")
self.assertEqual(dcd.get_id(), "affinity_parse_1")
self.assertEqual(dcd.get_name(), "test_affinity_node_socket")
self.assertEqual(len(dcd.componentfiles.get_componentfile()), 1)
self.assertEqual(len(dcd.partitioning.get_componentplacement()), 1)
gpp=dcd.partitioning.get_componentplacement()[0]
gpp_ci=gpp.get_componentinstantiation()[0]
self.assertEqual(gpp.get_componentfileref().get_refid(), "GPP_File_1")
self.assertEqual(gpp_ci.get_id(), "test_affinity_node:GPP_1")
self.assertEqual(gpp_ci.get_usagename(), "GPP_1")
self.assertEqual(len(gpp_ci.affinity.get_simpleref()),2)
self.assertEqual(gpp_ci.affinity.get_simpleref()[0].refid, "affinity::exec_directive_class")
self.assertEqual(gpp_ci.affinity.get_simpleref()[0].value, "socket")
self.assertEqual(gpp_ci.affinity.get_simpleref()[1].refid, "affinity::exec_directive_value")
self.assertEqual(gpp_ci.affinity.get_simpleref()[1].value, "0")
# Verify that we can write the output and still be DTD valid
tmpfile = tempfile.mktemp()
try:
tmp = open(tmpfile, "w")
dcd.export(tmp, 0)
tmp.close()
status = self._xmllint(tmpfile, "DCD")
self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
finally:
try:
os.remove(tmpfile)
except OSError:
pass
def setUp(self):
self._dbfile = tempfile.mktemp()
self._domainBooter_1, self._domainManager_1 = self.launchDomainManager(endpoint="giop:tcp::5679", dbURI=self._dbfile)
self._domainBooter_2, self._domainManager_2 = launchDomain(2, self._root)
def load_logging_config_uri(orb, uri, binding=None):
scheme, netloc, path, params, query, fragment = urlparse.urlparse(uri)
if scheme == "file":
ossie.utils.log4py.config.fileConfig(path, binding)
elif scheme == "sca":
q = dict([x.split("=") for x in query.split("&")])
try:
fileSys = orb.string_to_object(q["fs"])
except KeyError:
logging.warning("sca URI missing fs query parameter")
else:
if fileSys == None:
logging.warning("Failed to lookup file system")
else:
try:
t = tempfile.mktemp()
tf = open(t, "w+")
scaFile = fileSys.open(path, True)
fileSize = scaFile.sizeOf()
buf = scaFile.read(fileSize)
tf.write(buf)
tf.close()
scaFile.close()
ossie.utils.log4py.config.fileConfig(t)
finally:
os.remove(t)
else:
# Invalid scheme
logging.warning("Invalid logging config URI scheme")