python类getpid()的实例源码

recipe-580672.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_err_in_fun(self):
            # Test that the original signal this process was hit with
            # is not returned in case fun raise an exception. Instead,
            # we're supposed to see retsig = 1.
            ret = pyrun(textwrap.dedent(
                """
                import os, signal, imp, sys
                mod = imp.load_source("mod", r"{}")

                def foo():
                    sys.stderr = os.devnull
                    1 / 0

                sig = signal.SIGTERM if os.name == 'posix' else \
                    signal.CTRL_C_EVENT
                mod.register_exit_fun(foo)
                os.kill(os.getpid(), sig)
                """.format(os.path.abspath(__file__), TESTFN)
            ))
            if POSIX:
                self.assertEqual(ret, 1)
                assert ret != signal.SIGTERM, strfsig(ret)
googleImageDownload.py 文件源码 项目:imageDownloader 作者: whcacademy 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def openBrowserRecursively(total, idName, browser):
    try:
        for i in range(total):
            iterator = i * 100
            url = r"https://www.google.com/search?q={word}&newwindow=1&biw=300&bih=629&tbm=isch&ijn={times}&start={start}"
            try:
                browser.get(url.format(word= idName, start=iterator,times = i))
            except SocketError as e:
                if e.errno != errno.ECONNRESET:
                    raise # raise to reset the connection
                pass
        time.sleep(1.5) # 1.5 seconds is the tuned time for HKU service not to be monitored and closed
    except:
        if isWindows:
            os.system("taskkill /im chrome.exe /F")
        else :
            os.system("kill " + str(os.getpid()))
        openBrowserRecursively(total, idName, browser)

# basic session setup
bridge.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def connect(self, vif, ifname, netns):
        host_ifname = vif.vif_name

        with b_base.get_ipdb(netns) as c_ipdb:
            with c_ipdb.create(ifname=ifname, peer=host_ifname,
                               kind='veth') as c_iface:
                c_iface.mtu = vif.network.mtu
                c_iface.address = str(vif.address)
                c_iface.up()

            if netns:
                with c_ipdb.interfaces[host_ifname] as h_iface:
                    h_iface.net_ns_pid = os.getpid()

        with b_base.get_ipdb() as h_ipdb:
            with h_ipdb.interfaces[host_ifname] as h_iface:
                h_iface.mtu = vif.network.mtu
                h_iface.up()
pidlockfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def write_pid_to_pidfile(pidfile_path):
    """ Write the PID in the named PID file.

        Get the numeric process ID (“PID”) of the current process
        and write it to the named file as a line of text.

        """
    open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
    open_mode = 0o644
    pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
    pidfile = os.fdopen(pidfile_fd, 'w')

    # According to the FHS 2.3 section on PID files in /var/run:
    #
    #   The file must consist of the process identifier in
    #   ASCII-encoded decimal, followed by a newline character. For
    #   example, if crond was process number 25, /var/run/crond.pid
    #   would contain three characters: two, five, and newline.

    pid = os.getpid()
    pidfile.write("%s\n" % pid)
    pidfile.close()
pidlockfile.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def write_pid_to_pidfile(pidfile_path):
    """ Write the PID in the named PID file.

        Get the numeric process ID (“PID”) of the current process
        and write it to the named file as a line of text.

        """
    open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
    open_mode = 0o644
    pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
    pidfile = os.fdopen(pidfile_fd, 'w')

    # According to the FHS 2.3 section on PID files in /var/run:
    #
    #   The file must consist of the process identifier in
    #   ASCII-encoded decimal, followed by a newline character. For
    #   example, if crond was process number 25, /var/run/crond.pid
    #   would contain three characters: two, five, and newline.

    pid = os.getpid()
    pidfile.write("%s\n" % pid)
    pidfile.close()
test_01_DeviceManager.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_deadDeviceManager(self):
        devmgr_nb, devMgr = self.launchDeviceManager("/nodes/test_SelfTerminatingDevice_node/DeviceManager.dcd.xml")
        self.assertNotEqual(devMgr, None)

        # NOTE These assert check must be kept in-line with the DeviceManager.dcd.xml
        self.assertEqual(len(devMgr._get_registeredDevices()), 1)

        devs = devMgr._get_registeredDevices()
        pids = getChildren(os.getpid())
        devMgrPid = 0
        for entry in pids:
            tmp_pids = getChildren(entry)
            if len(tmp_pids)>0:
                devMgrPid = entry
        self.assertNotEqual(devMgrPid, 0)

        # Test that the DCD file componentproperties get pushed to configure()
        # as per DeviceManager requirement SR:482
        devMgr.shutdown()

        time.sleep(1.0)
        self.assertNotEqual(devmgr_nb.poll(), None, "Nodebooter did not die after shutdown")
