python类getppid()的实例源码

runserver.py 文件源码 项目:django-webpack 作者: csinchok 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def run(self, *args, **options):
        p_path = os.path.join('/proc', str(os.getppid()), 'cmdline')
        with open(p_path, 'rb') as f:
            p_cmdline = f.read().split(b'\x00')

        p = None
        if b'runserver' not in p_cmdline:
            self.stdout.write("Starting webpack-dev-server...")
            p = webpack_dev_server()
            wrapper = io.TextIOWrapper(p.stdout, line_buffering=True)
            first_line = next(wrapper)
            webpack_host = first_line.split()[-1]

            print(webpack_host)

        super().run(**options)

        if p:
            p.kill()
            p.wait()
processes.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def startEventLoop(name, port, authkey, ppid, debug=False):
    if debug:
        import os
        cprint.cout(debug, '[%d] connecting to server at port localhost:%d, authkey=%s..\n' 
                    % (os.getpid(), port, repr(authkey)), -1)
    conn = multiprocessing.connection.Client(('localhost', int(port)), authkey=authkey)
    if debug:
        cprint.cout(debug, '[%d] connected; starting remote proxy.\n' % os.getpid(), -1)
    global HANDLER
    #ppid = 0 if not hasattr(os, 'getppid') else os.getppid()
    HANDLER = RemoteEventHandler(conn, name, ppid, debug=debug)
    while True:
        try:
            HANDLER.processRequests()  # exception raised when the loop should exit
            time.sleep(0.01)
        except ClosedError:
            break
processes.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def startEventLoop(name, port, authkey, ppid, debug=False):
    if debug:
        import os
        cprint.cout(debug, '[%d] connecting to server at port localhost:%d, authkey=%s..\n' 
                    % (os.getpid(), port, repr(authkey)), -1)
    conn = multiprocessing.connection.Client(('localhost', int(port)), authkey=authkey)
    if debug:
        cprint.cout(debug, '[%d] connected; starting remote proxy.\n' % os.getpid(), -1)
    global HANDLER
    #ppid = 0 if not hasattr(os, 'getppid') else os.getppid()
    HANDLER = RemoteEventHandler(conn, name, ppid, debug=debug)
    while True:
        try:
            HANDLER.processRequests()  # exception raised when the loop should exit
            time.sleep(0.01)
        except ClosedError:
            break
benchmarking.py 文件源码 项目:NordVPN-NetworkManager 作者: Chadsr 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_num_processes(num_servers):
    # Since each process is not resource heavy and simply takes time waiting for pings, maximise the number of processes (within constraints of the current configuration)

    # Maximum open file descriptors of current configuration
    soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE)

    # Find how many file descriptors are already in use by the parent process
    ppid = os.getppid()
    used_file_descriptors = int(subprocess.run('ls -l /proc/' + str(ppid) + '/fd | wc -l', shell=True, stdout=subprocess.PIPE).stdout.decode('utf-8'))

    # Max processes is the number of file descriptors left, before the sof limit (configuration maximum) is reached
    max_processes = int((soft_limit - used_file_descriptors) / 2)

    if num_servers > max_processes:
        return max_processes
    else:
        return num_servers
get_os_info.py 文件源码 项目:misc 作者: duboviy 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def get_os_info():
    """ Get some OS info with psutils and humanfriendly. """
    is_win = lambda: True if os.name == 'nt' else False
    pid = os.getgid() if not is_win() else None
    ppid = os.getppid()

    now = time.time()

    current_process = psutil.Process(pid=ppid)
    process_uptime = current_process.create_time()
    process_uptime_delta = now - process_uptime
    process_uptime_human = humanfriendly.format_timespan(process_uptime_delta)

    system_uptime = psutil.boot_time()
    system_uptime_delta = now - system_uptime
    system_uptime_human = humanfriendly.format_timespan(system_uptime_delta)

    free_memory = psutil.disk_usage('/').free
    total_memory = psutil.disk_usage('/').total
    percent_used_memory = psutil.disk_usage('/').percent
    used_memory = psutil.disk_usage('/').used
    free_memory_human = humanfriendly.format_size(free_memory)

    return vars()
