python类getfqdn()的实例源码

smtpd.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, server, conn, addr):
        asynchat.async_chat.__init__(self, conn)
        self.__server = server
        self.__conn = conn
        self.__addr = addr
        self.__line = []
        self.__state = self.COMMAND
        self.__greeting = 0
        self.__mailfrom = None
        self.__rcpttos = []
        self.__data = ''
        self.__fqdn = socket.getfqdn()
        self.__peer = conn.getpeername()
        print >> DEBUGSTREAM, 'Peer:', repr(self.__peer)
        self.push('220 %s %s' % (self.__fqdn, __version__))
        self.set_terminator('\r\n')

    # Overrides base class for convenience
webserver.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, server_address):
        # Create a listening socket
        self.listen_socket = listen_socket = socket.socket(
            self.address_family,
            self.socket_type
        )
        # Allow to reuse the same address
        listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        # Bind
        listen_socket.bind(server_address)
        # Activate
        listen_socket.listen(self.request_queue_size)
        # Get server host name and port
        host, port = self.listen_socket.getsockname()[:2]
        self.server_name = socket.getfqdn(host)
        self.server_port = port
        # Return headers set by Web framework/Web application
        self.headers_set = []
core.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def generate_comment(self):
        docs = []
        for ext in self.bot.extensions:
            doc = ext.__class__.__doc__
            if not doc:
                continue
            docs.append(inspect.cleandoc(doc))
        help_ = '\n\n'.join(docs)
        return self.HELP % dict(
            extensions=','.join(sorted(self.bot.extensions_map.keys())),
            help=help_,
            host=socket.getfqdn(),
            me=self.current.head.repository.SETTINGS.NAME,
            mentions=', '.join(sorted([
                '@' + m for m in self.current.help_mentions
            ])),
            software=self.DISTRIBUTION.project_name,
            version=self.DISTRIBUTION.version,
        )
connection.py 文件源码 项目:python-mysql-pool 作者: LuciferJack 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def report_failure(self, server_uuid, errno):
        """Report failure to Fabric

        This method sets the status of a MySQL server identified by
        server_uuid.
        """
        if not self._report_errors:
            return

        errno = int(errno)
        current_host = socket.getfqdn()

        if errno in REPORT_ERRORS or errno in REPORT_ERRORS_EXTRA:
            _LOGGER.debug("Reporting error %d of server %s", errno,
                          server_uuid)
            inst = self.get_instance()
            try:
                data = inst.execute('threat', 'report_failure',
                                    server_uuid, current_host, errno)
                FabricResponse(data)
            except (Fault, socket.error) as exc:
                _LOGGER.debug("Failed reporting server to Fabric (%s)",
                              str(exc))
                # Not requiring further action
resolver.py 文件源码 项目:spiderfoot 作者: wi-fi-analyzer 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def override_system_resolver(resolver=None):
    """Override the system resolver routines in the socket module with
    versions which use dnspython's resolver.

    This can be useful in testing situations where you want to control
    the resolution behavior of python code without having to change
    the system's resolver settings (e.g. /etc/resolv.conf).

    The resolver to use may be specified; if it's not, the default
    resolver will be used.

    @param resolver: the resolver to use
    @type resolver: dns.resolver.Resolver object or None
    """
    if resolver is None:
        resolver = get_default_resolver()
    global _resolver
    _resolver = resolver
    socket.getaddrinfo = _getaddrinfo
    socket.getnameinfo = _getnameinfo
    socket.getfqdn = _getfqdn
    socket.gethostbyname = _gethostbyname
    socket.gethostbyname_ex = _gethostbyname_ex
    socket.gethostbyaddr = _gethostbyaddr
