def get_cgroup_containers():
containers = subprocess.getoutput("find %s -type d 2>/dev/null | awk -F\/ '{print $(NF-1)}'" % (cgroup_manager.__default_prefix__ % ('cpu', '*', '.'))).split()
uuids = []
for item in containers:
if item.startswith('docker-') and item.endswith('.scope') and len(item) > 64:
uuids.append(item[7:-6])
else:
uuids.append(item)
return uuids
python类getoutput()的实例源码
def signal_handler(signal, frame):
if sys.argv[1] == 'master':
subprocess.getoutput('ovs-vsctl del-br ovs-master >/dev/null 2>&1')
else:
subprocess.getoutput('ovs-vsctl del-br ovs-minion >/dev/null 2>&1')
sys.exit(0)
def get_score_by_uuid(uuid):
user = uuid.split('-')[0]
online = subprocess.getoutput('cat /var/lib/docklet/global/users/%s/status 2>/dev/null' % user) == 'live'
return 10.0 if online else 1.0
def mpv_pause_status():
stdoutdata = subprocess.getoutput('echo \'{ "command": ["get_property", "pause"] }\' | socat - "' + mpv_socket + '"')
try:
return loads(stdoutdata)['data']
except:
return mpv_pause_status()
def mpv_fullscreen_status():
stdoutdata = subprocess.getoutput('echo \'{ "command": ["get_property", "fullscreen"] }\' | socat - "' + mpv_socket + '"')
try:
return loads(stdoutdata)['data']
except:
return mpv_fullscreen_status()
def create_jp2(img_name, img_id):
image = Image.open('{}.tiff'.format(img_name))
if image.mode == 'RGB':
kdu_com = kdu_command_rgb.format(img_name, img_name)
else:
kdu_com = kdu_command.format(img_name, img_name)
kdu_com = kdu_com.replace('{', '\{')
kdu_com = kdu_com.replace('}', '\}')
res = subprocess.getoutput(kdu_com)
if res.startswith('Kakadu Error'):
# Probably not uncompressed tiff
print('probably not uncompressed tiff')
print(res)
subprocess.getoutput('mv {}.tiff {}-2.tiff'.format(img_name, img_name))
subprocess.getoutput(convert_command.format(img_name, img_name))
kdu_com2 = kdu_command.format(img_name, img_name)
kdu_com2 = kdu_com2.replace('{', '\{')
kdu_com2 = kdu_com2.replace('}', '\}')
res = subprocess.getoutput(kdu_com2)
print('new response')
print(res)
if res.startswith('Kakadu Error') or res.startswith('Kakadu Core Error'):
print('Still broken :(')
raise ValueError(img_name)
k = Key(b, '{}{}.jp2'.format(image_base, img_id))
k.set_contents_from_filename('{}.jp2'.format(img_name))
def get_gitversion() -> dict:
"""
Uses GitVersion (https://github.com/GitTools/GitVersion) to infer project's current version
Returns Gitversion JSON output as a dict
"""
if os.environ.get('APPVEYOR'):
exe = find_executable('gitversion', r'C:\ProgramData\chocolatey\bin')
else:
exe = find_executable('gitversion')
if not exe:
click.secho(
'"gitversion.exe" not been found in your PATH.\n'
'GitVersion is used to infer the current version from the Git repository.\n'
'setuptools_scm plans on switching to using the Semver scheme in the future; when that happens, '
'I\'ll remove the dependency to GitVersion.\n'
'In the meantime, GitVersion can be obtained via Chocolatey (recommended): '
'https://chocolatey.org/packages/GitVersion.Portable\n'
'If you already have chocolatey installed, you can simply run the following command (as admin):\n\n'
'\t\t"choco install gitversion.portable -pre -y"\n\n'
'If you\'re not comfortable using the command line, there is a GUI tool for Chocolatey available at:\n\n'
'\t\thttps://github.com/chocolatey/ChocolateyGUI/releases\n\n'
'Or you can install directly from :\n\n'
'\t\thttps://github.com/GitTools/GitVersion/releases',
err=True,
)
exit(-1)
return loads(subprocess.getoutput([exe]).rstrip())
def get_gitversion() -> dict:
"""
Runs the gitversion executable and returns a dictionary of values
Example "gitversion" output::
"Major":0,
"Minor":4,
"Patch":4,
"PreReleaseTag":"dev.11",
"PreReleaseTagWithDash":"-dev.11",
"PreReleaseLabel":"dev",
"PreReleaseNumber":11,
"BuildMetaData":"",
"BuildMetaDataPadded":"",
"FullBuildMetaData":"Branch.develop.Sha.b22387288a19ac67641fac2711b940c4cab6d021",
"MajorMinorPatch":"0.4.4",
"SemVer":"0.4.4-dev.11",
"LegacySemVer":"0.4.4-dev11",
"LegacySemVerPadded":"0.4.4-dev0011",
"AssemblySemVer":"0.4.4.0",
"FullSemVer":"0.4.4-dev.11",
"InformationalVersion":"0.4.4-dev.11+Branch.develop.Sha.b22387288a19ac67641fac2711b940c4cab6d021",
"BranchName":"develop",
"Sha":"b22387288a19ac67641fac2711b940c4cab6d021",
"NuGetVersionV2":"0.4.4-dev0011",
"NuGetVersion":"0.4.4-dev0011",
"NuGetPreReleaseTagV2":"dev0011",
"NuGetPreReleaseTag":"dev0011",
"CommitsSinceVersionSource":11,
"CommitsSinceVersionSourcePadded":"0011",
"CommitDate":"2017-07-18"
"""
# This is a potential security breach, but I'm leaving it as is as it should only be running either from a dev
# machine or on Appveyor
cmd = r'C:\ProgramData\chocolatey\bin\gitversion.exe' if os.environ.get('APPVEYOR') else 'gitversion'
return loads(subprocess.getoutput([cmd]).rstrip())
def _minify(self, extension):
for root, dirs, files in os.walk(settings.STATIC_ROOT):
for f in files:
path = os.path.join(root, f)
if path.endswith(extension):
if getattr(self, 'dry_run', False):
print('[dry run] skipping minifying ' + path + '...')
continue
mini = getoutput('yui-compressor ' + path)
print('minifying "' + path + '" ... ', end='')
with open(path, 'w') as p:
p.write(mini)
print('done')
def time(self):
output=subprocess.getoutput("ps aux")
if u.uid == "":
if "google" in output:
subprocess.getoutput("killall -9 chrome")
from gir import Ui_Dialog
elif "firefox" in output:
subprocess.getoutput("killall -9 firefox")
from gir import Ui_Dialog
def resolve(self,request,handler):
reply = request.reply()
qname = request.q.qname
cmd = self.routes.get(qname)
if cmd:
output = getoutput(cmd).encode()
reply.add_answer(RR(qname,QTYPE.TXT,ttl=self.ttl,
rdata=TXT(output[:254])))
else:
reply.header.rcode = RCODE.NXDOMAIN
return reply
def resolve(self,request,handler):
reply = request.reply()
qname = request.q.qname
cmd = self.routes.get(qname)
if cmd:
output = getoutput(cmd).encode()
reply.add_answer(RR(qname,QTYPE.TXT,ttl=self.ttl,
rdata=TXT(output[:254])))
else:
reply.header.rcode = RCODE.NXDOMAIN
return reply
arpscan_tracker.py 文件源码
项目:home-assistant-custom-components
作者: cyberjunky
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def _update_info(self):
"""Scan the network for devices.
Returns boolean if scanning successful.
"""
_LOGGER.debug("Update_info called")
options = self._options
last_results = []
exclude_hosts = self.exclude
scandata = subprocess.getoutput("arp-scan "+options)
now = dt_util.now()
for line in scandata.splitlines():
ipv4 = re.findall(r'[0-9]+(?:\.[0-9]+){3}', line)
if not ipv4:
continue
parts = line.split()
ipv4 = parts[0]
for exclude in exclude_hosts:
if exclude == ipv4:
_LOGGER.debug("Excluded %s", exclude)
continue
name = ipv4
mac = parts[1]
last_results.append(Device(mac, name, ipv4, now))
self.last_results = last_results
_LOGGER.debug("Update_info successful")
return True
def resolve(self,request,handler):
reply = request.reply()
qname = request.q.qname
cmd = self.routes.get(qname)
if cmd:
output = getoutput(cmd).encode()
reply.add_answer(RR(qname,QTYPE.TXT,ttl=self.ttl,
rdata=TXT(output[:254])))
else:
reply.header.rcode = RCODE.NXDOMAIN
return reply
def test_terminate(self):
self._kill_process('terminate')
# The module says:
# "NB This only works (and is only relevant) for UNIX."
#
# Actually, getoutput should work on any platform with an os.popen, but
# I'll take the comment as given, and skip this suite.
def get_token(numero):
headers = {
"x-requested-with": "com.services.movistar.ar",
"Content-Type": "application/x-www-form-urlencoded"
}
encoded_number = subprocess.getoutput(
"java -jar Movistar_Exploit_V2-1.0-SNAPSHOT-jar-with-dependencies.jar %s" % numero)
print("Numero codificado: %s" % encoded_number)
data = {
"grant_type": "mobile",
"username": encoded_number,
"client_id": "appcontainer",
"client_secret": "YXBwY29udGFpbmVy"
}
r = requests.post(
url=MOVISTAR_API_ROOT + "oauth/token",
headers=headers,
data=data
)
if r.status_code == requests.codes.ok:
parsed_response = r.json()
token = parsed_response["access_token"]
print("Token: %s" % token)
return token
else:
print("Error 1")
return None
def test_that_verifies_if_mincheader_tool_is_callable(self):
self.actualMincHeaderOutput = subprocess.getoutput(['mincheader'])
self.assertEqual(self.expectedMincHeaderOutput, self.actualMincHeaderOutput)
def exc(ctx, *args):
if Henry(ctx):
query = " ".join(args)
await bot.say("```"+subprocess.getoutput(query)+"```")
else:
await bot.say("You ain't my master! Shoo!")
def exc(ctx, *args):
if Henry(ctx):
query = " ".join(args)
await bot.say("```"+subprocess.getoutput(query)+"```")
else:
await bot.say("You ain't my master! Shoo!")
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--toc-maker", help="path to ToC making tool")
parser.add_argument("--twitter-poster", default="t update", help="twitter poster command")
parser.add_argument("-t", "--use-twitter", action="store_true")
known_args, unknown_args = parser.parse_known_args()
if not known_args.toc_maker:
known_args.toc_maker = "./gh-md-toc"
if not os.path.isfile(known_args.toc_maker):
s = cmd.getoutput("uname -s").lower()
f = "gh-md-toc.%s.amd64.tgz" % s
URL = "https://github.com/ekalinin/github-markdown-toc.go/releases/download/0.6.0/%s" % f
if not os.path.isfile(f):
if cmd.getstatusoutput("wget %s" % URL)[0] != 0:
raise EnvironmentError("Cannot download toc maker from URL: %s" % URL)
if cmd.getstatusoutput("tar xzf %s" % f)[0] != 0:
raise EnvironmentError("Cannot untar toc maker from file %s" % f)
os.remove(f)
current_permissions = stat.S_IMODE(os.lstat(known_args.toc_maker).st_mode)
os.chmod(known_args.toc_maker, current_permissions & stat.S_IXUSR)
if unknown_args:
filepath = unknown_args[0]
else:
print("You should specify the path for file to work with!")
quit(1)
return known_args, filepath