def test_zipfile_timestamp():
# An environment variable can be used to influence the timestamp on
# TarInfo objects inside the zip. See issue #143. TemporaryDirectory is
# not a context manager under Python 3.
with temporary_directory() as tempdir:
for filename in ('one', 'two', 'three'):
path = os.path.join(tempdir, filename)
with codecs.open(path, 'w', encoding='utf-8') as fp:
fp.write(filename + '\n')
zip_base_name = os.path.join(tempdir, 'dummy')
# The earliest date representable in TarInfos, 1980-01-01
with environ('SOURCE_DATE_EPOCH', '315576060'):
zip_filename = wheel.archive.make_wheelfile_inner(
zip_base_name, tempdir)
with readable_zipfile(zip_filename) as zf:
for info in zf.infolist():
assert info.date_time[:3] == (1980, 1, 1)
python类TemporaryDirectory()的实例源码
def testPrintCopy(self):
"""test the print for a copy"""
with tempfile.TemporaryDirectory() as tmpdir:
fake_path_helper = \
fake_sqlite_plugin_path_helper.FakeSQLitePluginPathHelper(
self.template_path, 'test',
'db')
path = os.path.join(tmpdir, 'testfile')
generator = sqlite_generator.SQLiteGenerator(
tmpdir, 'test', 'test', ['test'],
output_handler_file.OutputHandlerFile(path,
file_handler.FileHandler()),
self.plugin_helper, fake_path_helper)
generator._PrintCopy(path) # pylint: disable=protected-access
expected = "copy " + path
actual = self._ReadFromFile(path)
self.assertEqual(expected, actual)
def testPrintEdit(self):
"""test the print for a edit"""
with tempfile.TemporaryDirectory() as tmpdir:
fake_path_helper = \
fake_sqlite_plugin_path_helper.FakeSQLitePluginPathHelper(
self.template_path, 'test',
'db')
path = os.path.join(tmpdir, 'testfile')
generator = sqlite_generator.SQLiteGenerator(
tmpdir, 'test', 'test', ['test'],
output_handler_file.OutputHandlerFile(path,
file_handler.FileHandler()),
self.plugin_helper, fake_path_helper)
generator._PrintEdit(path) # pylint: disable=protected-access
expected = 'edit ' + path
actual = self._ReadFromFile(path)
self.assertEqual(expected, actual)
def testPrint(self):
"""test print"""
with tempfile.TemporaryDirectory() as tmpdir:
fake_path_helper = \
fake_sqlite_plugin_path_helper.FakeSQLitePluginPathHelper(
self.template_path, 'test', 'db')
path = os.path.join(tmpdir, 'testfile')
generator = sqlite_generator.SQLiteGenerator(
tmpdir, 'test', 'test', ['test'],
output_handler_file.OutputHandlerFile(
path, file_handler.FileHandler()),
self.plugin_helper, fake_path_helper)
arguments = 'test1', 'test2', 'test3', 'test4', 'test5', 'test6', 'test7'
generator._Print(*arguments) # pylint: disable=protected-access
actual = self._ReadFromFile(path)
expected = 'create test1create test2create test3create test4copy ' \
'test5create test6create test7'
self.assertEqual(expected, actual)
def testPluginNameIfExisting(self):
"""test method after getting the plugin Name from the user if the plugin
Name already exists"""
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, 'testfile')
pathlib.Path(path).touch()
output_handler = output_handler_file.OutputHandlerFile(
path, file_handler.FileHandler(), prompt_info='the_plugin',
prompt_error='the_plugin', )
plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
plugin_exists=True, change_bool_after_every_call_plugin_exists=True,
valid_name=True)
controller = sqlite_controller.SQLiteController(output_handler,
plugin_helper)
actualName = 'the_plugin'
controller._path = 'somepath'
actual = controller.PluginName(None, None, actualName)
expected = 'Plugin exists. Choose new Name'
actual_prompt = self._ReadFromFile(path)
self.assertEqual(expected, actual_prompt)
self.assertEqual(actualName, actual)
def testCreateSQLQueryModelWithUserInputWithError(self):
"""test method CreateEventModelWithUserInput"""
error_message = "Some Error..."
fake_execution = fake_sqlite_query_execution.SQLQueryExecution(
sql_query_data.SQLQueryData(has_error=True,
error_message=error_message)
)
sql_query = 'SELECT createdDate FROM Users ORDER BY createdDate'
name = 'Contact'
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, 'testfile')
pathlib.Path(path).touch()
output_handler = output_handler_file.OutputHandlerFile(
path, file_handler.FileHandler(), prompt_info=name)
plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
folder_exists=True)
controller = sqlite_controller.SQLiteController(output_handler,
plugin_helper)
actual = controller._CreateSQLQueryModelWithUserInput(sql_query, False,
fake_execution)
self.assertIsNone(actual)
def testSourcePathIfNotExisting(self):
"""test method after getting the source path from the user"""
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, 'testfile')
pathlib.Path(path).touch()
output_handler = output_handler_file.OutputHandlerFile(
path, file_handler.FileHandler(), prompt_error='the source path')
plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
folder_exists=False, change_bool_after_every_call_folder_exists=True)
controller = sqlite_controller.SQLiteController(output_handler,
plugin_helper)
actualPath = 'testpath'
source_path = controller.SourcePath(None, None, actualPath)
expected = 'Folder does not exists. Enter correct one'
actual = self._ReadFromFile(path)
self.assertEqual(expected, actual)
self.assertEqual(source_path, 'the source path')
def testTestPathIfExisting(self):
"""test method after getting the source path from the user"""
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, 'testfile')
pathlib.Path(path).touch()
output_handler = output_handler_file.OutputHandlerFile(
path, file_handler.FileHandler())
plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
file_exists=True)
controller = sqlite_controller.SQLiteController(output_handler,
plugin_helper)
actualPath = os.path.join(path_helper.TestDatabasePath(),
'twitter_ios.db')
valid_path = controller.TestPath(None, None, actualPath)
actual_output = self._ReadFromFile(path)
self.assertEqual(actualPath, controller._testfile)
self.assertEqual('', actual_output)
self.assertEqual(valid_path, actualPath)
def testValidatePluginNameIfNotOk(self):
"""test the validate plugin Name method if not ok"""
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, 'testfile')
pathlib.Path(path).touch()
output_handler = output_handler_file.OutputHandlerFile(
path, file_handler.FileHandler(), prompt_error='valid_name')
plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
valid_name=False, change_bool_after_every_call_valid_name=True)
controller = sqlite_controller.SQLiteController(output_handler,
plugin_helper)
valid = controller._ValidatePluginName("the_wrong_plugin_")
expected = ('Plugin is not in a valid format. Choose new Name ['
'plugin_name_...]')
actual = self._ReadFromFile(path)
self.assertEqual(expected, actual)
self.assertEqual(valid, 'valid_name')
def testValidateRowNameIfNotOk(self):
"""test the validate row name method if not ok"""
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, 'testfile')
pathlib.Path(path).touch()
output_handler = output_handler_file.OutputHandlerFile(
path, file_handler.FileHandler(), prompt_error='TheValidRowName')
plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
valid_row_name=False,
change_bool_after_every_call_valid_row_name=True)
controller = sqlite_controller.SQLiteController(output_handler,
plugin_helper)
valid = controller._ValidateRowName("theWrongName")
expected = ('Row name is not in a valid format. Choose new Name ['
'RowName...]')
actual = self._ReadFromFile(path)
self.assertEqual(expected, actual)
self.assertEqual(valid, 'TheValidRowName')
def testValidateTimestampStringIfNotOk(self):
"""test the validate timestamp string method if not ok"""
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, 'testfile')
pathlib.Path(path).touch()
output_handler = output_handler_file.OutputHandlerFile(
path, file_handler.FileHandler(), prompt_error='this,that,bla')
plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
valid_comma_separated_string=False,
change_bool_after_every_call_valid_comma_separated_string=True)
controller = sqlite_controller.SQLiteController(output_handler,
plugin_helper)
valid = controller._ValidateTimestampString("this, that,bla")
expected = (
'Timestamps are not in valid format. Reenter them correctly [name,'
'name...]')
actual = self._ReadFromFile(path)
self.assertEqual(expected, actual)
self.assertEqual(valid, 'this,that,bla')
def testValidateColumnStringIfNotOk(self):
"""test the validate column string method if not ok"""
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, 'testfile')
pathlib.Path(path).touch()
output_handler = output_handler_file.OutputHandlerFile(
path, file_handler.FileHandler(), prompt_error='this,that,bla')
plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
valid_comma_separated_string=False,
change_bool_after_every_call_valid_comma_separated_string=True)
controller = sqlite_controller.SQLiteController(output_handler,
plugin_helper)
valid = controller._ValidateColumnString("this, that,bla")
expected = (
'Column names are not in valid format. Reenter them correctly [name,'
'name...]')
actual = self._ReadFromFile(path)
self.assertEqual(expected, actual)
self.assertEqual(valid, 'this,that,bla')
def testGenerateIfNotConfirmed(self):
"""test the generate if confirmed """
template_path = path_helper.TemplatePath()
with self.assertRaises(SystemExit):
with tempfile.TemporaryDirectory() as tmpdir:
file = os.path.join(tmpdir, 'testfile')
pathlib.Path(file).touch()
output_handler = output_handler_file.OutputHandlerFile(
file, file_handler.FileHandler(), confirm=False)
plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
valid_name=False,
change_bool_after_every_call_valid_name=True)
controller = sqlite_controller.SQLiteController(output_handler,
plugin_helper)
controller.Generate('not used', 'not used')
self.assertFalse(template_path)
def testCopyFile(self):
"""Tests if the copying of a file none existing beforhand works."""
expected_content = "this is test content."
with tempfile.TemporaryDirectory() as tmpdir:
source = os.path.join(tmpdir, self.file)
destination = os.path.join(tmpdir, "copy", self.file)
with open(source, "a") as f:
f.write(expected_content)
handler = file_handler.FileHandler()
self.assertFalse(os.path.exists(destination))
handler.CopyFile(source, destination)
self.assertTrue(os.path.exists(destination))
self.assertTrue(filecmp.cmp(destination, source))
def testAddContentIfFileExists(self):
"""Tests if the editing of a file existing works."""
content = "this is test content. "
expected = content + content
with tempfile.TemporaryDirectory() as tmpdir:
source = os.path.join(tmpdir, self.file)
with open(source, "a") as f:
f.write(content)
handler = file_handler.FileHandler()
self.assertTrue(os.path.exists(source))
handler.AddContent(source, content)
self.assertTrue(os.path.exists(source))
with open(source, "r") as f:
actual = f.read()
self.assertEqual(expected, actual)
def testCreateOrModifyFileWithContentIfFileExists(self):
"""Tests if the method create or modify file with content works, if the
file exists"""
content = "this is test content. "
expected = content + content
with tempfile.TemporaryDirectory() as tmpdir:
source = os.path.join(tmpdir, self.file)
with open(source, "a") as f:
f.write(content)
handler = file_handler.FileHandler()
self.assertTrue(os.path.exists(source))
handler.CreateOrModifyFileWithContent(source, content)
self.assertTrue(os.path.exists(source))
with open(source, "r") as f:
actual = f.read()
self.assertEqual(expected, actual)
def testAddContentIfFileAndFolderDoesNotExist(self):
"""Tests if the method create or modify file with content works, if the
file and Folder does not exist"""
content = "this is test content. "
expected = content
with tempfile.TemporaryDirectory() as tmpdir:
new_path = os.path.join(tmpdir, "newfolder")
source = os.path.join(new_path, self.file)
handler = file_handler.FileHandler()
self.assertFalse(os.path.exists(source))
handler.CreateOrModifyFileWithContent(source, content)
self.assertTrue(os.path.exists(source))
with open(source, "r") as f:
actual = f.read()
self.assertEqual(expected, actual)
def testUserConfigOverrides(self):
"""Test that user configs override default.yaml w/ includes"""
with TemporaryDirectory() as tmp:
os.chdir(tmp)
os.mkdir("recipes")
with open("default.yaml", "w") as f:
f.write("include:\n")
f.write(" - included\n")
f.write("environment:\n")
f.write(" FOO: BAR\n")
with open("included.yaml", "w") as f:
f.write("environment:\n")
f.write(" FOO: BAZ\n")
with open("user.yaml", "w") as f:
f.write("environment:\n")
f.write(" FOO: USER\n")
recipeSet = RecipeSet()
recipeSet.setConfigFiles(["user"])
recipeSet.parse()
assert recipeSet.defaultEnv() == { "FOO":"USER"}
def testChangeRemote(self):
"""Test that changed remotes in recipe are updated in the working copy"""
s1 = self.createGitScm({
'remote-bar' : 'http://bar.test/baz.git',
})
s2 = self.createGitScm({
'remote-bar' : 'http://bar.test/foo.git',
})
with tempfile.TemporaryDirectory() as workspace:
remotes = self.callAndGetRemotes(workspace, s1)
self.assertEqual(remotes, {
"origin" : self.repodir,
'bar' : 'http://bar.test/baz.git',
})
remotes = self.callAndGetRemotes(workspace, s2)
self.assertEqual(remotes, {
"origin" : self.repodir,
'bar' : 'http://bar.test/foo.git',
})
def testDirAndFile(self):
"""Test hashing a directory with one file.
The hash sum should stay stable in the long run as this might be used
for binary artifact matching in the future.
"""
with TemporaryDirectory() as tmp:
os.mkdir(os.path.join(tmp, "dir"))
with open(os.path.join(tmp, "dir", "file"), 'wb') as f:
f.write(b'abc')
sum1 = hashDirectory(tmp)
assert len(sum1) == 20
assert sum1 == binascii.unhexlify(
"640f516de78fba0b6d2ddde4451000f142d06b0d")
sum2 = hashDirectory(tmp)
assert sum1 == sum2
def testRewriteFile(self):
"""Changing the file content should change the hash sum"""
with NamedTemporaryFile() as index:
with TemporaryDirectory() as tmp:
with open(os.path.join(tmp, "foo"), 'wb') as f:
f.write(b'abc')
sum1 = hashDirectory(tmp, index.name)
with open(index.name, "rb") as f:
assert f.read(4) == b'BOB1'
with open(os.path.join(tmp, "foo"), 'wb') as f:
f.write(b'qwer')
sum2 = hashDirectory(tmp, index.name)
with open(index.name, "rb") as f:
assert f.read(4) == b'BOB1'
assert sum1 != sum2
def testUploadPackageNoFail(self):
"""The nofail option must prevent fatal error on upload failures"""
archive = self.__getArchiveInstance({"flags" : ["upload", "download", "nofail"]})
archive.wantUpload(True)
with TemporaryDirectory() as tmp:
# create simple workspace
audit = os.path.join(tmp, "audit.json.gz")
content = os.path.join(tmp, "workspace")
with open(audit, "wb") as f:
f.write(b"AUDIT")
os.mkdir(content)
with open(os.path.join(content, "data"), "wb") as f:
f.write(b"DATA")
# must not throw
archive.uploadPackage(ERROR_UPLOAD_ARTIFACT, audit, content, 0)
archive.uploadPackage(ERROR_UPLOAD_ARTIFACT, audit, content, 1)
# also live-build-id upload errors must not throw with nofail
archive.uploadLocalLiveBuildId(ERROR_UPLOAD_ARTIFACT, b'\x00', 0)
archive.uploadLocalLiveBuildId(ERROR_UPLOAD_ARTIFACT, b'\x00', 1)
def testUploadJenkinsNoFail(self):
"""The nofail option must prevent fatal error on upload failures"""
archive = self.__getArchiveInstance({"flags" : ["upload", "download", "nofail"]})
archive.wantUpload(True)
with TemporaryDirectory() as tmp:
with open(os.path.join(tmp, "error.buildid"), "wb") as f:
f.write(ERROR_UPLOAD_ARTIFACT)
self.__createArtifactByName(os.path.join(tmp, "result.tgz"))
# these uploads must not fail even though they do not succeed
script = archive.upload(None, "error.buildid", "result.tgz")
callJenkinsScript(script, tmp)
script = archive.uploadJenkinsLiveBuildId(None, "error.buildid", "test.buildid")
callJenkinsScript(script, tmp)
def testInvalidServer(self):
"""Test download on non-existent server"""
spec = { 'url' : "https://127.1.2.3:7257" }
archive = SimpleHttpArchive(spec)
archive.wantDownload(True)
archive.wantUpload(True)
# Local
archive.downloadPackage(b'\x00'*20, "unused", "unused", 0)
archive.downloadPackage(b'\x00'*20, "unused", "unused", 1)
self.assertEqual(archive.downloadLocalLiveBuildId(b'\x00'*20, 0), None)
# Jenkins
with TemporaryDirectory() as workspace:
with open(os.path.join(workspace, "test.buildid"), "wb") as f:
f.write(b'\x00'*20)
script = archive.download(None, "test.buildid", "result.tgz")
callJenkinsScript(script, workspace)
def set_mapper_number(manifest_file):
fastq_counts = 0
if manifest_file.startswith("s3://"):
s3 = boto3.resource("s3")
bucket_name, key_prefix = manifest_file.strip().strip("/")[5:].split("/", 1)
with tempfile.TemporaryDirectory() as tmpdirname:
s3.meta.client.download_file(bucket_name, key_prefix, tmpdirname + "/manifest")
for line in open(tmpdirname+"/manifest"):
fastq_counts += 1
else:
for line in open(manifest_file):
fastq_counts += 1
return fastq_counts
def set_mapper_number(manifest_file):
accession_counts = 0
if manifest_file.startswith("s3://"):
s3_client = boto3.resource("s3")
bucket_name, key_prefix = manifest_file.strip().strip("/")[5:].split("/", 1)
with tempfile.TemporaryDirectory() as tmpdirname:
s3_client.Object(bucket_name, key_prefix).download_file(tmpdirname+"/manifest")
for line in open(tmpdirname+"/manifest"):
accession_counts += 1
else:
for line in open(manifest_file):
accession_counts += 1
return accession_counts
def copy(contents, config=None, destination_dir=False, **kwargs):
if config is None:
config = Config(xyz='123')
with NamedTemporaryFile('w', delete=False) as tp:
tp.write(contents)
source = tp.name
if destination_dir:
with TemporaryDirectory() as destination:
path = copy_file(config, source, destination, **kwargs)
yield source, destination, path
os.remove(source)
else:
destination = source + '.copy'
path = copy_file(config, source, destination, **kwargs)
yield source, destination, path
os.remove(source)
os.remove(path)
def test_zipfile_timestamp():
# An environment variable can be used to influence the timestamp on
# TarInfo objects inside the zip. See issue #143. TemporaryDirectory is
# not a context manager under Python 3.
with temporary_directory() as tempdir:
for filename in ('one', 'two', 'three'):
path = os.path.join(tempdir, filename)
with codecs.open(path, 'w', encoding='utf-8') as fp:
fp.write(filename + '\n')
zip_base_name = os.path.join(tempdir, 'dummy')
# The earliest date representable in TarInfos, 1980-01-01
with environ('SOURCE_DATE_EPOCH', '315576060'):
zip_filename = wheel.archive.make_wheelfile_inner(
zip_base_name, tempdir)
with readable_zipfile(zip_filename) as zf:
for info in zf.infolist():
assert info.date_time[:3] == (1980, 1, 1)
def test_zipfile_timestamp():
# An environment variable can be used to influence the timestamp on
# TarInfo objects inside the zip. See issue #143. TemporaryDirectory is
# not a context manager under Python 3.
with temporary_directory() as tempdir:
for filename in ('one', 'two', 'three'):
path = os.path.join(tempdir, filename)
with codecs.open(path, 'w', encoding='utf-8') as fp:
fp.write(filename + '\n')
zip_base_name = os.path.join(tempdir, 'dummy')
# The earliest date representable in TarInfos, 1980-01-01
with environ('SOURCE_DATE_EPOCH', '315576060'):
zip_filename = wheel.archive.make_wheelfile_inner(
zip_base_name, tempdir)
with readable_zipfile(zip_filename) as zf:
for info in zf.infolist():
assert info.date_time[:3] == (1980, 1, 1)
def setUp(self):
file_path = resource_filename(Requirement.parse('search_google'), 'search_google/config.json')
with open(file_path, 'r') as in_file:
defaults = json.load(in_file)
buildargs = {
'serviceName': 'customsearch',
'version': 'v1',
'developerKey': defaults['build_developerKey']
}
cseargs = {
'q': 'google',
'num': 1,
'fileType': 'png',
'cx': defaults['cx']
}
self.results = search_google.api.results(buildargs, cseargs)
tempfile = TemporaryFile()
self.tempfile = str(tempfile.name)
tempfile.close()
self.tempdir = str(TemporaryDirectory().name)