python类SO_BROADCAST的实例源码

test_power.py 文件源码 项目:ironic-staging-drivers 作者: openstack 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_send_magic_packets(self, mock_socket):
        fake_socket = mock.Mock(spec=socket, spec_set=True)
        mock_socket.return_value = fake_socket()
        obj_utils.create_test_port(self.context,
                                   uuid=uuidutils.generate_uuid(),
                                   address='aa:bb:cc:dd:ee:ff',
                                   node_id=self.node.id)
        with task_manager.acquire(
                self.context, self.node.uuid, shared=True) as task:
            wol_power._send_magic_packets(task, '255.255.255.255', 9)

            expected_calls = [
                mock.call(),
                mock.call().setsockopt(socket.SOL_SOCKET,
                                       socket.SO_BROADCAST, 1),
                mock.call().sendto(mock.ANY, ('255.255.255.255', 9)),
                mock.call().sendto(mock.ANY, ('255.255.255.255', 9)),
                mock.call().close()]

            fake_socket.assert_has_calls(expected_calls)
            self.assertEqual(1, mock_socket.call_count)
network.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def __init__(self, port=17935, clients=[], broadcast=True):
        util.Thread.__init__(self)
        self.port = port
        self.clients = clients
        msg = '\x00'.join(["PyritServerAnnouncement",
                        '',
                        str(port)])
        md = hashlib.sha1()
        md.update(msg)
        self.msg = msg + md.digest()
        self.ucast_sckt = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        if broadcast:
            self.bcast_sckt = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            self.bcast_sckt.bind(('', 0))
            self.bcast_sckt.setsockopt(socket.SOL_SOCKET, \
                                       socket.SO_BROADCAST, 1)
        else:
            self.bcast_sckt = None
        self.setDaemon(True)
        self.start()
vloed.py 文件源码 项目:pixelvloed 作者: JanKlopper 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, queue, options):
    """Init the pixelVloed server"""
    self.debug = options.debug if options.debug else False
    self.pixeloffset = 2
    self.fps = 30
    self.screen = None
    self.udp_ip = options.ip if options.ip else UDP_IP
    self.udp_port = options.port if options.port else UDP_PORT
    self.factor = options.factor if options.factor else 1
    self.canvas()
    self.set_title()
    self.queue = queue
    self.limit = options.maxpixels if options.maxpixels else MAX_PIXELS
    self.pixels = None
    self.broadcastsocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    self.broadcastsocket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    self.broadcastsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sdlvloed.py 文件源码 项目:pixelvloed 作者: JanKlopper 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def __init__(self, queue, options):
    """Init the pixelVloed server"""
    self.debug = options.debug if options.debug else False
    self.pixeloffset = 2
    self.fps = 30
    self.screen = None
    self.udp_ip = options.ip if options.ip else UDP_IP
    self.udp_port = options.port if options.port else UDP_PORT
    self.factor = options.factor if options.factor else 1
    self.canvas()

    self.queue = queue
    self.limit = options.maxpixels if options.maxpixels else MAX_PIXELS
    self.pixels = None
    self.broadcastsocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    self.broadcastsocket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    self.broadcastsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
nmb.py 文件源码 项目:PiBunny 作者: tholum 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _setup_connection(self, dstaddr, timeout=None):
        port = randint(10000, 60000)
        af, socktype, proto, _canonname, _sa = socket.getaddrinfo(dstaddr, port, socket.AF_INET, socket.SOCK_DGRAM)[0]
        s = socket.socket(af, socktype, proto)
        has_bind = 1
        for _i in range(0, 10):
            # We try to bind to a port for 10 tries
            try:
                s.bind((INADDR_ANY, randint(10000, 60000)))
                s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
                has_bind = 1
            except socket.error:
                pass
        if not has_bind:
            raise NetBIOSError, ('Cannot bind to a good UDP port', ERRCLASS_OS, errno.EAGAIN)
        self.__sock = s
utils.py 文件源码 项目:bpy_lambda 作者: bcongdon 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def clientScan(report = None):
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        s.settimeout(30)

        s.bind(('', 8000))

        buf, address = s.recvfrom(64)

        address = address[0]
        port = int(str(buf, encoding='utf8'))

        reporting(report, "Master server found")

        return (address, port)
    except socket.timeout:
        reporting(report, "No master server on network", IOError)

        return ("", 8000) # return default values
