def list_identity(target_ip='255.255.255.255', target_port=44818, udp=True):
sockets = []
responses = []
delay = 1000
header = ENIPEncapsulationHeader(ENIPCommandCode.ListIdentity, 0, 0, 0, delay, 0)
data = header.export_data()
networks = networking.list_networks()
for ifc in networks:
if udp:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((ifc, 0))
s.setblocking(0)
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
else:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.sendto(data, (target_ip, target_port))
sockets.append(s)
time.sleep(1.1*delay/1000)
packets = []
for s in sockets:
try:
while True:
packets.append(s.recv(65535))
except BlockingIOError:
pass
for packet in packets:
if len(packet) < 42:
continue
rsp_header = ENIPEncapsulationHeader()
offset = rsp_header.import_data(packet)
li = ListIdentityRsp()
offset += li.import_data(packet, offset)
responses.append(li)
return responses
# this ideally will use asyncio to manage connections
评论列表
文章目录