def _ruby_version():
re_ver = re.compile(r'^ruby ([^\s]+) \(([^\)]+)\)')
try:
ruby_version = str(sh.ruby('--version')).strip()
matches = re_ver.match(ruby_version)
if not matches:
return ruby_version
return {
'version': matches.group(1),
'revision': matches.group(2)
}
except sh.CommandNotFound:
return None
python类CommandNotFound()的实例源码
def _ruby_version():
re_ver = re.compile(r'^ruby ([^\s]+) \(([^\)]+)\)')
try:
ruby_version = str(sh.ruby('--version')).strip()
matches = re_ver.match(ruby_version)
if not matches:
return ruby_version
return {
'version': matches.group(1),
'revision': matches.group(2)
}
except sh.CommandNotFound:
return None
def list_vms(running=False):
"""
Return the list of VM in for the form name => uuid, when the running bool
is set to true, only return running VMs
:param running: bool
:return: dict[str,str]
"""
try:
LIST_PARSER = re.compile(r'"(?P<name>[^"]+)" \{(?P<uuid>[^\}]+)\}')
vms = {}
list = running and 'runningvms' or 'vms'
for line in VBoxManage('list', list, _iter=True):
res = re.match(LIST_PARSER, line)
if res:
vms[res.group('name')] = res.group('uuid')
return vms
except CommandNotFound:
return {}
def _npm_install_static():
print("NPM Install packages...")
static_path = os.path.join(CWD, "application/static")
package_json = os.path.join(static_path, "package.json")
try:
if os.path.isfile(package_json):
with sh.pushd(static_path):
sh.npm("install", "-f")
else:
print("**ERROR: Can't install static files, `package.json` is missing at `%s`" % package_json)
print("*" * 80)
except sh.CommandNotFound as e:
print("")
print("*** Error Command Not Found: `{0}` is not found. You need to install `{0}` to continue".format(str(e)))
print("*" * 80)
def __init__(self, base_path):
self._base_path = base_path
try:
self._btrfs = sh.Command('btrfs')
self._btrfs('quota', 'enable', self._base_path)
except sh.CommandNotFound as e:
print(self._err('driver init', e.stderr, e.full_cmd))
def __init__(self,
training_label_prefix,
dataset_name=None,
epochs=None,
time_limit=None,
num_gpus=None):
if not ((epochs is None) ^ (time_limit is None)):
raise ValueError('epochs or time_limit must present, '
'but not both!')
self._training_label_prefix = training_label_prefix
self._dataset_name = dataset_name or active_config().dataset_name
self._validate_training_label_prefix()
self._epochs = epochs
self._time_limit = time_limit
fixed_config_keys = dict(dataset_name=self._dataset_name,
epochs=self._epochs,
time_limit=self._time_limit)
self._config_builder = Embed300FineRandomConfigBuilder(
fixed_config_keys)
try:
self._num_gpus = len(sh.nvidia_smi('-L').split('\n')) - 1
except sh.CommandNotFound:
self._num_gpus = 1
self._num_gpus = num_gpus or self._num_gpus
# TODO ! Replace set with a thread-safe set
self._available_gpus = set(range(self.num_gpus))
self._semaphore = Semaphore(self.num_gpus)
self._running_commands = [] # a list of (index, sh.RunningCommand)
self._stop_search = False
self._lock = Lock()
def test_keygen(self):
comment = 'Hello how are you I\'m a turtle'
try:
priv, pub = utils.JinjaUtils.keygen(
bits=1024, keytype='rsa', comment=comment)
except sh.CommandNotFound:
pytest.skip('ssh-keygen is not available')
assert 'RSA PRIVATE KEY' in priv
assert pub.endswith(comment + '\n')
def htpasswd_fn():
try:
return _sh_htpasswd()
except sh.CommandNotFound:
logging.error("'htpasswd' not found in the path. Install 'htpasswd' and try again")
sys.exit(1)
def test_importing_htpasswd_should_show_error_message_if_cant_find_it(self, mock_sys, mock_logging, mock_sh_htpasswd):
mock_sh_htpasswd.side_effect = sh.CommandNotFound()
htpasswd_fn()
mock_logging.error.assert_called_with("'htpasswd' not found in the path. Install 'htpasswd' and try again")
mock_sys.exit.assert_called_with(1)
def cli(is_json):
"""
See the aeriscloud version information
"""
versions = {
'aeriscloud': {'version': ac_version},
'ansible': {'version': ansible_version},
'vagrant': {'version': vagrant_version()},
'virtualbox': {'version': virtualbox_version()},
'ruby': _ruby_version(),
'python': {'version': _python_version()},
'git': {'version': str(sh.git('--version'))[12:].strip()}
}
# aeriscloud get information
if os.path.exists(os.path.join(aeriscloud_path, '.git')):
repo = Repo(aeriscloud_path)
rev = str(repo.head.commit)[:8]
branch = str(repo.active_branch)
versions['aeriscloud']['revision'] = rev
versions['aeriscloud']['branch'] = branch
# operating system
linux_version = _linux_version()
if linux_version:
versions['linux'] = linux_version
else:
try:
# this is for osx
sw_vers = dict([map(unicode.strip, line.split(':'))
for line in sh.sw_vers()])
versions['osx'] = {
'name': sw_vers['ProductName'],
'version': sw_vers['ProductVersion'],
'build': sw_vers['BuildVersion']
}
except sh.CommandNotFound:
pass
try:
uname = str(sh.uname('-sr')).strip()
versions['kernel'] = {'version': uname}
except sh.CommandNotFound:
pass
if is_json:
click.echo(json.dumps(versions))
else:
click.echo(render_cli('version', **versions))
def cli(is_json):
"""
See the aeriscloud version information
"""
versions = {
'aeriscloud': {'version': ac_version},
'ansible': {'version': ansible_version},
'vagrant': {'version': vagrant_version()},
'virtualbox': {'version': virtualbox_version()},
'ruby': _ruby_version(),
'python': {'version': _python_version()},
'git': {'version': str(sh.git('--version'))[12:].strip()}
}
# aeriscloud get information
if os.path.exists(os.path.join(aeriscloud_path, '.git')):
repo = Repo(aeriscloud_path)
rev = str(repo.head.commit)[:8]
branch = str(repo.active_branch)
versions['aeriscloud']['revision'] = rev
versions['aeriscloud']['branch'] = branch
# operating system
linux_version = _linux_version()
if linux_version:
versions['linux'] = linux_version
else:
try:
# this is for osx
sw_vers = dict([map(unicode.strip, line.split(':'))
for line in sh.sw_vers()])
versions['osx'] = {
'name': sw_vers['ProductName'],
'version': sw_vers['ProductVersion'],
'build': sw_vers['BuildVersion']
}
except sh.CommandNotFound:
pass
try:
uname = str(sh.uname('-sr')).strip()
versions['kernel'] = {'version': uname}
except sh.CommandNotFound:
pass
if is_json:
click.echo(json.dumps(versions))
else:
click.echo(render_cli('version', **versions))
def local_ip():
"""
Retrieve the first ip from the interface linked to the default route
:return str
"""
sys_name = system()
if sys_name == 'Darwin':
# OSX
route = Command('route')
ifconfig = Command('ifconfig')
iface = [
line.strip()
for line in route('-n', 'get', 'default')
if line.strip().startswith('interface')
][0].split(':')[1].strip()
return [
line.strip()
for line in ifconfig(iface)
if line.strip().startswith('inet ')
][0].split(' ')[1]
elif sys_name == 'Linux':
try:
ip = Command('ip')
iface = [
line.strip()
for line in ip('route')
if line.strip().startswith('default ')
][0].split(' ')[4]
except CommandNotFound:
route = Command('route')
iface = [
line.strip()
for line in route('-n')
if line.startswith('0.0.0.0')
][0].split(' ').pop()
try:
# try with IP
ip = Command('ip')
return [
line.strip()
for line in ip('addr', 'show', iface)
if line.strip().startswith('inet ')
][0].split(' ')[1].split('/')[0]
except CommandNotFound:
pass
# fallback to ifconfig
ifconfig = Command('ifconfig')
return [
line.strip()
for line in ifconfig(iface)
if line.strip().startswith('inet ')
][0].split(' ')[1]
return None