account.py 文件源码 项目:TCP-IP 作者: JackZ0 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __init__(self, regr, key, meta=None):
        self.key = key
        self.regr = regr
        self.meta = self.Meta(
            # pyrfc3339 drops microseconds, make sure __eq__ is sane
            creation_dt=datetime.datetime.now(
                tz=pytz.UTC).replace(microsecond=0),
            creation_host=socket.getfqdn()) if meta is None else meta

        self.id = hashlib.md5(
            self.key.key.public_key().public_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PublicFormat.SubjectPublicKeyInfo)
        ).hexdigest()
        # Implementation note: Email? Multiple accounts can have the
        # same email address. Registration URI? Assigned by the
        # server, not guaranteed to be stable over time, nor
        # canonical URI can be generated. ACME protocol doesn't allow
        # account key (and thus its fingerprint) to be updated...
tracker.py 文件源码 项目:dlbench 作者: hclhkbu 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_host_ip(hostIP=None):
    if hostIP is None or hostIP == 'auto':
        hostIP = 'ip'

    if hostIP == 'dns':
        hostIP = socket.getfqdn()
    elif hostIP == 'ip':
        from socket import gaierror
        try:
            hostIP = socket.gethostbyname(socket.getfqdn())
        except gaierror:
            logging.warn('gethostbyname(socket.getfqdn()) failed... trying on hostname()')
            hostIP = socket.gethostbyname(socket.gethostname())
        if hostIP.startswith("127."):
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            # doesn't have to be reachable
            s.connect(('10.255.255.255', 1))
            hostIP = s.getsockname()[0]
    return hostIP
connection.py 文件源码 项目:importacsv 作者: rasertux 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def report_failure(self, server_uuid, errno):
        """Report failure to Fabric

        This method sets the status of a MySQL server identified by
        server_uuid.
        """
        if not self._report_errors:
            return

        errno = int(errno)
        current_host = socket.getfqdn()

        if errno in REPORT_ERRORS or errno in REPORT_ERRORS_EXTRA:
            _LOGGER.debug("Reporting error %d of server %s", errno,
                          server_uuid)
            inst = self.get_instance()
            try:
                data = inst.execute('threat', 'report_failure',
                                    server_uuid, current_host, errno)
                FabricResponse(data)
            except (Fault, socket.error) as exc:
                _LOGGER.debug("Failed reporting server to Fabric (%s)",
                              str(exc))
                # Not requiring further action
resolver.py 文件源码 项目:hakkuframework 作者: 4shadoww 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def override_system_resolver(resolver=None):
    """Override the system resolver routines in the socket module with
    versions which use dnspython's resolver.

    This can be useful in testing situations where you want to control
    the resolution behavior of python code without having to change
    the system's resolver settings (e.g. /etc/resolv.conf).

    The resolver to use may be specified; if it's not, the default
    resolver will be used.

    @param resolver: the resolver to use
    @type resolver: dns.resolver.Resolver object or None
    """
    if resolver is None:
        resolver = get_default_resolver()
    global _resolver
    _resolver = resolver
    socket.getaddrinfo = _getaddrinfo
    socket.getnameinfo = _getnameinfo
    socket.getfqdn = _getfqdn
    socket.gethostbyname = _gethostbyname
    socket.gethostbyname_ex = _gethostbyname_ex
    socket.gethostbyaddr = _gethostbyaddr
barrier.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, client, path, num_clients, identifier=None):
        """Create a Double Barrier

        :param client: A :class:`~kazoo.client.KazooClient` instance.
        :param path: The barrier path to use.
        :param num_clients: How many clients must enter the barrier to
                            proceed.
        :type num_clients: int
        :param identifier: An identifier to use for this member of the
                           barrier when participating. Defaults to the
                           hostname + process id.

        """
        self.client = client
        self.path = path
        self.num_clients = num_clients
        self._identifier = identifier or '%s-%s' % (
            socket.getfqdn(), os.getpid())
        self.participating = False
        self.assured_path = False
        self.node_name = uuid.uuid4().hex
        self.create_path = self.path + "/" + self.node_name