network_udp.py 文件源码 项目:BiblioPixel2 作者: ManiacalLabs 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def _connect(self):
        try:
            self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

            if self._broadcast:
                # self._sock.bind((self._broadcast_interface, self._port))
                self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)

            return self._sock
        except socket.gaierror:
            error = "Unable to connect to or resolve host: {}".format(
                self._host)
            log.error(error)
            raise IOError(error)

    # Push new data to strand
__init__.py 文件源码 项目:SonyAPI 作者: kdschlosser 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _wake_on_lan(self):
        _LOGGER.debug('WOL ' + self.wol_mac)

        address_byte = tuple(
            int(b, 16) for b in self.wol_mac.split(':')
        )
        _LOGGER.debug(address_byte)

        hw_address = struct.pack('BBBBBB', *address_byte)
        msg = b'\xff' * 6 + hw_address * 16
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.setsockopt(
            socket.SOL_SOCKET,
            socket.SO_BROADCAST,
            1
        )

        sock.sendto(msg, ('<broadcast>', 9))
        sock.close()
        _LOGGER.debug('WOL Packet Sent')
worker.py 文件源码 项目:pyree-old 作者: DrLuke 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def __init__(self):
        self.tcpPort = 31337
        self.tcpsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.tcpsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.tcpsocket.bind(("0.0.0.0", self.tcpPort))
        self.tcpsocket.listen(10)

        self.discoverysocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.discoverysocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.discoverysocket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)

        self.glfwWorkers = {}

        self.controllerConn = None
        self.controllerAddr = None
        self.outbuf = ""    # IO buffer for controller
        self.inbuf = ""

        self.monitors = glfw.get_monitors()

        for monitor in glfw.get_monitors():
            self.glfwWorkers[bytes.decode(glfw.get_monitor_name(monitor))] = glfwWorker(monitor)
__init__.py 文件源码 项目:python-broadlink 作者: mjg59 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def __init__(self, host, mac, timeout=10):
    self.host = host
    self.mac = mac
    self.timeout = timeout
    self.count = random.randrange(0xffff)
    self.key = bytearray([0x09, 0x76, 0x28, 0x34, 0x3f, 0xe9, 0x9e, 0x23, 0x76, 0x5c, 0x15, 0x13, 0xac, 0xcf, 0x8b, 0x02])
    self.iv = bytearray([0x56, 0x2e, 0x17, 0x99, 0x6d, 0x09, 0x3d, 0x28, 0xdd, 0xb3, 0xba, 0x69, 0x5a, 0x2e, 0x6f, 0x58])
    self.id = bytearray([0, 0, 0, 0])
    self.cs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    self.cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    self.cs.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    self.cs.bind(('',0))
    self.type = "Unknown"
    self.lock = threading.Lock()

    if 'pyaes' in sys.modules:
        self.encrypt = self.encrypt_pyaes
        self.decrypt = self.decrypt_pyaes
    else:
        self.encrypt = self.encrypt_pycrypto
        self.decrypt = self.decrypt_pycrypto
test_discovery.py 文件源码 项目:networkzero 作者: tjguk 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_beacon_already_running():
    #
    # NB this one has to run without the beacon fixture
    #
    # Bind a socket on a random port before attempting
    # to start a beacon on that same port.
    #
    port = random.choice(nw0.config.DYNAMIC_PORTS)
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    s.bind(("", port))
    try:
        assert nw0.discovery._beacon is None
        nw0.discovery._start_beacon(port=port)
        assert nw0.discovery._beacon is nw0.discovery._remote_beacon
    finally:
        s.close()
        #
        # Make sure any future beacon use assumes it's not
        # already running.
        #
        nw0.discovery._stop_beacon()
_utils.py 文件源码 项目:caproto 作者: NSLS-II 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def bcast_socket(socket_module=socket):
    """
    Make a socket and configure it for UDP broadcast.

    Parameters
    ----------
    socket_module: module, optional
        Default is the built-in :mod:`socket` module, but anything with the
        same interface may be used, such as :mod:`curio.socket`.
    """
    socket = socket_module
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)

    # for BSD/Darwin only
    try:
        socket.SO_REUSEPORT
    except AttributeError:
        ...
    else:
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
    return sock
