def _joinAddr2(self, interface, addr, join):
addr = socket.inet_aton(addr)
interface = socket.inet_aton(interface)
if join:
cmd = socket.IP_ADD_MEMBERSHIP
else:
cmd = socket.IP_DROP_MEMBERSHIP
try:
self.socket.setsockopt(socket.IPPROTO_IP, cmd, addr + interface)
except socket.error as e:
return failure.Failure(error.MulticastJoinError(addr, interface,
*e.args))
python类IP_ADD_MEMBERSHIP的实例源码
def _joinAddr2(self, interface, addr, join):
addr = socket.inet_aton(addr)
interface = socket.inet_aton(interface)
if join:
cmd = socket.IP_ADD_MEMBERSHIP
else:
cmd = socket.IP_DROP_MEMBERSHIP
try:
self.socket.setsockopt(socket.IPPROTO_IP, cmd, addr + interface)
except socket.error as e:
return failure.Failure(error.MulticastJoinError(addr, interface, *e.args))
def run(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, "SO_REUSEPORT"):
try:
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except socket.error as le:
# RHEL6 defines SO_REUSEPORT but it doesn't work
if le.errno == ENOPROTOOPT:
pass
else:
raise
addr = socket.inet_aton(SSDP_ADDR)
interface = socket.inet_aton('0.0.0.0')
cmd = socket.IP_ADD_MEMBERSHIP
self.sock.setsockopt(socket.IPPROTO_IP, cmd, addr + interface)
self.sock.bind(('0.0.0.0', SSDP_PORT))
self.sock.settimeout(1)
while True:
try:
data, addr = self.sock.recvfrom(1024)
self.datagram_received(data, addr)
except socket.timeout:
continue
self.shutdown()
def join(self, multicast_addr):
with self.lock:
if multicast_addr not in self.multicastSet:
self.setsockopt(socket.SOL_IP, socket.IP_ADD_MEMBERSHIP,
socket.inet_aton(multicast_addr) + socket.inet_aton(self.bind_addr))
self.multicastSet.add(multicast_addr)
if self.callback_obj is not None:
self.callback_obj.on_join(self, multicast_addr)
# for RECEIVER to stop receiving datagram from the multicast group
def getData( self, mgroup, hostip, port=29495, pkts=1000, pktlen=1080, block=True, returnSddsAnalyzer=True):
totalRead=0.0
startTime = _time.time()
sock = None
ismulticast=False
blen=10240
bytesRead=0
requestedBytes=pkts*pktlen
data=[]
rawdata=''
try:
try:
ip_class=int(mgroup.split('.')[0])
if ip_class == '224' or ip_class == '239':
ismulticast=True
except:
pass
#print " Capturing ", mgroup, " host ", hostip, " port ", port
sock = _socket.socket(_socket.AF_INET, _socket.SOCK_DGRAM, _socket.IPPROTO_UDP)
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, 1)
sock.bind(("",port))
if ismulticast:
mreq=struct.pack('4s4s',_socket.inet_aton(mgroup),_socket.inet_aton(hostip))
sock.setsockopt(_socket.IPPROTO_IP, _socket.IP_ADD_MEMBERSHIP, mreq)
print "Capturing Socket Interface: (MULTICAST) Host Interface: " + hostip + " Multicast: " + mgroup + " Port: "+ str(port)
else:
print "Capturing Socket Interface: (UDP) Host Interface: " + hostip + " Source Address: " + mgroup + " Port: "+ str(port)
ncnt=0
while totalRead < requestedBytes:
rcvddata = sock.recv(blen,_socket.MSG_WAITALL)
rawdata=rawdata+rcvddata
data=data+list(rcvddata)
totalRead = totalRead + len(rcvddata)
ncnt += 1
print " read ", ncnt, " pkt ", len(rcvddata)
except KeyboardInterrupt,e :
traceback.print_exc()
print "Exception during packet capture: " + str(e)
except Exception, e :
traceback.print_exc()
print "Exception during packet capture: " + str(e)
finally:
endTime=_time.time()
deltaTime=endTime -startTime
if sock: sock.close()
print "Elapsed Time: ", deltaTime, " Total Data (kB): ", totalRead/1000.0, " Rate (kBps): ", (totalRead/1000.0)/deltaTime
if returnSddsAnalyzer:
from ossie.utils.sdds import SDDSAnalyzer
return SDDSAnalyzer( rawdata, pkts, pktlen, totalRead )
else:
return data, rawdata, (pktlen,pkts,totalRead)
def get_multicast_socket(sock=None):
if not sock:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(0.001)
# set multicast interface to any local interface
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, socket.inet_aton('0.0.0.0'))
# Enable multicast, TTL should be <32 (local network)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 5)
# Allow reuse of addresses
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Allow receiving multicast broadcasts (subscribe to multicast group)
try:
mreq = struct.pack('4sL', socket.inet_aton(multicast_ip), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
# Do not loop back own messages
sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_LOOP, 0)
except OSError as e:
logger.error('Unable to obtain socket with multicast enabled.')
raise e
port = None
for i in range(30100, 30105):
try:
# Binding to 0.0.0.0 results in multiple messages if there is multiple interfaces available
# Kept as-is to avoid losing messages
sock.bind(('0.0.0.0', i))
port = i
break
except OSError as e:
# Socket already in use without SO_REUSEADDR enabled
continue
if not port:
raise RuntimeError('No IMC multicast ports free on local interface.')
return sock
def run(self):
try:
PLUGIN.responderLogger.debug("Responder.run called")
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
try:
sock.bind(('', UPNP_PORT))
sock.setsockopt(socket.IPPROTO_IP,
socket.IP_ADD_MEMBERSHIP,
socket.inet_aton(BCAST_IP) + socket.inet_aton(PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['host']))
sock.settimeout(1)
start_time = time.time()
end_time = start_time + (PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['discoveryExpiration'] * 60)
while True:
try:
data, addr = sock.recvfrom(1024)
# Following code will only time out the Broadcaster Thread if PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['discoveryExpiration'] > 0 (valid values 0 thru 10 inclusive)
# A value of zero means 'always on'
if PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['discoveryExpiration'] and time.time() > end_time:
PLUGIN.responderLogger.debug("Responder.run thread timed out")
self.stop()
raise socket.error
except socket.error:
if self.interrupted:
PLUGIN.responderLogger.debug("Responder.run: self.interrupted: True")
PLUGIN.setDeviceDiscoveryState(False, self.ahbDevId)
sock.close()
return
else:
if M_SEARCH_REQ_MATCH in data:
PLUGIN.responderLogger.debug("Responder.run: received: {}".format(str(data)))
self.respond(addr)
except socket.error as e:
# This is the exception thrown when someone else has bound to the UPNP port, so write some errors and
# stop the thread (which really isn't needed, but it logs a nice stop debug message).
if e.errno == errno.EADDRINUSE:
PLUGIN.responderLogger.error(u"'{}' Responder startup failed because another app or plugin is using the UPNP port.".format(indigo.devices[self.ahbDevId].name))
PLUGIN.responderLogger.error(u"Open a terminal window and type 'sudo lsof -i :{}}' to see a list of processes that have bound to that port and quit those applications.".format(UPNP_PORT))
self.stop()
elif e.errno == errno.EADDRNOTAVAIL:
PLUGIN.responderLogger.error(u"'{}' Responder startup failed because host address is not available.".format(indigo.devices[self.ahbDevId].name))
PLUGIN.responderLogger.error(u"Double check that the host is specified correctly in the Plugin Config. Correct if invalid and then reload the plugin.")
self.stop()
else:
PLUGIN.responderLogger.error("'{}' Responder.run: socket error: {}".format(indigo.devices[self.ahbDevId].name, e))
PLUGIN.setDeviceDiscoveryState(False, self.ahbDevId)
except StandardError, e:
PLUGIN.responderLogger.error(u"StandardError detected in Responder.Run for '{}'. Line '{}' has error='{}'".format(indigo.devices[self.ahbDevId].name, sys.exc_traceback.tb_lineno, e))
def __init__(self, bindaddress=None):
"""Creates an instance of the Zeroconf class, establishing
multicast communications, listening and reaping threads."""
globals()['_GLOBAL_DONE'] = 0
self.intf = bindaddress
self.group = ('', _MDNS_PORT)
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except Exception:
# SO_REUSEADDR should be equivalent to SO_REUSEPORT for
# multicast UDP sockets (p 731, "TCP/IP Illustrated,
# Volume 2"), but some BSD-derived systems require
# SO_REUSEPORT to be specified explicity. Also, not all
# versions of Python have SO_REUSEPORT available. So
# if you're on a BSD-based system, and haven't upgraded
# to Python 2.3 yet, you may find this library doesn't
# work as expected.
#
pass
self.socket.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_TTL, 255)
self.socket.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_LOOP, 1)
try:
self.socket.bind(self.group)
except Exception:
# Some versions of linux raise an exception even though
# the SO_REUSE* options have been set, so ignore it
#
pass
if self.intf is not None:
self.socket.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_IF, socket.inet_aton(self.intf) + socket.inet_aton('0.0.0.0'))
self.socket.setsockopt(socket.SOL_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(_MDNS_ADDR) + socket.inet_aton('0.0.0.0'))
self.listeners = []
self.browsers = []
self.services = {}
self.cache = DNSCache()
self.condition = threading.Condition()
self.engine = Engine(self)
self.listener = Listener(self)
self.reaper = Reaper(self)
def client_update (self):
update_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
#Set socket reuse, may not work on all OSs.
try:
update_sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
except:
pass
#Attempt to bind to the socket to recieve and send data. If we can;t do this, then we cannot send registration
try:
update_sock.bind(('0.0.0.0',self.client_update_port))
except:
self.__printDebug( "Error: Unable to bind to port [%s] - client will not be registered" % self.client_update_port, 0)
return
update_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 255)
status = update_sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(self._multicast_address) + socket.inet_aton('0.0.0.0'))
update_sock.setblocking(0)
self.__printDebug("Sending registration data: HELLO %s\r\n%s" % (self.client_header, self.client_data), 3)
#Send initial client registration
try:
update_sock.sendto("HELLO %s\r\n%s" % (self.client_header, self.client_data), self.client_register_group)
except:
self.__printDebug( "Error: Unable to send registeration message" , 0)
#Now, listen for client discovery reguests and respond.
while self._registration_is_running:
try:
data, addr = update_sock.recvfrom(1024)
self.__printDebug("Recieved UDP packet from [%s] containing [%s]" % (addr, data.strip()), 3)
except socket.error, e:
pass
else:
if "M-SEARCH * HTTP/1." in data:
self.__printDebug("Detected client discovery request from %s. Replying" % ( addr ,) , 2)
try:
update_sock.sendto("HTTP/1.0 200 OK\r\n%s" % self.client_data, addr)
except:
self.__printDebug( "Error: Unable to send client update message",0)
self.__printDebug("Sending registration data: HTTP/1.0 200 OK\r\n%s" % (self.client_data), 3)
self.client_registered = True
time.sleep(0.5)
self.__printDebug("Client Update loop stopped",1)
#When we are finished, then send a final goodbye message to deregister cleanly.
self.__printDebug("Sending registration data: BYE %s\r\n%s" % (self.client_header, self.client_data), 3)
try:
update_sock.sendto("BYE %s\r\n%s" % (self.client_header, self.client_data), self.client_register_group)
except:
self.__printDebug( "Error: Unable to send client update message" ,0)
self.client_registered = False
def __init__(
self,
interfaces=InterfaceChoice.All,
):
"""Creates an instance of the Zeroconf class, establishing
multicast communications, listening and reaping threads.
:type interfaces: :class:`InterfaceChoice` or sequence of ip addresses
"""
# hook for threads
self._GLOBAL_DONE = False
self._listen_socket = new_socket()
interfaces = normalize_interface_choice(interfaces, socket.AF_INET)
self._respond_sockets = []
for i in interfaces:
log.debug('Adding %r to multicast group', i)
try:
self._listen_socket.setsockopt(
socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP,
socket.inet_aton(_MDNS_ADDR) + socket.inet_aton(i))
except socket.error as e:
if get_errno(e) == errno.EADDRINUSE:
log.info(
'Address in use when adding %s to multicast group, '
'it is expected to happen on some systems', i,
)
elif get_errno(e) == errno.EADDRNOTAVAIL:
log.info(
'Address not available when adding %s to multicast '
'group, it is expected to happen on some systems', i,
)
continue
else:
raise
respond_socket = new_socket()
respond_socket.setsockopt(
socket.IPPROTO_IP, socket.IP_MULTICAST_IF, socket.inet_aton(i))
self._respond_sockets.append(respond_socket)
self.listeners = []
self.browsers = {}
self.services = {}
self.servicetypes = {}
self.cache = DNSCache()
self.condition = threading.Condition()
self.engine = Engine(self)
self.listener = Listener(self)
self.engine.add_reader(self.listener, self._listen_socket)
self.reaper = Reaper(self)
self.debug = None
def run(self):
"""Run the server."""
# Listen for UDP port 1900 packets sent to SSDP multicast address
ssdp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ssdp_socket.setblocking(False)
# Required for receiving multicast
ssdp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ssdp_socket.setsockopt(
socket.SOL_IP,
socket.IP_MULTICAST_IF,
socket.inet_aton(self.host_ip_addr))
ssdp_socket.setsockopt(
socket.SOL_IP,
socket.IP_ADD_MEMBERSHIP,
socket.inet_aton("239.255.255.250") +
socket.inet_aton(self.host_ip_addr))
ssdp_socket.bind(("239.255.255.250", 1900))
while True:
if self._interrupted:
clean_socket_close(ssdp_socket)
return
try:
read, _, _ = select.select(
[self._interrupted_read_pipe, ssdp_socket], [],
[ssdp_socket])
if self._interrupted_read_pipe in read:
# Implies self._interrupted is True
clean_socket_close(ssdp_socket)
return
elif ssdp_socket in read:
data, addr = ssdp_socket.recvfrom(1024)
else:
continue
except socket.error as ex:
if self._interrupted:
clean_socket_close(ssdp_socket)
return
_LOGGER.error("UPNP Responder socket exception occured: %s",
ex.__str__)
if "M-SEARCH" in data.decode('utf-8'):
# SSDP M-SEARCH method received, respond to it with our info
resp_socket = socket.socket(
socket.AF_INET, socket.SOCK_DGRAM)
resp_socket.sendto(self.upnp_response, addr)
resp_socket.close()