connection.py 文件源码 项目:gimel 作者: Alephbet 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, path='', db=0, password=None,
                 socket_timeout=None, encoding='utf-8',
                 encoding_errors='strict', decode_responses=False,
                 retry_on_timeout=False,
                 parser_class=DefaultParser, socket_read_size=65536):
        self.pid = os.getpid()
        self.path = path
        self.db = db
        self.password = password
        self.socket_timeout = socket_timeout
        self.retry_on_timeout = retry_on_timeout
        self.encoding = encoding
        self.encoding_errors = encoding_errors
        self.decode_responses = decode_responses
        self._sock = None
        self._parser = parser_class(socket_read_size=socket_read_size)
        self._description_args = {
            'path': self.path,
            'db': self.db,
        }
        self._connect_callbacks = []
multipartform.py 文件源码 项目:HeaTDV4A 作者: HeaTTheatR 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def choose_boundary():
    global _prefix

    if _prefix is None:
        hostid = socket.gethostbyname(socket.gethostname())

        try:
            uid = `os.getuid()`
        except:
            uid = '1'
        try:
            pid = `os.getpid()`
        except:
            pid = '1'

        _prefix = hostid + '.' + uid + '.' + pid

    timestamp = '%.3f' % time.time()
    seed = `random.randint(0, 32767)`
    return _prefix + '.' + timestamp + '.' + seed
daemon.py 文件源码 项目:spoon 作者: SpamExperts 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def run_daemon(server, pidfile, daemonize=True):
    """Run the server as a daemon

    :param server: cutlery (a Spoon or Spork)
    :param pidfile: the file to keep the parent PID
    :param daemonize: if True fork the processes into
      a daemon.
    :return:
    """
    logger = logging.getLogger(server.server_logger)
    if daemonize:
        detach(pidfile=pidfile, logger=logger)
    elif pidfile:
        with open(pidfile, "w+") as pidf:
            pidf.write("%s\n" % os.getpid())
    try:
        server.serve_forever()
    finally:
        try:
            os.remove(pidfile)
        except OSError:
            pass
mimetools.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def choose_boundary():
    """Return a string usable as a multipart boundary.

    The string chosen is unique within a single program run, and
    incorporates the user id (if available), process id (if available),
    and current time.  So it's very unlikely the returned string appears
    in message text, but there's no guarantee.

    The boundary contains dots so you have to quote it in the header."""

    global _prefix
    import time
    if _prefix is None:
        import socket
        try:
            hostid = socket.gethostbyname(socket.gethostname())
        except socket.gaierror:
            hostid = '127.0.0.1'
        try:
            uid = repr(os.getuid())
        except AttributeError:
            uid = '1'
        try:
            pid = repr(os.getpid())
        except AttributeError:
            pid = '1'
        _prefix = hostid + '.' + uid + '.' + pid
    return "%s.%.3f.%d" % (_prefix, time.time(), _get_next_counter())


# Subroutines for decoding some common content-transfer-types
queues.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def __init__(self, maxsize=0):
        if maxsize <= 0:
            maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
        self._maxsize = maxsize
        self._reader, self._writer = Pipe(duplex=False)
        self._rlock = Lock()
        self._opid = os.getpid()
        if sys.platform == 'win32':
            self._wlock = None
        else:
            self._wlock = Lock()
        self._sem = BoundedSemaphore(maxsize)

        self._after_fork()

        if sys.platform != 'win32':
            register_after_fork(self, Queue._after_fork)
heap.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def malloc(self, size):
        # return a block of right size (possibly rounded up)
        assert 0 <= size < sys.maxint
        if os.getpid() != self._lastpid:
            self.__init__()                     # reinitialize after fork
        self._lock.acquire()
        try:
            size = self._roundup(max(size,1), self._alignment)
            (arena, start, stop) = self._malloc(size)
            new_stop = start + size
            if new_stop < stop:
                self._free((arena, new_stop, stop))
            block = (arena, start, new_stop)
            self._allocated_blocks.add(block)
            return block
        finally:
            self._lock.release()