base.py 文件源码 项目:floto 作者: babbel 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __init__(self, swf=None, identity=None):
        """Base class for deciders.
        Parameters
        ----------
        swf: floto.api.Swf
            The SWF client, if swf is None an instance is created
        """
        self.task_token = None
        self.last_response = None
        self.history = None
        self.decisions = []
        self.run_id = None
        self.workflow_id = None
        self.domain = None
        self.identity = identity or socket.getfqdn(socket.gethostname())

        self.swf = swf or floto.api.Swf()
        self.task_list = None

        self.terminate_workflow = False
        self.terminate_decider = False

        self._separate_process = None
pull.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def select_playbook(self, path):
        playbook = None
        if len(self.args) > 0 and self.args[0] is not None:
            playbook = os.path.join(path, self.args[0])
            rc = self.try_playbook(playbook)
            if rc != 0:
                display.warning("%s: %s" % (playbook, self.PLAYBOOK_ERRORS[rc]))
                return None
            return playbook
        else:
            fqdn = socket.getfqdn()
            hostpb = os.path.join(path, fqdn + '.yml')
            shorthostpb = os.path.join(path, fqdn.split('.')[0] + '.yml')
            localpb = os.path.join(path, self.DEFAULT_PLAYBOOK)
            errors = []
            for pb in [hostpb, shorthostpb, localpb]:
                rc = self.try_playbook(pb)
                if rc == 0:
                    playbook = pb
                    break
                else:
                    errors.append("%s: %s" % (pb, self.PLAYBOOK_ERRORS[rc]))
            if playbook is None:
                display.warning("\n".join(errors))
            return playbook
resource_mgr.py 文件源码 项目:opsmgr 作者: open-power-ref-design-toolkit 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _check_address(address):
    ipv4 = ""
    hostname = ""
    if is_valid_address(address):
        ipv4 = address
        try:
            hostname = socket.gethostbyaddr(address)[0]
        except Exception:
            pass  # no DNS
    else:
        hostname = socket.getfqdn(address)
        try:
            ipv4 = socket.gethostbyname(hostname)
        except Exception:
            pass  # host not valid or offline
    return ipv4, hostname
gist.py 文件源码 项目:twitter-message-bus 作者: clickyotomy 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def post(content, token=None, username=None, public=False, debug=False):
    '''
    Post a gist on GitHub.
    '''
    random = hashlib.sha1(os.urandom(16)).hexdigest()
    username = getuser() if username is None else username
    now = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
    description = ('{hash} (twitter-message-bus); from {host} by {user} '
                   'at {time} UTC.').format(host=getfqdn(), user=username,
                                            time=now, hash=random)

    payload = json.dumps({
        'files': {
            'message': {
                'content': content if content is not None else ''
            }
        },
        'public': public,
        'description': description
    })

    response = github(http='post', uri='gists', token=token, payload=payload,
                      debug=debug)
    return (response['id'], random) if 'id' in response else (None, None)
__main__.py 文件源码 项目:virtual-core 作者: tiagoantao 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def welcome(run=0):
    if run == 0:
        form = wizard.config.get('General', {})
    else:
        form = _delist(request.form)
        if _has_all_parameters(form, ['host', 'country', 'state', 'locality',
                                      'orgname', 'orgunit', 'commonname',
                                      'email']):
            wizard.change_config('General', **form)
            return redirect(url_for('determine_ssh_status', run=0))
    if 'host' not in form:
        form['host'] = socket.getfqdn()

    all_params = {'run': run, **form}
    return render_template('welcome.html',
                           menu_options=wizard.get_available_options(),
                           **all_params)
test_socket.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def testHostnameRes(self):
        # Testing hostname resolution mechanisms
        hostname = socket.gethostname()
        try:
            ip = socket.gethostbyname(hostname)
        except socket.error:
            # Probably name lookup wasn't set up right; skip this test
            return
        self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
        try:
            hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
        except socket.error:
            # Probably a similar problem as above; skip this test
            return
        all_host_names = [hostname, hname] + aliases
        fqhn = socket.getfqdn(ip)
        if not fqhn in all_host_names:
            self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
