def chat(host, port, task=None):
task.set_daemon()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock = pycos.AsyncSocket(sock)
sock.bind((host, port))
sock.listen(128)
print('server at %s' % str(sock.getsockname()))
clients = set()
try:
while True:
conn, addr = yield sock.accept()
clients.add(conn)
pycos.Task(client_send, clients, conn)
except:
for client in clients:
client.shutdown(socket.SHUT_RDWR)
client.close()
raise
python类socket()的实例源码
def __init__(self, addr="127.0.0.1", port=4444):
"""Initialize the socket and initialize pdb."""
# Backup stdin and stdout before replacing them by the socket handle
self.old_stdout = sys.stdout
self.old_stdin = sys.stdin
# Open a 'reusable' socket to let the webapp reload on the same port
self.skt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
self.skt.bind((addr, port))
self.skt.listen(1)
(clientsocket, address) = self.skt.accept()
handle = clientsocket.makefile('rw')
pdb.Pdb.__init__(self, completekey='tab', stdin=handle, stdout=handle)
sys.stdout = sys.stdin = handle
def get_iphostname():
'''??linux?????????IP??'''
def get_ip(ifname):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ipaddr = socket.inet_ntoa(fcntl.ioctl(
sock.fileno(),
0x8915, # SIOCGIFADDR
struct.pack('256s', ifname[:15])
)[20:24]
)
sock.close()
return ipaddr
try:
ip = get_ip('eth0')
except IOError:
ip = get_ip('eno1')
hostname = socket.gethostname()
return {'hostname': hostname, 'ip':ip}
def client_proc(host, port, input, task=None):
# client reads input file and sends data in chunks
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock = pycos.AsyncSocket(sock)
yield sock.connect((host, port))
# data can be written to this asynchronous socket; however, for
# illustration, convert its file descriptor to asynchronous file
# and write to that instead
afd = pycos.asyncfile.AsyncFile(sock)
input = open(input)
csum = hashlib.sha1()
while True:
data = os.read(input.fileno(), 16*1024)
if not data:
break
csum.update(data)
n = yield afd.write(data, full=True)
afd.close()
print('client sha1 csum: %s' % csum.hexdigest())
def server_proc(conn, task=None):
# conn is a synchronous socket (as it is obtained from synchronous
# 'accept'); it's file-descriptor is converted to asynchronous
# file to read data from that
afd = pycos.asyncfile.AsyncFile(conn)
csum = hashlib.sha1()
nlines = 0
while True:
# read lines from data
line = yield afd.readline()
if not line:
break
csum.update(line)
nlines += 1
afd.close()
print('server sha1 csum: %s' % (csum.hexdigest()))
print('lines: %s' % (nlines))
def chat(host, port, task=None):
task.set_daemon()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock = pycos.AsyncSocket(sock)
sock.bind((host, port))
sock.listen(128)
print('server at %s' % str(sock.getsockname()))
clients = set()
try:
while True:
conn, addr = yield sock.accept()
clients.add(conn)
pycos.Task(client_send, clients, conn)
except:
for client in clients:
client.shutdown(socket.SHUT_RDWR)
client.close()
raise
def client(host, port, n, task=None):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock = pycos.AsyncSocket(sock)
yield sock.connect((host, port))
print('%s connected' % n)
# send arbitrary length of data
msg = '%d: ' % n + '-' * random.randint(100,300) + '/'
msg = msg.encode()
yield sock.sendall(msg)
sock.close()
# pycos.logger.setLevel(pycos.Logger.DEBUG)
# run 10 client tasks
def client_proc(host, port, input, task=None):
# client reads input file and sends data in chunks
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock = pycos.AsyncSocket(sock)
yield sock.connect((host, port))
# data can be written to this asynchronous socket; however, for
# illustration, convert its file descriptor to asynchronous file
# and write to that instead
afd = pycos.asyncfile.AsyncFile(sock)
input = open(input)
csum = hashlib.sha1()
while True:
data = os.read(input.fileno(), 16*1024)
if not data:
break
csum.update(data)
n = yield afd.write(data, full=True)
afd.close()
print('client sha1 csum: %s' % csum.hexdigest())
def server_proc(conn, task=None):
# conn is a synchronous socket (as it is obtained from synchronous
# 'accept'); it's file-descriptor is converted to asynchronous
# file to read data from that
afd = pycos.asyncfile.AsyncFile(conn)
csum = hashlib.sha1()
nlines = 0
while True:
# read lines from data
line = yield afd.readline()
if not line:
break
csum.update(line)
nlines += 1
afd.close()
print('server sha1 csum: %s' % (csum.hexdigest()))
print('lines: %s' % (nlines))
def _async_send(self, *args):
"""Internal use only; use 'send' with 'yield' instead.
Asynchronous version of socket send method.
"""
def _send():
try:
sent = self._rsock.send(*args)
except:
self._write_fn = None
self._notifier.clear(self, _AsyncPoller._Write)
self._write_task.throw(*sys.exc_info())
else:
self._write_fn = None
self._notifier.clear(self, _AsyncPoller._Write)
self._write_task._proceed_(sent)
if not self._scheduler:
self._scheduler = Pycos.scheduler()
self._notifier = self._scheduler._notifier
self._register()
self._write_task = Pycos.cur_task(self._scheduler)
self._write_task._await_()
self._write_fn = _send
self._notifier.add(self, _AsyncPoller._Write)
def _async_sendto(self, *args):
"""Internal use only; use 'sendto' with 'yield' instead.
Asynchronous version of socket sendto method.
"""
def _sendto():
try:
sent = self._rsock.sendto(*args)
except:
self._write_fn = None
self._notifier.clear(self, _AsyncPoller._Write)
self._write_task.throw(*sys.exc_info())
else:
self._write_fn = None
self._notifier.clear(self, _AsyncPoller._Write)
self._write_task._proceed_(sent)
if not self._scheduler:
self._scheduler = Pycos.scheduler()
self._notifier = self._scheduler._notifier
self._register()
self._write_task = Pycos.cur_task(self._scheduler)
self._write_task._await_()
self._write_fn = _sendto
self._notifier.add(self, _AsyncPoller._Write)
def _socketpair():
if hasattr(socket, 'socketpair'):
return socket.socketpair()
srv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv_sock.bind(('127.0.0.1', 0))
srv_sock.listen(1)
write_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
write_sock.setblocking(False)
try:
write_sock.connect(srv_sock.getsockname()[:2])
except socket.error as e:
if e.args[0] in (EINPROGRESS, EWOULDBLOCK):
pass
else:
raise
write_sock.setblocking(True)
read_sock = srv_sock.accept()[0]
except:
write_sock.close()
raise
finally:
srv_sock.close()
return (read_sock, write_sock)
def _timed_out(self):
if self._rsock and self._rsock.type & socket.SOCK_STREAM:
if self._read_overlap or self._write_overlap:
win32file.CancelIo(self._fileno)
if self._read_task:
if self._rsock and self._rsock.type & socket.SOCK_DGRAM:
self._notifier.clear(self, _AsyncPoller._Read)
self._read_fn = None
self._read_task.throw(socket.timeout('timed out'))
self._read_result = self._read_task = None
if self._write_task:
if self._rsock and self._rsock.type & socket.SOCK_DGRAM:
self._notifier.clear(self, _AsyncPoller._Write)
self._write_fn = None
self._write_task.throw(socket.timeout('timed out'))
self._write_result = self._write_task = None
def chat(host, port, task=None):
task.set_daemon()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock = pycos.AsyncSocket(sock)
sock.bind((host, port))
sock.listen(128)
print('server at %s' % str(sock.getsockname()))
clients = set()
try:
while True:
conn, addr = yield sock.accept()
clients.add(conn)
pycos.Task(client_send, clients, conn)
except:
for client in clients:
client.shutdown(socket.SHUT_RDWR)
client.close()
raise
def server_proc(conn, task=None):
# conn is a synchronous socket (as it is obtained from synchronous
# 'accept'); it's file-descriptor is converted to asynchronous
# file to read data from that
afd = pycos.asyncfile.AsyncFile(conn)
csum = hashlib.sha1()
nlines = 0
while True:
# read lines from data
line = yield afd.readline()
if not line:
break
csum.update(line)
nlines += 1
afd.close()
print('server sha1 csum: %s' % (csum.hexdigest()))
print('lines: %s' % (nlines))
def _async_recvfrom(self, *args):
"""Internal use only; use 'recvfrom' with 'yield' instead.
Asynchronous version of socket recvfrom method.
"""
def _recvfrom():
try:
buf = self._rsock.recvfrom(*args)
except:
self._read_fn = None
self._notifier.clear(self, _AsyncPoller._Read)
self._read_task.throw(*sys.exc_info())
else:
self._read_fn = None
self._notifier.clear(self, _AsyncPoller._Read)
self._read_task._proceed_(buf)
if not self._scheduler:
self._scheduler = Pycos.scheduler()
self._notifier = self._scheduler._notifier
self._register()
self._read_task = Pycos.cur_task(self._scheduler)
self._read_task._await_()
self._read_fn = _recvfrom
self._notifier.add(self, _AsyncPoller._Read)
def _async_send(self, *args):
"""Internal use only; use 'send' with 'yield' instead.
Asynchronous version of socket send method.
"""
def _send():
try:
sent = self._rsock.send(*args)
except:
self._write_fn = None
self._notifier.clear(self, _AsyncPoller._Write)
self._write_task.throw(*sys.exc_info())
else:
self._write_fn = None
self._notifier.clear(self, _AsyncPoller._Write)
self._write_task._proceed_(sent)
if not self._scheduler:
self._scheduler = Pycos.scheduler()
self._notifier = self._scheduler._notifier
self._register()
self._write_task = Pycos.cur_task(self._scheduler)
self._write_task._await_()
self._write_fn = _send
self._notifier.add(self, _AsyncPoller._Write)
def is_port_enabled(hostname, port):
"""
To check if a port is enabled or not. For example
To check ssh port is enabled or not,
is_port_enabled(HOSTNAME, 22)
To see glusterd port is enabled,
is_port_enabled(HOSTNAME, 24007)
"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((hostname, port))
enabled = True
except socket.error:
enabled = False
s.close()
return enabled
def port_knock_units(self, sentry_units, port=22,
timeout=15, expect_success=True):
"""Open a TCP socket to check for a listening sevice on each
listed juju unit.
:param sentry_units: list of sentry unit pointers
:param port: TCP port number, default to 22
:param timeout: Connect timeout, default to 15 seconds
:expect_success: True by default, set False to invert logic
:returns: None if successful, Failure message otherwise
"""
for unit in sentry_units:
host = unit.info['public-address']
connected = self.port_knock_tcp(host, port, timeout)
if not connected and expect_success:
return 'Socket connect failed.'
elif connected and not expect_success:
return 'Socket connected unexpectedly.'
def get_my_ip():
"""
Returns the actual ip of the local machine.
This code figures out what source address would be used if some traffic
were to be sent out to some well known address on the Internet. In this
case, a Google DNS server is used, but the specific address does not
matter much. No traffic is actually sent.
"""
try:
csock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
csock.connect(('8.8.8.8', 80))
(addr, port) = csock.getsockname()
csock.close()
return addr
except socket.error:
return "127.0.0.1"
def run(self):
while True:
try:
text = irc.recv(2040)
if text.find('PING') != -1:
irc.send('PONG ' + text.split() [1] + '\r\n')
connected = True
elif text.find('PRIVMSG') != -1:
end = text.find('!')
username = text[1:end]
start = text.find(channel + ' :')
message = text[start + len(channel) + 2:]
log('recv', '<' + username + '> ' + message)
elif text.find('PART') != -1:
end = text.find('!')
username = text[1:end]
log('recv', '[' + username + '] left \n')
elif text.find('JOIN') != -1:
end = text.find('!')
username = text[1:end]
log('recv', '[' + username + ']' + ' joined \n')
else:
print(text)
except socket.error:
break
def __init__(self):
self.state = 'accept'
# ????????
self.need_read = 10
self.need_write = 0
# ????????
self.have_read = 0
self.have_write = 0
# ????
self.buff_read = ""
self.buff_write = ""
# socket ??
self.sock_obj = ""
# ?????IP
self.sock_addr = ""
# ????check_fd????
# ??????
self.read_stime= None
# ??read????????
self.read_itime= 30
def state_log(self):
if debug:
msg = (
'\n current_fd:{fd} \n state:{state}'
'\n need_read:{need_read} \n need_write:{need_write}'
'\n have_read:{have_read}\n have_write:{have_write}'
'\n buff_read:{buff_read} \n buff_write:{buff_write}'
) .format(
fd = self.sock_obj.fileno(), state = self.state,
need_read = self.need_read, need_write = self.need_write,
have_read = self.need_read, have_write = self.need_write,
buff_read = self.need_read, buff_write = self.need_write,
)
logger.debug(msg)
#}}}
# ????????socket
#{{{bind_socket
def accept(self, fd):
'''??????fd???fd??
'''
try:
# ??fd??????????fd?
sock_state = self.conn_state[fd]
# ??sock??????soket?
sock = sock_state.sock_obj
# ??accept???????????????????conn?????socket???addr?????????
conn, addr = sock.accept()
# ??socket????
conn.setblocking(0)
# ?epoll??????socket??fd
self.epoll_sock.register(conn.fileno(), select.EPOLLIN)
# ???????conn???????sock
self.setFd(conn)
# ????fd????read epoll????????????????
self.conn_state[conn.fileno()].state = "read"
except socket.error as msg:
# ECONNABORTED??TCP???????????RST
# EAGIIN ???????????????????
# ????accept
if msg.args[0] in (errno.ECONNABORTED, errno.EAGAIN):
return
raise
def process(self, fd):
'''read????? process ????
'''
# ??socket
sock_state = self.conn_state[fd]
# ????
response = self.logic(sock_state.buff_read)
# ????????????????????buff_write
sock_state.buff_write = "%010d%s" % (len(response), response)
# ???????
sock_state.need_write = len(sock_state.buff_write)
# ???????
sock_state.state = "write"
# ??epoll??????
self.epoll_sock.modify(fd, select.EPOLLOUT)
# ??write??
self.state_machine(fd)
def accept(self, fd):
'''accpet??epoll?????????
??????socket?????
'''
logs.dblog("accept: accept client")
try:
# ??fd??????????fd?
sock_state = self.conn_state[fd]
# ??sock??????soket?
sock = sock_state.sock_obj
# ??accept???????????????????conn?????socket???addr?????????
conn, addr = sock.accept()
# ??socket????
conn.setblocking(0)
# ????????socket??,???IP??
logs.dblog("accept: find new socket client fd(%s)" % conn.fileno())
return conn,addr[0]
except socket.error as msg:
# EAGIIN ???????????????????(erron???11)
# ECONNABORTED??TCP???????????RST(erron???103)
# ????accept ?????? retry
if msg.errno in (11, 103):
return "retry"
def process(self, fd):
'''??????????logic??????'''
logs.dblog("proces: proces start")
# ??socket
sock_state = self.conn_state[fd]
# ????
response = self.logic(sock_state.buff_read)
# ????????????????????buff_write
sock_state.buff_write = "%010d%s" % (len(response), response)
# ???????
sock_state.need_write = len(sock_state.buff_write)
# ???????
sock_state.state = "write"
# ??epoll??????????epoll??????epoll???????????
# ????????
self.epoll_sock.modify(fd, select.EPOLLOUT)
# ???????????
logs.dblog("***process: process end fd state change to write***")
sock_state.state_log()
def state_log(self):
if debug:
msg = (
'\n current_fd:{fd} \n state:{state}'
'\n need_read:{need_read} \n need_write:{need_write}'
'\n have_read:{have_read}\n have_write:{have_write}'
'\n buff_read:{buff_read} \n buff_write:{buff_write}'
) .format(
fd = self.sock_obj.fileno(), state = self.state,
need_read = self.need_read, need_write = self.need_write,
have_read = self.need_read, have_write = self.need_write,
buff_read = self.need_read, buff_write = self.need_write,
)
logger.debug(msg)
#}}}
# ????????socket
#{{{bind_socket
def __init__(self, sock, logic):
# ??????,??????socket??????????????????
self.conn_state = {}
# ??setFD?????socket?????????
self.setFd(sock)
# ??epoll??????????????????
self.epoll_sock = select.epoll()
# ??????epoll???????socket????????fd?????
# ?????????epoll????? EPOLLIN ?????
# ??????https://docs.python.org/2.7/library/select.html?highlight=epoll#select.poll.register
self.epoll_sock.register(sock.fileno(), select.EPOLLIN)
# ??????
self.logic = logic
# ?????????????
self.sm = {
"accept": self.accept,
"read": self.read,
"write": self.write,
"process": self.process,
"closing": self.close,
}
def accept(self, fd):
'''??????fd???fd??
'''
try:
# ??fd??????????fd?
sock_state = self.conn_state[fd]
# ??sock??????soket?
sock = sock_state.sock_obj
# ??accept???????????????????conn?????socket???addr?????????
conn, addr = sock.accept()
# ??socket????
conn.setblocking(0)
# ?epoll??????socket??fd
self.epoll_sock.register(conn.fileno(), select.EPOLLIN)
# ???????conn???????sock
self.setFd(conn)
# ????fd????read epoll????????????????
self.conn_state[conn.fileno()].state = "read"
except socket.error as msg:
# ECONNABORTED??TCP???????????RST
# EAGIIN ???????????????????
# ????accept
if msg.args[0] in (errno.ECONNABORTED, errno.EAGAIN):
return
raise