process.py 文件源码 项目:monitorstack 作者: openstack 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def get_cmdlines():
    """Retrieve the cmdline of each process running on the system."""
    processes = []

    # Get our current PID as well as the parent so we can exclude them.
    current_pid = os.getpid()
    parent_pid = os.getppid()

    for proc in psutil.process_iter():
        try:
            if proc.pid not in [current_pid, parent_pid]:
                processes.append(' '.join(proc.cmdline()))
        except psutil.NoSuchProcess:
            pass

    return processes
daemon.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def is_process_started_by_init():
    """ Determine whether the current process is started by `init`.

        :return: ``True`` iff the parent process is `init`; otherwise
            ``False``.

        The `init` process is the one with process ID of 1.

        """
    result = False

    init_pid = 1
    if os.getppid() == init_pid:
        result = True

    return result
bot_manager.py 文件源码 项目:django-botmanager 作者: dimoha 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def run(self):
        setproctitle(self.process_name)
        while True:

            if os.getppid() != self.parent_pid:
                logging.info(u"Parent process is die. Exit..")
                break

            for task_class_string, processes_count in self.config['tasks'].iteritems():
                task_class = Command.import_from_string(task_class_string)

                if self._time_to_set_tasks_for(task_class) and task_class.SELF_SUPPORT:
                    self.shedule_cache[task_class.name] = datetime.now()
                    try:
                        task_class.set_tasks()
                    except Exception as e:
                        logging.exception(e)

            sleep(self.SET_PERIOD_SECONDS)
bot_manager.py 文件源码 项目:django-botmanager 作者: dimoha 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run(self):
        setproctitle(self.process_name)
        while True:
            try:

                if os.getppid() != self.parent_pid:
                    logging.info(u"Parent process is die. Exit..")
                    break

                task = self.queue.get_nowait()
                try:
                    self.run_task(task)
                except Exception as e:
                    logging.exception(u"Worker {0} catch exception on task {1}: {2}".format(
                        self, task.id, e
                    ))
                finally:
                    pass
            except QueueEmpty:
                sleep(1)
            except Exception as e:
                logging.exception(
                    u"Error in queue preparing: %s".format(e)
                )
common.py 文件源码 项目:qqbot 作者: pandolia 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def getppid():
        '''
        :return: The pid of the parent of this process.
        '''
        pe = PROCESSENTRY32()
        pe.dwSize = ctypes.sizeof(PROCESSENTRY32)
        mypid = GetCurrentProcessId()
        snapshot = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0)

        result = 0
        try:
            have_record = Process32First(snapshot, ctypes.byref(pe))

            while have_record:
                if mypid == pe.th32ProcessID:
                    result = pe.th32ParentProcessID
                    break

                have_record = Process32Next(snapshot, ctypes.byref(pe))

        finally:
            CloseHandle(snapshot)

        return result
arbiter.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def maybe_promote_master(self):
        if self.master_pid == 0:
            return

        if self.master_pid != os.getppid():
            self.log.info("Master has been promoted.")
            # reset master infos
            self.master_name = "Master"
            self.master_pid = 0
            self.proc_name = self.cfg.proc_name
            del os.environ['GUNICORN_PID']
            # rename the pidfile
            if self.pidfile is not None:
                self.pidfile.rename(self.cfg.pidfile)
            # reset proctitle
            util._setproctitle("master [%s]" % self.proc_name)
threading-verification.py 文件源码 项目:biocommons.seqrepo 作者: biocommons 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def fetch_in_thread(sr, nsa):
    """fetch a sequence in a thread

    """

    def fetch_seq(q, nsa):
        pid, ppid = os.getpid(), os.getppid()
        q.put((pid, ppid, sr[nsa]))

    q = Queue()
    p = Process(target=fetch_seq, args=(q, nsa))
    p.start()
    pid, ppid, seq = q.get()
    p.join()

    assert pid != ppid, "sequence was not fetched from thread"
    return pid, ppid, seq