test_smtplib.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def setUp(self):
        self.real_getfqdn = socket.getfqdn
        socket.getfqdn = mock_socket.getfqdn
        # temporarily replace sys.stdout to capture DebuggingServer output
        self.old_stdout = sys.stdout
        self.output = io.StringIO()
        sys.stdout = self.output

        self.serv_evt = threading.Event()
        self.client_evt = threading.Event()
        # Capture SMTPChannel debug output
        self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
        smtpd.DEBUGSTREAM = io.StringIO()
        # Pick a random unused port by passing 0 for the port number
        self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1))
        # Keep a note of what port was assigned
        self.port = self.serv.socket.getsockname()[1]
        serv_args = (self.serv, self.serv_evt, self.client_evt)
        self.thread = threading.Thread(target=debugging_server, args=serv_args)
        self.thread.start()

        # wait until server thread has assigned a port number
        self.serv_evt.wait()
        self.serv_evt.clear()
network.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def get_fqdn():
    """
    Return the current fqdn of the machine, trying hard to return a meaningful
    name.

    In particular, it means working against a NetworkManager bug which seems to
    make C{getfqdn} return localhost6.localdomain6 for machine without a domain
    since Maverick.
    """
    fqdn = socket.getfqdn()
    if "localhost" in fqdn:
        # Try the heavy artillery
        fqdn = socket.getaddrinfo(socket.gethostname(), None, socket.AF_INET,
                                  socket.SOCK_DGRAM, socket.IPPROTO_IP,
                                  socket.AI_CANONNAME)[0][3]
        if "localhost" in fqdn:
            # Another fallback
            fqdn = socket.gethostname()
    return fqdn
strack.py 文件源码 项目:strack_python_api 作者: cine-use 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def get_unique_code():
        # get code from env
        code_from_env = os.environ.get("STRACK_UNIQUE_CODE")
        if code_from_env:
            return code_from_env
        # read code from cache
        unique_code_cache_file = os.path.join(STRACK_USER_PATH, "unique")
        if os.path.isfile(unique_code_cache_file):
            with open(unique_code_cache_file) as f:
                code_from_file = f.read()
            os.environ.update({"STRACK_UNIQUE_CODE": code_from_file})
            return code_from_file
        # make dir
        if not os.path.isdir(STRACK_USER_PATH):
            os.makedirs(STRACK_USER_PATH)
        # generate the code
        computer_name = socket.getfqdn(socket.gethostname())
        ip = socket.gethostbyname(computer_name)
        unique_code = md5(ip).hexdigest()
        # save cache
        os.environ.update({"STRACK_UNIQUE_CODE": unique_code})
        with open(unique_code_cache_file, "w") as f:
            f.write(unique_code)
        return unique_code
kerberos.py 文件源码 项目:incubator-airflow-old 作者: apache 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def perform_krb181_workaround():
    cmdv = [configuration.get('kerberos', 'kinit_path'),
            "-c", configuration.get('kerberos', 'ccache'),
            "-R"]  # Renew ticket_cache

    log.info("Renewing kerberos ticket to work around kerberos 1.8.1: " +
             " ".join(cmdv))

    ret = subprocess.call(cmdv, close_fds=True)

    if ret != 0:
        principal = "%s/%s" % (configuration.get('kerberos', 'principal'), socket.getfqdn())
        fmt_dict = dict(princ=principal,
                        ccache=configuration.get('kerberos', 'principal'))
        log.error("Couldn't renew kerberos ticket in order to work around "
                  "Kerberos 1.8.1 issue. Please check that the ticket for "
                  "'%(princ)s' is still renewable:\n"
                  "  $ kinit -f -c %(ccache)s\n"
                  "If the 'renew until' date is the same as the 'valid starting' "
                  "date, the ticket cannot be renewed. Please check your KDC "
                  "configuration, and the ticket renewal policy (maxrenewlife) "
                  "for the '%(princ)s' and `krbtgt' principals." % fmt_dict)
        sys.exit(ret)
