def import_key(keyid):
key = keyid.strip()
if (key.startswith('-----BEGIN PGP PUBLIC KEY BLOCK-----') and
key.endswith('-----END PGP PUBLIC KEY BLOCK-----')):
juju_log("PGP key found (looks like ASCII Armor format)", level=DEBUG)
juju_log("Importing ASCII Armor PGP key", level=DEBUG)
with tempfile.NamedTemporaryFile() as keyfile:
with open(keyfile.name, 'w') as fd:
fd.write(key)
fd.write("\n")
cmd = ['apt-key', 'add', keyfile.name]
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError:
error_out("Error importing PGP key '%s'" % key)
else:
juju_log("PGP key found (looks like Radix64 format)", level=DEBUG)
juju_log("Importing PGP key from keyserver", level=DEBUG)
cmd = ['apt-key', 'adv', '--keyserver',
'hkp://keyserver.ubuntu.com:80', '--recv-keys', key]
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError:
error_out("Error importing PGP key '%s'" % key)
python类NamedTemporaryFile()的实例源码
def dot2graph(self, dot, format='svg'):
# windows ???????????????????
# ?????? NamedTemporaryFile ?????
with NamedTemporaryFile(delete=False) as dotfile:
dotfile.write(dot)
outfile = NamedTemporaryFile(delete=False)
os.system('dot -Efontname=sans -Nfontname=sans %s -o%s -T%s' % (
dotfile.name, outfile.name, format))
result = outfile.read()
outfile.close()
os.unlink(dotfile.name)
os.unlink(outfile.name)
return result
def test_rs3topng():
"""rs3 file is converted to PNG"""
png_str = rstviewer.rs3topng(RS3_FILEPATH)
temp = tempfile.NamedTemporaryFile(suffix='.png', delete=False)
temp.close()
rstviewer.rs3topng(RS3_FILEPATH, temp.name)
with open(temp.name, 'r') as png_file:
assert png_str == png_file.read()
os.unlink(temp.name)
# generated images might not be 100% identical, probably
# because of the font used
with open(EXPECTED_PNG1, 'r') as expected_png_file:
ident1 = png_str == expected_png_file.read()
with open(EXPECTED_PNG2, 'r') as expected_png_file:
ident2 = png_str == expected_png_file.read()
assert ident1 or ident2
def test_cli_rs3topng():
"""conversion to PNG on the commandline"""
temp_png = tempfile.NamedTemporaryFile(suffix='.png', delete=False)
temp_png.close()
# calling `rstviewer -f png input.rs3 output.png` will end the program
# with sys.exit(0), so we'll have to catch this here.
with pytest.raises(SystemExit) as serr:
cli(['-f', 'png', RS3_FILEPATH, temp_png.name])
out, err = pytest.capsys.readouterr()
assert err == 0
with open(temp_png.name, 'r') as png_file:
png_str = png_file.read()
os.unlink(temp_png.name)
# generated images might not be 100% identical, probably
# because of the font used
with open(EXPECTED_PNG1, 'r') as expected_png_file:
ident1 = png_str == expected_png_file.read()
with open(EXPECTED_PNG2, 'r') as expected_png_file:
ident2 = png_str == expected_png_file.read()
assert ident1 or ident2
test_run_no_updates_available.py 文件源码
项目:pyupdater-wx-demo
作者: wettenhj
项目源码
文件源码
阅读 41
收藏 0
点赞 0
评论 0
def setUp(self):
tempFile = tempfile.NamedTemporaryFile()
self.fileServerDir = tempFile.name
tempFile.close()
os.mkdir(self.fileServerDir)
os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir
privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'),
encoding='base64')
signature = privateKey.sign(six.b(json.dumps(VERSIONS, sort_keys=True)),
encoding='base64').decode()
VERSIONS['signature'] = signature
keysFilePath = os.path.join(self.fileServerDir, 'keys.gz')
with gzip.open(keysFilePath, 'wb') as keysFile:
keysFile.write(json.dumps(KEYS, sort_keys=True))
versionsFilePath = os.path.join(self.fileServerDir, 'versions.gz')
with gzip.open(versionsFilePath, 'wb') as versionsFile:
versionsFile.write(json.dumps(VERSIONS, sort_keys=True))
os.environ['WXUPDATEDEMO_TESTING'] = 'True'
from wxupdatedemo.config import CLIENT_CONFIG
self.clientConfig = CLIENT_CONFIG
self.clientConfig.PUBLIC_KEY = PUBLIC_KEY
def setUp(self):
tempFile = tempfile.NamedTemporaryFile()
self.fileServerDir = tempFile.name
tempFile.close()
os.mkdir(self.fileServerDir)
os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir
privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'),
encoding='base64')
signature = privateKey.sign(six.b(json.dumps(VERSIONS, sort_keys=True)),
encoding='base64').decode()
VERSIONS['signature'] = signature
keysFilePath = os.path.join(self.fileServerDir, 'keys.gz')
with gzip.open(keysFilePath, 'wb') as keysFile:
keysFile.write(json.dumps(KEYS, sort_keys=True))
versionsFilePath = os.path.join(self.fileServerDir, 'versions.gz')
with gzip.open(versionsFilePath, 'wb') as versionsFile:
versionsFile.write(json.dumps(VERSIONS, sort_keys=True))
os.environ['WXUPDATEDEMO_TESTING'] = 'True'
from wxupdatedemo.config import CLIENT_CONFIG
self.clientConfig = CLIENT_CONFIG
self.clientConfig.PUBLIC_KEY = PUBLIC_KEY
self.clientConfig.APP_NAME = APP_NAME
def _atomic_write(filename):
path = os.path.dirname(filename)
try:
file = tempfile.NamedTemporaryFile(delete=False, dir=path, mode="w+")
yield file
file.flush()
os.fsync(file.fileno())
os.rename(file.name, filename)
finally:
try:
os.remove(file.name)
except OSError as e:
if e.errno == 2:
pass
else:
raise e
def upload_to_fileshare_test(self): #pylint: disable=no-self-use
"""Upload copies files to non-native store correctly with no
progress"""
import shutil
import tempfile
temp_file = tempfile.NamedTemporaryFile(dir=tempfile.mkdtemp())
temp_src_dir = os.path.dirname(temp_file.name)
temp_dst_dir = tempfile.mkdtemp()
shutil_mock = MagicMock()
shutil_mock.copyfile.return_value = None
with patch('sfctl.custom_app.shutil', new=shutil_mock):
sf_c.upload_to_fileshare(temp_src_dir, temp_dst_dir, False)
shutil_mock.copyfile.assert_called_once()
temp_file.close()
shutil.rmtree(os.path.dirname(temp_file.name))
shutil.rmtree(temp_dst_dir)
def _create_pdf_pdftk(self):
with NamedTemporaryFile(
mode='wb+',
prefix='geo-pyprint_',
delete=True
) as map_image_file:
self._map_image.save(map_image_file, 'PNG')
map_image_file.flush()
# TODO: use the configuration to select the template
# TODO: use the configuration to select the name of the key in the template
pdfjinja = PdfJinja('pdfjinja-template.pdf')
pdfout = pdfjinja(dict(map=map_image_file.name))
with NamedTemporaryFile(
mode='wb+',
prefix='geo-pyprint_',
suffix='.pdf',
delete=False
) as output_file:
pdfout.write(output_file)
output_file.flush()
return output_file.name
def filter_region(view, txt, command):
try:
contents = tempfile.NamedTemporaryFile(suffix='.txt', delete=False)
contents.write(txt.encode('utf-8'))
contents.close()
script = tempfile.NamedTemporaryFile(suffix='.bat', delete=False)
script.write(('@echo off\ntype %s | %s' % (contents.name, command)).encode('utf-8'))
script.close()
p = subprocess.Popen([script.name],
stdout=PIPE,
stderr=PIPE,
startupinfo=get_startup_info())
out, err = p.communicate()
return (out or err).decode(get_oem_cp()).replace('\r\n', '\n')[:-1].strip()
finally:
os.remove(script.name)
os.remove(contents.name)
def test_scp(event_loop):
async with base.CleanModel() as model:
await model.add_machine()
await asyncio.wait_for(
model.block_until(lambda: model.machines),
timeout=240)
machine = model.machines['0']
await asyncio.wait_for(
model.block_until(lambda: (machine.status == 'running' and
machine.agent_status == 'started')),
timeout=480)
with NamedTemporaryFile() as f:
f.write(b'testcontents')
f.flush()
await machine.scp_to(f.name, 'testfile')
with NamedTemporaryFile() as f:
await machine.scp_from('testfile', f.name)
assert f.read() == b'testcontents'
def test_scp(event_loop):
async with base.CleanModel() as model:
app = await model.deploy('ubuntu')
await asyncio.wait_for(
model.block_until(lambda: app.units),
timeout=60)
unit = app.units[0]
await asyncio.wait_for(
model.block_until(lambda: unit.machine),
timeout=60)
machine = unit.machine
await asyncio.wait_for(
model.block_until(lambda: (machine.status == 'running' and
machine.agent_status == 'started')),
timeout=480)
with NamedTemporaryFile() as f:
f.write(b'testcontents')
f.flush()
await unit.scp_to(f.name, 'testfile')
with NamedTemporaryFile() as f:
await unit.scp_from('testfile', f.name)
assert f.read() == b'testcontents'
def add_local_charm_dir(self, charm_dir, series):
"""Upload a local charm to the model.
This will automatically generate an archive from
the charm dir.
:param charm_dir: Path to the charm directory
:param series: Charm series
"""
fh = tempfile.NamedTemporaryFile()
CharmArchiveGenerator(charm_dir).make_archive(fh.name)
with fh:
func = partial(
self.add_local_charm, fh, series, os.stat(fh.name).st_size)
charm_url = await self._connector.loop.run_in_executor(None, func)
log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url)
return charm_url
def test_load_config_file(self):
with NamedTemporaryFile(mode='w+t') as f:
f.write("user_a:password_a:role_a,role_b\n")
f.write("user_b:password_b:role_b,role_c\n")
f.write("user_c:password_c:role_c,role_c\n")
f.flush()
self.auth.load_from_file(f.name)
# Assert user equality.
self.assertEqual(self.auth.users,
{
'user_a': 'password_a',
'user_b': 'password_b',
'user_c': 'password_c'
})
# Assert role equality.
self.assertEqual(self.auth.roles,
{
'user_a': set(('role_a', 'role_b')),
'user_b': set(('role_b', 'role_c')),
'user_c': set(('role_c', 'role_c'))
})
def test_save_config_file(self):
self.auth.add_user('user_a', 'password_a')
self.auth.add_roles('user_a', ('role_a', 'role_b'))
self.auth.add_user('user_b', 'password_b')
self.auth.add_roles('user_b', ('role_b', 'role_c'))
self.auth.add_user('user_c', 'password_c')
self.auth.add_roles('user_c', ('role_c', 'role_c'))
with NamedTemporaryFile(mode='w+t') as f:
self.auth.save_to_file(f.name)
expected = [
"user_a:password_a:role_a,role_b",
"user_b:password_b:role_b,role_c",
"user_c:password_c:role_c"
]
for a, e in zip(f.readlines(), expected):
self.assertEqual(a.strip(), e)
def create_temp_parallel_data(sources, targets):
"""
Creates a temporary TFRecords file.
Args:
source: List of source sentences
target: List of target sentences
Returns:
A tuple (sources_file, targets_file).
"""
file_source = tempfile.NamedTemporaryFile()
file_target = tempfile.NamedTemporaryFile()
file_source.write("\n".join(sources).encode("utf-8"))
file_source.flush()
file_target.write("\n".join(targets).encode("utf-8"))
file_target.flush()
return file_source, file_target
def create_temp_tfrecords(sources, targets):
"""
Creates a temporary TFRecords file.
Args:
source: List of source sentences
target: List of target sentences
Returns:
A tuple (sources_file, targets_file).
"""
output_file = tempfile.NamedTemporaryFile()
writer = tf.python_io.TFRecordWriter(output_file.name)
for source, target in zip(sources, targets):
ex = tf.train.Example()
#pylint: disable=E1101
ex.features.feature["source"].bytes_list.value.extend(
[source.encode("utf-8")])
ex.features.feature["target"].bytes_list.value.extend(
[target.encode("utf-8")])
writer.write(ex.SerializeToString())
writer.close()
return output_file
def create_temporary_vocab_file(words, counts=None):
"""
Creates a temporary vocabulary file.
Args:
words: List of words in the vocabulary
Returns:
A temporary file object with one word per line
"""
vocab_file = tempfile.NamedTemporaryFile()
if counts is None:
for token in words:
vocab_file.write((token + "\n").encode("utf-8"))
else:
for token, count in zip(words, counts):
vocab_file.write("{}\t{}\n".format(token, count).encode("utf-8"))
vocab_file.flush()
return vocab_file
def test_save_svgz_filename():
import gzip
qr = segno.make_qr('test')
f = tempfile.NamedTemporaryFile('wb', suffix='.svgz', delete=False)
f.close()
qr.save(f.name)
f = open(f.name, mode='rb')
expected = b'\x1f\x8b\x08' # gzip magic number
val = f.read(len(expected))
f.close()
f = gzip.open(f.name)
try:
content = f.read(6)
finally:
f.close()
os.unlink(f.name)
assert expected == val
assert b'<?xml ' == content
def test_write_unicode_filename():
qr = segno.make_qr('test')
f = tempfile.NamedTemporaryFile('wt', suffix='.svg', delete=False)
f.close()
title = 'mürrische Mädchen'
desc = '?'
qr.save(f.name, title=title, desc=desc)
f = open(f.name, mode='rb')
root = _parse_xml(f)
f.seek(0)
val = f.read(6)
f.close()
os.unlink(f.name)
assert b'<?xml ' == val
assert title == _get_title(root).text
assert desc == _get_desc(root).text
def test_infile_outfile(self):
with tempfile.NamedTemporaryFile() as infile:
infile.write(self.data.encode())
infile.flush()
# outfile will get overwritten by tool, so the delete
# may not work on some platforms. Do it manually.
outfile = tempfile.NamedTemporaryFile()
try:
self.assertEqual(
self.runTool(args=[infile.name, outfile.name]),
''.encode())
with open(outfile.name, 'rb') as f:
self.assertEqual(f.read(), self.expect.encode())
finally:
outfile.close()
if os.path.exists(outfile.name):
os.unlink(outfile.name)
def testRewriteFile(self):
"""Changing the file content should change the hash sum"""
with NamedTemporaryFile() as index:
with TemporaryDirectory() as tmp:
with open(os.path.join(tmp, "foo"), 'wb') as f:
f.write(b'abc')
sum1 = hashDirectory(tmp, index.name)
with open(index.name, "rb") as f:
assert f.read(4) == b'BOB1'
with open(os.path.join(tmp, "foo"), 'wb') as f:
f.write(b'qwer')
sum2 = hashDirectory(tmp, index.name)
with open(index.name, "rb") as f:
assert f.read(4) == b'BOB1'
assert sum1 != sum2
def __init__(self, *args, **kwds):
## Create shared memory for rendered image
#pg.dbg(namespace={'r': self})
if sys.platform.startswith('win'):
self.shmtag = "pyqtgraph_shmem_" + ''.join([chr((random.getrandbits(20)%25) + 97) for i in range(20)])
self.shm = mmap.mmap(-1, mmap.PAGESIZE, self.shmtag) # use anonymous mmap on windows
else:
self.shmFile = tempfile.NamedTemporaryFile(prefix='pyqtgraph_shmem_')
self.shmFile.write(b'\x00' * (mmap.PAGESIZE+1))
fd = self.shmFile.fileno()
self.shm = mmap.mmap(fd, mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_WRITE)
atexit.register(self.close)
GraphicsView.__init__(self, *args, **kwds)
self.scene().changed.connect(self.update)
self.img = None
self.renderTimer = QtCore.QTimer()
self.renderTimer.timeout.connect(self.renderView)
self.renderTimer.start(16)
def test_plotscene():
tempfilename = tempfile.NamedTemporaryFile(suffix='.svg').name
print("using %s as a temporary file" % tempfilename)
pg.setConfigOption('foreground', (0,0,0))
w = pg.GraphicsWindow()
w.show()
p1 = w.addPlot()
p2 = w.addPlot()
p1.plot([1,3,2,3,1,6,9,8,4,2,3,5,3], pen={'color':'k'})
p1.setXRange(0,5)
p2.plot([1,5,2,3,4,6,1,2,4,2,3,5,3], pen={'color':'k', 'cosmetic':False, 'width': 0.3})
app.processEvents()
app.processEvents()
ex = pg.exporters.SVGExporter(w.scene())
ex.export(fileName=tempfilename)
# clean up after the test is done
os.unlink(tempfilename)
def test_plotscene():
tempfilename = tempfile.NamedTemporaryFile(suffix='.svg').name
print("using %s as a temporary file" % tempfilename)
pg.setConfigOption('foreground', (0,0,0))
w = pg.GraphicsWindow()
w.show()
p1 = w.addPlot()
p2 = w.addPlot()
p1.plot([1,3,2,3,1,6,9,8,4,2,3,5,3], pen={'color':'k'})
p1.setXRange(0,5)
p2.plot([1,5,2,3,4,6,1,2,4,2,3,5,3], pen={'color':'k', 'cosmetic':False, 'width': 0.3})
app.processEvents()
app.processEvents()
ex = pg.exporters.SVGExporter(w.scene())
ex.export(fileName=tempfilename)
# clean up after the test is done
os.unlink(tempfilename)
def test_css(Chart):
"""Test css file option"""
css = "{{ id }}text { fill: #bedead; }\n"
with NamedTemporaryFile('w') as f:
f.write(css)
f.flush()
config = Config()
config.css.append('file://' + f.name)
chart = Chart(config)
chart.add('/', [10, 1, 5])
svg = chart.render().decode('utf-8')
assert '#bedead' in svg
chart = Chart(css=(_ellipsis, 'file://' + f.name))
chart.add('/', [10, 1, 5])
svg = chart.render().decode('utf-8')
assert '#bedead' in svg
def copy(contents, config=None, destination_dir=False, **kwargs):
if config is None:
config = Config(xyz='123')
with NamedTemporaryFile('w', delete=False) as tp:
tp.write(contents)
source = tp.name
if destination_dir:
with TemporaryDirectory() as destination:
path = copy_file(config, source, destination, **kwargs)
yield source, destination, path
os.remove(source)
else:
destination = source + '.copy'
path = copy_file(config, source, destination, **kwargs)
yield source, destination, path
os.remove(source)
os.remove(path)
def edit(filename=None, contents=None):
editor = get_editor()
args = get_editor_args(os.path.basename(os.path.realpath(editor)))
args = [editor] + args.split(' ')
if filename is None:
tmp = tempfile.NamedTemporaryFile()
filename = tmp.name
if contents is not None:
with open(filename, mode='wb') as f:
f.write(contents)
args += [filename]
proc = subprocess.Popen(args, close_fds=True)
proc.communicate()
with open(filename, mode='rb') as f:
return f.read()
def test_simple_case(self):
ticket_url = "http://ticket.com"
data_url = "http://data.url.com"
headers = {"a": "a", "xyz": "ghj"}
ticket = {"htsget": {
"urls": [{"url": data_url, "headers": headers}]}}
data = b"0" * 1024
returned_response = MockedResponse(json.dumps(ticket).encode(), data)
with mock.patch("requests.get", return_value=returned_response) as mocked_get:
with tempfile.NamedTemporaryFile("wb+") as f:
htsget.get(ticket_url, f)
f.seek(0)
self.assertEqual(f.read(), data)
self.assertEqual(mocked_get.call_count, 2)
# Note that we only get the arguments for the last call using this method.
args, kwargs = mocked_get.call_args
self.assertEqual(args[0], data_url)
self.assertEqual(kwargs["headers"], headers)
self.assertEqual(kwargs["stream"], True)
def test_bearer_token(self):
ticket_url = "http://ticket.com"
ticket = {"htsget": {"urls": []}}
bearer_token = "x" * 1024
returned_response = MockedTicketResponse(json.dumps(ticket).encode())
with mock.patch("requests.get", return_value=returned_response) as mocked_get:
with tempfile.NamedTemporaryFile("wb+") as f:
htsget.get(ticket_url, f, bearer_token=bearer_token)
f.seek(0)
self.assertEqual(f.read(), b"")
# Because we have no URLs in the returned ticked, it should be called
# only once.
self.assertEqual(mocked_get.call_count, 1)
# Note that we only get the arguments for the last call using this method.
args, kwargs = mocked_get.call_args
self.assertEqual(args[0], ticket_url)
headers = {"Authorization": "Bearer {}".format(bearer_token)}
self.assertEqual(kwargs["headers"], headers)
self.assertEqual(kwargs["stream"], True)