def get_information():
information = '#####################################################\n'
information += ('System: %s\n') % (platform.system())
information += ('Machine: %s\n') % (platform.machine())
information += ('Node: %s\n') % (platform.node())
information += ('Release: %s\n') % (platform.release())
information += ('Version: %s\n') % (platform.version())
information += ('Platform: %s\n') % (platform.platform())
information += ('%s version: %s\n') % (comun.APPNAME, comun.VERSION)
information += '#####################################################\n'
return information
python类node()的实例源码
def hostname():
return platform.node()
def defaults_hostname(self):
if self.model.hostname is None:
self.model.hostname = platform.node().split('.')[0]
def execute(self, args, console):
print 'architecture:', platform.architecture()[0]
print 'java_ver:', platform.java_ver()
print 'node:', platform.node()
print 'processor:', platform.processor()
print 'python_compiler:', platform.python_compiler()
print 'python_implementation:', platform.python_implementation()
print 'python_version:', platform.python_version()
print 'release:', platform.release()
def machineInfo():
machineDict = {"pythonVersion": sys.version,
"node": platform.node(),
"OSRelease": platform.release(),
"OSVersion": platform.platform(),
"processor": platform.processor(),
"machineType": platform.machine(),
"env": os.environ,
"syspaths": sys.path}
return machineDict
def UpdateRecord(rinex, path):
try:
cnn = dbConnection.Cnn('gnss_data.cfg')
Config = pyOptions.ReadOptions('gnss_data.cfg')
rnxobj = pyRinex.ReadRinex(rinex['NetworkCode'], rinex['StationCode'], path)
date = pyDate.Date(year=rinex['ObservationYear'], doy=rinex['ObservationDOY'])
if not verify_rinex_date_multiday(date, rnxobj, Config):
cnn.begin_transac()
# propagate the deletes
cnn.query(
'DELETE FROM gamit_soln WHERE "NetworkCode" = \'%s\' AND "StationCode" = \'%s\' AND "Year" = %i AND "DOY" = %i'
% (rinex['NetworkCode'], rinex['StationCode'], rinex['ObservationYear'], rinex['ObservationDOY']))
cnn.query(
'DELETE FROM ppp_soln WHERE "NetworkCode" = \'%s\' AND "StationCode" = \'%s\' AND "Year" = %i AND "DOY" = %i'
% (rinex['NetworkCode'], rinex['StationCode'], rinex['ObservationYear'], rinex['ObservationDOY']))
cnn.query(
'DELETE FROM rinex WHERE "NetworkCode" = \'%s\' AND "StationCode" = \'%s\' AND "ObservationYear" = %i AND "ObservationDOY" = %i'
% (rinex['NetworkCode'], rinex['StationCode'], rinex['ObservationYear'], rinex['ObservationDOY']))
cnn.commit_transac()
return 'Multiday rinex file moved out of the archive: ' + rinex['NetworkCode'] + '.' + rinex['StationCode'] + ' ' + str(rinex['ObservationYear']) + ' ' + str(rinex['ObservationDOY']) + ' using node ' + platform.node()
else:
cnn.update('rinex', rinex, Completion=rnxobj.completion)
except Exception:
return traceback.format_exc() + ' processing rinex: ' + rinex['NetworkCode'] + '.' + rinex['StationCode'] + ' ' + str(rinex['ObservationYear']) + ' ' + str(rinex['ObservationDOY']) + ' using node ' + platform.node()
def __init__(self, **kwargs):
dict.__init__(self)
self['EventDate'] = datetime.datetime.now()
self['EventType'] = 'info'
self['NetworkCode'] = None
self['StationCode'] = None
self['Year'] = None
self['DOY'] = None
self['Description'] = ''
self['node'] = platform.node()
self['stack'] = None
module = inspect.getmodule(inspect.stack()[1][0])
if module is None:
self['module'] = inspect.stack()[1][3] # just get the calling module
else:
self['module'] = module.__name__ + '.' + inspect.stack()[1][3] # just get the calling module
# initialize the dictionary based on the input
for key in kwargs:
if key not in self.keys():
raise Exception('Provided key not in list of valid fields.')
arg = kwargs[key]
self[key] = arg
if self['EventType'] == 'error':
self['stack'] = ''.join(traceback.format_stack()[0:-1]) # print the traceback until just before this call
else:
self['stack'] = None
def insert_warning(self, desc):
line = inspect.stack()[1][2]
caller = inspect.stack()[1][3]
mod = platform.node()
module = '[%s:%s(%s)]\n' % (mod, caller, str(line))
# get the module calling for insert_warning to make clear how is logging this message
self.insert_event_bak('warn', module, desc)
def insert_info(self, desc):
line = inspect.stack()[1][2]
caller = inspect.stack()[1][3]
mod = platform.node()
module = '[%s:%s(%s)]\n' % (mod, caller, str(line))
self.insert_event_bak('info', module, desc)
def run_test(config, args):
"""
Run the 'test' subcommand
:param config: configuration
:type config: :py:class:`~.Config`
:param args: command line arguments
:type args: :py:class:`argparse.Namespace`
"""
base_url = get_base_url(config, args)
logger.debug('API base url: %s', base_url)
endpoints = config.get('endpoints')
if args.endpoint_name is not None:
endpoints = {
args.endpoint_name: endpoints[args.endpoint_name]
}
dt = datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f')
data = {
'message': 'testing via webhook2lambda2sqs CLI',
'version': VERSION,
'host': node(),
'datetime': dt
}
for ep in sorted(endpoints):
url = base_url + ep + '/'
print('=> Testing endpoint %s with %s: %s' % (
url, endpoints[ep]['method'], pformat(data))
)
if endpoints[ep]['method'] == 'POST':
res = requests.post(url, json=data)
elif endpoints[ep]['method'] == 'GET':
res = requests.get(url, params=data)
else:
raise Exception('Unimplemented method: %s'
'' % endpoints[ep]['method'])
print('RESULT: HTTP %d' % res.status_code)
for h in sorted(res.headers):
print('%s: %s' % (h, res.headers[h]))
print("\n%s\n" % res.content)
def init_with_context(self, context):
upMinionCount = Host.objects.filter(minion_status=1).count()
downMinionCount = Host.objects.filter(minion_status=0).count()
self.hostname = platform.node()
self.system_info = '%s, %s, %s' % (
platform.system(),
' '.join(platform.linux_distribution()),
platform.release())
self.arch = ' '.join(platform.architecture())
self.procesor = platform.processor(),
self.py_version = platform.python_version()
self.host_count = Host.objects.count()
self.buss_count = Project.objects.count()
self.minions_status = '??? %s,??? %s' % (upMinionCount, downMinionCount)
def tip(self, vcdir, abortOnError=True):
self.check_for_hg()
# We don't use run because this can be called very early before _opts is set
try:
return subprocess.check_output(['hg', 'tip', '-R', vcdir, '--template', '{node}'])
except subprocess.CalledProcessError:
if abortOnError:
abort('hg tip failed')
else:
return None
def release_version(self, snapshotSuffix='dev'):
"""
Gets the release tag from VC or create a time based once if VC is unavailable
"""
if not self._releaseVersion:
_version = self._get_early_suite_dict_property('version')
if not _version:
_version = self.vc.release_version_from_tags(self.vc_dir, self.name, snapshotSuffix=snapshotSuffix)
if not _version:
_version = 'unknown-{0}-{1}'.format(platform.node(), time.strftime('%Y-%m-%d_%H-%M-%S_%Z'))
self._releaseVersion = _version
return self._releaseVersion
def system(inp):
"""system -- Retrieves information about the host system."""
hostname = platform.node()
os = platform.platform()
python_imp = platform.python_implementation()
python_ver = platform.python_version()
architecture = '-'.join(platform.architecture())
cpu = platform.machine()
with open('/proc/uptime', 'r') as f:
uptime_seconds = float(f.readline().split()[0])
uptime = str(timedelta(seconds = uptime_seconds))
return "Hostname: \x02{}\x02, Operating System: \x02{}\x02, Python " \
"Version: \x02{} {}\x02, Architecture: \x02{}\x02, CPU: \x02{}" \
"\x02, Uptime: \x02{}\x02".format(hostname, os, python_imp, python_ver, architecture, cpu, uptime)
def get_admin_command(): # no cov
if ON_WINDOWS:
return [
'runas', r'/user:{}\{}'.format(
platform.node() or os.environ.get('USERDOMAIN', ''),
os.environ.get('_DEFAULT_ADMIN_', 'Administrator')
)
]
# Should we indeed use -H here?
else:
admin = os.environ.get('_DEFAULT_ADMIN_', '')
return ['sudo', '-H'] + (['--user={}'.format(admin)] if admin else [])
def process_send_data(socket, context):
"""
Send all memory, cpu, disk, network data of computer to server(master node)
"""
while True:
try:
cpu_percent = psutil.cpu_percent(interval=1, percpu=True)
memory_info = psutil.virtual_memory()
disk_info = psutil.disk_usage('/')
pids_computer = psutil.pids()
info_to_send = json.dumps({
"computer_utc_clock": str(datetime.datetime.utcnow()),
"computer_clock": str(datetime.datetime.now()),
"hostname": platform.node(),
"mac_address": mac_address(),
"ipv4_interfaces": internet_addresses(),
"cpu": {
"percent_used": cpu_percent
},
"memory": {
"total_bytes": memory_info.total,
"total_bytes_used": memory_info.used,
"percent_used": memory_info.percent
},
"disk": {
"total_bytes": disk_info.total,
"total_bytes_used": disk_info.used,
"total_bytes_free": disk_info.free,
"percent_used": disk_info.percent
},
"process": pids_active(pids_computer)
}).encode()
#send json data in the channel 'status', although is not necessary to send.
socket.send_multipart([b"status", info_to_send])
#time.sleep(0.500)
except (KeyboardInterrupt, SystemExit):
socket.close()
context.term()
def start(ip_address='127.0.0.1', port=5555):
"""
Connect to master node and each one second send data.
"""
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.connect("tcp://%s:%s" % (ip_address, port))
socket.sndhwm = 1
handler_signal_keyboard(socket, context)
process_send_data(socket, context)
def process_send_data(socket, context):
"""
Send all memory, cpu, disk, network data of computer to server(master node)
"""
while True:
try:
cpu_percent = psutil.cpu_percent(interval=1, percpu=True)
memory_info = psutil.virtual_memory()
disk_info = psutil.disk_usage('/')
pids_computer = psutil.pids()
info_to_send = json.dumps({
"computer_utc_clock": str(datetime.datetime.utcnow()),
"computer_clock": str(datetime.datetime.now()),
"hostname": platform.node(),
"mac_address": mac_address(),
"ipv4_interfaces": internet_addresses(),
"cpu": {
"percent_used": cpu_percent
},
"memory": {
"total_bytes": memory_info.total,
"total_bytes_used": memory_info.used,
"percent_used": memory_info.percent
},
"disk": {
"total_bytes": disk_info.total,
"total_bytes_used": disk_info.used,
"total_bytes_free": disk_info.free,
"percent_used": disk_info.percent
},
"process": pids_active(pids_computer)
}).encode()
#send json data in the channel 'status', although is not necessary to send.
socket.send_multipart([b"status", info_to_send])
#time.sleep(0.500)
except (KeyboardInterrupt, SystemExit):
socket.close()
context.term()