#
# Class representing a chunk of an mmap -- can be inherited
#
process.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def start(self):
        '''
        Start child process
        '''
        assert self._popen is None, 'cannot start a process twice'
        assert self._parent_pid == os.getpid(), \
               'can only start a process object created by current process'
        assert not _current_process._daemonic, \
               'daemonic processes are not allowed to have children'
        _cleanup()
        if self._Popen is not None:
            Popen = self._Popen
        else:
            from .forking import Popen
        self._popen = Popen(self)
        _current_process._children.add(self)
process.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def __repr__(self):
        if self is _current_process:
            status = 'started'
        elif self._parent_pid != os.getpid():
            status = 'unknown'
        elif self._popen is None:
            status = 'initial'
        else:
            if self._popen.poll() is not None:
                status = self.exitcode
            else:
                status = 'started'

        if type(status) is int:
            if status == 0:
                status = 'stopped'
            else:
                status = 'stopped[%s]' % _exitcode_to_name.get(status, status)

        return '<%s(%s, %s%s)>' % (type(self).__name__, self._name,
                                   status, self._daemonic and ' daemon' or '')

    ##
runtime.py 文件源码 项目:stackimpact-python 作者: stackimpact 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def read_current_rss():
    pid = os.getpid()

    output = None
    try:
        f = open('/proc/{0}/status'.format(os.getpid()))
        output = f.read()
        f.close()
    except Exception:
        return None

    m = VmRSSRe.search(output)
    if m:
        return int(float(m.group(1)))

    return None
runtime.py 文件源码 项目:stackimpact-python 作者: stackimpact 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def read_vm_size():
    pid = os.getpid()

    output = None
    try:
        f = open('/proc/{0}/status'.format(os.getpid()))
        output = f.read()
        f.close()
    except Exception:
        return None

    m = VmSizeRe.search(output)
    if m:
        return int(float(m.group(1)))

    return None
runtime.py 文件源码 项目:stackimpact-python 作者: stackimpact 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def register_signal(signal_number, handler_func, once = False):
    prev_handler = None

    def _handler(signum, frame):
        skip_prev = handler_func(signum, frame)

        if not skip_prev:
            if callable(prev_handler):
                if once:
                    signal.signal(signum, prev_handler)
                prev_handler(signum, frame)
            elif prev_handler == signal.SIG_DFL and once:
                signal.signal(signum, signal.SIG_DFL)
                os.kill(os.getpid(), signum)

    prev_handler = signal.signal(signal_number, _handler)
