def register(args, config):
path = '/clients/register/'
my_hostname = socket.getfqdn().lower()
agent_root_dir = os.path.dirname(os.path.dirname(__file__))
url = lib.lib.get_config_url(config, path)
req = requests.post(url, data=json.dumps({'fqdn': my_hostname}))
resp_body = lib.lib.get_response_body(req)
obj = json.loads(resp_body)
lib.lib.check_for_error(obj)
print('Host `{}` successfully registered with CMDB!'.format(my_hostname))
print()
print('API Key: {}'.format(obj.get('api_key')))
print()
print('!!! WARNING: save this API Key in your conifg file: `{}`'.format(
os.path.join(agent_root_dir, 'config.py')
))
python类getfqdn()的实例源码
def testHostnameRes(self):
# Testing hostname resolution mechanisms
hostname = socket.gethostname()
try:
ip = socket.gethostbyname(hostname)
except socket.error:
# Probably name lookup wasn't set up right; skip this test
return
self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
try:
hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
except socket.error:
# Probably a similar problem as above; skip this test
return
all_host_names = [hostname, hname] + aliases
fqhn = socket.getfqdn(ip)
if not fqhn in all_host_names:
self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
def setUp(self):
self.real_getfqdn = socket.getfqdn
socket.getfqdn = mock_socket.getfqdn
# temporarily replace sys.stdout to capture DebuggingServer output
self.old_stdout = sys.stdout
self.output = io.StringIO()
sys.stdout = self.output
self.serv_evt = threading.Event()
self.client_evt = threading.Event()
# Capture SMTPChannel debug output
self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
smtpd.DEBUGSTREAM = io.StringIO()
# Pick a random unused port by passing 0 for the port number
self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1))
# Keep a note of what port was assigned
self.port = self.serv.socket.getsockname()[1]
serv_args = (self.serv, self.serv_evt, self.client_evt)
self.thread = threading.Thread(target=debugging_server, args=serv_args)
self.thread.start()
# wait until server thread has assigned a port number
self.serv_evt.wait()
self.serv_evt.clear()
def setUp(self):
self.real_getfqdn = socket.getfqdn
socket.getfqdn = mock_socket.getfqdn
self.serv_evt = threading.Event()
self.client_evt = threading.Event()
# Pick a random unused port by passing 0 for the port number
self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1))
# Keep a note of what port was assigned
self.port = self.serv.socket.getsockname()[1]
serv_args = (self.serv, self.serv_evt, self.client_evt)
self.thread = threading.Thread(target=debugging_server, args=serv_args)
self.thread.start()
# wait until server thread has assigned a port number
self.serv_evt.wait()
self.serv_evt.clear()
def __init__(self):
super(TransformService, self).__init__()
self.coordinator = None
self.group = CONF.service.coordinator_group
# A unique name used for establishing election candidacy
self.my_host_name = socket.getfqdn()
# periodic check
leader_check = loopingcall.FixedIntervalLoopingCall(
self.periodic_leader_check)
leader_check.start(interval=float(
CONF.service.election_polling_frequency))
def _connect(self):
self._transport = paramiko.Transport((self.url.host, self.url.port or self._DEFAULT_PORT))
self._transport.connect(self._get_hostkey(),
self.url.user,
self.url.password,
gss_host=socket.getfqdn(self.url.host),
gss_auth=self.GSS_AUTH,
gss_kex=self.GSS_KEX)
self.__client = paramiko.SFTPClient.from_transport(self._transport)
self.__client.chdir()
def __init__(self, server, conn, addr):
asynchat.async_chat.__init__(self, conn)
self.__server = server
self.__conn = conn
self.__addr = addr
self.__line = []
self.__state = self.COMMAND
self.__greeting = 0
self.__mailfrom = None
self.__rcpttos = []
self.__data = ''
self.__fqdn = socket.getfqdn()
try:
self.__peer = conn.getpeername()
except socket.error, err:
# a race condition may occur if the other end is closing
# before we can get the peername
self.close()
if err[0] != errno.ENOTCONN:
raise
return
print >> DEBUGSTREAM, 'Peer:', repr(self.__peer)
self.push('220 %s %s' % (self.__fqdn, __version__))
self.set_terminator('\r\n')
# Overrides base class for convenience