account.py 文件源码 项目:certbot 作者: nikoloskii 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, regr, key, meta=None):
        self.key = key
        self.regr = regr
        self.meta = self.Meta(
            # pyrfc3339 drops microseconds, make sure __eq__ is sane
            creation_dt=datetime.datetime.now(
                tz=pytz.UTC).replace(microsecond=0),
            creation_host=socket.getfqdn()) if meta is None else meta

        self.id = hashlib.md5(
            self.key.key.public_key().public_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PublicFormat.SubjectPublicKeyInfo)
        ).hexdigest()
        # Implementation note: Email? Multiple accounts can have the
        # same email address. Registration URI? Assigned by the
        # server, not guaranteed to be stable over time, nor
        # canonical URI can be generated. ACME protocol doesn't allow
        # account key (and thus its fingerprint) to be updated...
pipelines.py 文件源码 项目:osp-scraper 作者: opensyllabus 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def update_warc_info_from_spider(record, spider):
    """update a WARC warcinfo record from a scrapy Spider"""

    # make empty header object to use for fields
    # XXX WARCHeader messes up capitalization here
    fields = warc.WARCHeader({}, defaults=False)
    fields['software'] = 'osp_scraper'
    fields['hostname'] = socket.getfqdn()
    fields['x-spider-name'] = spider.name
    fields['x-spider-run-id'] = spider.run_id
    fields['x-spider-revision'] = git_revision
    fields['x-spider-parameters'] = json.dumps(spider.get_parameters())

    buf = BytesIO()
    fields.write_to(buf, version_line=False, extra_crlf=False)
    record.update_payload(buf.getvalue())
test_example_api_client.py 文件源码 项目:intel-manager-for-lustre 作者: intel-hpdd 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_login(self):
        self.hosts = self.add_hosts([
            config['lustre_servers'][0]['address'],
            config['lustre_servers'][1]['address']])

        # Chroma puts its FQDN in the manager certificate, but the test config may
        # be pointing to localhost: if this is the case, substitute the FQDN in the
        # URL so that the client can validate the certificate.
        url = config['chroma_managers'][0]['server_http_url']
        parsed = urlparse.urlparse(url)
        if parsed.hostname == 'localhost':
            parsed = list(parsed)
            parsed[1] = parsed[1].replace("localhost", socket.getfqdn())
            url = urlparse.urlunparse(tuple(parsed))

        example_api_client.setup_ca(url)
        hosts = example_api_client.list_hosts(url,
            config['chroma_managers'][0]['users'][0]['username'],
            config['chroma_managers'][0]['users'][0]['password']
        )
        self.assertListEqual(hosts, [h['fqdn'] for h in self.hosts])
test_socket.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def testHostnameRes(self):
        # Testing hostname resolution mechanisms
        hostname = socket.gethostname()
        try:
            ip = socket.gethostbyname(hostname)
        except socket.error:
            # Probably name lookup wasn't set up right; skip this test
            self.skipTest('name lookup failure')
        self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
        try:
            hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
        except socket.error:
            # Probably a similar problem as above; skip this test
            self.skipTest('address lookup failure')
        all_host_names = [hostname, hname] + aliases
        fqhn = socket.getfqdn(ip)
        if not fqhn in all_host_names:
            self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
test_socket.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def testHostnameRes(self):
        # Testing hostname resolution mechanisms
        hostname = socket.gethostname()
        try:
            ip = socket.gethostbyname(hostname)
        except socket.error:
            # Probably name lookup wasn't set up right; skip this test
            self.skipTest('name lookup failure')
        self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
        try:
            hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
        except socket.error:
            # Probably a similar problem as above; skip this test
            self.skipTest('address lookup failure')
        all_host_names = [hostname, hname] + aliases
        fqhn = socket.getfqdn(ip)
        if not fqhn in all_host_names:
            self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