pidfile.py 文件源码 项目:pscheduler 作者: perfsonar 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __enter__(self):
        if self.path is None:
            return self.pidfile

        self.pidfile = open(self.path, "a+")
        try:
            fcntl.flock(self.pidfile.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
        except IOError:
            self.pidfile = None
            raise SystemExit("Already running according to " + self.path)
        self.pidfile.seek(0)
        self.pidfile.truncate()
        self.pidfile.write(str(os.getpid()))
        self.pidfile.flush()
        self.pidfile.seek(0)
        return self.pidfile
topology.py 文件源码 项目:mongodb-monitoring 作者: jruaux 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def open(self):
        """Start monitoring, or restart after a fork.

        No effect if called multiple times.

        .. warning:: To avoid a deadlock during Python's getaddrinfo call,
          will generate a warning if open() is called from a different
          process than the one that initialized the Topology. To prevent this
          from happening, MongoClient must be created after any forking OR
          MongoClient must be started with connect=False.
        """
        with self._lock:
            if self._pid is None:
                self._pid = os.getpid()
            else:
                if os.getpid() != self._pid:
                    warnings.warn(
                        "MongoClient opened before fork. Create MongoClient "
                        "with connect=False, or create client after forking. "
                        "See PyMongo's documentation for details: http://api."
                        "mongodb.org/python/current/faq.html#using-pymongo-"
                        "with-multiprocessing>")

            self._ensure_opened()
processes.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def startEventLoop(name, port, authkey, ppid, debug=False):
    if debug:
        import os
        cprint.cout(debug, '[%d] connecting to server at port localhost:%d, authkey=%s..\n' 
                    % (os.getpid(), port, repr(authkey)), -1)
    conn = multiprocessing.connection.Client(('localhost', int(port)), authkey=authkey)
    if debug:
        cprint.cout(debug, '[%d] connected; starting remote proxy.\n' % os.getpid(), -1)
    global HANDLER
    #ppid = 0 if not hasattr(os, 'getppid') else os.getppid()
    HANDLER = RemoteEventHandler(conn, name, ppid, debug=debug)
    while True:
        try:
            HANDLER.processRequests()  # exception raised when the loop should exit
            time.sleep(0.01)
        except ClosedError:
            break
processes.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def startQtEventLoop(name, port, authkey, ppid, debug=False):
    if debug:
        import os
        cprint.cout(debug, '[%d] connecting to server at port localhost:%d, authkey=%s..\n' % (os.getpid(), port, repr(authkey)), -1)
    conn = multiprocessing.connection.Client(('localhost', int(port)), authkey=authkey)
    if debug:
        cprint.cout(debug, '[%d] connected; starting remote proxy.\n' % os.getpid(), -1)
    from ..Qt import QtGui, QtCore
    app = QtGui.QApplication.instance()
    #print app
    if app is None:
        app = QtGui.QApplication([])
        app.setQuitOnLastWindowClosed(False)  ## generally we want the event loop to stay open 
                                              ## until it is explicitly closed by the parent process.

    global HANDLER
    HANDLER = RemoteQtEventHandler(conn, name, ppid, debug=debug)
    HANDLER.startEventTimer()
    app.exec_()
processes.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def startEventLoop(name, port, authkey, ppid, debug=False):
    if debug:
        import os
        cprint.cout(debug, '[%d] connecting to server at port localhost:%d, authkey=%s..\n' 
                    % (os.getpid(), port, repr(authkey)), -1)
    conn = multiprocessing.connection.Client(('localhost', int(port)), authkey=authkey)
    if debug:
        cprint.cout(debug, '[%d] connected; starting remote proxy.\n' % os.getpid(), -1)
    global HANDLER
    #ppid = 0 if not hasattr(os, 'getppid') else os.getppid()
    HANDLER = RemoteEventHandler(conn, name, ppid, debug=debug)
    while True:
        try:
            HANDLER.processRequests()  # exception raised when the loop should exit
            time.sleep(0.01)
        except ClosedError:
            break
asyncorereactor.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def __init__(self):
        self._pid = os.getpid()
        self._loop_lock = Lock()
        self._started = False
        self._shutdown = False

        self._thread = None

        self._timers = TimerManager()

        try:
            dispatcher = self._loop_dispatch_class()
            dispatcher.validate()
            log.debug("Validated loop dispatch with %s", self._loop_dispatch_class)
        except Exception:
            log.exception("Failed validating loop dispatch with %s. Using busy wait execution instead.", self._loop_dispatch_class)
            dispatcher.close()
            dispatcher = _BusyWaitDispatcher()
        self._loop_dispatcher = dispatcher

        atexit.register(partial(_cleanup, weakref.ref(self)))
pidlockfile.py 文件源码 项目:pip-update-requirements 作者: alanhamlett 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def write_pid_to_pidfile(pidfile_path):
    """ Write the PID in the named PID file.

        Get the numeric process ID (“PID”) of the current process
        and write it to the named file as a line of text.

        """
    open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
    open_mode = 0o644
    pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
    pidfile = os.fdopen(pidfile_fd, 'w')

    # According to the FHS 2.3 section on PID files in /var/run:
    #
    #   The file must consist of the process identifier in
    #   ASCII-encoded decimal, followed by a newline character. For
    #   example, if crond was process number 25, /var/run/crond.pid
    #   would contain three characters: two, five, and newline.

    pid = os.getpid()
    pidfile.write("%s\n" % pid)
    pidfile.close()
evaluation.py 文件源码 项目:deep_srl 作者: luheng 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, data, label_dict,
               gold_props_file=None,
               use_se_marker=False,
               pred_props_file=None, 
               word_dict=None):

    self.data = data
    self.best_accuracy = 0.0
    self.has_best = False
    self.label_dict = label_dict
    self.gold_props_file = gold_props_file
    self.pred_props_file = pred_props_file
    self.use_se_marker = use_se_marker

    if gold_props_file is None and pred_props_file is None:
      print ('Warning: not using official gold predicates. Not for formal evaluation.')
      ''' Output to mock gold '''
      assert word_dict != None
      conll_output_path = join(ROOT_DIR, 'temp/srl_pred_%d.gold.tmp' % os.getpid())
      print_gold_to_conll(self.data, word_dict, label_dict, conll_output_path)
      self.pred_props_file = conll_output_path
