def CreatePods(pod_name, yaml_file):
"""Creates pods based on the given kubernetes config.
Args:
pod_name: 'name-prefix' selector for the pods.
yaml_file: kubernetes yaml config.
Raises:
TimeoutError: if jobs didn't come up for a long time.
"""
command = [_KUBECTL, 'create', '--filename=%s' % yaml_file]
logging.info('Creating pods: %s', subprocess.list2cmdline(command))
subprocess.check_call(command)
if not _WaitUntil(100, _GetPodNames, pod_name):
raise TimeoutError(
'Timed out waiting for %s pod to come up.' % pod_name)
python类list2cmdline()的实例源码
def call(self, args, devnull=False):
"""Call other processes.
args - list of command args
devnull - whether to pipe stdout to /dev/null (or equivalent)
"""
if self.debug:
click.echo(subprocess.list2cmdline(args))
click.confirm('Continue?', default=True, abort=True)
try:
kwargs = {}
if devnull:
# Pipe to /dev/null (or equivalent).
kwargs['stderr'] = subprocess.STDOUT
kwargs['stdout'] = self.FNULL
ret_code = subprocess.call(args, **kwargs)
except subprocess.CalledProcessError:
return False
return ret_code
def run_adb(*args, **kwargs):
cmds = ['adb']
serialno = kwargs.get('serialno', None)
if serialno:
cmds.extend(['-s', serialno])
host = kwargs.get('host')
if host:
cmds.extend(['-H', host])
port = kwargs.get('port')
if port:
cmds.extend(['-P', str(port)])
cmds.extend(args)
cmds = map(str, cmds)
cmdline = subprocess.list2cmdline(cmds)
try:
return check_output(cmdline, shell=True)
except Exception, e:
raise EnvironmentError('run cmd: {} failed. {}'.format(cmdline, e))
def _gen_wordnet(self, wdnet):
if self._languageModel is True:
new_hmmlist = append_to_file(self._viseme_list, ('!ENTER', '!EXIT'))
self._viseme_dict = append_to_file(self._viseme_dict, ('!ENTER []', '!EXIT []'))
cmd = ['HLStats -b ./run/bigrams -o ' + self._viseme_list + ' ' + self._labels]
print(list2cmdline(cmd))
run(cmd, check=True, shell=True)
cmd = ['HBuild -n ./run/bigrams ' + new_hmmlist + ' ' + wdnet]
print(list2cmdline(cmd))
run(cmd, check=True, shell=True)
self._word_net = wdnet
else:
cmd = ['HParse', self._grammar, wdnet]
print(list2cmdline(cmd))
run(cmd, check=True)
self._word_net = wdnet
def run_adb(*args, **kwargs):
cmds = ['adb']
serialno = kwargs.get('serialno', None)
if serialno:
cmds.extend(['-s', serialno])
host = kwargs.get('host')
if host:
cmds.extend(['-H', host])
port = kwargs.get('port')
if port:
cmds.extend(['-P', str(port)])
cmds.extend(args)
cmds = map(str, cmds)
cmdline = subprocess.list2cmdline(cmds)
try:
return check_output(cmdline, shell=False)
except Exception, e:
raise EnvironmentError('run cmd: {} failed. {}'.format(cmdline, e))
def test_list2cmdline(self):
self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
'"a b c" d e')
self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
'ab\\"c \\ d')
self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']),
'ab\\"c " \\\\" d')
self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
'a\\\\\\b "de fg" h')
self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
'a\\\\\\"b c d')
self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
'"a\\\\b c" d e')
self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
'"a\\\\b\\ c" d e')
self.assertEqual(subprocess.list2cmdline(['ab', '']),
'ab ""')
def test_list2cmdline(self):
self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
'"a b c" d e')
self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
'ab\\"c \\ d')
self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']),
'ab\\"c " \\\\" d')
self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
'a\\\\\\b "de fg" h')
self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
'a\\\\\\"b c d')
self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
'"a\\\\b c" d e')
self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
'"a\\\\b\\ c" d e')
self.assertEqual(subprocess.list2cmdline(['ab', '']),
'ab ""')
def test_list2cmdline(self):
self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
'"a b c" d e')
self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
'ab\\"c \\ d')
self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']),
'ab\\"c " \\\\" d')
self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
'a\\\\\\b "de fg" h')
self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
'a\\\\\\"b c d')
self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
'"a\\\\b c" d e')
self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
'"a\\\\b\\ c" d e')
self.assertEqual(subprocess.list2cmdline(['ab', '']),
'ab ""')
def test_list2cmdline(self):
self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
'"a b c" d e')
self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
'ab\\"c \\ d')
self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']),
'ab\\"c " \\\\" d')
self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
'a\\\\\\b "de fg" h')
self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
'a\\\\\\"b c d')
self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
'"a\\\\b c" d e')
self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
'"a\\\\b\\ c" d e')
self.assertEqual(subprocess.list2cmdline(['ab', '']),
'ab ""')
def nt_quote_arg(arg):
"""Quote a command line argument according to Windows parsing rules"""
return subprocess.list2cmdline([arg])
def install_options(self, script_text):
self.options = shlex.split(self._extract_options(script_text))
cmdline = subprocess.list2cmdline(self)
if not isascii(cmdline):
self.options[:0] = ['-x']
def _render(items):
cmdline = subprocess.list2cmdline(
CommandSpec._strip_quotes(item.strip()) for item in items)
return '#!' + cmdline + '\n'
# For pbr compat; will be removed in a future version.
def nt_quote_arg(arg):
"""Quote a command line argument according to Windows parsing rules"""
return subprocess.list2cmdline([arg])
def install_options(self, script_text):
self.options = shlex.split(self._extract_options(script_text))
cmdline = subprocess.list2cmdline(self)
if not isascii(cmdline):
self.options[:0] = ['-x']
def _render(items):
cmdline = subprocess.list2cmdline(
CommandSpec._strip_quotes(item.strip()) for item in items)
return '#!' + cmdline + '\n'
# For pbr compat; will be removed in a future version.
def nt_quote_arg(arg):
"""Quote a command line argument according to Windows parsing rules"""
return subprocess.list2cmdline([arg])
def install_options(self, script_text):
self.options = shlex.split(self._extract_options(script_text))
cmdline = subprocess.list2cmdline(self)
if not isascii(cmdline):
self.options[:0] = ['-x']
def _render(items):
cmdline = subprocess.list2cmdline(
CommandSpec._strip_quotes(item.strip()) for item in items)
return '#!' + cmdline + '\n'
# For pbr compat; will be removed in a future version.
def nt_quote_arg(arg):
"""Quote a command line argument according to Windows parsing rules"""
return subprocess.list2cmdline([arg])
def install_options(self, script_text):
self.options = shlex.split(self._extract_options(script_text))
cmdline = subprocess.list2cmdline(self)
if not isascii(cmdline):
self.options[:0] = ['-x']
def _render(items):
cmdline = subprocess.list2cmdline(items)
return '#!' + cmdline + '\n'
# For pbr compat; will be removed in a future version.
def nt_quote_arg(arg):
"""Quote a command line argument according to Windows parsing rules"""
return subprocess.list2cmdline([arg])
def install_options(self, script_text):
self.options = shlex.split(self._extract_options(script_text))
cmdline = subprocess.list2cmdline(self)
if not isascii(cmdline):
self.options[:0] = ['-x']
def nt_quote_arg(arg):
"""Quote a command line argument according to Windows parsing rules"""
return subprocess.list2cmdline([arg])
def install_options(self, script_text):
self.options = shlex.split(self._extract_options(script_text))
cmdline = subprocess.list2cmdline(self)
if not isascii(cmdline):
self.options[:0] = ['-x']
def _render(items):
cmdline = subprocess.list2cmdline(items)
return '#!' + cmdline + '\n'
# For pbr compat; will be removed in a future version.
def nt_quote_arg(arg):
"""Quote a command line argument according to Windows parsing rules"""
return subprocess.list2cmdline([arg])
def install_options(self, script_text):
self.options = shlex.split(self._extract_options(script_text))
cmdline = subprocess.list2cmdline(self)
if not isascii(cmdline):
self.options[:0] = ['-x']
def _render(items):
cmdline = subprocess.list2cmdline(
CommandSpec._strip_quotes(item.strip()) for item in items)
return '#!' + cmdline + '\n'
# For pbr compat; will be removed in a future version.
def nt_quote_arg(arg):
"""Quote a command line argument according to Windows parsing rules"""
return subprocess.list2cmdline([arg])