models.py 文件源码 项目:munch-core 作者: crunchmail 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def save(self, *args, **kwargs):
        created = not self.pk

        self.update_mail_status()

        if self.status_code:
            status_code = rfc3463_regex.match(self.status_code)
            if status_code:
                self.status_code = status_code.group(0)

        if not self.pk:
            if not self.creation_date:
                self.creation_date = timezone.now()
            if not self.source_hostname:
                self.source_hostname = socket.getfqdn()
        super().save(*args, **kwargs)

        if created and self.status in [self.DROPPED, self.BOUNCED]:
            self.should_optout(create=True)
resolver.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def override_system_resolver(resolver=None):
    """Override the system resolver routines in the socket module with
    versions which use dnspython's resolver.

    This can be useful in testing situations where you want to control
    the resolution behavior of python code without having to change
    the system's resolver settings (e.g. /etc/resolv.conf).

    The resolver to use may be specified; if it's not, the default
    resolver will be used.

    @param resolver: the resolver to use
    @type resolver: dns.resolver.Resolver object or None
    """
    if resolver is None:
        resolver = get_default_resolver()
    global _resolver
    _resolver = resolver
    socket.getaddrinfo = _getaddrinfo
    socket.getnameinfo = _getnameinfo
    socket.getfqdn = _getfqdn
    socket.gethostbyname = _gethostbyname
    socket.gethostbyname_ex = _gethostbyname_ex
    socket.gethostbyaddr = _gethostbyaddr
factory.py 文件源码 项目:of 作者: OptimalBPM 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def store_process_system_document(_process_id, _name, _parent_id=None):
    """
    Creates a process instance structure, automatically sets systemPid, spawnedBy, host and spawnedWhen
    """
    _struct = {
        "_id": _process_id,
        "systemPid": os.getpid(),
        "spawnedBy": get_current_login(),
        "name": _name,
        "host": socket.getfqdn(),
        "spawnedWhen": str(datetime.datetime.utcnow()),
        "schemaRef": "ref://of.process.system"
    }
    if _parent_id:
        _struct["parent_id"] = _parent_id,

    return _struct
simple_webserver_with_wsgi.py 文件源码 项目:python_programing 作者: lzhaoyang 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __init__(self,server_address):
        #create a listening socket
        self.listen_socket=listen_socket=socket.socket(
            self.address_family,self.socket_type
        )
        #allow the same socket address
        listen_socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
        #bind to the address
        listen_socket.bind(server_address)
        #activate to listening
        listen_socket.listen(self.request_queue_size)
        #get server host name and port
        host,port = self.listen_socket.getsockname()[:2]
        self.server_name=socket.getfqdn(host)
        self.server_port=port
        # return headers set by web frameworks/applications
        self.headers_set=[]
resolver.py 文件源码 项目:cheapstream 作者: miltador 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def override_system_resolver(resolver=None):
    """Override the system resolver routines in the socket module with
    versions which use dnspython's resolver.

    This can be useful in testing situations where you want to control
    the resolution behavior of python code without having to change
    the system's resolver settings (e.g. /etc/resolv.conf).

    The resolver to use may be specified; if it's not, the default
    resolver will be used.

    @param resolver: the resolver to use
    @type resolver: dns.resolver.Resolver object or None
    """
    if resolver is None:
        resolver = get_default_resolver()
    global _resolver
    _resolver = resolver
    socket.getaddrinfo = _getaddrinfo
    socket.getnameinfo = _getnameinfo
    socket.getfqdn = _getfqdn
    socket.gethostbyname = _gethostbyname
    socket.gethostbyname_ex = _gethostbyname_ex
    socket.gethostbyaddr = _gethostbyaddr


问题


面经


文章

微信
公众号

扫码关注公众号