agent.py 文件源码 项目:MalwrAgent 作者: michaelschratt 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, name, mode, chain, interval, result_queue, logging_level=0):
        self.name = name
        self.mode = mode
        self.chain = chain
        self.interval = interval
        self.result_queue = result_queue

        self.pid = str(os.getpid())
        if hasattr(os, 'getppid'):  # only available on Unix
            self.ppid = str(os.getppid())
        # self.job_id = job_id
        self.registered = False
        self.logging_level = logging_level

        self.logger = Logger(self.name, self.logging_level)

        try:
            self.logger.log_debug('Starting new process with PID ' + self.pid)
            self.run()
        except KeyboardInterrupt:
            pass
warden_server.py 文件源码 项目:STaaS 作者: CESNET 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def getDebug(self):
        return {
            "environment": self.req.env,
            "client": self.req.client.__dict__,
            "database": self.db.get_debug(),
            "system": {
                "uname": os.uname()
            },
            "process": {
                "cwd": os.getcwdu(),
                "pid": os.getpid(),
                "ppid": os.getppid(),
                "pgrp": os.getpgrp(),
                "uid": os.getuid(),
                "gid": os.getgid(),
                "euid": os.geteuid(),
                "egid": os.getegid(),
                "groups": os.getgroups()
            }
        }
arbiter.py 文件源码 项目:tabmaster 作者: NicolasMinghetti 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def maybe_promote_master(self):
        if self.master_pid == 0:
            return

        if self.master_pid != os.getppid():
            self.log.info("Master has been promoted.")
            # reset master infos
            self.master_name = "Master"
            self.master_pid = 0
            self.proc_name = self.cfg.proc_name
            del os.environ['GUNICORN_PID']
            # rename the pidfile
            if self.pidfile is not None:
                self.pidfile.rename(self.cfg.pidfile)
            # reset proctitle
            util._setproctitle("master [%s]" % self.proc_name)
test_atfork.py 文件源码 项目:procszoo 作者: procszoo 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def procinfo(str):
        if "sched_getcpu" not in show_available_c_functions():
            cpu_idx = -1
        else:
            cpu_idx = sched_getcpu()
        pid = os.getpid()
        ppid = os.getppid()
        uid = os.getuid()
        gid = os.getgid()
        euid = os.geteuid()
        egid = os.getegid()
        hostname = gethostname()
        procs = os.listdir("/proc")
        printf("""%s:
        cpu: %d pid: %d ppid: %d
        uid %d gid %d euid %d egid %d
        hostname: %s
        procs: %s"""
        % (str, cpu_idx, pid, ppid, uid, gid, euid, egid,
               hostname, ", ".join(procs[-4:])))
worker.py 文件源码 项目:sanic 作者: channelcat 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _check_alive(self):
        # If our parent changed then we shut down.
        pid = os.getpid()
        try:
            while self.alive:
                self.notify()

                req_count = sum(
                    self.servers[srv]["requests_count"] for srv in self.servers
                )
                if self.max_requests and req_count > self.max_requests:
                    self.alive = False
                    self.log.info("Max requests exceeded, shutting down: %s",
                                  self)
                elif pid == os.getpid() and self.ppid != os.getppid():
                    self.alive = False
                    self.log.info("Parent changed, shutting down: %s", self)
                else:
                    await asyncio.sleep(1.0, loop=self.loop)
        except (Exception, BaseException, GeneratorExit, KeyboardInterrupt):
            pass
arbiter.py 文件源码 项目:infiblog 作者: RajuKoushik 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def maybe_promote_master(self):
        if self.master_pid == 0:
            return

        if self.master_pid != os.getppid():
            self.log.info("Master has been promoted.")
            # reset master infos
            self.master_name = "Master"
            self.master_pid = 0
            self.proc_name = self.cfg.proc_name
            del os.environ['GUNICORN_PID']
            # rename the pidfile
            if self.pidfile is not None:
                self.pidfile.rename(self.cfg.pidfile)
            # reset proctitle
            util._setproctitle("master [%s]" % self.proc_name)
