def xcheck_envar(conf, name, wafname=None, cross=False):
wafname = wafname or name
envar = os.environ.get(name, None)
if envar is None:
return
value = Utils.to_list(envar) if envar != '' else [envar]
conf.env[wafname] = value
if cross:
pretty = 'cross-compilation %s' % wafname
else:
pretty = wafname
conf.msg('Will use %s' % pretty,
" ".join(quote(x) for x in value))
python类quote()的实例源码
def xcheck_envar(conf, name, wafname=None, cross=False):
wafname = wafname or name
envar = os.environ.get(name, None)
if envar is None:
return
value = Utils.to_list(envar) if envar != '' else [envar]
conf.env[wafname] = value
if cross:
pretty = 'cross-compilation %s' % wafname
else:
pretty = wafname
conf.msg('Will use %s' % pretty,
" ".join(quote(x) for x in value))
def printCommand(arg1: "typing.Union[str, typing.Sequence[typing.Any]]", *remainingArgs, outputFile=None,
colour=AnsiColour.yellow, cwd=None, env=None, sep=" ", printVerboseOnly=False, **kwargs):
if not _cheriConfig or (_cheriConfig.quiet or (printVerboseOnly and not _cheriConfig.verbose)):
return
# also allow passing a single string
if not type(arg1) is str:
allArgs = arg1
arg1 = allArgs[0]
remainingArgs = allArgs[1:]
newArgs = ("cd", shlex.quote(str(cwd)), "&&") if cwd else tuple()
if env:
# only print the changed environment entries
filteredEnv = __filterEnv(env)
if filteredEnv:
newArgs += ("env",) + tuple(map(shlex.quote, (k + "=" + str(v) for k, v in filteredEnv.items())))
# comma in tuple is required otherwise it creates a tuple of string chars
newArgs += (shlex.quote(str(arg1)),) + tuple(map(shlex.quote, map(str, remainingArgs)))
if outputFile:
newArgs += (">", str(outputFile))
print(coloured(colour, newArgs, sep=sep), flush=True, **kwargs)
def process_pre(self):
import tempfile
self.temp_dir = tempfile.TemporaryDirectory()
self.environ = {'PYTHONPATH': pythonpath()}
self.outfname = bpy.path.ensure_ext(self.filepath, ".zip")
self.command = (
bpy.app.binary_path_python,
'-m', 'bam.pack',
# file to pack
"--input", bpy.data.filepath,
# file to write
"--output", self.outfname,
"--temp", self.temp_dir.name,
)
if self.log.isEnabledFor(logging.INFO):
import shlex
cmd_to_log = ' '.join(shlex.quote(s) for s in self.command)
self.log.info('Executing %s', cmd_to_log)
def shell_script(self):
def _trace(command):
return 'echo + {}\n{} '.format(
shlex.quote(command),
command
)
commands = []
after_success = [_trace(cmd) for cmd in self.after_success]
after_failure = [_trace(cmd) for cmd in self.after_failure]
for service in self.services:
commands.append(_trace('service {} start'.format(service)))
for script in self.scripts:
commands.append(_trace(script))
command_encoded = shlex.quote(to_text(base64.b64encode(to_binary('\n'.join(commands)))))
context = {
'command': command_encoded,
'after_success': ' \n'.join(after_success),
'after_failure': ' \n'.join(after_failure),
}
script = render_template('script.sh', **context)
logger.debug('Build script: \n%s', script)
return script
def files_in_archive(self, force_refresh=False):
if self._files_in_archive and not force_refresh:
return self._files_in_archive
cmd = [ self.cmd_path, '--list', '-C', self.dest ]
if self.zipflag:
cmd.append(self.zipflag)
if self.opts:
cmd.extend([ '--show-transformed-names' ] + self.opts)
if self.excludes:
cmd.extend([ '--exclude=' + quote(f) for f in self.excludes ])
cmd.extend([ '-f', self.src ])
rc, out, err = self.module.run_command(cmd, cwd=self.dest, environ_update=dict(LANG='C', LC_ALL='C', LC_MESSAGES='C'))
if rc != 0:
raise UnarchiveError('Unable to list files in the archive')
for filename in out.splitlines():
# Compensate for locale-related problems in gtar output (octal unicode representation) #11348
# filename = filename.decode('string_escape')
filename = codecs.escape_decode(filename)[0]
if filename and filename not in self.excludes:
self._files_in_archive.append(to_native(filename))
return self._files_in_archive
def unarchive(self):
cmd = [ self.cmd_path, '--extract', '-C', self.dest ]
if self.zipflag:
cmd.append(self.zipflag)
if self.opts:
cmd.extend([ '--show-transformed-names' ] + self.opts)
if self.file_args['owner']:
cmd.append('--owner=' + quote(self.file_args['owner']))
if self.file_args['group']:
cmd.append('--group=' + quote(self.file_args['group']))
if self.module.params['keep_newer']:
cmd.append('--keep-newer-files')
if self.excludes:
cmd.extend([ '--exclude=' + quote(f) for f in self.excludes ])
cmd.extend([ '-f', self.src ])
rc, out, err = self.module.run_command(cmd, cwd=self.dest, environ_update=dict(LANG='C', LC_ALL='C', LC_MESSAGES='C'))
return dict(cmd=cmd, rc=rc, out=out, err=err)
def test_alias_magic():
"""Test %alias_magic."""
ip = get_ipython()
mm = ip.magics_manager
# Basic operation: both cell and line magics are created, if possible.
ip.run_line_magic('alias_magic', 'timeit_alias timeit')
nt.assert_in('timeit_alias', mm.magics['line'])
nt.assert_in('timeit_alias', mm.magics['cell'])
# --cell is specified, line magic not created.
ip.run_line_magic('alias_magic', '--cell timeit_cell_alias timeit')
nt.assert_not_in('timeit_cell_alias', mm.magics['line'])
nt.assert_in('timeit_cell_alias', mm.magics['cell'])
# Test that line alias is created successfully.
ip.run_line_magic('alias_magic', '--line env_alias env')
nt.assert_equal(ip.run_line_magic('env', ''),
ip.run_line_magic('env_alias', ''))
# Test that line alias with parameters passed in is created successfully.
ip.run_line_magic('alias_magic', '--line history_alias history --params ' + shlex.quote('3'))
nt.assert_in('history_alias', mm.magics['line'])
def opened_files(path, excludes):
files = []
try:
process = os.popen('lsof -wFn +D %s | tail -n +2 | cut -c2-' % cmd_quote(path))
data = process.read()
process.close()
for item in data.split('\n'):
if not item or len(item) <= 2 or os.path.isdir(item) or item.isdigit() or file_excluded(item, excludes):
continue
files.append(item)
return files
except Exception as ex:
logger.exception("Exception checking %r: ", path)
return None
def rclone_move_command(local, remote, transfers, checkers, bwlimit, excludes, chunk_size, dry_run):
upload_cmd = 'rclone move %s %s' \
' --delete-after' \
' --no-traverse' \
' --stats=60s' \
' -v' \
' --transfers=%d' \
' --checkers=%d' \
' --drive-chunk-size=%s' % \
(cmd_quote(local), cmd_quote(remote), transfers, checkers, chunk_size)
if bwlimit and len(bwlimit):
upload_cmd += ' --bwlimit="%s"' % bwlimit
for item in excludes:
upload_cmd += ' --exclude="%s"' % item
if dry_run:
upload_cmd += ' --dry-run'
return upload_cmd
def remove_empty_directories(config, force_dry_run=False):
open_files = opened_files(config['local_folder'], config['lsof_excludes'])
if not len(open_files):
clearing = False
for dir, depth in config['rclone_remove_empty_on_upload'].items():
if os.path.exists(dir):
clearing = True
logger.debug("Removing empty directories from %r with mindepth %r", dir, depth)
cmd = 'find %s -mindepth %d -type d -empty' % (cmd_quote(dir), depth)
if not config['dry_run'] and not force_dry_run:
cmd += ' -delete'
run_command(cmd)
if clearing:
logger.debug("Finished clearing empty directories")
else:
logger.debug("Skipped removing empty directories because %d files are currently open: %r", len(open_files),
open_files)
############################################################
# CONFIG STUFF
############################################################
def test_activate(monkeypatch):
can_connect_args = _monkeypatch_can_connect_to_socket_to_succeed(monkeypatch)
def activate_redis_url(dirname):
project_dir_disable_dedicated_env(dirname)
result = activate(dirname, UI_MODE_TEXT_ASSUME_YES_DEVELOPMENT, conda_environment=None, command_name=None)
assert can_connect_args['port'] == 6379
assert result is not None
if platform.system() == 'Windows':
result = [line for line in result if not line.startswith("export PATH")]
print("activate changed PATH on Windows and ideally it would not.")
if len(result) > 2:
import os
print("os.environ=" + repr(os.environ))
print("result=" + repr(result))
assert ['export PROJECT_DIR=' + quote(dirname), 'export REDIS_URL=redis://localhost:6379'] == result
with_directory_contents_completing_project_file(
{DEFAULT_PROJECT_FILENAME: """
services:
REDIS_URL: redis
"""}, activate_redis_url)
def test_activate_quoting(monkeypatch):
def activate_foo(dirname):
project_dir_disable_dedicated_env(dirname)
result = activate(dirname, UI_MODE_TEXT_ASSUME_YES_DEVELOPMENT, conda_environment=None, command_name=None)
assert result is not None
if platform.system() == 'Windows':
result = [line for line in result if not line.startswith("export PATH")]
print("activate changed PATH on Windows and ideally it would not.")
assert ["export FOO='$! boo'", 'export PROJECT_DIR=' + quote(dirname)] == result
with_directory_contents_completing_project_file(
{
DEFAULT_PROJECT_FILENAME: """
variables:
FOO: {}
""",
DEFAULT_LOCAL_STATE_FILENAME: """
variables:
FOO: $! boo
"""
}, activate_foo)
def _shell_replace_vars(cmd, local_vars):
spans = []
replacements = []
for match in _dollar_var_re.finditer(cmd):
varname = match.group(1)
if varname.isdigit():
# $1, $2 for sys.argv, just like bash
value = sys.argv[int(varname)]
elif varname == '#':
value = len(sys.argv) - 1
elif varname == '@':
value = ' '.join(map(shlex.quote, sys.argv[1:]))
else:
assert is_variable_name(varname), 'not a valid var name: ' + varname
if not varname in local_vars:
continue
value = local_vars[varname]
if isinstance(value, str):
value = shlex.quote(value)
else:
value = str(value)
spans.append(match.span())
replacements.append(value)
return _replace_n(cmd, spans, replacements)
def run(self, context, KEY, ACTION):
keymap = self.tui.keymap
key = KEY
if len(ACTION) == 1 and ACTION[0][0] == '<' and ACTION[0][-1] == '>':
# ACTION is another key (e.g. 'j' -> 'down')
action = keymap.mkkey(ACTION[0])
else:
action = ' '.join(shlex.quote(x) for x in ACTION)
if context is None:
from ...tui.keymap import DEFAULT_CONTEXT
context = DEFAULT_CONTEXT
elif context not in _get_KEYMAP_CONTEXTS():
log.error('Invalid context: {!r}'.format(context))
return False
try:
keymap.bind(key, action, context=context)
except ValueError as e:
log.error(e)
return False
else:
return True
def get_output_nocheck(self, *cmd, **kwargs):
proc = self.create_subprocess(cmd,
stdout=subprocess.PIPE,
universal_newlines=True,
**kwargs)
# FIXME: support Python 2?
with proc:
stdout = proc.communicate()[0]
stdout = stdout.rstrip()
exitcode = proc.wait()
if exitcode:
cmd_str = ' '.join(map(shlex.quote, cmd))
self.logger.error("Command %s failed with exit code %s"
% (cmd_str, exitcode))
return (exitcode, stdout)
def _spawn_ffmpeg(self):
if self.streaming:
url = self._stream_url
elif self.playing:
url = self._song_context.song_url
else:
raise RuntimeError('Player is in an invalid state')
args = shlex.split(self._ffmpeg_command.format(shlex.quote(url)))
try:
self._ffmpeg = subprocess.Popen(args)
except FileNotFoundError as e:
raise RuntimeError('ffmpeg executable was not found') from e
except subprocess.SubprocessError as e:
raise RuntimeError('Popen failed: {0.__name__} {1}'.format(type(e), str(e))) from e
#
# Player FSM
#
def testQuote(self):
safeunquoted = string.ascii_letters + string.digits + '@%_-+=:,./'
unicode_sample = '\xe9\xe0\xdf' # e + acute accent, a + grave, sharp s
unsafe = '"`$\\!' + unicode_sample
self.assertEqual(shlex.quote(''), "''")
self.assertEqual(shlex.quote(safeunquoted), safeunquoted)
self.assertEqual(shlex.quote('test file name'), "'test file name'")
for u in unsafe:
self.assertEqual(shlex.quote('test%sname' % u),
"'test%sname'" % u)
for u in unsafe:
self.assertEqual(shlex.quote("test%s'name'" % u),
"'test%s'\"'\"'name'\"'\"''" % u)
# Allow this test to be used with old shlex.py
def restore_vm(self, new_vm, new_name, size, qvm_create_args, vm_keys, backup_storage_vm):
subprocess.check_call("qvm-create "+shlex.quote(new_name)+" "+qvm_create_args, shell=True)
subprocess.check_call(["qvm-prefs", "-s", new_name, "netvm", "none"]) # Safe approach…
if size is not None:
subprocess.check_call(["qvm-grow-private", new_name, size])
with Dvm() as dvm:
dvm.attach("xvdz", new_vm.private_volume())
try:
if size is not None:
dvm.check_call("sudo e2fsck -f -p /dev/xvdz")
dvm.check_call("sudo resize2fs /dev/xvdz")
dvm.check_call("sudo mkdir /mnt/clone")
dvm.check_call("sudo mount /dev/xvdz /mnt/clone")
try:
self.upload_agent(dvm)
with self.add_permissions(backup_storage_vm, dvm, vm_keys.encrypted_name):
dvm.check_call("/tmp/restore-agent "+shlex.quote(backup_storage_vm.get_name())+" "+shlex.quote(vm_keys.encrypted_name), input = vm_keys.key, stdout = None, stderr = None)
finally: dvm.check_call("sudo umount /mnt/clone")
finally: dvm.detach_all()
# abstract def upload_agent(self, dvm)
def main():
if not shutil.which('borg'):
print('The \'borg\' command can\'t be found in the PATH. Please correctly install borgbackup first.')
print('See instructions at https://borgbackup.readthedocs.io/en/stable/installation.html')
return 1
parser = build_parser()
args = parser.parse_args()
logging.basicConfig(level=args.log_level, format='%(message)s')
if 'function' not in args:
return parser.print_help()
try:
return args.function(args)
except subprocess.CalledProcessError as cpe:
print('{} invocation failed with status {}'.format(cpe.cmd[0], cpe.returncode))
print('Command line was:', *[shlex.quote(s) for s in cpe.cmd])
return cpe.returncode
def get_package_version(prefix, connection, package_name):
command = "dpkg-query --showformat='${Version}' --show %s" % shlex.quote(
package_name)
result = await connection.run(command)
if result.exit_status != os.EX_OK:
click.echo(
"{0}package (failed {1}): {2} - {3}".format(
prefix, result.exit_status, package_name, result.stderr.strip()
)
)
else:
click.echo(
"{0}package (ok): {1}=={2}".format(
prefix, package_name, result.stdout.strip()
)
)
def run(self):
data = cluster_data.ClusterData.find_one(self.cluster.model_id)
cluster_name = data.global_vars.get("cluster", self.cluster.name)
cluster_name = shlex.quote(cluster_name)
cluster_servers = {item._id: item for item in self.cluster.server_list}
mons = [
cluster_servers[item["server_id"]]
for item in self.cluster.configuration.state
if item["role"] == "mons"]
if not mons:
return
version_result = await self.execute_cmd(
"ceph --cluster {0} health --format json".format(cluster_name),
random.choice(mons))
self.manage_errors(
"Cannot execute ceph health command on %s (%s): %s",
"Not all hosts have working ceph command",
version_result.errors
)
self.manage_health(version_result)
def run(self):
data = cluster_data.ClusterData.find_one(self.cluster.model_id)
cluster_name = data.global_vars.get("cluster", self.cluster.name)
cluster_name = shlex.quote(cluster_name)
version_result = await self.execute_cmd(
"ceph --cluster {0} version".format(cluster_name),
*self.cluster.server_list)
self.manage_errors(
"Cannot execute ceph version command on %s (%s): %s",
"Not all hosts have working ceph command",
version_result.errors
)
results = list(parse_results(version_result.ok))
self.manage_versions(results)
self.manage_commits(results)
def _write_command_to_file(self, env, arglist):
envvar_settings_list = []
if "DIALOGRC" in env:
envvar_settings_list.append(
"DIALOGRC={0}".format(_shell_quote(env["DIALOGRC"])))
for var in self._lowlevel_exit_code_varnames:
varname = "DIALOG_" + var
envvar_settings_list.append(
"{0}={1}".format(varname, _shell_quote(env[varname])))
command_str = ' '.join(envvar_settings_list +
list(imap(_shell_quote, arglist)))
s = "{separator}{cmd}\n\nArgs: {args!r}\n".format(
separator="" if self._debug_first_output else ("-" * 79) + "\n",
cmd=command_str, args=arglist)
self._debug_logfile.write(s)
if self._debug_always_flush:
self._debug_logfile.flush()
self._debug_first_output = False
def action(action_name=None, keep_comments=False, escape=True):
def decorator(func):
action = action_name or func.__name__.replace('_', '-')
def function_wrapper(*args, **kw):
if not keep_comments:
args = [i for i in args if not i.startswith('#')]
if escape:
args = [shlex.quote(i) for i in args]
res = func(*args, **kw)
# allow actions to yield each line
if not isinstance(res, str) and res is not None:
res = '\n'.join(res)
return res
state['actions'][action] = function_wrapper
return function_wrapper
return decorator
def brute(i):
global flag
global last_breakpoint
for c in charset:
flag[i] = c
output = gdb.execute('r < <(echo {})'.format(shlex.quote(''.join(flag))), True, True)
# skip floating point exception
while "SIGFPE" in output:
output = gdb.execute('c', True, True)
output = gdb.execute('x $pc', True, True)
pc = output.split(":")[0]
pc = int(pc, 16)
if pc > last_breakpoint:
last_breakpoint = pc
break
print(''.join(flag))
def testQuote(self):
safeunquoted = string.ascii_letters + string.digits + '@%_-+=:,./'
unicode_sample = '\xe9\xe0\xdf' # e + acute accent, a + grave, sharp s
unsafe = '"`$\\!' + unicode_sample
self.assertEqual(shlex.quote(''), "''")
self.assertEqual(shlex.quote(safeunquoted), safeunquoted)
self.assertEqual(shlex.quote('test file name'), "'test file name'")
for u in unsafe:
self.assertEqual(shlex.quote('test%sname' % u),
"'test%sname'" % u)
for u in unsafe:
self.assertEqual(shlex.quote("test%s'name'" % u),
"'test%s'\"'\"'name'\"'\"''" % u)
# Allow this test to be used with old shlex.py
def submit(self, values, cwd):
queue_command = [
'qsub', '-cwd', '-e', 'stderr.txt', '-o', 'stdout.txt'
] + self.qargs
process = subprocess.Popen(
queue_command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE,
cwd=cwd,
env=self.env,
universal_newlines=True
)
command_chunks = self.bin + self.get_options(values)
command = ' '.join(shlex.quote(s) for s in command_chunks)
stdin = ("echo > started;\n"
"%s;\n"
"echo > finished;") % command
stdout, stderr = process.communicate(stdin)
match = self.job_submission_regex.match(stdout)
return match.group(1)
def _list(self):
# Do a long listing to avoid connection reset
# remote_dir = urllib.unquote(self.parsed_url.path.lstrip('/')).rstrip()
remote_dir = urllib.unquote(self.parsed_url.path)
# print remote_dir
quoted_path = cmd_quote(self.remote_path)
# failing to cd into the folder might be because it was not created already
commandline = "lftp -c \"source %s; ( cd %s && ls ) || ( mkdir -p %s && cd %s && ls )\"" % (
cmd_quote(self.tempname),
quoted_path, quoted_path, quoted_path
)
log.Debug("CMD: %s" % commandline)
_, l, e = self.subprocess_popen(commandline)
log.Debug("STDERR:\n"
"%s" % (e))
log.Debug("STDOUT:\n"
"%s" % (l))
# Look for our files as the last element of a long list line
return [x.split()[-1] for x in l.split('\n') if x]
def rm_doubled_senses(entry):
"""Some entries have multiple senses. A few of them are exactly the same,
remove these.
This function returns True if an element has been altered"""
senses = list(findall(entry, 'sense'))
if len(senses) == 1:
return
# obtain a mapping from XML node -> list of words within `<quote>…</quote>`
senses = {sense: tuple(q.text.strip() for q in tei_iter(sense, 'quote')
if q.text) for sense in senses}
changed = False
# pair each sense with another and compare their content
for s1, s2 in itertools.combinations(senses.items(), 2):
if len(s1[1]) == len(s2[1]):
# if two senses are *excactly* identical
if all(e1 == e2 for e1, e2 in zip(s1[1], s2[1])):
try:
entry.remove(s2[0]) # sense node object
changed = True
except ValueError: # already removed?
pass
return changed