def rclone_move_command(local, remote, transfers, checkers, bwlimit, excludes, chunk_size, dry_run):
upload_cmd = 'rclone move %s %s' \
' --delete-after' \
' --no-traverse' \
' --stats=60s' \
' -v' \
' --transfers=%d' \
' --checkers=%d' \
' --drive-chunk-size=%s' % \
(cmd_quote(local), cmd_quote(remote), transfers, checkers, chunk_size)
if bwlimit and len(bwlimit):
upload_cmd += ' --bwlimit="%s"' % bwlimit
for item in excludes:
upload_cmd += ' --exclude="%s"' % item
if dry_run:
upload_cmd += ' --dry-run'
return upload_cmd
python类quote()的实例源码
def remove_empty_directories(config, force_dry_run=False):
open_files = opened_files(config['local_folder'], config['lsof_excludes'])
if not len(open_files):
clearing = False
for dir, depth in config['rclone_remove_empty_on_upload'].items():
if os.path.exists(dir):
clearing = True
logger.debug("Removing empty directories from %r with mindepth %r", dir, depth)
cmd = 'find %s -mindepth %d -type d -empty' % (cmd_quote(dir), depth)
if not config['dry_run'] and not force_dry_run:
cmd += ' -delete'
run_command(cmd)
if clearing:
logger.debug("Finished clearing empty directories")
else:
logger.debug("Skipped removing empty directories because %d files are currently open: %r", len(open_files),
open_files)
############################################################
# CONFIG STUFF
############################################################
def test_activate(monkeypatch):
can_connect_args = _monkeypatch_can_connect_to_socket_to_succeed(monkeypatch)
def activate_redis_url(dirname):
project_dir_disable_dedicated_env(dirname)
result = activate(dirname, UI_MODE_TEXT_ASSUME_YES_DEVELOPMENT, conda_environment=None, command_name=None)
assert can_connect_args['port'] == 6379
assert result is not None
if platform.system() == 'Windows':
result = [line for line in result if not line.startswith("export PATH")]
print("activate changed PATH on Windows and ideally it would not.")
if len(result) > 2:
import os
print("os.environ=" + repr(os.environ))
print("result=" + repr(result))
assert ['export PROJECT_DIR=' + quote(dirname), 'export REDIS_URL=redis://localhost:6379'] == result
with_directory_contents_completing_project_file(
{DEFAULT_PROJECT_FILENAME: """
services:
REDIS_URL: redis
"""}, activate_redis_url)
def test_activate_quoting(monkeypatch):
def activate_foo(dirname):
project_dir_disable_dedicated_env(dirname)
result = activate(dirname, UI_MODE_TEXT_ASSUME_YES_DEVELOPMENT, conda_environment=None, command_name=None)
assert result is not None
if platform.system() == 'Windows':
result = [line for line in result if not line.startswith("export PATH")]
print("activate changed PATH on Windows and ideally it would not.")
assert ["export FOO='$! boo'", 'export PROJECT_DIR=' + quote(dirname)] == result
with_directory_contents_completing_project_file(
{
DEFAULT_PROJECT_FILENAME: """
variables:
FOO: {}
""",
DEFAULT_LOCAL_STATE_FILENAME: """
variables:
FOO: $! boo
"""
}, activate_foo)
def activate(dirname, ui_mode, conda_environment, command_name):
"""Prepare project and return lines to be sourced.
Future direction: should also activate the proper conda env.
Returns:
None on failure or a list of lines to print.
"""
project = load_project(dirname)
result = prepare_with_ui_mode_printing_errors(project,
ui_mode=ui_mode,
env_spec_name=conda_environment,
command_name=command_name)
if result.failed:
return None
exports = []
# sort so we have deterministic output order for tests
sorted_keys = list(result.environ.keys())
sorted_keys.sort()
for key in sorted_keys:
value = result.environ[key]
if key not in os.environ or os.environ[key] != value:
exports.append("export {key}={value}".format(key=key, value=quote(value)))
return exports
def _debug_cmd(self, args, exe=None):
if not self.params.get('verbose', False):
return
str_args = [decodeArgument(a) for a in args]
if exe is None:
exe = os.path.basename(str_args[0])
try:
import pipes
shell_quote = lambda args: ' '.join(map(pipes.quote, str_args))
except ImportError:
shell_quote = repr
self.to_screen('[debug] %s command line: %s' % (
exe, shell_quote(str_args)))
def is_available(cls):
if (super(cls, cls).is_available() and
diagnose.check_executable('text2wave') and
diagnose.check_executable('festival')):
logger = logging.getLogger(__name__)
cmd = ['festival', '--pipe']
with tempfile.SpooledTemporaryFile() as out_f:
with tempfile.SpooledTemporaryFile() as in_f:
logger.debug('Executing %s', ' '.join([pipes.quote(arg)
for arg in cmd]))
subprocess.call(cmd, stdin=in_f, stdout=out_f,
stderr=out_f)
out_f.seek(0)
output = out_f.read().strip()
if output:
logger.debug("Output was: '%s'", output)
return ('No default voice found' not in output)
return False
def say(self, phrase, *args):
self._logger.debug("Saying '%s' with '%s'", phrase, self.SLUG)
cmd = ['text2wave']
with tempfile.NamedTemporaryFile(suffix='.wav') as out_f:
with tempfile.SpooledTemporaryFile() as in_f:
in_f.write(phrase)
in_f.seek(0)
with tempfile.SpooledTemporaryFile() as err_f:
self._logger.debug('Executing %s',
' '.join([pipes.quote(arg)
for arg in cmd]))
subprocess.call(cmd, stdin=in_f, stdout=out_f,
stderr=err_f)
err_f.seek(0)
output = err_f.read()
if output:
self._logger.debug("Output was: '%s'", output)
self.play(out_f.name)
def say(self, phrase, *args):
self._logger.debug("Saying '%s' with '%s'", phrase, self.SLUG)
cmd = ['flite']
if self.voice:
cmd.extend(['-voice', self.voice])
cmd.extend(['-t', phrase])
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f:
fname = f.name
cmd.append(fname)
with tempfile.SpooledTemporaryFile() as out_f:
self._logger.debug('Executing %s',
' '.join([pipes.quote(arg)
for arg in cmd]))
subprocess.call(cmd, stdout=out_f, stderr=out_f)
out_f.seek(0)
output = out_f.read().strip()
if output:
self._logger.debug("Output was: '%s'", output)
self.play(fname)
os.remove(fname)
def say(self, phrase, *args):
self._logger.debug("Saying '%s' with '%s'", phrase, self.SLUG)
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f:
fname = f.name
cmd = ['espeak', '-v', self.voice,
'-p', self.pitch_adjustment,
'-s', self.words_per_minute,
'-w', fname,
phrase]
cmd = [str(x) for x in cmd]
self._logger.debug('Executing %s', ' '.join([pipes.quote(arg)
for arg in cmd]))
with tempfile.TemporaryFile() as f:
subprocess.call(cmd, stdout=f, stderr=f)
f.seek(0)
output = f.read()
if output:
self._logger.debug("Output was: '%s'", output)
self.play(fname)
os.remove(fname)
def say(self, phrase, *args):
self._logger.debug("Saying '%s' with '%s'", phrase, self.SLUG)
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f:
fname = f.name
cmd = ['pico2wave', '--wave', fname]
if self.language not in self.languages:
raise ValueError("Language '%s' not supported by '%s'",
self.language, self.SLUG)
cmd.extend(['-l', self.language])
cmd.append(phrase)
self._logger.debug('Executing %s', ' '.join([pipes.quote(arg)
for arg in cmd]))
with tempfile.TemporaryFile() as f:
subprocess.call(cmd, stdout=f, stderr=f)
f.seek(0)
output = f.read()
if output:
self._logger.debug("Output was: '%s'", output)
self.play(fname)
os.remove(fname)
def get_command(self, file, **options):
# on darwin open returns immediately resulting in the temp
# file removal while app is opening
command = "open -a /Applications/Preview.app"
command = "(%s %s; sleep 20; rm -f %s)&" % (command, quote(file),
quote(file))
return command
def show_file(self, file, **options):
command, executable = self.get_command_ex(file, **options)
command = "(%s %s; rm -f %s)&" % (command, quote(file),
quote(file))
os.system(command)
return 1
# implementations
def get_command_ex(self, file, title=None, **options):
# note: xv is pretty outdated. most modern systems have
# imagemagick's display command instead.
command = executable = "xv"
if title:
command += " -name %s" % quote(title)
return command, executable
def __uploadJenkins(self, step, buildIdFile, resultFile, suffix):
"""Generate upload shell script.
We cannot simply copy the artifact to the final location as this is not
atomic. Instead we create a temporary file at the repository root, copy
the artifact there and hard-link the temporary file at the final
location. If the link fails it is usually caused by a concurrent
upload. Test that the artifact is readable in this case to distinguish
it from other fatal errors.
"""
if not self.canUploadJenkins():
return ""
return "\n" + textwrap.dedent("""\
# upload artifact
cd $WORKSPACE
BOB_UPLOAD_BID="$(hexdump -ve '/1 "%02x"' {BUILDID}){GEN}"
BOB_UPLOAD_FILE="{DIR}/${{BOB_UPLOAD_BID:0:2}}/${{BOB_UPLOAD_BID:2:2}}/${{BOB_UPLOAD_BID:4}}{SUFFIX}"
if [[ ! -e ${{BOB_UPLOAD_FILE}} ]] ; then
(
set -eE
T="$(mktemp -p {DIR})"
trap 'rm -f $T' EXIT
cp {RESULT} "$T"
mkdir -p "${{BOB_UPLOAD_FILE%/*}}"
if ! ln -T "$T" "$BOB_UPLOAD_FILE" ; then
[[ -r "$BOB_UPLOAD_FILE" ]] || exit 2
fi
){FIXUP}
fi""".format(DIR=self.__basePath, BUILDID=quote(buildIdFile), RESULT=quote(resultFile),
FIXUP=" || echo Upload failed: $?" if self._ignoreErrors() else "",
GEN=ARCHIVE_GENERATION, SUFFIX=suffix))
def download(self, step, buildIdFile, tgzFile):
if not self.canDownloadJenkins():
return ""
return "\n" + textwrap.dedent("""\
if [[ ! -e {RESULT} ]] ; then
BOB_DOWNLOAD_BID="$(hexdump -ve '/1 "%02x"' {BUILDID}){GEN}"
BOB_DOWNLOAD_FILE="{DIR}/${{BOB_DOWNLOAD_BID:0:2}}/${{BOB_DOWNLOAD_BID:2:2}}/${{BOB_DOWNLOAD_BID:4}}{SUFFIX}"
cp "$BOB_DOWNLOAD_FILE" {RESULT} || echo Download failed: $?
fi
""".format(DIR=self.__basePath, BUILDID=quote(buildIdFile), RESULT=quote(tgzFile),
GEN=ARCHIVE_GENERATION, SUFFIX=ARTIFACT_SUFFIX))
def download(self, step, buildIdFile, tgzFile):
# only download if requested
if not self.canDownloadJenkins():
return ""
return "\n" + textwrap.dedent("""\
if [[ ! -e {RESULT} ]] ; then
BOB_DOWNLOAD_BID="$(hexdump -ve '/1 "%02x"' {BUILDID}){GEN}"
BOB_DOWNLOAD_URL="{URL}/${{BOB_DOWNLOAD_BID:0:2}}/${{BOB_DOWNLOAD_BID:2:2}}/${{BOB_DOWNLOAD_BID:4}}{SUFFIX}"
curl -sSg --fail -o {RESULT} "$BOB_DOWNLOAD_URL" || echo Download failed: $?
fi
""".format(URL=self.__url.geturl(), BUILDID=quote(buildIdFile), RESULT=quote(tgzFile),
GEN=ARCHIVE_GENERATION, SUFFIX=ARTIFACT_SUFFIX))
def download(self, step, buildIdFile, tgzFile):
# only download if requested
if not self.canDownloadJenkins():
return ""
return """
if [[ ! -e {RESULT} ]] ; then
BOB_DOWNLOAD_BID="$(hexdump -ve '/1 "%02x"' {BUILDID}){GEN}"
BOB_LOCAL_ARTIFACT={RESULT}
BOB_REMOTE_ARTIFACT="${{BOB_DOWNLOAD_BID:0:2}}/${{BOB_DOWNLOAD_BID:2:2}}/${{BOB_DOWNLOAD_BID:4}}{SUFFIX}"
{CMD}
fi
""".format(CMD=self.__downloadCmd, BUILDID=quote(buildIdFile), RESULT=quote(tgzFile),
GEN=ARCHIVE_GENERATION, SUFFIX=ARTIFACT_SUFFIX)
def __getitem__(self, item):
mode = item[0]
item = item[1:]
content = []
try:
paths = sorted(glob(os.path.join(self.baseDir, item)))
if not paths:
raise ParseError("No files matched in include pattern '{}'!"
.format(item))
for path in paths:
content.append(self.fileLoader(path))
except OSError as e:
raise ParseError("Error including '"+item+"': " + str(e))
content = b''.join(content)
self.incDigests.append(asHexStr(hashlib.sha1(content).digest()))
if mode == '<':
var = "_{}{}".format(self.varBase, self.count)
self.count += 1
self.prolog.extend([
"{VAR}=$(mktemp)".format(VAR=var),
"_BOB_TMP_CLEANUP+=( ${VAR} )".format(VAR=var),
"base64 -d > ${VAR} <<EOF".format(VAR=var)])
self.prolog.extend(sliceString(b64encode(content).decode("ascii"), 76))
self.prolog.append("EOF")
ret = "${" + var + "}"
else:
assert mode == "'"
ret = quote(content.decode('utf8'))
return ret