util.py 文件源码 项目:AmqpCode 作者: SVADemoAPP 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_client_properties_with_defaults(provided_client_properties={}):
  ppid = 0
  try:
    ppid = os.getppid()
  except:
    pass

  client_properties = {"product": "qpid python client",
                       "version": "development",
                       "platform": os.name,
                       "qpid.client_process": os.path.basename(sys.argv[0]),
                       "qpid.client_pid": os.getpid(),
                       "qpid.client_ppid": ppid}

  if provided_client_properties:
    client_properties.update(provided_client_properties)
  return client_properties
parentpoller.py 文件源码 项目:qudi 作者: Ulm-IQO 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run(self):
        """ Run the parentpoller.
        """
        # We cannot use os.waitpid because it works only for child processes.
        from errno import EINTR
        while True:
            try:
                if os.getppid() == 1:
                    if hasattr(self.quitfunction, '__call__'):
                        self.quitfunction()
                    waitForClose()
                    os._exit(1)
                time.sleep(1.0)
            except OSError as e:
                if e.errno == EINTR:
                    continue
                raise
sysinfo_test.py 文件源码 项目:treadmill 作者: Morgan-Stanley 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_proc_info(self):
        """Proc info test."""
        proc_info = sysinfo.proc_info(os.getpid())
        # Handle running python with options, as in:
        # sys.argv[0] == 'python -m unittest'
        expected = os.path.basename(sys.argv[0].split()[0])

        # TODO: When running coverage, script is execed under python.
        #                but sys.argv[0] reports as setup.py
        #
        # train starts subprocess for the test with altnose.py
        # this makes this assert unusable
        expected_progs = ['setup.py', 'altnose.py', 'sysinfo_test.py']
        if expected not in expected_progs:
            self.assertEqual(expected, proc_info.filename)
        self.assertEqual(os.getppid(), proc_info.ppid)

        # We do not check the starttime, but just verify that calling
        # proc_info twice returns same starttime, which can be used as part of
        # process signature.
        self.assertEqual(
            proc_info.starttime,
            sysinfo.proc_info(os.getpid()).starttime
        )
arbiter.py 文件源码 项目:metrics 作者: Jeremy-Friedman 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def maybe_promote_master(self):
        if self.master_pid == 0:
            return

        if self.master_pid != os.getppid():
            self.log.info("Master has been promoted.")
            # reset master infos
            self.master_name = "Master"
            self.master_pid = 0
            self.proc_name = self.cfg.proc_name
            del os.environ['GUNICORN_PID']
            # rename the pidfile
            if self.pidfile is not None:
                self.pidfile.rename(self.cfg.pidfile)
            # reset proctitle
            util._setproctitle("master [%s]" % self.proc_name)
arbiter.py 文件源码 项目:metrics 作者: Jeremy-Friedman 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def maybe_promote_master(self):
        if self.master_pid == 0:
            return

        if self.master_pid != os.getppid():
            self.log.info("Master has been promoted.")
            # reset master infos
            self.master_name = "Master"
            self.master_pid = 0
            self.proc_name = self.cfg.proc_name
            del os.environ['GUNICORN_PID']
            # rename the pidfile
            if self.pidfile is not None:
                self.pidfile.rename(self.cfg.pidfile)
            # reset proctitle
            util._setproctitle("master [%s]" % self.proc_name)
arbiter.py 文件源码 项目:liberator 作者: libscie 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def maybe_promote_master(self):
        if self.master_pid == 0:
            return

        if self.master_pid != os.getppid():
            self.log.info("Master has been promoted.")
            # reset master infos
            self.master_name = "Master"
            self.master_pid = 0
            self.proc_name = self.cfg.proc_name
            del os.environ['GUNICORN_PID']
            # rename the pidfile
            if self.pidfile is not None:
                self.pidfile.rename(self.cfg.pidfile)
            # reset proctitle
            util._setproctitle("master [%s]" % self.proc_name)
arbiter.py 文件源码 项目:djanoDoc 作者: JustinChavez 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def maybe_promote_master(self):
        if self.master_pid == 0:
            return

        if self.master_pid != os.getppid():
            self.log.info("Master has been promoted.")
            # reset master infos
            self.master_name = "Master"
            self.master_pid = 0
            self.proc_name = self.cfg.proc_name
            del os.environ['GUNICORN_PID']
            # rename the pidfile
            if self.pidfile is not None:
                self.pidfile.rename(self.cfg.pidfile)
            # reset proctitle
            util._setproctitle("master [%s]" % self.proc_name)
