def sqlldr(self, login, finalscript):
self._setenv()
debug("SQLLDR execution starts")
f1 = mkstemp(suffix=".ctl")
ftmp = os.fdopen(f1[0], "w")
ftmp.write(finalscript)
ftmp.close()
f2 = mkstemp(suffix=".log")
os.close(f2[0])
with TemporaryFile() as f:
p = Popen([os.path.join(self.oraclehome, 'bin', 'sqlldr'), login, "control=%s" % f1[1], "log=%s" % f2[1], "errors=0", "silent=all"], stdout=f, stderr=None, stdin=None)
p.communicate()
if p.returncode != 0:
error("SQLLDR exited with code %d" % p.returncode)
raise Exception('sqlldr', "sqlldr exited with code %d" % p.returncode)
else:
debug("SQLLDR execution successful")
os.unlink(f1[1])
os.unlink(f2[1])
python类TemporaryFile()的实例源码
def setUp(self):
tree = ArrayTree(10000, 10) # max value of 10000, each block has 10 numbers
for i in range(5000):
tree[i] = i
# Insert extra copies to test frequency
for i in range(3000):
tree[i] = i
tree.set_range(5000, 9001, 100)
tree.root.build_summary()
d = {'test': tree}
f = tempfile.TemporaryFile()
FileArrayTreeDict.dict_to_file( d, f )
f.seek(0)
self.filearraytreedict = FileArrayTreeDict(f)
self.filearraytree = self.filearraytreedict['test']
def test_has_no_name(self):
# TemporaryFile creates files with no names (on this system)
dir = tempfile.mkdtemp()
f = tempfile.TemporaryFile(dir=dir)
f.write(b'blat')
# Sneaky: because this file has no name, it should not prevent
# us from removing the directory it was created in.
try:
os.rmdir(dir)
except:
ei = sys.exc_info()
# cleanup
f.close()
os.rmdir(dir)
self.failOnException("rmdir", ei)
def download_glove(glove):
if os.path.exists(glove):
return
print('Downloading glove...')
with tempfile.TemporaryFile() as tmp:
with urllib.request.urlopen('http://nlp.stanford.edu/data/glove.42B.300d.zip') as res:
shutil.copyfileobj(res, tmp)
with zipfile.ZipFile(tmp, 'r') as glove_zip:
glove_zip.extract('glove.42B.300d.txt', path=os.path.dirname(glove))
print('Done')
test_sqlite_plugin_helper.py 文件源码
项目:PlasoScaffolder
作者: ClaudiaSaxer
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def test_FileExistsIfTrue(self):
""" test the method that checks if the file exists """
with tempfile.TemporaryDirectory() as tmpdir:
with tempfile.TemporaryFile(dir=tmpdir) as fp:
actual = self.helper.FileExists(fp.name)
self.assertTrue(actual)
def make_file(self, binary=None):
"""Overridable: return a readable & writable file.
The file will be used as follows:
- data is written to it
- seek(0)
- data is read from it
The 'binary' argument is unused -- the file is always opened
in binary mode.
This version opens a temporary file for reading and writing,
and immediately deletes (unlinks) it. The trick (on Unix!) is
that the file can still be used, but it can't be opened by
another process, and it will automatically be deleted when it
is closed or when the current process terminates.
If you want a more permanent file, you derive a class which
overrides this method. If you want a visible temporary file
that is nevertheless automatically deleted when the script
terminates, try defining a __del__ method in a derived class
which unlinks the temporary files you have created.
"""
import tempfile
return tempfile.TemporaryFile("w+b")
# Backwards Compatibility Classes
# ===============================
def __radd__(self, other):
new_file = TemporaryFile()
new_file.write(other)
self.file.seek(0)
for l in self.file:
new_file.write(l)
new_file.seek(0)
return FileString(new_file)
def safe_size(source):
"""
READ THE source UP TO SOME LIMIT, THEN COPY TO A FILE IF TOO BIG
RETURN A str() OR A FileString()
"""
if source is None:
return None
total_bytes = 0
bytes = []
b = source.read(MIN_READ_SIZE)
while b:
total_bytes += len(b)
bytes.append(b)
if total_bytes > MAX_STRING_SIZE:
try:
data = FileString(TemporaryFile())
for bb in bytes:
data.write(bb)
del bytes
del bb
b = source.read(MIN_READ_SIZE)
while b:
total_bytes += len(b)
data.write(b)
b = source.read(MIN_READ_SIZE)
data.seek(0)
Log.note("Using file of size {{length}} instead of str()", length= total_bytes)
return data
except Exception as e:
Log.error("Could not write file > {{num}} bytes", num= total_bytes, cause=e)
b = source.read(MIN_READ_SIZE)
data = b"".join(bytes)
del bytes
return data
def __radd__(self, other):
new_file = TemporaryFile()
new_file.write(other)
self.file.seek(0)
for l in self.file:
new_file.write(l)
new_file.seek(0)
return FileString(new_file)
def __init__(self, archiver, url):
self.archiver = archiver
self.tmp = TemporaryFile()
self.url = url
def test_write_read_string(self):
with tempfile.TemporaryFile() as f:
value = u'test'
write_string(f, value)
f.seek(0)
self.assertEqual(read_string(f), value)
def test_write_read_longstring(self):
with tempfile.TemporaryFile() as f:
value = u'test'
write_longstring(f, value)
f.seek(0)
self.assertEqual(read_longstring(f), value)
def test_write_read_stringmap(self):
with tempfile.TemporaryFile() as f:
value = {'key': 'value'}
write_stringmap(f, value)
f.seek(0)
self.assertEqual(read_stringmap(f), value)
def test_write_read_inet(self):
with tempfile.TemporaryFile() as f:
value = ('192.168.1.1', 9042)
write_inet(f, value)
f.seek(0)
self.assertEqual(read_inet(f), value)
with tempfile.TemporaryFile() as f:
value = ('2001:db8:0:f101::1', 9042)
write_inet(f, value)
f.seek(0)
self.assertEqual(read_inet(f), value)
def default_stream_factory(total_content_length, filename, content_type,
content_length=None):
"""The stream factory that is used per default."""
if total_content_length > 1024 * 500:
return TemporaryFile('wb+')
return BytesIO()
def test_cli_error(self):
cmd = [
sys.executable, "htsget_dev.py", TestRequestHandler.ticket_url + "XXX",
"-O", self.output_file]
with tempfile.TemporaryFile("wb+") as stderr, \
tempfile.TemporaryFile("wb+") as stdout:
ret = subprocess.call(cmd, stderr=stderr, stdout=stdout)
self.assertEqual(ret, 1)
stderr.seek(0)
stdout.seek(0)
self.assertGreater(len(stderr.read()), 0)
self.assertEqual(len(stdout.read()), 0)
def test_bad_scheme(self):
with tempfile.TemporaryFile("w+") as temp_file:
for bad_scheme in ["htt://as", "file:///home", "ftp://x.y/sdf"]:
ticket = get_ticket(urls=[
get_http_ticket("http://a.b"),
get_http_ticket("htp")])
dm = StoringUrlsDownloadManager(ticket, temp_file)
self.assertRaises(ValueError, dm.run)
def test_basic_http_parsing(self):
headers = {"a": "b", "b": "c"}
ticket = get_ticket(urls=[get_http_ticket(EXAMPLE_URL, headers)])
with tempfile.TemporaryFile("w+") as temp_file:
dm = StoringUrlsDownloadManager(ticket, temp_file)
dm.run()
self.assertEqual(dm.stored_urls[0], (EXAMPLE_URL, headers))
def test_basic_data_uri_parsing(self):
data_uri = "data:application/vnd.ga4gh.bam;base64,SGVsbG8sIFdvcmxkIQ=="
ticket = get_ticket(urls=[get_data_uri_ticket(data_uri)])
with tempfile.TemporaryFile("w+") as temp_file:
dm = StoringUrlsDownloadManager(ticket, temp_file)
dm.run()
self.assertEqual(dm.stored_urls[0], urlparse(data_uri))
def test_num_retries(self):
ticket = get_ticket(urls=[get_http_ticket(EXAMPLE_URL)])
with tempfile.TemporaryFile("w+") as temp_file:
for num_retries in range(10):
with mock.patch("time.sleep") as mock_sleep, \
mock.patch("logging.warning") as mock_warning:
dm = RetryCountDownloadManager(
ticket, temp_file, max_retries=num_retries)
self.assertEqual(dm.max_retries, num_retries)
self.assertRaises(exceptions.RetryableError, dm.run)
self.assertEqual(dm.attempt_counts[EXAMPLE_URL], num_retries + 1)
self.assertEqual(mock_sleep.call_count, num_retries)
self.assertEqual(mock_warning.call_count, num_retries)