def load_into_pgsql(self, capture_stderr=True):
if not os.path.exists(self.overpass_filename):
return 'no data from overpass to load with osm2pgsql'
if os.stat(self.overpass_filename).st_size == 0:
return 'no data from overpass to load with osm2pgsql'
cmd = self.osm2pgsql_cmd()
if not capture_stderr:
p = subprocess.run(cmd,
env={'PGPASSWORD': current_app.config['DB_PASS']})
return
p = subprocess.run(cmd,
stderr=subprocess.PIPE,
env={'PGPASSWORD': current_app.config['DB_PASS']})
if p.returncode != 0:
if b'Out of memory' in p.stderr:
return 'out of memory'
else:
return p.stderr.decode('utf-8')
python类run()的实例源码
def chunk(self):
chunk_size = utils.calc_chunk_size(self.area_in_sq_km)
chunks = self.chunk_n(chunk_size)
print('chunk size:', chunk_size)
files = []
for num, chunk in enumerate(chunks):
filename = self.chunk_filename(num, len(chunks))
# print(num, q.count(), len(tags), filename, list(tags))
full = os.path.join('overpass', filename)
files.append(full)
if os.path.exists(full):
continue
oql = self.oql_for_chunk(chunk, include_self=(num == 0))
r = overpass.run_query_persistent(oql)
if not r:
print(oql)
assert r
open(full, 'wb').write(r.content)
cmd = ['osmium', 'merge'] + files + ['-o', self.overpass_filename]
print(' '.join(cmd))
subprocess.run(cmd)
def run(self):
tstart = datetime.now()
msg = "{} Starting validator: {}".format(tstart, self.vcf_file)
self.log(msg)
std = self.validate()
tend = datetime.now()
annotation_time = tend - tstart
msg = "{} Finished validator, it took: {}".format(tend, annotation_time)
self.log(msg)
# print(tend, 'Finished validator, it took: ', annotation_time)
return std
# Validate vcf file with Vcftools
def _get_gitinfo():
import pygrunt.platform as platform
import subprocess
from pathlib import Path
git = platform.current.find_executable('git')
if git is None:
# No git installed; assume we're on master
return ('master', '')
cwd = str(Path(__file__).parent)
args = [git, 'rev-parse', '--abbrev-ref', 'HEAD']
result = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, cwd=cwd, universal_newlines=True)
if result.returncode != 0:
# Quietly return defaults on fail
return ('master', '')
branch = result.stdout
args = [git, 'rev-parse', 'HEAD']
result = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, cwd=cwd, universal_newlines=True)
if result.returncode != 0:
# Quietly return defaults on fail
return ('master', '')
commit = result.stdout
return (branch.strip(), commit.strip())
def compile_object(self, in_file, out_file):
import subprocess
in_file = Path(in_file)
out_file = Path(out_file)
# Skip compile if RecompileStrategy says so
# Since preprocess_source ( possibly used by recompile ) also modifies self._args,
# we gotta back it up
# TODO: Maybe use something more elegant than self._args?
old_args = self._args
if out_file.is_file():
if not self.recompile.should_recompile(str(in_file)):
# Style.info('Nothing to do with', in_file)
return True
self._args = old_args
Path(out_file).parent.mkdir(parents=True, exist_ok=True)
self._args.extend(self._build_compiler_flags())
result = subprocess.run(self._args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
# TODO: do something useful with output
if result.stdout:
print(result.stdout)
if result.stderr:
print(result.stderr)
return result.returncode == 0
def link_executable(self, in_files, out_file):
import subprocess
Path(out_file).parent.mkdir(parents=True, exist_ok=True)
self._args.extend(self._build_linker_flags())
result = subprocess.run(self._args)
return result.returncode == 0
def link_library(self, in_files, out_file):
import subprocess
Path(out_file).parent.mkdir(parents=True, exist_ok=True)
self._args.extend(self._build_linker_flags())
result = subprocess.run(self._args)
return result.returncode == 0
def clear():
subprocess.run(["clear"])
def sleep(_time):
"""genera una pausa la cantidad de segundos indicados por _time
Args:
_time (int): tiempo de la pausa en segundos
"""
subprocess.run(["sleep", str(_time)+"s"])
def sys_run(command,check=False):
Ret = subprocess.run(command, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, shell=True, check=check)
return Ret
def list_links():
try:
ret = subprocess.run(['ip', 'link', 'show'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
links = ipcontrol.parse(ret.stdout.decode('utf-8'))
return [True, list(links.keys())]
except subprocess.CalledProcessError as suberror:
return [False, "list links failed : %s" % suberror.stdout.decode('utf-8')]
def link_exist(linkname):
try:
subprocess.run(['ip', 'link', 'show', 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return True
except subprocess.CalledProcessError:
return False
def link_info(linkname):
try:
ret = subprocess.run(['ip', 'address', 'show', 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return [True, ipcontrol.parse(ret.stdout.decode('utf-8'))[str(linkname)]]
except subprocess.CalledProcessError as suberror:
return [False, "get link info failed : %s" % suberror.stdout.decode('utf-8')]
def link_state(linkname):
try:
ret = subprocess.run(['ip', 'link', 'show', 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return [True, ipcontrol.parse(ret.stdout.decode('utf-8'))[str(linkname)]['state']]
except subprocess.CalledProcessError as suberror:
return [False, "get link state failed : %s" % suberror.stdout.decode('utf-8')]
def down_link(linkname):
try:
subprocess.run(['ip', 'link', 'set', 'dev', str(linkname), 'down'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return [True, str(linkname)]
except subprocess.CalledProcessError as suberror:
return [False, "set link down failed : %s" % suberror.stdout.decode('utf-8')]
def add_addr(linkname, address):
try:
subprocess.run(['ip', 'address', 'add', address, 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return [True, str(linkname)]
except subprocess.CalledProcessError as suberror:
return [False, "add address failed : %s" % suberror.stdout.decode('utf-8')]
def list_bridges():
try:
ret = subprocess.run(['ovs-vsctl', 'list-br'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return [True, ret.stdout.decode('utf-8').split()]
except subprocess.CalledProcessError as suberror:
return [False, "list bridges failed : %s" % suberror.stdout.decode('utf-8')]
def bridge_exist(bridge):
try:
subprocess.run(['ovs-vsctl', 'br-exists', str(bridge)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return True
except subprocess.CalledProcessError:
return False
def add_bridge(bridge):
try:
subprocess.run(['ovs-vsctl', '--may-exist', 'add-br', str(bridge)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return [True, str(bridge)]
except subprocess.CalledProcessError as suberror:
return [False, "add bridge failed : %s" % suberror.stdout.decode('utf-8')]
def del_bridge(bridge):
try:
subprocess.run(['ovs-vsctl', 'del-br', str(bridge)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return [True, str(bridge)]
except subprocess.CalledProcessError as suberror:
return [False, "del bridge failed : %s" % suberror.stdout.decode('utf-8')]