thread_modules.py 文件源码 项目:kawaii-player 作者: kanishka-linux 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def run(self):
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        port = str(ui.local_port_stream)
        print(ui.local_ip_stream, '-----ip--', ui.local_port_stream)
        msg = 'this is kawaii-player At: port={} https={} msg={}'.format(
            port, ui.https_media_server, ui.broadcast_message)
        msg = bytes(msg , 'utf-8')
        if ui.https_media_server:
            https_val = 'https'
        else:
            https_val = 'http'
        subnet_mask = ui.local_ip_stream.rsplit('.', 1)[0] + '.255'
        notify_msg = '{0}://{1}:{2} started broadcasting. Now Clients can Discover it'.format(
            https_val, ui.local_ip_stream, ui.local_port_stream)
        send_notification(notify_msg)
        print(subnet_mask)
        while ui.broadcast_server:
            s.sendto(msg, (subnet_mask,12345))
            time.sleep(1)
        send_notification('Broadcasting Stopped')
networkMngr.py 文件源码 项目:WMIControl 作者: crutchcorn 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def sendWoL(mac, broadcast=findBroadcast()):
    """Given string mac and broadcast: Turn on computer using WoL
    This was taken from http://code.activestate.com/recipes/358449-wake-on-lan/. Thanks, Fadly!
    """
    # Cleans and removes delimiters from MAC
    try:
        mac = format_mac(EUI(mac), mac_bare)
    except AddrFormatError:
        raise ValueError('Incorrect MAC address format')

    # Pad the synchronization stream.
    data = ''.join(['FFFFFFFFFFFF', mac * 20])
    send_data = b''

    # Split up the hex values and pack.
    for i in range(0, len(data), 2):
        send_data = b''.join([send_data, pack('B', int(data[i: i + 2], 16))])

    # Broadcast it to the LAN.
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    sock.sendto(send_data, (broadcast, 7))
Network.py 文件源码 项目:CAAPR 作者: Stargrazer82301 项目源码 文件源码 阅读 46 收藏 0 点赞 0 评论 0
def __init__(self, host, port, target_port):

      """
      The constructor ...
      :param host:
      :param port:
      :param target_port:
      """

      super(UDPThreadBroadcastClient, self).__init__()
      self.host = host
      self.port = port
      self.targetPort = target_port
      self.data = None
      self.sentBytes = None
      self.sentBytesLock = threading.Lock()

      self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
      self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
      self.sock.bind((host, port))

    # -----------------------------------------------------------------
Network.py 文件源码 项目:CAAPR 作者: Stargrazer82301 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def __init__(self, host, port, target_port):

      """
      The constructor ...
      :param host:
      :param port:
      :param target_port:
      """

      super(UDPThreadBroadcastClient, self).__init__()
      self.host = host
      self.port = port
      self.targetPort = target_port
      self.data = None
      self.sentBytes = None
      self.sentBytesLock = threading.Lock()

      self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
      self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
      self.sock.bind((host, port))

    # -----------------------------------------------------------------
nmb.py 文件源码 项目:CVE-2017-7494 作者: joxeankoret 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def _setup_connection(self, dstaddr, timeout=None):
        port = randint(10000, 60000)
        af, socktype, proto, _canonname, _sa = socket.getaddrinfo(dstaddr, port, socket.AF_INET, socket.SOCK_DGRAM)[0]
        s = socket.socket(af, socktype, proto)
        has_bind = 1
        for _i in range(0, 10):
            # We try to bind to a port for 10 tries
            try:
                s.bind((INADDR_ANY, randint(10000, 60000)))
                s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
                has_bind = 1
            except socket.error:
                pass
        if not has_bind:
            raise NetBIOSError, ('Cannot bind to a good UDP port', ERRCLASS_OS, errno.EAGAIN)
        self.__sock = s
network.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def __init__(self, port=17935, clients=[], broadcast=True):
        util.Thread.__init__(self)
        self.port = port
        self.clients = clients
        msg = '\x00'.join(["PyritServerAnnouncement",
                        '',
                        str(port)])
        md = hashlib.sha1()
        md.update(msg)
        self.msg = msg + md.digest()
        self.ucast_sckt = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        if broadcast:
            self.bcast_sckt = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            self.bcast_sckt.bind(('', 0))
            self.bcast_sckt.setsockopt(socket.SOL_SOCKET, \
                                       socket.SO_BROADCAST, 1)
        else:
            self.bcast_sckt = None
        self.setDaemon(True)
        self.start()
