def vulture():
""" try to find dead code paths """
with api.quiet():
if not api.local('which vulture').succeeded:
print 'vulture not found, installing it'
api.local('pip install vulture')
ignore_functions_grep = 'egrep -v "{0}"'.format(
'|'.join(VULTURE_IGNORE_FUNCTIONS))
excluded = ",".join(VULTURE_EXCLUDE_PATHS)
excluded_paths = (' --exclude ' + excluded) if excluded else ''
vulture_cmd = '\n vulture {pkg_name}{exclude}{pipes}'
vulture_cmd = vulture_cmd.format(
pkg_name=PKG_NAME,
exclude=excluded_paths,
pipes='|'.join(['', ignore_functions_grep]))
changedir = api.lcd(os.path.dirname(__file__))
warn_only = api.settings(warn_only=True)
be_quit = api.hide('warnings')
with contextlib.nested(changedir, warn_only, be_quit):
result = api.local(vulture_cmd, capture=True)
exit_code = result.return_code
print result.strip()
raise SystemExit(exit_code)
python类quiet()的实例源码
def list_platforms(root_dir):
"""
???????????????
"""
def is_platform(dir):
"""
????version.lua???????????????????
"""
with quiet():
return run('test -f "{}/{}/version.lua"'.format(root_dir, dir)).succeeded
with cd(root_dir), hide('stdout'):
result = run('''find ./ -mindepth 1 -maxdepth 1 -type d -print |grep --color=never -vE '([0-9]+(\.[0-9]+){3}\\b)|(lyServers)' ''')
dirs = [each.lstrip('./') for each in result.splitlines()]
return [each for each in dirs if is_platform(each)]
def fetch_and_inflat_package(template_matchServer_ip, remote_dir, matchServer):
mk_remote_dir(remote_dir)
with cd(remote_dir):
#wget = 'wget -c -t 10 -T 10 -q'
#server_name = 'match_download_{}'.format(TIME)
#run('''{} --header="Host:{}" http://{}/package.tgz'''.format(wget, server_name, template_matchServer_ip))
run('tar zxf package.tgz')
with quiet():
nginx_conf_exsits = run('test -f /app/nginx/conf/vhost/{}.conf'.format(matchServer)).succeeded
if nginx_conf_exsits:
run('mkdir backup')
run('mv /app/nginx/conf/vhost/{}.conf backup/'.format(matchServer))
run('cp nginx_matchServer.conf /app/nginx/conf/vhost/{}.conf'.format(matchServer))
run('''pandora --update -e 'create database {}' '''.format(matchServer))
run('''pandora --update {} <matchServer_init.sql'''.format(matchServer))
run('''mkdir -p /app/{}'''.format(matchServer))
run('''tar zxf matchServer.tgz -C /app/{}'''.format(matchServer))
def update_backend(gameServer, version, mainland=True):
backup_dir = '/app/opbak/{}/{}'.format(TIME, gameServer)
run(''' [ -d {0} ] || mkdir -p {0} '''.format(backup_dir))
with cd('/app/{}/backend/apps'.format(gameServer)):
for conf_file in ['app.properties', 'plugins.xml']:
#check if the config exists
with quiet():
conf_exists = run('test -f {}'.format(conf_file)).succeeded
if conf_exists:
run('cp {} {}/'.format(conf_file, backup_dir))
if mainland:
cmd = ''' sed -i '/http:\/\/.*\/%s/s/%stest_[0-9]\{1,3\}-[0-9]\{1,3\}-[0-9]\{1,3\}/%s/g' %s ''' % (GAME, GAME, version, conf_file)
else:
cmd = ''' sed -i '/http:\/\/.*\/%s/s/%stest_[a-z]\{2,5\}_[0-9]\{1,3\}-[0-9]\{1,3\}-[0-9]\{1,3\}/%s/g' %s ''' % (GAME, GAME, version, conf_file)
run(cmd)
def ip():
"""The ip where the addons site can be accessed"""
docker_host = os.environ.get("DOCKER_HOST") or ""
if not docker_host:
with quiet():
docker_env = local("docker-machine env addons", capture=True)
if docker_env:
match = re.search(r'DOCKER_HOST="(tcp://[^"]+?)"', docker_env)
if match:
docker_host = match.group(1)
match = re.search(r'tcp://([^:]+):', docker_host)
if match:
print(match.group(1))
else:
try:
# host used by dlite
_, _, ips = socket.gethostbyname_ex("local.docker")
except:
abort("Could not determine docker-machine host; perhaps localhost?")
else:
print(ips[0])
def create_virtualenv():
require('srvr', 'path', 'within_virtualenv', provided_by=env.servers)
with quiet():
env_vpath = os.path.join(env.envs_path, 'dprr-' + env.srvr)
if run('ls {}'.format(env_vpath)).succeeded:
print(
green('virtual environment at [{}] exists'.format(env_vpath)))
return
print(yellow('setting up virtual environment in [{}]'.format(env_vpath)))
run('virtualenv {}'.format(env_vpath))
def clone_repo():
require('srvr', 'path', 'within_virtualenv', provided_by=env.servers)
with quiet():
if run('ls {}'.format(os.path.join(env.path, '.git'))).succeeded:
print(green(('repository at'
' [{}] exists').format(env.path)))
return
print(yellow('cloning repository to [{}]'.format(env.path)))
run('git clone --recursive {} {}'.format(REPOSITORY, env.path))
def status_rabbitmq():
with settings(warn_only=True), quiet():
res = local(RABBITMQ_CMD(action='status'), capture=True)
if res.return_code == 2 or res.return_code == 69:
status = STATUS.STOPPED
elif res.return_code == 0:
status = STATUS.RUNNING
else:
raise Exception("Rabbitmq: unknown status " + str(res.return_code))
print status
print_status(status, 'rabbitmq')
return status
def git_branch():
""" returns (branch-name, hash) """
with api.quiet():
cmd = 'git rev-parse --abbrev-ref HEAD'
current_branch = api.local(cmd, capture=True)
cmd = 'git rev-parse HEAD'
current_hash = api.local(cmd, capture=True)
return current_hash, current_branch
def _local_resolver(domain):
with quiet():
ret_value = local('''nslookup %s |grep "^Address" |grep -v '#'|awk '{print $2}' ''' % domain, capture=True)
if ret_value:
return ret_value
else:
raise Exception('Fail to resolve {} to ip address'.format(domain))
def remote_dir_exists(dir):
with quiet():
dir_exists = run('test -d {}'.format(dir)).succeeded
return dir_exists
def file_exists_check(file):
with quiet():
ret = run('test -f {}'.format(file)).succeeded
return ret
def __init__(self, filename, remote_dir=REMOTE_DIR):
self.filename = filename
with quiet():
has_the_file = run('test -f {}'.format(filename)).succeeded
if not has_the_file:
raise Exception('File {} NOT exists under backend/apps/'.format(filename))
self.dir = run('pwd')
gameServer = self.dir.split('/')[2]
backup_dir = '{}/{}'.format(remote_dir, gameServer)
run('[ -d {0} ] || mkdir -p {0}'.format(backup_dir))
run('cp {} {}/'.format(self.filename, backup_dir))
def has_the_key(self, key):
with quiet():
_has_the_key = run('grep --color=never "^{} *=" {}'.format(key, self.filename)).succeeded
return _has_the_key
def remote_file_exists(file):
with quiet():
return run('test -f "{}"'.format(file)).succeeded
def remote_dir_exists(file):
with quiet():
return run('test -d "{}"'.format(file)).succeeded
def __init__(self, filename, remote_dir=REMOTE_DIR):
self.filename = filename
self.dir = run('pwd')
with quiet():
has_the_file = run('test -f {}'.format(filename)).succeeded
if not has_the_file:
raise Exception('File {}/{} NOT exists'.format(self.dir, filename))
tmp_tag = self.filename.split('/')[0]
backup_dir = '{}/{}'.format(remote_dir, tmp_tag)
run('[ -d {0} ] || mkdir -p {0}'.format(backup_dir))
run('cp {} {}/'.format(self.filename, backup_dir))
def check_file(file):
pattern = r'^/.*/hotswap.zip$'
file_with_full_path = '/app/online/{}{}'.format(GAME, file)
file_path = os.path.dirname(file_with_full_path)
pattern_matched = (re.match(pattern, file), 'Hotswap filename should be hotswap.zip')
with quiet():
file_exists = (local('test -f {}'.format(file_with_full_path)).succeeded, '{} does NOT exists on FTP, please check'.format(file))
md5_exists = (local('test -f {}/md5.txt'.format(file_path)).succeeded, 'md5.txt does NOT exists on FTP, please check')
for each_check in [pattern_matched, file_exists, md5_exists]:
if each_check[0]:
pass
else:
raise Exception(each_check[1])
def remote_file_exists(file):
with quiet():
return run('test -f "{}"'.format(file)).succeeded
def remote_dir_exists(file):
with quiet():
return run('test -d "{}"'.format(file)).succeeded
def print_help():
with quiet():
@hosts(DNS_SERVER)
def _dnsapi():
result = run('/app/opbin/dns/dnsapi -h')
print(result)
execute(_dnsapi)
def remote_file_exists(file):
with quiet():
return run('test -f "{}"'.format(file)).succeeded
def remote_dir_exists(file):
with quiet():
return run('test -d "{}"'.format(file)).succeeded
def remote_dir_exists(dir):
with quiet():
dir_exists = run('test -d {}'.format(dir)).succeeded
return dir_exists
def matchServer_exists(matchServer, ip):
with quiet():
exists = local('''grep "\\b{}\\b" /etc/hosts '''.format(matchServer)).succeeded
if exists:
raise Exception('''The match server {} already exists in /etc/hosts'''.format(matchServer))
else:
matchServer_dir_exists = execute(remote_dir_exists, '/app/{}'.format(matchServer), hosts=[ip])[ip]
if matchServer_dir_exists:
raise Exception('''The match dir: /app/{} already exists on {}'''.format(matchServer, ip))
#def create_nginx_conf(remote_dir):
# server_name = 'match_download_{}'.format(TIME)
# conf_name = 'download_{}.conf'.format(TIME)
# with cd('/app/nginx/conf/vhost'):
# run('''echo -e "server {\\n listen 80;\\n server_name %s;\\n root %s;\\n index Main.html;\\n access_log logs/default.access.log main;\\n location / {\\n expires 0;\\n }\\n\\n error_page 404 500 502 503 504 /404.html;\\n}" >%s''' % (server_name, remote_dir, conf_name))
def load_file(game_servers, local_file, remote_file, load_type='upload'):
test_server_info = get_test_server_info(GAME)
check_game_servers(game_servers, test_server_info)
locate_game_servers = transform_gameservers(game_servers, test_server_info)
ips = locate_game_servers.keys()
@hosts(ips)
def _upload_file():
upload(local_file, REMOTE_DIR, env.host_string)
for game_server in locate_game_servers[env.host_string]:
replace_file(game_server, remote_file)
@hosts(ips)
def _download_file():
for game_server in locate_game_servers[env.host_string]:
local_path = '{}/{}/'.format(local_root_path, game_server)
local('su - astd -c "mkdir -p {}"'.format(local_path))
target_file = '/app/{}/{}'.format(game_server, remote_file)
with quiet():
target_file_exists = run('test -f {}'.format(target_file)).succeeded
if target_file_exists:
get(target_file, local_path)
else:
raise Exception('File {} NOT exists on {}'.format(target_file, game_server))
local('chown -R astd.astd {}'.format(local_root_path))
if load_type == 'upload':
ftp_file_check(local_file)
file_name_consistence_check(local_file, remote_file)
execute(_upload_file)
print('{} was uploaded to {} successfully.'.format(local_file, game_servers))
elif load_type == 'download':
ftp_path = 'download/{}/{}'.format(GAME, TIMESTAMP)
local_root_path = '/app/online/{}'.format(ftp_path)
execute(_download_file)
print('Downloaded remote file: {} to FTP: {}/'.format(remote_file, ftp_path))
def get_clusterdock_container_id():
"""Returns the container ID of the Docker container running clusterdock.
"""
with quiet():
for cgroup in local('cat /proc/self/cgroup', capture=True).stdout.split('\n'):
if 'docker' in cgroup:
return cgroup.rsplit('/')[-1]
# If we get through the loop and never find the cgroup, something has gone very wrong.
raise ContainerNotFoundException('Could not find container name from /proc/self/cgroup.')
def _get_images():
return client.images(quiet=True)
def _get_running_containers():
return client.containers(all=False, quiet=True)