def __init__(self, server, conn, addr):
asynchat.async_chat.__init__(self, conn)
self.__server = server
self.__conn = conn
self.__addr = addr
self.__line = []
self.__state = self.COMMAND
self.__greeting = 0
self.__mailfrom = None
self.__rcpttos = []
self.__data = ''
self.__fqdn = socket.getfqdn()
self.__peer = conn.getpeername()
print >> DEBUGSTREAM, 'Peer:', repr(self.__peer)
self.push('220 %s %s' % (self.__fqdn, __version__))
self.set_terminator('\r\n')
# Overrides base class for convenience
python类getfqdn()的实例源码
def __init__(self, server_address):
# Create a listening socket
self.listen_socket = listen_socket = socket.socket(
self.address_family,
self.socket_type
)
# Allow to reuse the same address
listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Bind
listen_socket.bind(server_address)
# Activate
listen_socket.listen(self.request_queue_size)
# Get server host name and port
host, port = self.listen_socket.getsockname()[:2]
self.server_name = socket.getfqdn(host)
self.server_port = port
# Return headers set by Web framework/Web application
self.headers_set = []
def generate_comment(self):
docs = []
for ext in self.bot.extensions:
doc = ext.__class__.__doc__
if not doc:
continue
docs.append(inspect.cleandoc(doc))
help_ = '\n\n'.join(docs)
return self.HELP % dict(
extensions=','.join(sorted(self.bot.extensions_map.keys())),
help=help_,
host=socket.getfqdn(),
me=self.current.head.repository.SETTINGS.NAME,
mentions=', '.join(sorted([
'@' + m for m in self.current.help_mentions
])),
software=self.DISTRIBUTION.project_name,
version=self.DISTRIBUTION.version,
)
def report_failure(self, server_uuid, errno):
"""Report failure to Fabric
This method sets the status of a MySQL server identified by
server_uuid.
"""
if not self._report_errors:
return
errno = int(errno)
current_host = socket.getfqdn()
if errno in REPORT_ERRORS or errno in REPORT_ERRORS_EXTRA:
_LOGGER.debug("Reporting error %d of server %s", errno,
server_uuid)
inst = self.get_instance()
try:
data = inst.execute('threat', 'report_failure',
server_uuid, current_host, errno)
FabricResponse(data)
except (Fault, socket.error) as exc:
_LOGGER.debug("Failed reporting server to Fabric (%s)",
str(exc))
# Not requiring further action
def override_system_resolver(resolver=None):
"""Override the system resolver routines in the socket module with
versions which use dnspython's resolver.
This can be useful in testing situations where you want to control
the resolution behavior of python code without having to change
the system's resolver settings (e.g. /etc/resolv.conf).
The resolver to use may be specified; if it's not, the default
resolver will be used.
@param resolver: the resolver to use
@type resolver: dns.resolver.Resolver object or None
"""
if resolver is None:
resolver = get_default_resolver()
global _resolver
_resolver = resolver
socket.getaddrinfo = _getaddrinfo
socket.getnameinfo = _getnameinfo
socket.getfqdn = _getfqdn
socket.gethostbyname = _gethostbyname
socket.gethostbyname_ex = _gethostbyname_ex
socket.gethostbyaddr = _gethostbyaddr
def __init__(self, regr, key, meta=None):
self.key = key
self.regr = regr
self.meta = self.Meta(
# pyrfc3339 drops microseconds, make sure __eq__ is sane
creation_dt=datetime.datetime.now(
tz=pytz.UTC).replace(microsecond=0),
creation_host=socket.getfqdn()) if meta is None else meta
self.id = hashlib.md5(
self.key.key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
).hexdigest()
# Implementation note: Email? Multiple accounts can have the
# same email address. Registration URI? Assigned by the
# server, not guaranteed to be stable over time, nor
# canonical URI can be generated. ACME protocol doesn't allow
# account key (and thus its fingerprint) to be updated...
def get_host_ip(hostIP=None):
if hostIP is None or hostIP == 'auto':
hostIP = 'ip'
if hostIP == 'dns':
hostIP = socket.getfqdn()
elif hostIP == 'ip':
from socket import gaierror
try:
hostIP = socket.gethostbyname(socket.getfqdn())
except gaierror:
logging.warn('gethostbyname(socket.getfqdn()) failed... trying on hostname()')
hostIP = socket.gethostbyname(socket.gethostname())
if hostIP.startswith("127."):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# doesn't have to be reachable
s.connect(('10.255.255.255', 1))
hostIP = s.getsockname()[0]
return hostIP
def report_failure(self, server_uuid, errno):
"""Report failure to Fabric
This method sets the status of a MySQL server identified by
server_uuid.
"""
if not self._report_errors:
return
errno = int(errno)
current_host = socket.getfqdn()
if errno in REPORT_ERRORS or errno in REPORT_ERRORS_EXTRA:
_LOGGER.debug("Reporting error %d of server %s", errno,
server_uuid)
inst = self.get_instance()
try:
data = inst.execute('threat', 'report_failure',
server_uuid, current_host, errno)
FabricResponse(data)
except (Fault, socket.error) as exc:
_LOGGER.debug("Failed reporting server to Fabric (%s)",
str(exc))
# Not requiring further action
def override_system_resolver(resolver=None):
"""Override the system resolver routines in the socket module with
versions which use dnspython's resolver.
This can be useful in testing situations where you want to control
the resolution behavior of python code without having to change
the system's resolver settings (e.g. /etc/resolv.conf).
The resolver to use may be specified; if it's not, the default
resolver will be used.
@param resolver: the resolver to use
@type resolver: dns.resolver.Resolver object or None
"""
if resolver is None:
resolver = get_default_resolver()
global _resolver
_resolver = resolver
socket.getaddrinfo = _getaddrinfo
socket.getnameinfo = _getnameinfo
socket.getfqdn = _getfqdn
socket.gethostbyname = _gethostbyname
socket.gethostbyname_ex = _gethostbyname_ex
socket.gethostbyaddr = _gethostbyaddr
def __init__(self, client, path, num_clients, identifier=None):
"""Create a Double Barrier
:param client: A :class:`~kazoo.client.KazooClient` instance.
:param path: The barrier path to use.
:param num_clients: How many clients must enter the barrier to
proceed.
:type num_clients: int
:param identifier: An identifier to use for this member of the
barrier when participating. Defaults to the
hostname + process id.
"""
self.client = client
self.path = path
self.num_clients = num_clients
self._identifier = identifier or '%s-%s' % (
socket.getfqdn(), os.getpid())
self.participating = False
self.assured_path = False
self.node_name = uuid.uuid4().hex
self.create_path = self.path + "/" + self.node_name
def __init__(self, swf=None, identity=None):
"""Base class for deciders.
Parameters
----------
swf: floto.api.Swf
The SWF client, if swf is None an instance is created
"""
self.task_token = None
self.last_response = None
self.history = None
self.decisions = []
self.run_id = None
self.workflow_id = None
self.domain = None
self.identity = identity or socket.getfqdn(socket.gethostname())
self.swf = swf or floto.api.Swf()
self.task_list = None
self.terminate_workflow = False
self.terminate_decider = False
self._separate_process = None
def select_playbook(self, path):
playbook = None
if len(self.args) > 0 and self.args[0] is not None:
playbook = os.path.join(path, self.args[0])
rc = self.try_playbook(playbook)
if rc != 0:
display.warning("%s: %s" % (playbook, self.PLAYBOOK_ERRORS[rc]))
return None
return playbook
else:
fqdn = socket.getfqdn()
hostpb = os.path.join(path, fqdn + '.yml')
shorthostpb = os.path.join(path, fqdn.split('.')[0] + '.yml')
localpb = os.path.join(path, self.DEFAULT_PLAYBOOK)
errors = []
for pb in [hostpb, shorthostpb, localpb]:
rc = self.try_playbook(pb)
if rc == 0:
playbook = pb
break
else:
errors.append("%s: %s" % (pb, self.PLAYBOOK_ERRORS[rc]))
if playbook is None:
display.warning("\n".join(errors))
return playbook
def _check_address(address):
ipv4 = ""
hostname = ""
if is_valid_address(address):
ipv4 = address
try:
hostname = socket.gethostbyaddr(address)[0]
except Exception:
pass # no DNS
else:
hostname = socket.getfqdn(address)
try:
ipv4 = socket.gethostbyname(hostname)
except Exception:
pass # host not valid or offline
return ipv4, hostname
def post(content, token=None, username=None, public=False, debug=False):
'''
Post a gist on GitHub.
'''
random = hashlib.sha1(os.urandom(16)).hexdigest()
username = getuser() if username is None else username
now = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
description = ('{hash} (twitter-message-bus); from {host} by {user} '
'at {time} UTC.').format(host=getfqdn(), user=username,
time=now, hash=random)
payload = json.dumps({
'files': {
'message': {
'content': content if content is not None else ''
}
},
'public': public,
'description': description
})
response = github(http='post', uri='gists', token=token, payload=payload,
debug=debug)
return (response['id'], random) if 'id' in response else (None, None)
def welcome(run=0):
if run == 0:
form = wizard.config.get('General', {})
else:
form = _delist(request.form)
if _has_all_parameters(form, ['host', 'country', 'state', 'locality',
'orgname', 'orgunit', 'commonname',
'email']):
wizard.change_config('General', **form)
return redirect(url_for('determine_ssh_status', run=0))
if 'host' not in form:
form['host'] = socket.getfqdn()
all_params = {'run': run, **form}
return render_template('welcome.html',
menu_options=wizard.get_available_options(),
**all_params)
def testHostnameRes(self):
# Testing hostname resolution mechanisms
hostname = socket.gethostname()
try:
ip = socket.gethostbyname(hostname)
except socket.error:
# Probably name lookup wasn't set up right; skip this test
return
self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
try:
hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
except socket.error:
# Probably a similar problem as above; skip this test
return
all_host_names = [hostname, hname] + aliases
fqhn = socket.getfqdn(ip)
if not fqhn in all_host_names:
self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
def setUp(self):
self.real_getfqdn = socket.getfqdn
socket.getfqdn = mock_socket.getfqdn
# temporarily replace sys.stdout to capture DebuggingServer output
self.old_stdout = sys.stdout
self.output = io.StringIO()
sys.stdout = self.output
self.serv_evt = threading.Event()
self.client_evt = threading.Event()
# Capture SMTPChannel debug output
self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
smtpd.DEBUGSTREAM = io.StringIO()
# Pick a random unused port by passing 0 for the port number
self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1))
# Keep a note of what port was assigned
self.port = self.serv.socket.getsockname()[1]
serv_args = (self.serv, self.serv_evt, self.client_evt)
self.thread = threading.Thread(target=debugging_server, args=serv_args)
self.thread.start()
# wait until server thread has assigned a port number
self.serv_evt.wait()
self.serv_evt.clear()
def get_fqdn():
"""
Return the current fqdn of the machine, trying hard to return a meaningful
name.
In particular, it means working against a NetworkManager bug which seems to
make C{getfqdn} return localhost6.localdomain6 for machine without a domain
since Maverick.
"""
fqdn = socket.getfqdn()
if "localhost" in fqdn:
# Try the heavy artillery
fqdn = socket.getaddrinfo(socket.gethostname(), None, socket.AF_INET,
socket.SOCK_DGRAM, socket.IPPROTO_IP,
socket.AI_CANONNAME)[0][3]
if "localhost" in fqdn:
# Another fallback
fqdn = socket.gethostname()
return fqdn
def get_unique_code():
# get code from env
code_from_env = os.environ.get("STRACK_UNIQUE_CODE")
if code_from_env:
return code_from_env
# read code from cache
unique_code_cache_file = os.path.join(STRACK_USER_PATH, "unique")
if os.path.isfile(unique_code_cache_file):
with open(unique_code_cache_file) as f:
code_from_file = f.read()
os.environ.update({"STRACK_UNIQUE_CODE": code_from_file})
return code_from_file
# make dir
if not os.path.isdir(STRACK_USER_PATH):
os.makedirs(STRACK_USER_PATH)
# generate the code
computer_name = socket.getfqdn(socket.gethostname())
ip = socket.gethostbyname(computer_name)
unique_code = md5(ip).hexdigest()
# save cache
os.environ.update({"STRACK_UNIQUE_CODE": unique_code})
with open(unique_code_cache_file, "w") as f:
f.write(unique_code)
return unique_code
def perform_krb181_workaround():
cmdv = [configuration.get('kerberos', 'kinit_path'),
"-c", configuration.get('kerberos', 'ccache'),
"-R"] # Renew ticket_cache
log.info("Renewing kerberos ticket to work around kerberos 1.8.1: " +
" ".join(cmdv))
ret = subprocess.call(cmdv, close_fds=True)
if ret != 0:
principal = "%s/%s" % (configuration.get('kerberos', 'principal'), socket.getfqdn())
fmt_dict = dict(princ=principal,
ccache=configuration.get('kerberos', 'principal'))
log.error("Couldn't renew kerberos ticket in order to work around "
"Kerberos 1.8.1 issue. Please check that the ticket for "
"'%(princ)s' is still renewable:\n"
" $ kinit -f -c %(ccache)s\n"
"If the 'renew until' date is the same as the 'valid starting' "
"date, the ticket cannot be renewed. Please check your KDC "
"configuration, and the ticket renewal policy (maxrenewlife) "
"for the '%(princ)s' and `krbtgt' principals." % fmt_dict)
sys.exit(ret)
def __init__(self, regr, key, meta=None):
self.key = key
self.regr = regr
self.meta = self.Meta(
# pyrfc3339 drops microseconds, make sure __eq__ is sane
creation_dt=datetime.datetime.now(
tz=pytz.UTC).replace(microsecond=0),
creation_host=socket.getfqdn()) if meta is None else meta
self.id = hashlib.md5(
self.key.key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
).hexdigest()
# Implementation note: Email? Multiple accounts can have the
# same email address. Registration URI? Assigned by the
# server, not guaranteed to be stable over time, nor
# canonical URI can be generated. ACME protocol doesn't allow
# account key (and thus its fingerprint) to be updated...
def update_warc_info_from_spider(record, spider):
"""update a WARC warcinfo record from a scrapy Spider"""
# make empty header object to use for fields
# XXX WARCHeader messes up capitalization here
fields = warc.WARCHeader({}, defaults=False)
fields['software'] = 'osp_scraper'
fields['hostname'] = socket.getfqdn()
fields['x-spider-name'] = spider.name
fields['x-spider-run-id'] = spider.run_id
fields['x-spider-revision'] = git_revision
fields['x-spider-parameters'] = json.dumps(spider.get_parameters())
buf = BytesIO()
fields.write_to(buf, version_line=False, extra_crlf=False)
record.update_payload(buf.getvalue())
test_example_api_client.py 文件源码
项目:intel-manager-for-lustre
作者: intel-hpdd
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def test_login(self):
self.hosts = self.add_hosts([
config['lustre_servers'][0]['address'],
config['lustre_servers'][1]['address']])
# Chroma puts its FQDN in the manager certificate, but the test config may
# be pointing to localhost: if this is the case, substitute the FQDN in the
# URL so that the client can validate the certificate.
url = config['chroma_managers'][0]['server_http_url']
parsed = urlparse.urlparse(url)
if parsed.hostname == 'localhost':
parsed = list(parsed)
parsed[1] = parsed[1].replace("localhost", socket.getfqdn())
url = urlparse.urlunparse(tuple(parsed))
example_api_client.setup_ca(url)
hosts = example_api_client.list_hosts(url,
config['chroma_managers'][0]['users'][0]['username'],
config['chroma_managers'][0]['users'][0]['password']
)
self.assertListEqual(hosts, [h['fqdn'] for h in self.hosts])
def testHostnameRes(self):
# Testing hostname resolution mechanisms
hostname = socket.gethostname()
try:
ip = socket.gethostbyname(hostname)
except socket.error:
# Probably name lookup wasn't set up right; skip this test
self.skipTest('name lookup failure')
self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
try:
hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
except socket.error:
# Probably a similar problem as above; skip this test
self.skipTest('address lookup failure')
all_host_names = [hostname, hname] + aliases
fqhn = socket.getfqdn(ip)
if not fqhn in all_host_names:
self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
def testHostnameRes(self):
# Testing hostname resolution mechanisms
hostname = socket.gethostname()
try:
ip = socket.gethostbyname(hostname)
except socket.error:
# Probably name lookup wasn't set up right; skip this test
self.skipTest('name lookup failure')
self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
try:
hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
except socket.error:
# Probably a similar problem as above; skip this test
self.skipTest('address lookup failure')
all_host_names = [hostname, hname] + aliases
fqhn = socket.getfqdn(ip)
if not fqhn in all_host_names:
self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
def save(self, *args, **kwargs):
created = not self.pk
self.update_mail_status()
if self.status_code:
status_code = rfc3463_regex.match(self.status_code)
if status_code:
self.status_code = status_code.group(0)
if not self.pk:
if not self.creation_date:
self.creation_date = timezone.now()
if not self.source_hostname:
self.source_hostname = socket.getfqdn()
super().save(*args, **kwargs)
if created and self.status in [self.DROPPED, self.BOUNCED]:
self.should_optout(create=True)
def override_system_resolver(resolver=None):
"""Override the system resolver routines in the socket module with
versions which use dnspython's resolver.
This can be useful in testing situations where you want to control
the resolution behavior of python code without having to change
the system's resolver settings (e.g. /etc/resolv.conf).
The resolver to use may be specified; if it's not, the default
resolver will be used.
@param resolver: the resolver to use
@type resolver: dns.resolver.Resolver object or None
"""
if resolver is None:
resolver = get_default_resolver()
global _resolver
_resolver = resolver
socket.getaddrinfo = _getaddrinfo
socket.getnameinfo = _getnameinfo
socket.getfqdn = _getfqdn
socket.gethostbyname = _gethostbyname
socket.gethostbyname_ex = _gethostbyname_ex
socket.gethostbyaddr = _gethostbyaddr
def store_process_system_document(_process_id, _name, _parent_id=None):
"""
Creates a process instance structure, automatically sets systemPid, spawnedBy, host and spawnedWhen
"""
_struct = {
"_id": _process_id,
"systemPid": os.getpid(),
"spawnedBy": get_current_login(),
"name": _name,
"host": socket.getfqdn(),
"spawnedWhen": str(datetime.datetime.utcnow()),
"schemaRef": "ref://of.process.system"
}
if _parent_id:
_struct["parent_id"] = _parent_id,
return _struct
simple_webserver_with_wsgi.py 文件源码
项目:python_programing
作者: lzhaoyang
项目源码
文件源码
阅读 32
收藏 0
点赞 0
评论 0
def __init__(self,server_address):
#create a listening socket
self.listen_socket=listen_socket=socket.socket(
self.address_family,self.socket_type
)
#allow the same socket address
listen_socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
#bind to the address
listen_socket.bind(server_address)
#activate to listening
listen_socket.listen(self.request_queue_size)
#get server host name and port
host,port = self.listen_socket.getsockname()[:2]
self.server_name=socket.getfqdn(host)
self.server_port=port
# return headers set by web frameworks/applications
self.headers_set=[]
def override_system_resolver(resolver=None):
"""Override the system resolver routines in the socket module with
versions which use dnspython's resolver.
This can be useful in testing situations where you want to control
the resolution behavior of python code without having to change
the system's resolver settings (e.g. /etc/resolv.conf).
The resolver to use may be specified; if it's not, the default
resolver will be used.
@param resolver: the resolver to use
@type resolver: dns.resolver.Resolver object or None
"""
if resolver is None:
resolver = get_default_resolver()
global _resolver
_resolver = resolver
socket.getaddrinfo = _getaddrinfo
socket.getnameinfo = _getnameinfo
socket.getfqdn = _getfqdn
socket.gethostbyname = _gethostbyname
socket.gethostbyname_ex = _gethostbyname_ex
socket.gethostbyaddr = _gethostbyaddr