def mkstemp(suffix=None, prefix=None, dir=None, text=False):
"""
Args:
suffix (`pathlike` or `None`): suffix or `None` to use the default
prefix (`pathlike` or `None`): prefix or `None` to use the default
dir (`pathlike` or `None`): temp dir or `None` to use the default
text (bool): if the file should be opened in text mode
Returns:
Tuple[`int`, `fsnative`]:
A tuple containing the file descriptor and the file path
Raises:
EnvironmentError
Like :func:`python3:tempfile.mkstemp` but always returns a `fsnative`
path.
"""
suffix = fsnative() if suffix is None else path2fsn(suffix)
prefix = gettempprefix() if prefix is None else path2fsn(prefix)
dir = gettempdir() if dir is None else path2fsn(dir)
return tempfile.mkstemp(suffix, prefix, dir, text)
python类gettempdir()的实例源码
def mkdtemp(suffix=None, prefix=None, dir=None):
"""
Args:
suffix (`pathlike` or `None`): suffix or `None` to use the default
prefix (`pathlike` or `None`): prefix or `None` to use the default
dir (`pathlike` or `None`): temp dir or `None` to use the default
Returns:
`fsnative`: A path to a directory
Raises:
EnvironmentError
Like :func:`python3:tempfile.mkstemp` but always returns a `fsnative` path.
"""
suffix = fsnative() if suffix is None else path2fsn(suffix)
prefix = gettempprefix() if prefix is None else path2fsn(prefix)
dir = gettempdir() if dir is None else path2fsn(dir)
return tempfile.mkdtemp(suffix, prefix, dir)
def runTest(self):
"""The actual test goes here."""
if os.system(
"ruby --help > %s" %
os.path.join(
tempfile.gettempdir(),
tempfile.gettempprefix()
)
):
self.stop()
raise RuntimeError("""Can't find the "ruby" command.""")
self.reportProgres()
if not os.path.exists("py2rb/builtins/module.rb"):
self.stop()
raise RuntimeError("""Can't find the "py2rb/builtins/module.rb" command.""")
if not os.path.exists("py2rb/builtins/using.rb"):
self.stop()
raise RuntimeError("""Can't find the "py2rb/builtins/using.rb" command.""")
if not os.path.exists("py2rb/builtins/require.rb"):
self.stop()
raise RuntimeError("""Can't find the "py2rb/builtins/require.rb" command.""")
self.reportProgres()
def serve(content):
"""Write content to a temp file and serve it in browser"""
temp_folder = tempfile.gettempdir()
temp_file_name = tempfile.gettempprefix() + str(uuid.uuid4()) + ".html"
# Generate a file path with a random name in temporary dir
temp_file_path = os.path.join(temp_folder, temp_file_name)
# save content to temp file
save(temp_file_path, content)
# Open templfile in a browser
webbrowser.open("file://{}".format(temp_file_path))
# Block the thread while content is served
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
# cleanup the temp file
os.remove(temp_file_path)
def download_workflow(url):
"""Download workflow at ``url`` to a local temporary file.
:param url: URL to .alfredworkflow file in GitHub repo
:returns: path to downloaded file
"""
filename = url.split("/")[-1]
if (not filename.endswith('.alfredworkflow') and
not filename.endswith('.alfred3workflow')):
raise ValueError('Attachment `{0}` not a workflow'.format(filename))
local_path = os.path.join(tempfile.gettempdir(), filename)
wf().logger.debug(
'Downloading updated workflow from `%s` to `%s` ...', url, local_path)
response = web.get(url)
with open(local_path, 'wb') as output:
output.write(response.content)
return local_path
def load_names_data():
fp = os.path.join(tempfile.gettempdir(), ZIP_NAME)
if not os.path.exists(fp):
r = requests.get(URL_NAMES)
with open(fp, 'wb') as f:
f.write(r.content)
post = collections.OrderedDict()
with zipfile.ZipFile(fp) as zf:
# get ZipInfo instances
for zi in sorted(zf.infolist(), key=lambda zi: zi.filename):
fn = zi.filename
if fn.startswith('yob'):
year = int(fn[3:7])
df = pd.read_csv(
zf.open(zi),
header=None,
names=('name', 'gender', 'count'))
df['year'] = year
post[year] = df
df = pd.concat(post.values())
df.set_index('name', inplace=True, drop=True)
return df
def render_xlsx(cls):
'''
Render the df to a xlsx file.
'''
# load data
df = sample_names_data()
# build presentation model
pm = tc.build_presentation_model(df=df, output_format='xlsx')
# render to xlsx
tempdir = tempfile.gettempdir()
fp = os.path.join(tempdir, 'example1.xlsx')
layout = [pm]
print('Writing to ' + fp)
xlsxw.XLSXWriter.to_xlsx(layout, output_fp=fp)
# end_XLSXExample1
# start_XLSXExample2
def load_names_data():
fp = os.path.join(tempfile.gettempdir(), ZIP_NAME)
if not os.path.exists(fp):
r = requests.get(URL_NAMES)
with open(fp, 'wb') as f:
f.write(r.content)
post = collections.OrderedDict()
with zipfile.ZipFile(fp) as zf:
# get ZipInfo instances
for zi in sorted(zf.infolist(), key=lambda zi: zi.filename):
fn = zi.filename
if fn.startswith('yob'):
year = int(fn[3:7])
df = pd.read_csv(
zf.open(zi),
header=None,
names=('name', 'gender', 'count'))
df['year'] = year
post[year] = df
df = pd.concat(post.values())
df.set_index('name', inplace=True, drop=True)
return df
def render_html(cls):
# load data
df = load_names_data()
df = df[:100]
# build presentation model
pm = tc.build_presentation_model(df=df, output_format='html')
# render to xlsx
tempdir = tempfile.gettempdir()
fp = os.path.join(tempdir, 'example_1.html')
layout = [pm]
print('Writing to ' + fp)
html = htmlw.HTMLWriter.to_html(layout, border=1)
output_fp = os.path.join(
tempfile.gettempdir(),
'example1.html')
with open(output_fp, 'w') as f:
f.write(html)
# end_HTMLExample1
# start_HTMLExample2
def compile_so(libs):
# I don't know how else to find these .so files other than just asking clang
# to make a .so file out of all of them
clang = os.getenv('CLANG', 'clang')
tempdir = tempfile.gettempdir()
libname = '.'.join(sorted(libs))
target = join(tempdir, 'lib' + libname + '.so')
if not os.path.exists(target):
libs = ['-l' + lib for lib in libs]
flags = ['-shared']
cmd = [clang, '-o', target] + flags + libs
subprocess.check_call(cmd)
return target
def _find_grail_rc(self):
import glob
import pwd
import socket
import tempfile
tempdir = os.path.join(tempfile.gettempdir(),
".grail-unix")
user = pwd.getpwuid(os.getuid())[0]
filename = os.path.join(tempdir, user + "-*")
maybes = glob.glob(filename)
if not maybes:
return None
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
for fn in maybes:
# need to PING each one until we find one that's live
try:
s.connect(fn)
except socket.error:
# no good; attempt to clean it out, but don't fail:
try:
os.unlink(fn)
except IOError:
pass
else:
return s
def cmdargs_for_style(self, formatstyle, filename=None):
# type: (Style, Optional[str]) -> List[str]
assert isinstance(formatstyle, Style)
configdata = bytestr(self.styletext(formatstyle))
sha = shahex(configdata)
cfg = os.path.join(tempfile.gettempdir(), 'whatstyle_uncrustify_%s.cfg' % sha)
if not self.tempfile_exists(cfg):
writebinary(cfg, configdata)
self.add_tempfile(cfg)
cmdargs = ['-c', cfg]
# The filename extension might be ambiguous so we choose from the languages
# registered in identify_language.
if self.languages:
lang = self.languages[0]
cmdargs.extend(['-l', lang])
return cmdargs
def cmdargs_for_style(self, formatstyle, filename=None):
# type: (Style, Optional[str]) -> List[str]
assert isinstance(formatstyle, Style)
configdata = bytestr(self.styletext(formatstyle))
sha = shahex(configdata)
cfg = os.path.join(tempfile.gettempdir(),
'whatstyle_rustfmt_%s/%s' % (sha, self.configfilename))
try:
dirpath = os.path.dirname(cfg)
os.makedirs(dirpath)
self.add_tempfile(dirpath)
except OSError as exc:
if exc.errno != errno.EEXIST:
raise
if not self.tempfile_exists(cfg):
writebinary(cfg, configdata)
self.add_tempfile(cfg)
cmdargs = ['--config-path', cfg]
return cmdargs
def fromrepo(cls, path=None):
repo = cls()
if path is None:
path = Repo.findparent(getcwd())
if path is None:
error(
"Could not find mbed program in current path \"%s\".\n"
"You can fix this by calling \"mbed new .\" or \"mbed config root .\" in the root of your program." % getcwd())
repo.path = os.path.abspath(path)
repo.name = os.path.basename(repo.path)
cache_cfg = Global().get_cfg('CACHE', '')
if cache_repositories and cache_cfg and cache_cfg != 'none' and cache_cfg != 'off' and cache_cfg != 'disabled':
loc = cache_cfg if (cache_cfg and cache_cfg != 'on' and cache_cfg != 'enabled') else None
repo.cache = loc or os.path.join(tempfile.gettempdir(), 'mbed-repo-cache')
repo.sync()
if repo.scm is None:
warning(
"Program \"%s\" in \"%s\" does not use source control management.\n"
"To fix this you should use \"mbed new .\" in the root of your program." % (repo.name, repo.path))
return repo
def __init__(self, max_age):
"""Constructor.
Args:
max_age: Cache expiration in seconds.
"""
self._max_age = max_age
self._file = os.path.join(tempfile.gettempdir(), FILENAME)
f = LockedFile(self._file, 'a+', 'r')
try:
f.open_and_lock()
if f.is_locked():
_read_or_initialize_cache(f)
# If we can not obtain the lock, other process or thread must
# have initialized the file.
except Exception as e:
logging.warning(e, exc_info=True)
finally:
f.unlock_and_close()
def compare_component_output(self, input_path, expected_output_path):
rendering_engine = self.get_rendering_engine()
temp_dir = tempfile.gettempdir()
output_dir = os.path.join(temp_dir, str(uuid.uuid4()))
process_sketch_archive(zip_path=input_path, compress_zip=False,
output_path=output_dir, engine=rendering_engine)
self.assertTrue(dircmp.is_same(expected_output_path, output_dir))
shutil.rmtree(output_dir)
storage.clear()
output_zip = os.path.join(temp_dir, "{}.zip".format(str(uuid.uuid4())))
process_sketch_archive(zip_path=input_path, compress_zip=True,
output_path=output_zip, engine=rendering_engine)
z = zipfile.ZipFile(output_zip)
z.extractall(output_dir)
self.assertTrue(dircmp.is_same(expected_output_path, output_dir))
shutil.rmtree(output_dir)
os.remove(output_zip)
def create_local_temp_dir(name, directory=None):
"""
Create a directory for storing temporary files needed for testing neo
If directory is None or not specified, automatically create the directory
in {tempdir}/files_for_testing_neo on linux/unix/mac or
{tempdir}\files_for_testing_neo on windows, where {tempdir} is the system
temporary directory returned by tempfile.gettempdir().
"""
if directory is None:
directory = os.path.join(tempfile.gettempdir(),
'files_for_testing_neo')
if not os.path.exists(directory):
os.mkdir(directory)
directory = os.path.join(directory, name)
if not os.path.exists(directory):
os.mkdir(directory)
return directory
def create_local_temp_dir(name, directory=None):
"""
Create a directory for storing temporary files needed for testing neo
If directory is None or not specified, automatically create the directory
in {tempdir}/files_for_testing_neo on linux/unix/mac or
{tempdir}\files_for_testing_neo on windows, where {tempdir} is the system
temporary directory returned by tempfile.gettempdir().
"""
if directory is None:
directory = os.path.join(tempfile.gettempdir(),
'files_for_testing_neo')
if not os.path.exists(directory):
os.mkdir(directory)
directory = os.path.join(directory, name)
if not os.path.exists(directory):
os.mkdir(directory)
return directory
def activate_debug(module):
"""
activates the given module for debugging
:param module: the module name to activate
:return: None
"""
module = str(module)
if module not in AVAILABLE_DEBUG_MODULES:
print_error("debug module '{}' is unknown, cannot activate it".format(module))
return
if module in ACTIVATED_DEBUG_MODULES:
print_error("debug module '{}' is already active".format(module))
return
# if this is the first activation set LOGFILE and print it
if not ACTIVATED_DEBUG_MODULES:
import os
global LOGFILE
LOGFILE = str(os.path.join(tempfile.gettempdir(), "outis.log"))
print_message("DEBUGGING is active, writing to debug file " + str(LOGFILE))
# add module to ACTIVATED_DEBUG_MODULES
ACTIVATED_DEBUG_MODULES.append(module)
def main():
args = parse_args()
logging.basicConfig(level=(logging.WARN if args.quiet else logging.INFO))
# Don't allow more than 10 concurrent requests to the wayback machine
concurrency = min(args.concurrency, 10)
# Scrape results are stored in a temporary folder if no folder specified
target_folder = args.target_folder if args.target_folder else tempfile.gettempdir()
logger.info('Writing scrape results in the folder {target_folder}'.format(target_folder=target_folder))
# Parse the period entered by the user (throws an exception if the dates are not correctly formatted)
from_date = datetime.strptime(args.from_date, CLI_DATE_FORMAT)
to_date = datetime.strptime(args.to_date, CLI_DATE_FORMAT)
# The scraper downloads the elements matching the given xpath expression in the target folder
scraper = Scraper(target_folder, args.xpath)
# Launch the scraping using the scraper previously instantiated
scrape_archives(args.website_url, scraper.scrape, from_date, to_date, args.user_agent, timedelta(days=args.delta),
concurrency)
def update_sideshow_file(fname,
server_fname,
server=SIDESHOW_SERVER,
temp_path=gettempdir()):
"""
Update the JPL side show file stored locally at *fname*. The
remote file is accessed via FTP on *server* at *server_fname*. The
path *temp_path* is used to store intermediate files. Return
*fname*.
"""
dest_fname = replace_path(temp_path, server_fname)
logger.info('opening connection to {}'.format(server))
with closing(FTP(server)) as ftp, open(dest_fname, 'w') as fid:
logger.info('logging in')
ftp.login()
logger.info('writing to {}'.format(dest_fname))
ftp.retrbinary('RETR ' + server_fname, fid.write)
logger.info('uncompressing file to {}'.format(fname))
with GzipFile(dest_fname) as gzip_fid, open(fname, 'w') as fid:
fid.write(gzip_fid.read())
return fname
def kernel():
"""Create a ipykernel conda environment separate from this test
environment where jupyter console is installed.
The environments must be separate otherwise we cannot easily check
if kernel start is activating the environment or if it was already
active when the test suite started.
"""
# unique name for the kernel and environment
name = str(uuid4())
env_path = '{}/kernel-env-{name}'.format(gettempdir(), name=name)
pexpect.run('/bin/bash -c "conda create -y -p {env_path} ipykernel && \
source activate {env_path} && \
python -m ipykernel install --user \
--name {name}"'.format(env_path=env_path, name=name))
# query jupyter for the user data directory in a separate command to
# make parsing easier
stdout = pexpect.run('jupyter --data-dir')
user_path = stdout.decode('utf-8').strip()
# the kernel spec resides in the jupyter user data path
spec_path = os.path.join(user_path, 'kernels', name)
yield Kernel(name, os.path.join(spec_path, 'kernel.json'), env_path)
shutil.rmtree(env_path)
def download_workflow(url):
"""Download workflow at ``url`` to a local temporary file.
:param url: URL to .alfredworkflow file in GitHub repo
:returns: path to downloaded file
"""
filename = url.split("/")[-1]
if (not filename.endswith('.alfredworkflow') and
not filename.endswith('.alfred3workflow')):
raise ValueError('Attachment `{0}` not a workflow'.format(filename))
local_path = os.path.join(tempfile.gettempdir(), filename)
wf().logger.debug(
'Downloading updated workflow from `%s` to `%s` ...', url, local_path)
response = web.get(url)
with open(local_path, 'wb') as output:
output.write(response.content)
return local_path
def _get_storage_path(cls):
try:
return cls._storage_path
except AttributeError:
storage_path = getattr(settings, "SESSION_FILE_PATH", None)
if not storage_path:
storage_path = tempfile.gettempdir()
# Make sure the storage path is valid.
if not os.path.isdir(storage_path):
raise ImproperlyConfigured(
"The session storage path %r doesn't exist. Please set your"
" SESSION_FILE_PATH setting to an existing directory in which"
" Django can store session data." % storage_path)
cls._storage_path = storage_path
return storage_path
def _get_storage_path(cls):
try:
return cls._storage_path
except AttributeError:
storage_path = getattr(settings, "SESSION_FILE_PATH", None)
if not storage_path:
storage_path = tempfile.gettempdir()
# Make sure the storage path is valid.
if not os.path.isdir(storage_path):
raise ImproperlyConfigured(
"The session storage path %r doesn't exist. Please set your"
" SESSION_FILE_PATH setting to an existing directory in which"
" Django can store session data." % storage_path)
cls._storage_path = storage_path
return storage_path
def _should_continue():
min_check_interval = 3600 * 24
update_timestamp_file = os.path.join(tempfile.gettempdir(), 'uavcan_gui_tool', 'update_check_timestamp')
try:
with open(update_timestamp_file, 'r') as f:
update_check_timestamp = float(f.read().strip())
except Exception:
logger.debug('Update timestamp file could not be read', exc_info=True)
update_check_timestamp = 0
if (time.time() - update_check_timestamp) < min_check_interval:
return False
try:
os.makedirs(os.path.dirname(update_timestamp_file))
except Exception:
pass # Nobody cares.
with open(update_timestamp_file, 'w') as f:
f.write(str(time.time()))
return True
def setUp(self):
self.tmp_dir = tempfile.gettempdir()
self.template_name = 'good_template.html'
self.bad_string = '<% name="Jazzar" My name is ${name}.'
self.template_string = '<% name="Jazzar" %> My name is ${name}.'
tmp_template = os.path.join(self.tmp_dir, self.template_name)
with open(tmp_template, 'w') as f:
f.write(self.template_string)
parameters = {
'NAME': 'mako',
'DIRS': [self.tmp_dir],
'APP_DIRS': False,
'OPTIONS': {}
}
self.mako_backend = MakoBackend(parameters)
def __init__(self):
self.DUMMY_FILE = '{0}/dummy.hosts'.format(tempfile.gettempdir())
pass
# def generate_hostname(self, minlen=5, maxlen=20):
# """
# Generate a random hostname between minlen and maxlen, appending
# '.example.net' (eg. oskpijsrss7.example.net)
#
# :param minlen: minimum length of the hostname (default: 5)
#
# :param maxlen: maximum length of the hostname (default: 20)
# """
#
# len = random.randint(minlen, maxlen)
#
# host_str = ''.join(
# random.choice(string.ascii_lowercase) for _ in range(len -1))
#
# host_str = '{0}{1}.example.net'.format(
# host_str, random.choice(string.digits))
#
# return host_str
def __init__(self, max_age):
"""Constructor.
Args:
max_age: Cache expiration in seconds.
"""
self._max_age = max_age
self._file = os.path.join(tempfile.gettempdir(), FILENAME)
f = LockedFile(self._file, 'a+', 'r')
try:
f.open_and_lock()
if f.is_locked():
_read_or_initialize_cache(f)
# If we can not obtain the lock, other process or thread must
# have initialized the file.
except Exception as e:
LOGGER.warning(e, exc_info=True)
finally:
f.unlock_and_close()
def download_workflow(url):
"""Download workflow at ``url`` to a local temporary file.
:param url: URL to .alfredworkflow file in GitHub repo
:returns: path to downloaded file
"""
filename = url.split("/")[-1]
if (not filename.endswith('.alfredworkflow') and
not filename.endswith('.alfred3workflow')):
raise ValueError('Attachment `{0}` not a workflow'.format(filename))
local_path = os.path.join(tempfile.gettempdir(), filename)
wf().logger.debug(
'Downloading updated workflow from `%s` to `%s` ...', url, local_path)
response = web.get(url)
with open(local_path, 'wb') as output:
output.write(response.content)
return local_path