python类gethostname()的实例源码

DBSQLite.py 文件源码 项目:d-tailor 作者: jcg 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def registerWorker(self):
        start_time = strftime("%Y-%m-%d %H:%M:%S %Z")
        hostname = check_output("hostname").rstrip()
        ip = gethostbyname(gethostname()).rstrip()

        self.cur.execute("insert into worker(worker_id, hostname, ip, time_start, time_finish) values (?,?,?,?,NULL);", 
                        (self.worker_id, hostname, ip, start_time) )

        self.cur.execute("select * from desired_solution")

        for row in self.cur.fetchall():
            key = str(row['des_solution_id'])                        
            self.des_solutions[key] = {'status': str(row['status']), 'des_solution_id': str(row['des_solution_id'])}

        self.cur.execute("select generated_solution_id from generated_solution")

        for row in self.cur.fetchall():                        
            self.gen_solutions_id[str(row['generated_solution_id'])] = '1'            

        return 0
resolver.py 文件源码 项目:llk 作者: Tycx2ry 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def reset(self):
        """Reset all resolver configuration to the defaults."""
        self.domain = \
            dns.name.Name(dns.name.from_text(socket.gethostname())[1:])
        if len(self.domain) == 0:
            self.domain = dns.name.root
        self.nameservers = []
        self.search = []
        self.port = 53
        self.timeout = 2.0
        self.lifetime = 30.0
        self.keyring = None
        self.keyname = None
        self.keyalgorithm = dns.tsig.default_algorithm
        self.edns = -1
        self.ednsflags = 0
        self.payload = 0
        self.cache = None
        self.flags = None
        self.retry_servfail = False
server.py 文件源码 项目:sysdweb 作者: ogarcia 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_main():
    services = []
    for service in config.sections():
        service_status = get_service_action(service, 'status')
        if service_status['status'] == 'not-found':
            cls = 'active'
        elif service_status['status'] == 'inactive' or service_status['status'] == 'failed':
            cls = 'danger'
        elif service_status['status'] == 'active':
            cls = 'success'
        else:
            cls = 'warning'
        disabled_start = True if cls == 'active' or cls == 'success' else False
        disabled_stop = True if cls == 'active' or cls == 'danger' else False
        disabled_restart = True if cls == 'active' or cls == 'danger' else False
        services.append({'class': cls,
            'disabled_start': disabled_start,
            'disabled_stop': disabled_stop,
            'disabled_restart': disabled_restart,
            'title': config.get(service, 'title'),
            'service': service})
    return template('index', hostname=gethostname(), services=services)
board.py 文件源码 项目:jcchess 作者: johncheetham 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_game(self):

        # if the movelist is positioned part way through the game then
        # we must redo all moves to get the full game
        redo_count = len(gv.jcchess.get_redolist())
        for i in range(0, redo_count):
            gv.jcchess.redo_move()

        game = chess.pgn.Game.from_board(self.chessboard)

        # if we did any redo moves then undo them now to get things back
        # the way they were
        for i in range(0, redo_count):
            gv.jcchess.undo_move()

        game.headers["Event"] = "Computer Chess Game"
        game.headers["Site"] = socket.gethostname()
        game.headers["Date"] = datetime.strftime(datetime.now(), '%Y.%m.%d')  
        game.headers["Round"] = "-"
        game.headers["White"] = gv.jcchess.get_player(WHITE)
        game.headers["Black"] = gv.jcchess.get_player(BLACK)
        return game
manage.py 文件源码 项目:mongodb 作者: autopilotpattern 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def on_change():
    '''
    called when there is a change in the list of IPs and ports for this backend
    '''
    hostname = socket.gethostname()
    ip = get_ip()
    local_mongo = MongoClient(ip, connect=False)

    try:
        repl_status = local_mongo.admin.command('replSetGetStatus')
        is_mongo_primary = repl_status['myState'] == 1
        # ref https://docs.mongodb.com/manual/reference/replica-states/
    except Exception as e:
        log.error(e, 'unable to get primary status')
        return False

    if is_mongo_primary:
        return mongo_update_replset_config(local_mongo, ip)
    else:
        return True