ecolab.py 文件源码 项目:EcoLab 作者: basfom 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def wakeOnLan(macaddress):
    if len(macaddress) == 12:
        pass
    elif len(macaddress) == 12 + 5:
        sep = macaddress[2]
        macaddress = macaddress.replace(sep, '')
    else:
        raise ValueError('Formato de MAC incorrecto!') #(EN) Incorrect MAC format!

    data = ''.join(['FFFFFFFFFFFF', macaddress * 20])
    send_data = ''

    for i in range(0, len(data), 2):
        send_data = ''.join([send_data, struct.pack('B', int(data[i: i + 2], 16))])
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    sock.sendto(send_data, ('<broadcast>', 7))

## SEND CMD
wolpython.py 文件源码 项目:ngas 作者: ICRAR 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def WakeOnLan(ethernet_address):

  # Construct a six-byte hardware address

  addr_byte = ethernet_address.split(':')
  hw_addr = struct.pack('BBBBBB', int(addr_byte[0], 16),
    int(addr_byte[1], 16),
    int(addr_byte[2], 16),
    int(addr_byte[3], 16),
    int(addr_byte[4], 16),
    int(addr_byte[5], 16))

  # Build the Wake-On-LAN "Magic Packet"...

  msg = '\xff' * 6 + hw_addr * 16

  # ...and send it to the broadcast address using UDP

  s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
  s.sendto(msg, ('<broadcast>', 9))
  s.close()
main.py 文件源码 项目:python_light_detector 作者: bkosciow 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def get_current_state():
    address = ('192.168.1.255', PORT)
    packet = {
        "protocol": "iot:1",
        "node": "computer",
        "event": "state",
        "targets": [
            NODE
        ]
    }

    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    msg = json.dumps(packet)
    s.sendto(msg.encode(), address)
    (data, ip) = s.recvfrom(1024)
    state = "unknown"
    try:
        msg = json.loads(data.decode())
        if msg['protocol'] == "iot:1":
            state = msg['response']
    except ValueError:
        pass

    return state
plugin.py 文件源码 项目:domoticz 作者: ericstaal 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def sendWOL(self):
    # only if WOL exists
    if not (self.mac is None):
      self.LogMessage("Send WOL to MAC: "+ self.mac, 3)
      data = b'FFFFFFFFFFFF' + (self.mac * 20).encode()
      send_data = b'' 

      # Split up the hex values and pack.
      for i in range(0, len(data), 2):
        send_data += struct.pack('B', int(data[i: i + 2], 16))

      # Broadcast it to the LAN.
      sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
      sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
      sock.sendto(send_data, ('255.255.255.255', self.wolport))

####################### Generic helper member functions for plugin #######################
utils.py 文件源码 项目:blender-addons 作者: scorpion81 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def clientScan(report = None):
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        s.settimeout(30)

        s.bind(('', 8000))

        buf, address = s.recvfrom(64)

        address = address[0]
        port = int(str(buf, encoding='utf8'))

        reporting(report, "Master server found")

        return (address, port)
    except socket.timeout:
        reporting(report, "No master server on network", IOError)

        return ("", 8000) # return default values
nmb.py 文件源码 项目:kekescan 作者: xiaoxiaoleo 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def _setup_connection(self, dstaddr):
        port = randint(10000, 60000)
        af, socktype, proto, _canonname, _sa = socket.getaddrinfo(dstaddr, port, socket.AF_INET, socket.SOCK_DGRAM)[0]
        s = socket.socket(af, socktype, proto)
        has_bind = 1
        for _i in range(0, 10):
        # We try to bind to a port for 10 tries
            try:
                s.bind(( INADDR_ANY, randint(10000, 60000) ))
                s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
                has_bind = 1
            except socket.error:
                pass
        if not has_bind:
            raise NetBIOSError, ( 'Cannot bind to a good UDP port', ERRCLASS_OS, errno.EAGAIN )
        self.__sock = s

    # Set the default NetBIOS domain nameserver.
