def findFreePort(interface='127.0.0.1', family=socket.AF_INET,
type=socket.SOCK_STREAM):
"""
Ask the platform to allocate a free port on the specified interface, then
release the socket and return the address which was allocated.
@param interface: The local address to try to bind the port on.
@type interface: C{str}
@param type: The socket type which will use the resulting port.
@return: A two-tuple of address and port, like that returned by
L{socket.getsockname}.
"""
addr = socket.getaddrinfo(interface, 0)[0][4]
probe = socket.socket(family, type)
try:
probe.bind(addr)
return probe.getsockname()
finally:
probe.close()
python类getsockname()的实例源码
def __init__(self, socket, proto):
self.host = socket.getsockname()
self.peer = socket.getpeername()
self.protocall = proto
self.recevied_time = time.time()
def get_port(socket):
"""Return the port to which a socket is bound."""
addr, port = socket.getsockname()
return port
def handle_request(self, socket, address):
ip, port = socket.getsockname()
plugin = self.message_mapping.get(port)
if plugin:
session = EasySession(socket)
plugin.handle_request(session)
session.close()
# ??unix_socket??
def handle_uds_request(self, socket, address):
name = socket.getsockname()
plugin = self.message_mapping.get(name)
if plugin:
session = EasySession(socket)
plugin.handle_request(session)
session.close()
# ??????
def get_port(socket):
"""Return the port to which a socket is bound."""
addr, port = socket.getsockname()
return port
def do_Send():
global currentState, user
inputData = userentry.get()
if len(inputData.strip(' ')) == 0:
print('Detect nothing to send!\n')
return
CmdWin.insert(1.0, "\nPress Send")
# check stage
stateLock.acquire()
checkFlag = currentState.isAfter(States['NAMED'])
stateLock.release()
if not checkFlag:
CmdWin.insert(1.0, "\nSend Error: You are not in any chatroom, please join a chatroom first!")
return
# check for all back and forward link
sendingList = []
stateLock.acquire()
forwardLinkTuple = currentState._getforwardlink()
if forwardLinkTuple is not None:
forwardLink = forwardLinkTuple[1]
sendingList.append(forwardLink)
backwardLinkTupleList = currentState._getbackwardlinks()
backwardLinks = [i[1] for i in backwardLinkTupleList]
roomName = currentState._getroomname()
stateLock.release()
if len(backwardLinks) > 0 :
sendingList = sendingList + backwardLinks
# get all infos desired by sending Text message
userInfoLock.acquire()
userName = user._getname()
userIp = user._getip()
userPort = user._getport()
# update msgID
msgID = currentState.newMsgID()
userInfoLock.release()
# construct the protocal message
originHID = sdbm_hash(userName+userIp+str(userPort))
message = [roomName, str(originHID), userName, str(msgID), str(len(inputData)), inputData]
requestMessage = 'T:' + ':'.join(message) + PROTOCAL_END
print("\nMain Thread: perform the sending process, dispatch data to other peers\n")
print('Message:', requestMessage)
for socket in sendingList:
output = socketOperation(socket, requestMessage, receive = False)
if output == Exceptions['SOCKET_ERROR']:
print('Send Error: cannot sent the message to', socket.getsockname())
MsgWin.insert(1.0, '\n['+userName+']: '+inputData)
# clear the entry if success
userentry.delete(0, END)
def do_Send():
global currentState, user
print("\nPress Send")
# check stage
stateLock.acquire()
checkFlag = currentState.isAfter(States['NAMED'])
stateLock.release()
if not checkFlag:
print("\nSend Error: You are not in any chatroom, please join a chatroom first!")
userentry.delete(0, END)
return
inputData = userentry.get()
if len(inputData.strip(' ')) == 0:
print("\nSend Error: Invalid message!")
return
# check for all back and forward link
sendingList = []
stateLock.acquire()
forwardLinkTuple = currentState._getforwardlink()
if forwardLinkTuple is not None:
forwardLink = forwardLinkTuple[1]
sendingList.append(forwardLink)
backwardLinkTupleList = currentState._getbackwardlinks()
backwardLinks = [i[1] for i in backwardLinkTupleList]
roomName = currentState._getroomname()
stateLock.release()
if len(backwardLinks) > 0 :
sendingList = sendingList + backwardLinks
# get all infos desired by sending Textmessage
print("locked at 1067")
userInfoLock.acquire()
userName = user._getname()
userIp = user._getip()
userPort = user._getport()
# update msgID
msgID = currentState.newMsgID()
userInfoLock.release()
print("1067 released")
# construct the protocal message
originHID = sdbm_hash(userName+userIp+str(userPort))
message = [roomName, str(originHID), userName, str(msgID), str(len(inputData)), inputData]
requestMessage = 'T:' + ':'.join(message) + PROTOCAL_END
print("\nMain Thread: perform the sending process, dispatch data to other peers\n")
print('Message:', requestMessage)
for socket in sendingList:
output = socketOperation(socket, requestMessage, receive = False)
if output == Exceptions['SOCKET_ERROR']:
print('Send Error: cannot sent the message to', socket.getsockname())
print('\n['+userName+']: '+inputData)
# clear the entry if success
userentry.delete(0, END)