# ---------------------------------------------------------
test_eventfilter.py 文件源码 项目:masakari-monitors 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_vir_event_filter(self, mock_utcnow, mock_libvirt_event_callback,
        mock_save_and_reraise_exception):

        current_time = timeutils.utcnow()
        mock_utcnow.return_value = current_time
        mock_libvirt_event_callback.return_value = None
        mock_save_and_reraise_exception.return_value = None

        obj = eventfilter.EventFilter()
        eventID = 0
        eventType = 5
        detail = 5
        uuID = uuid.uuid4()
        obj.vir_event_filter(eventID, eventType, detail, uuID)

        mock_libvirt_event_callback.assert_called_once_with(
            evft.eventID_dic[eventID],
            evft.detail_dic[eventID][eventType][detail],
            uuID,
            ec.EventConstants.TYPE_VM,
            socket.gethostname(),
            current_time)
        mock_save_and_reraise_exception.assert_not_called()
handle_process.py 文件源码 项目:masakari-monitors 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _make_event(self, process_name):

        hostname = socket.gethostname()
        current_time = timeutils.utcnow()
        event = {
            'notification': {
                'type': ec.EventConstants.TYPE_PROCESS,
                'hostname': hostname,
                'generated_time': current_time,
                'payload': {
                    'event': ec.EventConstants.EVENT_STOPPED,
                    'process_name': process_name
                }
            }
        }

        return event
resolver.py 文件源码 项目:spiderfoot 作者: wi-fi-analyzer 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def reset(self):
        """Reset all resolver configuration to the defaults."""
        self.domain = \
            dns.name.Name(dns.name.from_text(socket.gethostname())[1:])
        if len(self.domain) == 0:
            self.domain = dns.name.root
        self.nameservers = []
        self.search = []
        self.port = 53
        self.timeout = 2.0
        self.lifetime = 30.0
        self.keyring = None
        self.keyname = None
        self.keyalgorithm = dns.tsig.default_algorithm
        self.edns = -1
        self.ednsflags = 0
        self.payload = 0
        self.cache = None
        self.flags = None
        self.retry_servfail = False
base.py 文件源码 项目:touch-pay-client 作者: HackPucBemobi 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def distributed_transaction_commit(*instances):
        if not instances:
            return
        instances = enumerate(instances)
        thread_key = '%s.%s' % (
            socket.gethostname(), threading.currentThread())
        keys = ['%s.%i' % (thread_key, i) for (i, db) in instances]
        for (i, db) in instances:
            if not db._adapter.support_distributed_transaction():
                raise SyntaxError(
                    'distributed transaction not suported by %s' % db._dbanme)
        try:
            for (i, db) in instances:
                db._adapter.prepare(keys[i])
        except:
            for (i, db) in instances:
                db._adapter.rollback_prepared(keys[i])
            raise RuntimeError('failure to commit distributed transaction')
        else:
            for (i, db) in instances:
                db._adapter.commit_prepared(keys[i])
        return
nova_compute_hooks.py 文件源码 项目:charm-nova-compute 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def compute_joined(rid=None):
    # NOTE(james-page) in MAAS environments the actual hostname is a CNAME
    # record so won't get scanned based on private-address which is an IP
    # add the hostname configured locally to the relation.
    settings = {
        'hostname': gethostname(),
        'private-address': get_relation_ip(
            'cloud-compute', cidr_network=config('os-internal-network')),
    }

    if migration_enabled():
        auth_type = config('migration-auth-type')
        settings['migration_auth_type'] = auth_type
        if auth_type == 'ssh':
            settings['ssh_public_key'] = public_ssh_key()
        relation_set(relation_id=rid, **settings)
    if config('enable-resize'):
        settings['nova_ssh_public_key'] = public_ssh_key(user='nova')
        relation_set(relation_id=rid, **settings)