arbiter.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def maybe_promote_master(self):
        if self.master_pid == 0:
            return

        if self.master_pid != os.getppid():
            self.log.info("Master has been promoted.")
            # reset master infos
            self.master_name = "Master"
            self.master_pid = 0
            self.proc_name = self.cfg.proc_name
            del os.environ['GUNICORN_PID']
            # rename the pidfile
            if self.pidfile is not None:
                self.pidfile.rename(self.cfg.pidfile)
            # reset proctitle
            util._setproctitle("master [%s]" % self.proc_name)
test_process.py 文件源码 项目:FancyWord 作者: EastonLee 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_ppid(self):
        if hasattr(os, 'getppid'):
            self.assertEqual(psutil.Process().ppid(), os.getppid())
        this_parent = os.getpid()
        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        self.assertEqual(p.ppid(), this_parent)
        self.assertEqual(p.parent().pid, this_parent)
        # no other process is supposed to have us as parent
        reap_children(recursive=True)
        if APPVEYOR:
            # Occasional failures, see:
            # https://ci.appveyor.com/project/giampaolo/psutil/build/
            #     job/0hs623nenj7w4m33
            return
        for p in psutil.process_iter():
            if p.pid == sproc.pid:
                continue
            # XXX: sometimes this fails on Windows; not sure why.
            self.assertNotEqual(p.ppid(), this_parent, msg=p)
webserver3c.py 文件源码 项目:WebServer 作者: wanzifa 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def handle_request(client_connection):
    request = client_connection.recv(1024)
    print(
            'Child PID:{pid}.Parent PID {ppid}'.format(
                pid=os.getpid(),
                ppid=os.getppid(),
        )
    )
    print(request.decode())
    http_response = """
HTTP/1.1 200 OK

Hello world!
"""
    client_connection.sendall(http_response)
    time.sleep(60)
validate_parent.py 文件源码 项目:pam-typopw 作者: rchatterjee 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def is_valid_parent():
    """
    Authenticates the script by validating top 3 parents, if any of
    them belongs to VALID_PARENTS, with matching RECORDED_DIGEST.
    """
    # f = open('/tmp/typtop.log', 'a')
    RECORDED_DIGESTS = load_recoreded_digest()
    ppid = os.getppid()
    for _ in xrange(3):
        ppid, uid, user, exe = get_ppid_and_attr(ppid)
        if not ppid or int(ppid) <= 0: break
        ppid = int(ppid)
        continue;
        if uid and int(uid) == 0: # any of the uids is 0 (root)
            return True
        if sha256(exe) in RECORDED_DIGESTS:
            return True
    # f.close()
    return False
multiple_image_multiple_label_data_layer.py 文件源码 项目:shuffle-tuple 作者: imisra 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def run(self):
        print 'BlobFetcher started: pid %d; ppid %d'%(os.getpid(), os.getppid())
        self._parent_pid = os.getppid()
        self._self_pid = os.getpid();
        self._prefetch_process_id_q.put(self._self_pid);
        global shared_mem_list
        while True:
            #blobs = get_minibatch(minibatch_db, self._num_classes)
            self.self_cleanup();
            if self._slots_used.empty():
                continue;
            slot = self._slots_used.get();
            im_datas = self._funct_to_call();
            for t in range(self._num_tops):
                shared_mem = shared_mem_list[t][slot];
                with shared_mem.get_lock():
                    s = np.frombuffer(shared_mem.get_obj(), dtype=np.float32);
                    # print s.size, self._shared_shapes[t];
                    shared_mem_arr = np.reshape(s, self._shared_shapes[t]);
                    shared_mem_arr[...] = im_datas[t].astype(np.float32, copy=True);
                    # print 'helper:: ',im_datas[t].min(), im_datas[t].max(), im_datas[t].mean()
            self._slots_filled.put(slot);


问题


面经


文章

微信
公众号

扫码关注公众号