introspect.py 文件源码 项目:TACTIC-Handler 作者: listyque 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def commit(my, xml=None):

        if not xml:
            xml = my.xml

        from tactic_client_lib import TacticServerStub
        my.server = TacticServerStub.get()

        search_type = "prod/session_contents"

        # get more info
        pid = os.getpid()
        login = my.server.get_login()

        data = { 'pid': pid, 'login': login, 'data': xml }

        my.server.insert( search_type, data)
mbootuz.py 文件源码 项目:mbootuz 作者: ckhung 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def mounted_at(dev='', loopback=''):
    df = subprocess.check_output(['df'])
    if dev:
    fn = dev[dev.rfind('/')+1:]
    dev_or_loop = dev
    m = re.search('^' + dev + r'\s.*\s(\S+)$', df, flags=re.MULTILINE)
    elif loopback:
    dev_or_loop = loopback
    fn = loopback[loopback.rfind('/')+1:]
    m = re.search(r'\s(/lib/live/\S*' + fn + ')$', df, flags=re.MULTILINE)
    else:
    sys.exit('mounted_at() needs at least one arg')
    if (m):
    return m.group(1)
    else:
        target_mp = '/tmp/mbootuz-' + str(os.getpid()) + '-' + fn
        subprocess.call(['mkdir', target_mp])
        try:
            subprocess.check_output(['mount', dev_or_loop, target_mp])
        except subprocess.CalledProcessError as e:
            subprocess.call(['rmdir', target_mp])
            sys.exit('mount failure [' + e.output +
                '], mbootuz aborted')
    atexit.register(cleanup, target_mp)
        return target_mp
web_app.py 文件源码 项目:react-tornado-graphql-example 作者: yatsu 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def _execute_command(self, command):
        if len(self.job_servers) == 0:
            app_log.error('there is no job server')
            return

        server = self.job_servers[self.job_server_index]
        self.job_server_index = (self.job_server_index + 1) % len(self.job_servers)

        context = zmq.Context.instance()
        zmq_sock = context.socket(zmq.DEALER)
        zmq_sock.linger = 1000
        zmq_sock.identity = bytes(str(os.getpid()), 'ascii')
        ip = server['ip']
        if ip == '*':
            ip = 'localhost'
        url = 'tcp://{0}:{1}'.format(ip, server['zmq_port'])
        app_log.info('connect %s', url)
        zmq_sock.connect(url)

        command = json_encode({'command': command})
        app_log.info('command: %s', command)
        zmq_sock.send_multipart([b'0', bytes(command, 'ascii')])

        stream = ZMQStream(zmq_sock)
        stream.on_recv(self.response_handler)
pidlockfile.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def write_pid_to_pidfile(pidfile_path):
    """ Write the PID in the named PID file.

        Get the numeric process ID (“PID”) of the current process
        and write it to the named file as a line of text.

        """
    open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
    open_mode = 0o644
    pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
    pidfile = os.fdopen(pidfile_fd, 'w')

    # According to the FHS 2.3 section on PID files in /var/run:
    #
    #   The file must consist of the process identifier in
    #   ASCII-encoded decimal, followed by a newline character. For
    #   example, if crond was process number 25, /var/run/crond.pid
    #   would contain three characters: two, five, and newline.

    pid = os.getpid()
    pidfile.write("%s\n" % pid)
    pidfile.close()
pool.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def reset(self):
        # Ignore this race condition -- if many threads are resetting at once,
        # the pool_id will definitely change, which is all we care about.
        self.pool_id += 1
        self.pid = os.getpid()

        sockets = None
        try:
            # Swapping variables is not atomic. We need to ensure no other
            # thread is modifying self.sockets, or replacing it, in this
            # critical section.
            self.lock.acquire()
            sockets, self.sockets = self.sockets, set()
        finally:
            self.lock.release()

        for sock_info in sockets:
            sock_info.close()


问题


面经


文章

微信
公众号

扫码关注公众号