def _create_sa(self, src_selector, dst_selector, src_port, dst_port, spi, ip_proto,
ipsec_proto, mode, src, dst, enc_algorith, sk_e, auth_algorithm, sk_a):
usersa = XfrmUserSaInfo(
sel=XfrmSelector(family=socket.AF_INET,
daddr=XfrmAddress.from_ipaddr(dst_selector[0]),
saddr=XfrmAddress.from_ipaddr(src_selector[0]),
dport=dst_port,
sport=src_port,
dport_mask=0 if dst_port == 0 else 0xFFFF,
sport_mask=0 if src_port == 0 else 0xFFFF,
prefixlen_d=dst_selector.prefixlen,
prefixlen_s=src_selector.prefixlen,
proto=ip_proto),
id=XfrmId(daddr=XfrmAddress.from_ipaddr(dst),
proto=(socket.IPPROTO_ESP
if ipsec_proto == Proposal.Protocol.ESP else socket.IPPROTO_AH),
spi=create_byte_array(spi)),
family=socket.AF_INET,
saddr=XfrmAddress.from_ipaddr(src),
mode=mode,
lft=XfrmLifetimeCfg.infinite(),
)
attributes = {}
if ipsec_proto == Proposal.Protocol.ESP:
attributes[XFRMA_ALG_CRYPT] = XfrmAlgo.build(alg_name=self._cipher_names[enc_algorith],
key=sk_e)
attributes[XFRMA_ALG_AUTH] = XfrmAlgo.build(alg_name=self._auth_names[auth_algorithm],
key=sk_a)
self.send_recv(XFRM_MSG_NEWSA, (NLM_F_REQUEST | NLM_F_ACK), usersa, attributes)
python类IPPROTO_AH的实例源码
def _create_policy(self, src_selector, dst_selector, src_port, dst_port, ip_proto, direction,
ipsec_proto, mode, src, dst, index=0):
policy = XfrmUserPolicyInfo(
sel=XfrmSelector(family=socket.AF_INET,
daddr=XfrmAddress.from_ipaddr(dst_selector[0]),
saddr=XfrmAddress.from_ipaddr(src_selector[0]),
dport=dst_port,
sport=src_port,
dport_mask=0 if dst_port == 0 else 0xFFFF,
sport_mask=0 if src_port == 0 else 0xFFFF,
prefixlen_d=dst_selector.prefixlen,
prefixlen_s=src_selector.prefixlen,
proto=ip_proto),
dir=direction,
index=index,
action=XFRM_POLICY_ALLOW,
lft=XfrmLifetimeCfg.infinite(),
)
template = XfrmUserTmpl(
id=XfrmId(daddr=XfrmAddress.from_ipaddr(dst),
proto=(socket.IPPROTO_ESP
if ipsec_proto == Proposal.Protocol.ESP else socket.IPPROTO_AH)),
family=socket.AF_INET,
saddr=XfrmAddress.from_ipaddr(src),
aalgos=0xFFFFFFFF,
ealgos=0xFFFFFFFF,
calgos=0xFFFFFFFF,
mode=mode)
self.send_recv(XFRM_MSG_NEWPOLICY, (NLM_F_REQUEST | NLM_F_ACK), policy,
{XFRMA_TMPL: template})
def delete_sa(self, daddr, proto, spi):
xfrm_id = XfrmUserSaId(
daddr=XfrmAddress.from_ipaddr(daddr),
family=socket.AF_INET,
proto=socket.IPPROTO_ESP if proto == Proposal.Protocol.ESP else socket.IPPROTO_AH,
spi=create_byte_array(spi))
try:
self.send_recv(XFRM_MSG_DELSA, (NLM_F_REQUEST | NLM_F_ACK), xfrm_id)
except NetlinkError as ex:
logging.error('Could not delete IPsec SA with SPI: {}. {}'.format(hexstring(spi), ex))
def _encrypt_ah(self, pkt, seq_num=None):
ah = AH(spi=self.spi, seq=seq_num or self.seq_num,
icv=chr(0) * self.auth_algo.icv_size)
if self.tunnel_header:
tunnel = self.tunnel_header.copy()
if tunnel.version == 4:
del tunnel.proto
del tunnel.len
del tunnel.chksum
else:
del tunnel.nh
del tunnel.plen
pkt = tunnel.__class__(str(tunnel / pkt))
ip_header, nh, payload = split_for_transport(pkt, socket.IPPROTO_AH)
ah.nh = nh
if ip_header.version == 6 and len(ah) % 8 != 0:
# For IPv6, the total length of the header must be a multiple of
# 8-octet units.
ah.padding = chr(0) * (-len(ah) % 8)
elif len(ah) % 4 != 0:
# For IPv4, the total length of the header must be a multiple of
# 4-octet units.
ah.padding = chr(0) * (-len(ah) % 4)
# RFC 4302 - Section 2.2. Payload Length
# This 8-bit field specifies the length of AH in 32-bit words (4-byte
# units), minus "2".
ah.payloadlen = len(ah) / 4 - 2
if ip_header.version == 4:
ip_header.len = len(ip_header) + len(ah) + len(payload)
del ip_header.chksum
ip_header = ip_header.__class__(str(ip_header))
else:
ip_header.plen = len(ip_header.payload) + len(ah) + len(payload)
signed_pkt = self.auth_algo.sign(ip_header / ah / payload, self.auth_key)
# sequence number must always change, unless specified by the user
if seq_num is None:
self.seq_num += 1
return signed_pkt
def _encrypt_ah(self, pkt, seq_num=None):
ah = AH(spi=self.spi, seq=seq_num or self.seq_num,
icv=chr(0) * self.auth_algo.icv_size)
if self.tunnel_header:
tunnel = self.tunnel_header.copy()
if tunnel.version == 4:
del tunnel.proto
del tunnel.len
del tunnel.chksum
else:
del tunnel.nh
del tunnel.plen
pkt = tunnel.__class__(bytes(tunnel / pkt))
ip_header, nh, payload = split_for_transport(pkt, socket.IPPROTO_AH)
ah.nh = nh
if ip_header.version == 6 and len(ah) % 8 != 0:
# For IPv6, the total length of the header must be a multiple of
# 8-octet units.
ah.padding = chr(0) * (-len(ah) % 8)
elif len(ah) % 4 != 0:
# For IPv4, the total length of the header must be a multiple of
# 4-octet units.
ah.padding = chr(0) * (-len(ah) % 4)
# RFC 4302 - Section 2.2. Payload Length
# This 8-bit field specifies the length of AH in 32-bit words (4-byte
# units), minus "2".
ah.payloadlen = len(ah) // 4 - 2
if ip_header.version == 4:
ip_header.len = len(ip_header) + len(ah) + len(payload)
del ip_header.chksum
ip_header = ip_header.__class__(bytes(ip_header))
else:
ip_header.plen = len(ip_header.payload) + len(ah) + len(payload)
signed_pkt = self.auth_algo.sign(ip_header / ah / payload, self.auth_key)
# sequence number must always change, unless specified by the user
if seq_num is None:
self.seq_num += 1
return signed_pkt
def _encrypt_ah(self, pkt, seq_num=None):
ah = AH(spi=self.spi, seq=seq_num or self.seq_num,
icv=chr(0) * self.auth_algo.icv_size)
if self.tunnel_header:
tunnel = self.tunnel_header.copy()
if tunnel.version == 4:
del tunnel.proto
del tunnel.len
del tunnel.chksum
else:
del tunnel.nh
del tunnel.plen
pkt = tunnel.__class__(str(tunnel / pkt))
ip_header, nh, payload = split_for_transport(pkt, socket.IPPROTO_AH)
ah.nh = nh
if ip_header.version == 6 and len(ah) % 8 != 0:
# For IPv6, the total length of the header must be a multiple of
# 8-octet units.
ah.padding = chr(0) * (-len(ah) % 8)
elif len(ah) % 4 != 0:
# For IPv4, the total length of the header must be a multiple of
# 4-octet units.
ah.padding = chr(0) * (-len(ah) % 4)
# RFC 4302 - Section 2.2. Payload Length
# This 8-bit field specifies the length of AH in 32-bit words (4-byte
# units), minus "2".
ah.payloadlen = len(ah) / 4 - 2
if ip_header.version == 4:
ip_header.len = len(ip_header) + len(ah) + len(payload)
del ip_header.chksum
ip_header = ip_header.__class__(str(ip_header))
else:
ip_header.plen = len(ip_header.payload) + len(ah) + len(payload)
signed_pkt = self.auth_algo.sign(ip_header / ah / payload, self.auth_key)
# sequence number must always change, unless specified by the user
if seq_num is None:
self.seq_num += 1
return signed_pkt
def _encrypt_ah(self, pkt, seq_num=None):
ah = AH(spi=self.spi, seq=seq_num or self.seq_num,
icv=chr(0) * self.auth_algo.icv_size)
if self.tunnel_header:
tunnel = self.tunnel_header.copy()
if tunnel.version == 4:
del tunnel.proto
del tunnel.len
del tunnel.chksum
else:
del tunnel.nh
del tunnel.plen
pkt = tunnel.__class__(bytes(tunnel / pkt))
ip_header, nh, payload = split_for_transport(pkt, socket.IPPROTO_AH)
ah.nh = nh
if ip_header.version == 6 and len(ah) % 8 != 0:
# For IPv6, the total length of the header must be a multiple of
# 8-octet units.
ah.padding = chr(0) * (-len(ah) % 8)
elif len(ah) % 4 != 0:
# For IPv4, the total length of the header must be a multiple of
# 4-octet units.
ah.padding = chr(0) * (-len(ah) % 4)
# RFC 4302 - Section 2.2. Payload Length
# This 8-bit field specifies the length of AH in 32-bit words (4-byte
# units), minus "2".
ah.payloadlen = len(ah) // 4 - 2
if ip_header.version == 4:
ip_header.len = len(ip_header) + len(ah) + len(payload)
del ip_header.chksum
ip_header = ip_header.__class__(bytes(ip_header))
else:
ip_header.plen = len(ip_header.payload) + len(ah) + len(payload)
signed_pkt = self.auth_algo.sign(ip_header / ah / payload, self.auth_key)
# sequence number must always change, unless specified by the user
if seq_num is None:
self.seq_num += 1
return signed_pkt
def _encrypt_ah(self, pkt, seq_num=None):
ah = AH(spi=self.spi, seq=seq_num or self.seq_num,
icv=chr(0) * self.auth_algo.icv_size)
if self.tunnel_header:
tunnel = self.tunnel_header.copy()
if tunnel.version == 4:
del tunnel.proto
del tunnel.len
del tunnel.chksum
else:
del tunnel.nh
del tunnel.plen
pkt = tunnel.__class__(str(tunnel / pkt))
ip_header, nh, payload = split_for_transport(pkt, socket.IPPROTO_AH)
ah.nh = nh
if ip_header.version == 6 and len(ah) % 8 != 0:
# For IPv6, the total length of the header must be a multiple of
# 8-octet units.
ah.padding = chr(0) * (-len(ah) % 8)
elif len(ah) % 4 != 0:
# For IPv4, the total length of the header must be a multiple of
# 4-octet units.
ah.padding = chr(0) * (-len(ah) % 4)
# RFC 4302 - Section 2.2. Payload Length
# This 8-bit field specifies the length of AH in 32-bit words (4-byte
# units), minus "2".
ah.payloadlen = len(ah) / 4 - 2
if ip_header.version == 4:
ip_header.len = len(ip_header) + len(ah) + len(payload)
del ip_header.chksum
ip_header = ip_header.__class__(str(ip_header))
else:
ip_header.plen = len(ip_header.payload) + len(ah) + len(payload)
signed_pkt = self.auth_algo.sign(ip_header / ah / payload, self.auth_key)
# sequence number must always change, unless specified by the user
if seq_num is None:
self.seq_num += 1
return signed_pkt
def _encrypt_ah(self, pkt, seq_num=None):
ah = AH(spi=self.spi, seq=seq_num or self.seq_num,
icv=chr(0) * self.auth_algo.icv_size)
if self.tunnel_header:
tunnel = self.tunnel_header.copy()
if tunnel.version == 4:
del tunnel.proto
del tunnel.len
del tunnel.chksum
else:
del tunnel.nh
del tunnel.plen
pkt = tunnel.__class__(str(tunnel / pkt))
ip_header, nh, payload = split_for_transport(pkt, socket.IPPROTO_AH)
ah.nh = nh
if ip_header.version == 6 and len(ah) % 8 != 0:
# For IPv6, the total length of the header must be a multiple of
# 8-octet units.
ah.padding = chr(0) * (-len(ah) % 8)
elif len(ah) % 4 != 0:
# For IPv4, the total length of the header must be a multiple of
# 4-octet units.
ah.padding = chr(0) * (-len(ah) % 4)
# RFC 4302 - Section 2.2. Payload Length
# This 8-bit field specifies the length of AH in 32-bit words (4-byte
# units), minus "2".
ah.payloadlen = len(ah) / 4 - 2
if ip_header.version == 4:
ip_header.len = len(ip_header) + len(ah) + len(payload)
del ip_header.chksum
ip_header = ip_header.__class__(str(ip_header))
else:
ip_header.plen = len(ip_header.payload) + len(ah) + len(payload)
signed_pkt = self.auth_algo.sign(ip_header / ah / payload, self.auth_key)
# sequence number must always change, unless specified by the user
if seq_num is None:
self.seq_num += 1
return signed_pkt
def _encrypt_ah(self, pkt, seq_num=None):
ah = AH(spi=self.spi, seq=seq_num or self.seq_num,
icv=chr(0) * self.auth_algo.icv_size)
if self.tunnel_header:
tunnel = self.tunnel_header.copy()
if tunnel.version == 4:
del tunnel.proto
del tunnel.len
del tunnel.chksum
else:
del tunnel.nh
del tunnel.plen
pkt = tunnel.__class__(str(tunnel / pkt))
ip_header, nh, payload = split_for_transport(pkt, socket.IPPROTO_AH)
ah.nh = nh
if ip_header.version == 6 and len(ah) % 8 != 0:
# For IPv6, the total length of the header must be a multiple of
# 8-octet units.
ah.padding = chr(0) * (-len(ah) % 8)
elif len(ah) % 4 != 0:
# For IPv4, the total length of the header must be a multiple of
# 4-octet units.
ah.padding = chr(0) * (-len(ah) % 4)
# RFC 4302 - Section 2.2. Payload Length
# This 8-bit field specifies the length of AH in 32-bit words (4-byte
# units), minus "2".
ah.payloadlen = len(ah) / 4 - 2
if ip_header.version == 4:
ip_header.len = len(ip_header) + len(ah) + len(payload)
del ip_header.chksum
ip_header = ip_header.__class__(str(ip_header))
else:
ip_header.plen = len(ip_header.payload) + len(ah) + len(payload)
signed_pkt = self.auth_algo.sign(ip_header / ah / payload, self.auth_key)
# sequence number must always change, unless specified by the user
if seq_num is None:
self.seq_num += 1
return signed_pkt
def _encrypt_ah(self, pkt, seq_num=None):
ah = AH(spi=self.spi, seq=seq_num or self.seq_num,
icv=chr(0) * self.auth_algo.icv_size)
if self.tunnel_header:
tunnel = self.tunnel_header.copy()
if tunnel.version == 4:
del tunnel.proto
del tunnel.len
del tunnel.chksum
else:
del tunnel.nh
del tunnel.plen
pkt = tunnel.__class__(str(tunnel / pkt))
ip_header, nh, payload = split_for_transport(pkt, socket.IPPROTO_AH)
ah.nh = nh
if ip_header.version == 6 and len(ah) % 8 != 0:
# For IPv6, the total length of the header must be a multiple of
# 8-octet units.
ah.padding = chr(0) * (-len(ah) % 8)
elif len(ah) % 4 != 0:
# For IPv4, the total length of the header must be a multiple of
# 4-octet units.
ah.padding = chr(0) * (-len(ah) % 4)
# RFC 4302 - Section 2.2. Payload Length
# This 8-bit field specifies the length of AH in 32-bit words (4-byte
# units), minus "2".
ah.payloadlen = len(ah) / 4 - 2
if ip_header.version == 4:
ip_header.len = len(ip_header) + len(ah) + len(payload)
del ip_header.chksum
ip_header = ip_header.__class__(str(ip_header))
else:
ip_header.plen = len(ip_header.payload) + len(ah) + len(payload)
signed_pkt = self.auth_algo.sign(ip_header / ah / payload, self.auth_key)
# sequence number must always change, unless specified by the user
if seq_num is None:
self.seq_num += 1
return signed_pkt
def _encrypt_ah(self, pkt, seq_num=None):
ah = AH(spi=self.spi, seq=seq_num or self.seq_num,
icv=chr(0) * self.auth_algo.icv_size)
if self.tunnel_header:
tunnel = self.tunnel_header.copy()
if tunnel.version == 4:
del tunnel.proto
del tunnel.len
del tunnel.chksum
else:
del tunnel.nh
del tunnel.plen
pkt = tunnel.__class__(bytes(tunnel / pkt))
ip_header, nh, payload = split_for_transport(pkt, socket.IPPROTO_AH)
ah.nh = nh
if ip_header.version == 6 and len(ah) % 8 != 0:
# For IPv6, the total length of the header must be a multiple of
# 8-octet units.
ah.padding = chr(0) * (-len(ah) % 8)
elif len(ah) % 4 != 0:
# For IPv4, the total length of the header must be a multiple of
# 4-octet units.
ah.padding = chr(0) * (-len(ah) % 4)
# RFC 4302 - Section 2.2. Payload Length
# This 8-bit field specifies the length of AH in 32-bit words (4-byte
# units), minus "2".
ah.payloadlen = len(ah) // 4 - 2
if ip_header.version == 4:
ip_header.len = len(ip_header) + len(ah) + len(payload)
del ip_header.chksum
ip_header = ip_header.__class__(bytes(ip_header))
else:
ip_header.plen = len(ip_header.payload) + len(ah) + len(payload)
signed_pkt = self.auth_algo.sign(ip_header / ah / payload, self.auth_key)
# sequence number must always change, unless specified by the user
if seq_num is None:
self.seq_num += 1
return signed_pkt