WOL.py 文件源码 项目:Hardware-Bruteforce-Framework-2 作者: cervoise 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def WOL(macaddress):
    """ Switches on remote computers using WOL. """

    # Check macaddress format and try to compensate.
    if len(macaddress) == 12:
        pass
    elif len(macaddress) == 12 + 5:
        sep = macaddress[2]
        macaddress = macaddress.replace(sep, '')
    else:
        raise ValueError('Incorrect MAC address format')

    # Pad the synchronization stream.
    data = ''.join(['FFFFFFFFFFFF', macaddress * 20])
    send_data = '' 

    # Split up the hex values and pack.
    for i in range(0, len(data), 2):
        send_data = ''.join([send_data,
                             struct.pack('B', int(data[i: i + 2], 16))])

    # Broadcast it to the LAN.
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    sock.sendto(send_data, ('<broadcast>', 7))
client.py 文件源码 项目:decentralized-chat 作者: Phil9l 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def __init__(self, port=8888, render_message=None, get_nickname=None):
        self.render_message = render_message
        self.get_nickname = get_nickname
        self.nickname = self.get_nickname()
        self.port = port
        # connecting to server
        try:
            self.sock_to_read = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            self.sock_to_read.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.sock_to_read.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
            self.sock_to_read.bind(('0.0.0.0', port))

            self.sock_to_write = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            self.sock_to_write.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        except Exception as e:
            raise ConnectionError('Unable to connect')
bravialib.py 文件源码 项目:bravialib 作者: 8none1 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def wakeonlan(self, mac=None):
        # Thanks: Taken from https://github.com/aparraga/braviarc/blob/master/braviarc/braviarc.py
        # Not using another library for this as it's pretty small...
        if mac is None and self.mac_addr is not None:
            mac = self.mac_addr
        print "Waking MAC: " + mac
        addr_byte = mac.split(':')
        hw_addr = struct.pack('BBBBBB', int(addr_byte[0], 16),
                              int(addr_byte[1], 16),
                              int(addr_byte[2], 16),
                              int(addr_byte[3], 16),
                              int(addr_byte[4], 16),
                              int(addr_byte[5], 16))
        msg = b'\xff' * 6 + hw_addr * 16
        socket_instance = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        socket_instance.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        socket_instance.sendto(msg, ('<broadcast>', 9))
        socket_instance.close()
        return True
ap-ro.py 文件源码 项目:networking 作者: RussianOtter 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def discover(match="", timeout=2):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    s.sendto(ssdpre, (ssdpsrc["ip_address"], ssdpsrc["port"]))
    s.settimeout(timeout)
    responses = []
    print ""
    try:
        while True:
            response = s.recv(1000)
            if match in response:
                print response
                responses.append(response)
    except:
        pass
    return responses
ap-ro.py 文件源码 项目:networking 作者: RussianOtter 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def reboot(timeout=2):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    s.sendto(exptpack1, (ssdpsrc["ip_address"], ssdpsrc["port"]))
    s.settimeout(timeout)
    s.settimeout(timeout)
    trg = raw_input("\nTarget: ")
    tpg = int(input("Port: "))
    for i in range(4):
        sys.stdout.write("\rSending Reboot Payload" + "." * i)
        time.sleep(0.05)
    print ""
    s.sendto(exptpack1, (trg, tpg))
    try:
        s.connect((str(tpg), int(tpg)))
        time.sleep(0.1)
        s.send(u"`REBOOT`")
        s.close()
        time.sleep(1)
        s.connect((str(tpg), int(tpg)))
    except:
        print "UPnP Device Rebooted"
    s.close()
detect.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def send_dhcp_request_packet(
        request: DHCPDiscoverPacket, ifname: str) -> None:
    """Sends out the specified DHCP discover packet to the given interface.

    Optionally takes a `retry_call` to cancel if a fatal error occurs before
    the first packet can be sent, such as inability to get a source IP
    address.
    """
    with udp_socket() as sock:
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        mac = get_interface_mac(sock, ifname)
        request.set_mac(mac)
        bind_address = get_interface_ip(sock, ifname)
        sock.bind((bind_address, BOOTP_CLIENT_PORT))
        sock.sendto(request.packet, ('<broadcast>', BOOTP_SERVER_PORT))


# Packets will be sent at the following intervals (in seconds).
# The length of `DHCP_REQUEST_TIMING` indicates the number of packets
# that will be sent. The values should get progressively larger, to mimic
# the exponential back-off retry behavior of a real DHCP client.


问题


面经


文章

微信
公众号

扫码关注公众号