cluster.py 文件源码 项目:charm-nova-compute 作者: openstack 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def is_crm_dc():
    """
    Determine leadership by querying the pacemaker Designated Controller
    """
    cmd = ['crm', 'status']
    try:
        status = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
        if not isinstance(status, six.text_type):
            status = six.text_type(status, "utf-8")
    except subprocess.CalledProcessError as ex:
        raise CRMDCNotFound(str(ex))

    current_dc = ''
    for line in status.split('\n'):
        if line.startswith('Current DC'):
            # Current DC: juju-lytrusty-machine-2 (168108163) - partition with quorum
            current_dc = line.split(':')[1].split()[0]
    if current_dc == get_unit_hostname():
        return True
    elif current_dc == 'NONE':
        raise CRMDCNotFound('Current DC: NONE')

    return False
cluster.py 文件源码 项目:charm-nova-compute 作者: openstack 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def is_crm_leader(resource, retry=False):
    """
    Returns True if the charm calling this is the elected corosync leader,
    as returned by calling the external "crm" command.

    We allow this operation to be retried to avoid the possibility of getting a
    false negative. See LP #1396246 for more info.
    """
    if resource == DC_RESOURCE_NAME:
        return is_crm_dc()
    cmd = ['crm', 'resource', 'show', resource]
    try:
        status = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
        if not isinstance(status, six.text_type):
            status = six.text_type(status, "utf-8")
    except subprocess.CalledProcessError:
        status = None

    if status and get_unit_hostname() in status:
        return True

    if status and "resource %s is NOT running" % (resource) in status:
        raise CRMResourceNotFound("CRM resource %s not found" % (resource))

    return False
resolver.py 文件源码 项目:SameKeyProxy 作者: xzhou 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def reset(self):
        """Reset all resolver configuration to the defaults."""
        self.domain = \
            dns.name.Name(dns.name.from_text(socket.gethostname())[1:])
        if len(self.domain) == 0:
            self.domain = dns.name.root
        self.nameservers = []
        self.search = []
        self.port = 53
        self.timeout = 2.0
        self.lifetime = 30.0
        self.keyring = None
        self.keyname = None
        self.keyalgorithm = dns.tsig.default_algorithm
        self.edns = -1
        self.ednsflags = 0
        self.payload = 0
        self.cache = None
pg_gw_utils.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_fabric_interface():
    '''
    Returns the fabric interface.
    '''
    fabric_interfaces = config('fabric-interfaces')
    if fabric_interfaces == 'MANAGEMENT':
        return get_mgmt_interface()
    else:
        try:
            all_fabric_interfaces = json.loads(fabric_interfaces)
        except ValueError:
            raise ValueError('Invalid json provided for fabric interfaces')
        hostname = get_unit_hostname()
        if hostname in all_fabric_interfaces:
            node_fabric_interface = all_fabric_interfaces[hostname]
        elif 'DEFAULT' in all_fabric_interfaces:
            node_fabric_interface = all_fabric_interfaces['DEFAULT']
        else:
            raise ValueError('No fabric interface provided for node')
        if interface_exists(node_fabric_interface):
            return node_fabric_interface
        else:
            log('Provided fabric interface %s does not exist'
                % node_fabric_interface)
            raise ValueError('Provided fabric interface does not exist')
        return node_fabric_interface
service.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def run(self):
        self.pipeline = h_cni.CNIPipeline()
        self.pipeline.register(h_cni.CallbackHandler(self.on_done))
        self.watcher = k_watcher.Watcher(self.pipeline)
        self.watcher.add(
            "%(base)s/pods?fieldSelector=spec.nodeName=%(node_name)s" % {
                'base': k_const.K8S_API_BASE,
                'node_name': socket.gethostname()})
        self.watcher.start()
__init__.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, path, threaded=True, timeout=None):
        """
        >>> lock = LockBase('somefile')
        >>> lock = LockBase('somefile', threaded=False)
        """
        super(LockBase, self).__init__(path)
        self.lock_file = os.path.abspath(path) + ".lock"
        self.hostname = socket.gethostname()
        self.pid = os.getpid()
        if threaded:
            t = threading.current_thread()
            # Thread objects in Python 2.4 and earlier do not have ident
            # attrs.  Worm around that.
            ident = getattr(t, "ident", hash(t))
            self.tname = "-%x" % (ident & 0xffffffff)
        else:
            self.tname = ""
        dirname = os.path.dirname(self.lock_file)

        # unique name is mostly about the current process, but must
        # also contain the path -- otherwise, two adjacent locked
        # files conflict (one file gets locked, creating lock-file and
        # unique file, the other one gets locked, creating lock-file
        # and overwriting the already existing lock-file, then one
        # gets unlocked, deleting both lock-file and unique file,
        # finally the last lock errors out upon releasing.
        self.unique_name = os.path.join(dirname,
                                        "%s%s.%s%s" % (self.hostname,
                                                       self.tname,
                                                       self.pid,
                                                       hash(self.path)))
        self.timeout = timeout
