def _save_pkcs1_der(self):
"""Saves the private key in PKCS#1 DER format.
@returns: the DER-encoded private key.
"""
from pyasn1.type import univ, namedtype
from pyasn1.codec.der import encoder
class AsnPrivKey(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('version', univ.Integer()),
namedtype.NamedType('modulus', univ.Integer()),
namedtype.NamedType('publicExponent', univ.Integer()),
namedtype.NamedType('privateExponent', univ.Integer()),
namedtype.NamedType('prime1', univ.Integer()),
namedtype.NamedType('prime2', univ.Integer()),
namedtype.NamedType('exponent1', univ.Integer()),
namedtype.NamedType('exponent2', univ.Integer()),
namedtype.NamedType('coefficient', univ.Integer()),
)
# Create the ASN object
asn_key = AsnPrivKey()
asn_key.setComponentByName('version', 0)
asn_key.setComponentByName('modulus', self.n)
asn_key.setComponentByName('publicExponent', self.e)
asn_key.setComponentByName('privateExponent', self.d)
asn_key.setComponentByName('prime1', self.p)
asn_key.setComponentByName('prime2', self.q)
asn_key.setComponentByName('exponent1', self.exp1)
asn_key.setComponentByName('exponent2', self.exp2)
asn_key.setComponentByName('coefficient', self.coef)
return encoder.encode(asn_key)
python类Integer()的实例源码
def _save_pkcs1_der(self):
"""Saves the private key in PKCS#1 DER format.
@returns: the DER-encoded private key.
"""
from pyasn1.type import univ, namedtype
from pyasn1.codec.der import encoder
class AsnPrivKey(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('version', univ.Integer()),
namedtype.NamedType('modulus', univ.Integer()),
namedtype.NamedType('publicExponent', univ.Integer()),
namedtype.NamedType('privateExponent', univ.Integer()),
namedtype.NamedType('prime1', univ.Integer()),
namedtype.NamedType('prime2', univ.Integer()),
namedtype.NamedType('exponent1', univ.Integer()),
namedtype.NamedType('exponent2', univ.Integer()),
namedtype.NamedType('coefficient', univ.Integer()),
)
# Create the ASN object
asn_key = AsnPrivKey()
asn_key.setComponentByName('version', 0)
asn_key.setComponentByName('modulus', self.n)
asn_key.setComponentByName('publicExponent', self.e)
asn_key.setComponentByName('privateExponent', self.d)
asn_key.setComponentByName('prime1', self.p)
asn_key.setComponentByName('prime2', self.q)
asn_key.setComponentByName('exponent1', self.exp1)
asn_key.setComponentByName('exponent2', self.exp2)
asn_key.setComponentByName('coefficient', self.coef)
return encoder.encode(asn_key)
def _save_pkcs1_der(self):
'''Saves the private key in PKCS#1 DER format.
@returns: the DER-encoded private key.
'''
from pyasn1.type import univ, namedtype
from pyasn1.codec.der import encoder
class AsnPrivKey(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('version', univ.Integer()),
namedtype.NamedType('modulus', univ.Integer()),
namedtype.NamedType('publicExponent', univ.Integer()),
namedtype.NamedType('privateExponent', univ.Integer()),
namedtype.NamedType('prime1', univ.Integer()),
namedtype.NamedType('prime2', univ.Integer()),
namedtype.NamedType('exponent1', univ.Integer()),
namedtype.NamedType('exponent2', univ.Integer()),
namedtype.NamedType('coefficient', univ.Integer()),
)
# Create the ASN object
asn_key = AsnPrivKey()
asn_key.setComponentByName('version', 0)
asn_key.setComponentByName('modulus', self.n)
asn_key.setComponentByName('publicExponent', self.e)
asn_key.setComponentByName('privateExponent', self.d)
asn_key.setComponentByName('prime1', self.p)
asn_key.setComponentByName('prime2', self.q)
asn_key.setComponentByName('exponent1', self.exp1)
asn_key.setComponentByName('exponent2', self.exp2)
asn_key.setComponentByName('coefficient', self.coef)
return encoder.encode(asn_key)
def value(self):
return univ.Integer(self.outlet_number)
def test_read_outlet_config_index(self):
outlet_index = self.pdu.oid_mapping[self.outlet_config_index_oid]
self.assertEqual(
univ.Integer(1),
outlet_index.value
)
def test_read_load_status_load(self):
self.assertEqual(
univ.Integer(apc_rackpdu.amp_10),
self.pdu.oid_mapping[apc_rackpdu.rPDU_load_status_load].value
)
def test_read_load_status_load_state(self):
self.assertEqual(
univ.Integer(apc_rackpdu.phase_load_normal),
self.pdu.oid_mapping[apc_rackpdu.rPDU_load_status_load_state].value
)
def test_set_unknown_oid(self):
self.assertEqual(NoSuchInstance(),
self.snmp_set(enterprises + (42,), univ.Integer(7)))
def test_harness_set(self):
mock_pdu = mock.Mock()
port = randint(20000, 30000)
harness = pysnmp_handler.SNMPPDUHarness(pdu=mock_pdu,
listen_address='127.0.0.1',
listen_port=port,
community='bleh')
harness.start()
client = snmp_client.SnmpClient(oneliner_cmdgen=cmdgen,
host='127.0.0.1',
port=port,
community='bleh',
timeout=1,
retries=1)
mock_pdu.oid_mapping = dict()
mock_pdu.oid_mapping[(1, 3, 6, 98)] = mock.Mock()
client.set((1, 3, 6, 98), univ.Integer(99))
self.assertEqual(univ.Integer(99),
mock_pdu.oid_mapping[(1, 3, 6, 98)].value)
harness.stop()
def test_harness_get_next(self):
mock_pdu = mock.Mock()
port = randint(20000, 30000)
harness = pysnmp_handler.SNMPPDUHarness(pdu=mock_pdu,
listen_address='127.0.0.1',
listen_port=port,
community='bleh')
harness.start()
client = snmp_client.SnmpClient(oneliner_cmdgen=cmdgen,
host='127.0.0.1',
port=port,
community='bleh',
timeout=1,
retries=1)
mock_pdu.oid_mapping = dict()
mock_pdu.oid_mapping[(1, 3, 6, 1, 5)] = mock.Mock()
mock_pdu.oid_mapping[(1, 3, 6, 1, 5)].value = univ.Integer(42)
oid, val = client.get_next((1, 3, 6, 1))
self.assertEqual((1, 3, 6, 1, 5), oid)
self.assertEqual(42, val)
harness.stop()
def _save_pkcs1_der(self):
"""Saves the private key in PKCS#1 DER format.
@returns: the DER-encoded private key.
"""
from pyasn1.type import univ, namedtype
from pyasn1.codec.der import encoder
class AsnPrivKey(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('version', univ.Integer()),
namedtype.NamedType('modulus', univ.Integer()),
namedtype.NamedType('publicExponent', univ.Integer()),
namedtype.NamedType('privateExponent', univ.Integer()),
namedtype.NamedType('prime1', univ.Integer()),
namedtype.NamedType('prime2', univ.Integer()),
namedtype.NamedType('exponent1', univ.Integer()),
namedtype.NamedType('exponent2', univ.Integer()),
namedtype.NamedType('coefficient', univ.Integer()),
)
# Create the ASN object
asn_key = AsnPrivKey()
asn_key.setComponentByName('version', 0)
asn_key.setComponentByName('modulus', self.n)
asn_key.setComponentByName('publicExponent', self.e)
asn_key.setComponentByName('privateExponent', self.d)
asn_key.setComponentByName('prime1', self.p)
asn_key.setComponentByName('prime2', self.q)
asn_key.setComponentByName('exponent1', self.exp1)
asn_key.setComponentByName('exponent2', self.exp2)
asn_key.setComponentByName('coefficient', self.coef)
return encoder.encode(asn_key)
def _save_pkcs1_der(self):
"""Saves the private key in PKCS#1 DER format.
@returns: the DER-encoded private key.
"""
from pyasn1.type import univ, namedtype
from pyasn1.codec.der import encoder
class AsnPrivKey(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('version', univ.Integer()),
namedtype.NamedType('modulus', univ.Integer()),
namedtype.NamedType('publicExponent', univ.Integer()),
namedtype.NamedType('privateExponent', univ.Integer()),
namedtype.NamedType('prime1', univ.Integer()),
namedtype.NamedType('prime2', univ.Integer()),
namedtype.NamedType('exponent1', univ.Integer()),
namedtype.NamedType('exponent2', univ.Integer()),
namedtype.NamedType('coefficient', univ.Integer()),
)
# Create the ASN object
asn_key = AsnPrivKey()
asn_key.setComponentByName('version', 0)
asn_key.setComponentByName('modulus', self.n)
asn_key.setComponentByName('publicExponent', self.e)
asn_key.setComponentByName('privateExponent', self.d)
asn_key.setComponentByName('prime1', self.p)
asn_key.setComponentByName('prime2', self.q)
asn_key.setComponentByName('exponent1', self.exp1)
asn_key.setComponentByName('exponent2', self.exp2)
asn_key.setComponentByName('coefficient', self.coef)
return encoder.encode(asn_key)
def decodeMessageVersion(wholeMsg):
try:
seq, wholeMsg = decoder.decode(
wholeMsg, asn1Spec=univ.Sequence(), recursiveFlag=0
)
ver, wholeMsg = decoder.decode(
wholeMsg, asn1Spec=univ.Integer(), recursiveFlag=0
)
if eoo.endOfOctets.isSameTypeWith(ver):
raise ProtocolError('EOO at SNMP version component')
return ver
except PyAsn1Error:
raise ProtocolError('Invalid BER at SNMP version component')
def _save_pkcs1_der(self):
"""Saves the private key in PKCS#1 DER format.
:returns: the DER-encoded private key.
:rtype: bytes
"""
from pyasn1.type import univ, namedtype
from pyasn1.codec.der import encoder
class AsnPrivKey(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('version', univ.Integer()),
namedtype.NamedType('modulus', univ.Integer()),
namedtype.NamedType('publicExponent', univ.Integer()),
namedtype.NamedType('privateExponent', univ.Integer()),
namedtype.NamedType('prime1', univ.Integer()),
namedtype.NamedType('prime2', univ.Integer()),
namedtype.NamedType('exponent1', univ.Integer()),
namedtype.NamedType('exponent2', univ.Integer()),
namedtype.NamedType('coefficient', univ.Integer()),
)
# Create the ASN object
asn_key = AsnPrivKey()
asn_key.setComponentByName('version', 0)
asn_key.setComponentByName('modulus', self.n)
asn_key.setComponentByName('publicExponent', self.e)
asn_key.setComponentByName('privateExponent', self.d)
asn_key.setComponentByName('prime1', self.p)
asn_key.setComponentByName('prime2', self.q)
asn_key.setComponentByName('exponent1', self.exp1)
asn_key.setComponentByName('exponent2', self.exp2)
asn_key.setComponentByName('coefficient', self.coef)
return encoder.encode(asn_key)
def set(self, *oidvalues):
assert self.alive is True
oidvalues_trans = []
for oid, value in oidvalues:
if isinstance(oid, tuple):
has_str = False
for entry in oid:
if isinstance(entry, str):
has_str = True
break
if has_str: # if oid is a tuple containing strings, assume translation using cmdgen.MibVariable.
# value must then be a Python type
assert isinstance(value, int) or isinstance(value, str) or isinstance(value, bool)
oidvalues_trans.append((cmdgen.MibVariable(*oid), value))
else:
# value must be a rfc1902/pyasn1 type
if not oid[-1] == 0:
assert isinstance(value, univ.Integer) or isinstance(value, univ.OctetString) or isinstance(value, univ.ObjectIdentifier)
oidvalues_trans.append((oid, value))
elif isinstance(oid, str): # if oid is a string, assume nodeid lookup
# value must then be a rfc1902/pyasn1 type, if oid is not a scalar
if not oid.endswith(".0"):
assert isinstance(value, univ.Integer) or isinstance(value, univ.OctetString) or isinstance(value, univ.ObjectIdentifier)
oidvalues_trans.append((nodeid(oid), value))
(error_indication, error_status, error_index, varbinds) = \
cmdgen.CommandGenerator().setCmd(self.auth, cmdgen.UdpTransportTarget((self.host, self.port), timeout=self.timeout, retries=self.retries), *oidvalues_trans) # pylint: disable=W0612
if error_indication or error_status:
self.__set_error(error_indication, error_status, error_index, varbinds)
raise SnmpError("SNMP set command on %s of oid values %r failed" % (self.host, oidvalues_trans), error_indication, error_status, error_index, varbinds)
return SnmpVarBinds(varbinds)
def __get_json(self, keytype=str):
json = {}
for key, value in list(self.get_dict().items()):
if isinstance(value, univ.OctetString):
value = str(value)
elif isinstance(value, univ.Integer):
value = int(value) # pylint:disable=R0204
elif isinstance(value, univ.ObjectIdentifier):
value = str(value)
else:
raise AssertionError("Unknown type %s encountered for oid %s" % (value.__class__.__name__, key))
json[keytype(key)] = value
return json
def _save_pkcs1_der(self):
'''Saves the private key in PKCS#1 DER format.
@returns: the DER-encoded private key.
'''
from pyasn1.type import univ, namedtype
from pyasn1.codec.der import encoder
class AsnPrivKey(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('version', univ.Integer()),
namedtype.NamedType('modulus', univ.Integer()),
namedtype.NamedType('publicExponent', univ.Integer()),
namedtype.NamedType('privateExponent', univ.Integer()),
namedtype.NamedType('prime1', univ.Integer()),
namedtype.NamedType('prime2', univ.Integer()),
namedtype.NamedType('exponent1', univ.Integer()),
namedtype.NamedType('exponent2', univ.Integer()),
namedtype.NamedType('coefficient', univ.Integer()),
)
# Create the ASN object
asn_key = AsnPrivKey()
asn_key.setComponentByName('version', 0)
asn_key.setComponentByName('modulus', self.n)
asn_key.setComponentByName('publicExponent', self.e)
asn_key.setComponentByName('privateExponent', self.d)
asn_key.setComponentByName('prime1', self.p)
asn_key.setComponentByName('prime2', self.q)
asn_key.setComponentByName('exponent1', self.exp1)
asn_key.setComponentByName('exponent2', self.exp2)
asn_key.setComponentByName('coefficient', self.coef)
return encoder.encode(asn_key)
def _save_pkcs1_der(self):
"""Saves the private key in PKCS#1 DER format.
@returns: the DER-encoded private key.
"""
from pyasn1.type import univ, namedtype
from pyasn1.codec.der import encoder
class AsnPrivKey(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('version', univ.Integer()),
namedtype.NamedType('modulus', univ.Integer()),
namedtype.NamedType('publicExponent', univ.Integer()),
namedtype.NamedType('privateExponent', univ.Integer()),
namedtype.NamedType('prime1', univ.Integer()),
namedtype.NamedType('prime2', univ.Integer()),
namedtype.NamedType('exponent1', univ.Integer()),
namedtype.NamedType('exponent2', univ.Integer()),
namedtype.NamedType('coefficient', univ.Integer()),
)
# Create the ASN object
asn_key = AsnPrivKey()
asn_key.setComponentByName('version', 0)
asn_key.setComponentByName('modulus', self.n)
asn_key.setComponentByName('publicExponent', self.e)
asn_key.setComponentByName('privateExponent', self.d)
asn_key.setComponentByName('prime1', self.p)
asn_key.setComponentByName('prime2', self.q)
asn_key.setComponentByName('exponent1', self.exp1)
asn_key.setComponentByName('exponent2', self.exp2)
asn_key.setComponentByName('coefficient', self.coef)
return encoder.encode(asn_key)
def __init__(self, config, env_switches, mib_dir):
"""Initialize SNMPCmd class
Args:
config(list[dict]): environment config
env_switches(dict): switches dictionary in format {switch_id: switch_object}
mib_dir(str): MIB module name
"""
self.switches = {}
# get community info from config file:
for conf in config:
if 'get_community' in conf:
self.get_community = conf['get_community']
self.mib_dir = mib_dir
if 'set_community' in conf:
self.set_community = conf['set_community']
# get switches ip addresses and ports
for switch_id in list(env_switches.keys()):
sw_ipaddr = env_switches[switch_id].ipaddr
if 'sshtun_port' in env_switches[switch_id].config:
sw_port = 161
if env_switches[switch_id].config['sshtun_port'] != 22:
sw_ipaddr = "10.10.{0}.{1}".format(*str(env_switches[switch_id].config['sshtun_port']).split('0'))
else:
sw_port = int(env_switches[switch_id].port) - 8080 + 4700
self.switches.update({switch_id: {'host': sw_ipaddr, 'port': sw_port}})
self.mib_builder = builder.MibBuilder()
mib_path = self.mib_builder.getMibPath() + (mib_dir, )
self.mib_builder.setMibPath(*mib_path)
# self.suite_logger.debug("mib_builder __modPathsSeen: %s" % (self.mib_builder._MibBuilder__modPathsSeen, ))
# self.suite_logger.debug("mib_builder __modSeen: %s" % (self.mib_builder._MibBuilder__modSeen, ))
self.mibViewController = view.MibViewController(self.mib_builder)
# loading SNMP types as instances
self.suite_logger.debug("Loading basic types from standard MIB modules")
self.OctetString, Integer = self.mib_builder.importSymbols('ASN1', 'OctetString', 'Integer')[0:2]
Counter32, Unsigned32, Counter64 = self.mib_builder.importSymbols('SNMPv2-SMI', 'Counter32', 'Unsigned32', 'Counter64')[0:3]
InetAddressType, self.InetAddress, InetAddressIPv4, InetAddressIPv6, InetAddressIPv4z, InetAddressIPv6z, InetAddressDNS = \
self.mib_builder.importSymbols('INET-ADDRESS-MIB', 'InetAddressType', 'InetAddress', 'InetAddressIPv4',
'InetAddressIPv6', 'InetAddressIPv4z', 'InetAddressIPv6z', 'InetAddressDNS')[0:7]
self.__integer = Integer()
self.__counter32 = Counter32()
self.__unsigned32 = Unsigned32()
self.__counter64 = Counter64()
self.__octetString = self.OctetString()
# creating InetAddress types dict with keys corresponded to InetAddressType named values
self.InetAddresses = {'ipv4': InetAddressIPv4(), 'ipv6': InetAddressIPv6(), 'ipv4z': InetAddressIPv4z(),
'ipv6z': InetAddressIPv6z(), 'dns': InetAddressDNS()}