def cmdDiffFolder( self, folder, head=False ):
self.debugLog( 'cmdDiffFolder( %r )' % (folder,) )
abs_folder = self.pathForSvn( folder )
if head:
old_rev = self.svn_rev_head
else:
old_rev = self.svn_rev_base
diff_text = self.client().diff(
tempfile.gettempdir(),
abs_folder, old_rev,
abs_folder, self.svn_rev_working,
recurse=True,
relative_to_dir=str( self.projectPath() ),
use_git_diff_format=True
)
return diff_text
python类gettempdir()的实例源码
def _get_storage_path(cls):
try:
return cls._storage_path
except AttributeError:
storage_path = getattr(settings, "SESSION_FILE_PATH", None)
if not storage_path:
storage_path = tempfile.gettempdir()
# Make sure the storage path is valid.
if not os.path.isdir(storage_path):
raise ImproperlyConfigured(
"The session storage path %r doesn't exist. Please set your"
" SESSION_FILE_PATH setting to an existing directory in which"
" Django can store session data." % storage_path)
cls._storage_path = storage_path
return storage_path
def download_workflow(url):
"""Download workflow at ``url`` to a local temporary file.
:param url: URL to .alfredworkflow file in GitHub repo
:returns: path to downloaded file
"""
filename = url.split('/')[-1]
if (not filename.endswith('.alfredworkflow') and
not filename.endswith('.alfred3workflow')):
raise ValueError('attachment not a workflow: {0}'.format(filename))
local_path = os.path.join(tempfile.gettempdir(), filename)
wf().logger.debug(
'downloading updated workflow from `%s` to `%s` ...', url, local_path)
response = web.get(url)
with open(local_path, 'wb') as output:
output.write(response.content)
return local_path
def __init__(self):
self._target_size = 10
logging.info("loading minst data")
path = os.path.join(tempfile.gettempdir(), "mnist.pkl.gz")
if not os.path.exists(path):
logging.info("downloading minst data")
urllib.urlretrieve (MNIST_URL, path)
self._train_set, self._valid_set, self._test_set = cPickle.load(gzip.open(path, 'rb'))
# Moving validation examples to training set, leaving 1000
train_set_x = np.vstack((self._train_set[0], self._valid_set[0][:-1000]))
train_set_y = np.hstack((self._train_set[1], self._valid_set[1][:-1000]))
valid_set_x = self._valid_set[0][-1000:]
valid_set_y = self._valid_set[1][-1000:]
self._train_set = (train_set_x, train_set_y)
self._valid_set = (valid_set_x, valid_set_y)
logging.info("[mnist small validation] training data size: %d" % len(self._train_set[0]))
logging.info("[mnist small validation] valid data size: %d" % len(self._valid_set[0]))
logging.info("[mnist small validation] test data size: %d" % len(self._test_set[0]))
def ssl_authenticator():
try:
import pupy_credentials
keystr=pupy_credentials.SSL_BIND_KEY
certstr=pupy_credentials.SSL_BIND_CERT
except:
keystr=DEFAULT_SSL_BIND_KEY
certstr=DEFAULT_SSL_BIND_CERT
key_path=None
cert_path=None
if os.path.isfile("pupy.conf"):
config = configparser.ConfigParser()
config.read("pupy.conf")
key_path=config.get("pupyd","keyfile").replace("\\",os.sep).replace("/",os.sep)
cert_path=config.get("pupyd","certfile").replace("\\",os.sep).replace("/",os.sep)
else:
tmpdir=tempfile.gettempdir()
cert_path=os.path.join(tmpdir, ''.join(random.choice(string.lowercase+string.digits) for _ in range(random.randint(5,8))))
key_path=os.path.join(tmpdir,''.join(random.choice(string.lowercase+string.digits) for _ in range(random.randint(5,8))))
with open(cert_path,'wb') as f:
f.write(certstr.strip())
with open(key_path,'wb') as f:
f.write(keystr.strip())
return SSLAuthenticator(key_path, cert_path, ciphers="SHA256+AES256:SHA1+AES256:@STRENGTH")
def download_workflow(url):
"""Download workflow at ``url`` to a local temporary file.
:param url: URL to .alfredworkflow file in GitHub repo
:returns: path to downloaded file
"""
filename = url.split("/")[-1]
if (not url.endswith('.alfredworkflow') or
not filename.endswith('.alfredworkflow')):
raise ValueError('Attachment `{0}` not a workflow'.format(filename))
local_path = os.path.join(tempfile.gettempdir(), filename)
wf().logger.debug(
'Downloading updated workflow from `%s` to `%s` ...', url, local_path)
response = web.get(url)
with open(local_path, 'wb') as output:
output.write(response.content)
return local_path
def test_profiler(self):
self.do_computation()
profilers = self.sc.profiler_collector.profilers
self.assertEqual(1, len(profilers))
id, profiler, _ = profilers[0]
stats = profiler.stats()
self.assertTrue(stats is not None)
width, stat_list = stats.get_print_list([])
func_names = [func_name for fname, n, func_name in stat_list]
self.assertTrue("heavy_foo" in func_names)
old_stdout = sys.stdout
sys.stdout = io = StringIO()
self.sc.show_profiles()
self.assertTrue("heavy_foo" in io.getvalue())
sys.stdout = old_stdout
d = tempfile.gettempdir()
self.sc.dump_profiles(d)
self.assertTrue("rdd_%d.pstats" % id in os.listdir(d))
def _get_pandoc_version(pandoc_path):
new_env = os.environ.copy()
if 'HOME' not in os.environ:
new_env['HOME'] = tempfile.gettempdir()
p = subprocess.Popen(
[pandoc_path, '--version'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
env=new_env)
comm = p.communicate()
out_lines = comm[0].decode().splitlines(False)
if p.returncode != 0 or len(out_lines) == 0:
raise RuntimeError("Couldn't call pandoc to get version information. Output from "
"pandoc:\n%s" % str(comm))
version_pattern = re.compile(r"^\d+(\.\d+){1,}$")
for tok in out_lines[0].split():
if version_pattern.match(tok):
version = tok
break
return version
def configure(dir=None, format_strs=None):
if dir is None:
dir = os.getenv('OPENAI_LOGDIR')
if dir is None:
dir = osp.join(tempfile.gettempdir(),
datetime.datetime.now().strftime("openai-%Y-%m-%d-%H-%M-%S-%f"))
assert isinstance(dir, str)
os.makedirs(dir, exist_ok=True)
if format_strs is None:
strs = os.getenv('OPENAI_LOG_FORMAT')
format_strs = strs.split(',') if strs else LOG_OUTPUT_FORMATS
output_formats = [make_output_format(f, dir) for f in format_strs]
Logger.CURRENT = Logger(dir=dir, output_formats=output_formats)
log('Logging to %s'%dir)
def main():
cachePath = os.path.join(tempfile.gettempdir(), CACHEFILE)
cache = loadCache(cachePath, BASE64)
chainInfo = rpcRetrieve('getblockchaininfo')
bestHash = chainInfo['bestblockhash']
height = int(chainInfo['blocks'])
bip9forks = chainInfo['bip9_softforks']
print(formatWelcome(cache, WINDOW, bestHash, height,
F(chainInfo['difficulty']), bip9forks, THRESHOLD))
if cache.height == 0:
print('Please wait while retrieving latest block versions and caching them...\n')
if len(cache.hashes) < 1 or cache.hashes[0] != bestHash:
retrieveBlock = lambda h: rpcRetrieve('getblock', h)
cache = updateCache(cache, WINDOW, HASHES_SIZE, bestHash, height, retrieveBlock)
saveCache(cache, cachePath, BASE64)
print(formatAllData(cache, bip9forks, THRESHOLD, WINDOW))
def download_and_save_icons(dist_dir):
saved_icons = dict()
temp_dir = tempfile.gettempdir()
temp_path = download_icons(AWS_ICON_DOWNLOAD_URL, os.path.join(temp_dir, 'icons.zip'))
for image, friendly_name in get_images_from_archive_file(temp_path):
emoji_icon = emojinize_image(image)
dist_path = os.path.join(
dist_dir,
'{name}.{ext}'.format(name=friendly_name, ext='png'),
)
emoji_icon.save(dist_path, 'PNG', optimize=True)
logging.info("Save emoji: %s", dist_path)
saved_icons[friendly_name] = dist_path
return saved_icons
def _get_storage_path(cls):
try:
return cls._storage_path
except AttributeError:
storage_path = getattr(settings, "SESSION_FILE_PATH", None)
if not storage_path:
storage_path = tempfile.gettempdir()
# Make sure the storage path is valid.
if not os.path.isdir(storage_path):
raise ImproperlyConfigured(
"The session storage path %r doesn't exist. Please set your"
" SESSION_FILE_PATH setting to an existing directory in which"
" Django can store session data." % storage_path)
cls._storage_path = storage_path
return storage_path
def download_workflow(url):
"""Download workflow at ``url`` to a local temporary file.
:param url: URL to .alfredworkflow file in GitHub repo
:returns: path to downloaded file
"""
filename = url.split("/")[-1]
if (not filename.endswith('.alfredworkflow') and
not filename.endswith('.alfred3workflow')):
raise ValueError('Attachment `{0}` not a workflow'.format(filename))
local_path = os.path.join(tempfile.gettempdir(), filename)
wf().logger.debug(
'Downloading updated workflow from `%s` to `%s` ...', url, local_path)
response = web.get(url)
with open(local_path, 'wb') as output:
output.write(response.content)
return local_path
def download_workflow(url):
"""Download workflow at ``url`` to a local temporary file.
:param url: URL to .alfredworkflow file in GitHub repo
:returns: path to downloaded file
"""
filename = url.split("/")[-1]
if (not url.endswith('.alfredworkflow') or
not filename.endswith('.alfredworkflow')):
raise ValueError('Attachment `{0}` not a workflow'.format(filename))
local_path = os.path.join(tempfile.gettempdir(), filename)
wf().logger.debug(
'Downloading updated workflow from `%s` to `%s` ...', url, local_path)
response = web.get(url)
with open(local_path, 'wb') as output:
output.write(response.content)
return local_path
def download_workflow(url):
"""Download workflow at ``url`` to a local temporary file.
:param url: URL to .alfredworkflow file in GitHub repo
:returns: path to downloaded file
"""
filename = url.split('/')[-1]
if (not filename.endswith('.alfredworkflow') and
not filename.endswith('.alfred3workflow')):
raise ValueError('attachment not a workflow: {0}'.format(filename))
local_path = os.path.join(tempfile.gettempdir(), filename)
wf().logger.debug(
'downloading updated workflow from `%s` to `%s` ...', url, local_path)
response = web.get(url)
with open(local_path, 'wb') as output:
output.write(response.content)
return local_path
def prepare_test(test_type):
"""prepare the geodatabase data for running tests on"""
cfg = TEST_CONFIG[test_type]
test_type = cfg['name']
out_report_folder = os.path.join(tempfile.gettempdir(), test_type)
if not os.path.exists(out_report_folder):
os.mkdir(out_report_folder)
if arcpy_found:
xml_schema = cfg['xml_schema']
in_gdb = arcpy.CreateFileGDB_management(out_report_folder, test_type).getOutput(0)
arcpy.ImportXMLWorkspaceDocument_management(in_gdb, xml_schema, "SCHEMA_ONLY")
else:
in_gdb = cfg['ogr_geodatabase']
json_results = cfg['json_results']
return (in_gdb, out_report_folder, json_results)
#adding the project folder to support running test files individually and from the IDE
def zip_folder(folder_path):
"""
Zip the entire folder and return a file to the zip. Use this inside
a "with" statement to cleanup the zipfile after it is used.
:param folder_path:
:return: Name of the zipfile
"""
filename = os.path.join(
tempfile.gettempdir(), "data-" + uuid.uuid4().hex)
zipfile_name = make_zip(filename, folder_path)
try:
yield zipfile_name
finally:
if os.path.exists(zipfile_name):
os.remove(zipfile_name)
def _find_grail_rc(self):
import glob
import pwd
import socket
import tempfile
tempdir = os.path.join(tempfile.gettempdir(),
".grail-unix")
user = pwd.getpwuid(os.getuid())[0]
filename = os.path.join(tempdir, user + "-*")
maybes = glob.glob(filename)
if not maybes:
return None
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
for fn in maybes:
# need to PING each one until we find one that's live
try:
s.connect(fn)
except socket.error:
# no good; attempt to clean it out, but don't fail:
try:
os.unlink(fn)
except IOError:
pass
else:
return s
def rtask_save_proc(task=None):
import os
import tempfile
# save data in /tmp/tickdata
with open(os.path.join(os.sep, tempfile.gettempdir(), 'tickdata'), 'w') as fd:
while True:
i, n = yield task.receive()
if n is None:
break
fd.write('%s: %s\n' % (i, n))
raise StopIteration(0)
# This task runs on client. It gets trend messages from remote task that
# computes moving window average.
def rtask_save_proc(task=None):
import os
import tempfile
# save data in /tmp/tickdata
with open(os.path.join(os.sep, tempfile.gettempdir(), 'tickdata'), 'w') as fd:
while True:
i, n = yield task.receive()
if n is None:
break
fd.write('%s: %s\n' % (i, n))
raise StopIteration(0)
# This task runs on client. It gets trend messages from remote task that
# computes moving window average.