consul_check_postgres.py 文件源码 项目:consul-pg 作者: adamcstephens 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __init__(self, service, role_source, configfile=DEFAULT_CONFIGFILE):
        self.service = service
        self.role_source = role_source

        self.api_endpoint = 'http://127.0.0.1:8500/v1'
        self.api_session = requests.Session()

        self.hostname = gethostname()
        self.short_hostname = self.hostname.split('.')[0]

        self.update_service = False
        self.valid_states = ['master', 'slave', 'fail']

        self.configfile = configfile
        self.leader_uri = self.api_endpoint + '/kv/session/' + self.service + '/leader'
libbuild.py 文件源码 项目:Dockerfiles 作者: appscode 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def metadata(cwd, goos='', goarch=''):
    md = {
        'commit_hash': subprocess.check_output('git rev-parse --verify HEAD', shell=True, cwd=cwd).strip(),
        'git_branch': subprocess.check_output('git rev-parse --abbrev-ref HEAD', shell=True, cwd=cwd).strip(),
        # http://stackoverflow.com/a/1404862/3476121
        'git_tag': subprocess.check_output('git describe --exact-match --abbrev=0 2>/dev/null || echo ""', shell=True,
                                           cwd=cwd).strip(),
        'commit_timestamp': datetime.datetime.utcfromtimestamp(
            int(subprocess.check_output('git show -s --format=%ct', shell=True, cwd=cwd).strip())).isoformat(),
        'build_timestamp': datetime.datetime.utcnow().isoformat(),
        'build_host': socket.gethostname(),
        'build_host_os': GOENV["GOHOSTOS"],
        'build_host_arch': GOENV["GOHOSTARCH"]
    }
    if md['git_tag']:
        md['version'] = md['git_tag']
        md['version_strategy'] = 'tag'
    elif not md['git_branch'] in ['master', 'HEAD'] and not md['git_branch'].startswith('release-'):
        md['version'] = md['git_branch']
        md['version_strategy'] = 'branch'
    else:
        hash_ver = subprocess.check_output('git describe --tags --always --dirty', shell=True, cwd=cwd).strip()
        md['version'] = hash_ver
        md['version_strategy'] = 'commit_hash'
    if goos:
        md['os'] = goos
    if goarch:
        md['arch'] = goarch
    return md
__init__.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, path, threaded=True, timeout=None):
        """
        >>> lock = LockBase('somefile')
        >>> lock = LockBase('somefile', threaded=False)
        """
        super(LockBase, self).__init__(path)
        self.lock_file = os.path.abspath(path) + ".lock"
        self.hostname = socket.gethostname()
        self.pid = os.getpid()
        if threaded:
            t = threading.current_thread()
            # Thread objects in Python 2.4 and earlier do not have ident
            # attrs.  Worm around that.
            ident = getattr(t, "ident", hash(t))
            self.tname = "-%x" % (ident & 0xffffffff)
        else:
            self.tname = ""
        dirname = os.path.dirname(self.lock_file)

        # unique name is mostly about the current process, but must
        # also contain the path -- otherwise, two adjacent locked
        # files conflict (one file gets locked, creating lock-file and
        # unique file, the other one gets locked, creating lock-file
        # and overwriting the already existing lock-file, then one
        # gets unlocked, deleting both lock-file and unique file,
        # finally the last lock errors out upon releasing.
        self.unique_name = os.path.join(dirname,
                                        "%s%s.%s%s" % (self.hostname,
                                                       self.tname,
                                                       self.pid,
                                                       hash(self.path)))
        self.timeout = timeout


问题


面经


文章

微信
公众号

扫码关注公众号