def bind_unix_listener(self, path, backlog=50, user=None):
try:
sock = gevent.socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.setblocking(0)
self.unlink(path)
sock.bind(path)
if user is not None:
import pwd
user = pwd.getpwnam(user)
os.chown(path, user.pw_uid, user.pw_gid)
os.chmod(path, 0777)
sock.listen(backlog)
except Exception, e:
self.logger.error("Create unix socket failed: %s", e.__str__())
return None
return sock
评论列表
文章目录