def generate_adhoc_ssl_context():
"""Generates an adhoc SSL context for the development server."""
crypto = _get_openssl_crypto_module()
import tempfile
import atexit
cert, pkey = generate_adhoc_ssl_pair()
cert_handle, cert_file = tempfile.mkstemp()
pkey_handle, pkey_file = tempfile.mkstemp()
atexit.register(os.remove, pkey_file)
atexit.register(os.remove, cert_file)
os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
os.close(cert_handle)
os.close(pkey_handle)
ctx = load_ssl_context(cert_file, pkey_file)
return ctx
python类dump_certificate()的实例源码
def make_identity_pem_string(given_key,given_cert,given_ca_cert):
'''Given objects for client key, client cert, and CA cert,
extract their contents in PEM format and combine them
into a single text string. Return that string or None.'''
common.logging_info("Generating client identity PEM.")
try:
key_pem = crypto.dump_privatekey(crypto.FILETYPE_PEM,given_key)
except crypto.Error:
common.logging_error("Could not get PEM contents of private key.")
return None
try:
cert_pem = crypto.dump_certificate(crypto.FILETYPE_PEM,given_cert)
except crypto.Error:
common.logging_error("Could not get PEM contents of client certificate.")
return None
try:
ca_pem = crypto.dump_certificate(crypto.FILETYPE_PEM,given_ca_cert)
except crypto.Error:
common.logging_error("Could not get PEM contents of CA certificate.")
return None
combined_pem = '%(key_pem)s\n%(cert_pem)s\n%(ca_pem)s' % {'key_pem':key_pem,'cert_pem':cert_pem,'ca_pem':ca_pem}
return combined_pem
def generate_adhoc_ssl_context():
"""Generates an adhoc SSL context for the development server."""
crypto = _get_openssl_crypto_module()
import tempfile
import atexit
cert, pkey = generate_adhoc_ssl_pair()
cert_handle, cert_file = tempfile.mkstemp()
pkey_handle, pkey_file = tempfile.mkstemp()
atexit.register(os.remove, pkey_file)
atexit.register(os.remove, cert_file)
os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
os.close(cert_handle)
os.close(pkey_handle)
ctx = load_ssl_context(cert_file, pkey_file)
return ctx
def generate_adhoc_ssl_context():
"""Generates an adhoc SSL context for the development server."""
crypto = _get_openssl_crypto_module()
import tempfile
import atexit
cert, pkey = generate_adhoc_ssl_pair()
cert_handle, cert_file = tempfile.mkstemp()
pkey_handle, pkey_file = tempfile.mkstemp()
atexit.register(os.remove, pkey_file)
atexit.register(os.remove, cert_file)
os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
os.close(cert_handle)
os.close(pkey_handle)
ctx = load_ssl_context(cert_file, pkey_file)
return ctx
def generate_adhoc_ssl_context():
"""Generates an adhoc SSL context for the development server."""
crypto = _get_openssl_crypto_module()
import tempfile
import atexit
cert, pkey = generate_adhoc_ssl_pair()
cert_handle, cert_file = tempfile.mkstemp()
pkey_handle, pkey_file = tempfile.mkstemp()
atexit.register(os.remove, pkey_file)
atexit.register(os.remove, cert_file)
os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
os.close(cert_handle)
os.close(pkey_handle)
ctx = load_ssl_context(cert_file, pkey_file)
return ctx
def generate_adhoc_ssl_context():
"""Generates an adhoc SSL context for the development server."""
crypto = _get_openssl_crypto_module()
import tempfile
import atexit
cert, pkey = generate_adhoc_ssl_pair()
cert_handle, cert_file = tempfile.mkstemp()
pkey_handle, pkey_file = tempfile.mkstemp()
atexit.register(os.remove, pkey_file)
atexit.register(os.remove, cert_file)
os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
os.close(cert_handle)
os.close(pkey_handle)
ctx = load_ssl_context(cert_file, pkey_file)
return ctx
def generate_adhoc_ssl_context():
"""Generates an adhoc SSL context for the development server."""
crypto = _get_openssl_crypto_module()
import tempfile
import atexit
cert, pkey = generate_adhoc_ssl_pair()
cert_handle, cert_file = tempfile.mkstemp()
pkey_handle, pkey_file = tempfile.mkstemp()
atexit.register(os.remove, pkey_file)
atexit.register(os.remove, cert_file)
os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
os.close(cert_handle)
os.close(pkey_handle)
ctx = load_ssl_context(cert_file, pkey_file)
return ctx
def createInvokeProposalForBDD(context, ccSpec, chainID, signersCert, Mspid, type):
import binascii
"Returns a deployment proposal of chaincode type"
lc_chaincode_invocation_spec = chaincode_pb2.ChaincodeInvocationSpec(chaincode_spec = ccSpec)
# Create
ccHdrExt = proposal_pb2.ChaincodeHeaderExtension(chaincode_id=ccSpec.chaincode_id)
ccProposalPayload = proposal_pb2.ChaincodeProposalPayload(input=lc_chaincode_invocation_spec.SerializeToString())
serializedIdentity = identities_pb2.SerializedIdentity(mspid=Mspid, id_bytes=crypto.dump_certificate(crypto.FILETYPE_PEM, signersCert))
nonce = bootstrap_util.BootstrapHelper.getNonce()
sigHdr = bootstrap_util.make_signature_header(serializedIdentity.SerializeToString(), nonce)
# Calculate the transaction ID
tx_id = binascii.hexlify(bootstrap_util.computeCryptoHash(nonce + serializedIdentity.SerializeToString()))
chainHdr = bootstrap_util.make_chain_header(type=common_dot_common_pb2.HeaderType.Value(type), channel_id=chainID,
txID=tx_id,
extension=ccHdrExt.SerializeToString())
header = common_dot_common_pb2.Header(channel_header=chainHdr.SerializeToString(), signature_header=sigHdr.SerializeToString())
# make proposal
proposal = proposal_pb2.Proposal(header=header.SerializeToString(), payload=ccProposalPayload.SerializeToString())
return proposal
def create_v1_id_card(self, node_admin_tuple, conn_profile):
from zipfile import ZipFile
from io import BytesIO
user_name = node_admin_tuple.user
user = self.directory.getUser(userName=user_name)
in_memory = BytesIO()
zf = ZipFile(in_memory, mode="w")
metadata = self.create_v1_metadata(user_name=node_admin_tuple.nodeName)
zf.writestr("metadata.json", json.dumps(metadata, separators=(',', ':')))
zf.writestr("connection.json", json.dumps(conn_profile, separators=(',', ':')))
cert = self.directory.findCertForNodeAdminTuple(node_admin_tuple)
zf.writestr(os.path.join("credentials","certificate"), crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
zf.writestr(os.path.join("credentials","privateKey"), crypto.dump_privatekey(crypto.FILETYPE_PEM, user.pKey))
zf.close()
in_memory.seek(0)
return in_memory.read()
def dump(self, output):
'Will dump the directory to the provided store'
import cPickle
data = {'users' : {}, 'organizations' : {}, 'nats' : {}}
dump_cert = lambda cert: crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
for userName, user in self.users.iteritems():
# for k, v in user.tags.iteritems():
# try:
# cPickle.dumps(v)
# except:
# raise Exception("Failed on key {0}".format(k))
data['users'][userName] = (user.ecdsaSigningKey.to_pem(), crypto.dump_privatekey(crypto.FILETYPE_PEM, user.rsaSigningKey), user.tags)
for orgName, org in self.organizations.iteritems():
networks = [n.name for n in org.networks]
data['organizations'][orgName] = (
org.ecdsaSigningKey.to_pem(), crypto.dump_privatekey(crypto.FILETYPE_PEM, org.rsaSigningKey),
dump_cert(org.getSelfSignedCert()), networks)
for nat, cert in self.ordererAdminTuples.iteritems():
data['nats'][nat] = dump_cert(cert)
cPickle.dump(data, output)
def create_envelope_for_msg(directory, nodeAdminTuple, chainId, msg, typeAsString):
payloadChainHeader = make_chain_header(type=common_dot_common_pb2.HeaderType.Value(typeAsString), channel_id=chainId)
# Now the SignatureHeader
org = directory.getOrganization(nodeAdminTuple.organization)
user = directory.getUser(nodeAdminTuple.user)
cert = directory.findCertForNodeAdminTuple(nodeAdminTuple)
serializedIdentity = identities_pb2.SerializedIdentity(mspid=org.name, id_bytes=crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
serializedCreatorCertChain = serializedIdentity.SerializeToString()
nonce = None
payloadSignatureHeader = common_dot_common_pb2.SignatureHeader(
creator=serializedCreatorCertChain,
nonce=BootstrapHelper.getNonce(),
)
payloadHeader = common_dot_common_pb2.Header(
channel_header=payloadChainHeader.SerializeToString(),
signature_header=payloadSignatureHeader.SerializeToString(),
)
payload = common_dot_common_pb2.Payload(header=payloadHeader, data=msg.SerializeToString())
payloadBytes = payload.SerializeToString()
envelope = common_dot_common_pb2.Envelope(payload=payloadBytes, signature=user.sign(payloadBytes))
return envelope
return configEnvelope
def test_dump_certificate(self):
"""
:py:obj:`dump_certificate` writes PEM, DER, and text.
"""
pemData = cleartextCertificatePEM + cleartextPrivateKeyPEM
cert = load_certificate(FILETYPE_PEM, pemData)
dumped_pem = dump_certificate(FILETYPE_PEM, cert)
self.assertEqual(dumped_pem, cleartextCertificatePEM)
dumped_der = dump_certificate(FILETYPE_ASN1, cert)
good_der = _runopenssl(dumped_pem, b"x509", b"-outform", b"DER")
self.assertEqual(dumped_der, good_der)
cert2 = load_certificate(FILETYPE_ASN1, dumped_der)
dumped_pem2 = dump_certificate(FILETYPE_PEM, cert2)
self.assertEqual(dumped_pem2, cleartextCertificatePEM)
dumped_text = dump_certificate(FILETYPE_TEXT, cert)
good_text = _runopenssl(dumped_pem, b"x509", b"-noout", b"-text")
self.assertEqual(dumped_text, good_text)
def generate_adhoc_ssl_context():
"""Generates an adhoc SSL context for the development server."""
crypto = _get_openssl_crypto_module()
import tempfile
import atexit
cert, pkey = generate_adhoc_ssl_pair()
cert_handle, cert_file = tempfile.mkstemp()
pkey_handle, pkey_file = tempfile.mkstemp()
atexit.register(os.remove, pkey_file)
atexit.register(os.remove, cert_file)
os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
os.close(cert_handle)
os.close(pkey_handle)
ctx = load_ssl_context(cert_file, pkey_file)
return ctx
def generate_adhoc_ssl_context():
"""Generates an adhoc SSL context for the development server."""
crypto = _get_openssl_crypto_module()
import tempfile
import atexit
cert, pkey = generate_adhoc_ssl_pair()
cert_handle, cert_file = tempfile.mkstemp()
pkey_handle, pkey_file = tempfile.mkstemp()
atexit.register(os.remove, pkey_file)
atexit.register(os.remove, cert_file)
os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
os.close(cert_handle)
os.close(pkey_handle)
ctx = load_ssl_context(cert_file, pkey_file)
return ctx
def generate_adhoc_ssl_context():
"""Generates an adhoc SSL context for the development server."""
crypto = _get_openssl_crypto_module()
import tempfile
import atexit
cert, pkey = generate_adhoc_ssl_pair()
cert_handle, cert_file = tempfile.mkstemp()
pkey_handle, pkey_file = tempfile.mkstemp()
atexit.register(os.remove, pkey_file)
atexit.register(os.remove, cert_file)
os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
os.close(cert_handle)
os.close(pkey_handle)
ctx = load_ssl_context(cert_file, pkey_file)
return ctx
def generate_adhoc_ssl_context():
"""Generates an adhoc SSL context for the development server."""
crypto = _get_openssl_crypto_module()
import tempfile
import atexit
cert, pkey = generate_adhoc_ssl_pair()
cert_handle, cert_file = tempfile.mkstemp()
pkey_handle, pkey_file = tempfile.mkstemp()
atexit.register(os.remove, pkey_file)
atexit.register(os.remove, cert_file)
os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
os.close(cert_handle)
os.close(pkey_handle)
ctx = load_ssl_context(cert_file, pkey_file)
return ctx
def generate_adhoc_ssl_context():
"""Generates an adhoc SSL context for the development server."""
crypto = _get_openssl_crypto_module()
import tempfile
import atexit
cert, pkey = generate_adhoc_ssl_pair()
cert_handle, cert_file = tempfile.mkstemp()
pkey_handle, pkey_file = tempfile.mkstemp()
atexit.register(os.remove, pkey_file)
atexit.register(os.remove, cert_file)
os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
os.close(cert_handle)
os.close(pkey_handle)
ctx = load_ssl_context(cert_file, pkey_file)
return ctx
def generate_adhoc_ssl_context():
"""Generates an adhoc SSL context for the development server."""
crypto = _get_openssl_crypto_module()
import tempfile
import atexit
cert, pkey = generate_adhoc_ssl_pair()
cert_handle, cert_file = tempfile.mkstemp()
pkey_handle, pkey_file = tempfile.mkstemp()
atexit.register(os.remove, pkey_file)
atexit.register(os.remove, cert_file)
os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
os.close(cert_handle)
os.close(pkey_handle)
ctx = load_ssl_context(cert_file, pkey_file)
return ctx
def generate_adhoc_ssl_context():
"""Generates an adhoc SSL context for the development server."""
crypto = _get_openssl_crypto_module()
import tempfile
import atexit
cert, pkey = generate_adhoc_ssl_pair()
cert_handle, cert_file = tempfile.mkstemp()
pkey_handle, pkey_file = tempfile.mkstemp()
atexit.register(os.remove, pkey_file)
atexit.register(os.remove, cert_file)
os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
os.close(cert_handle)
os.close(pkey_handle)
ctx = load_ssl_context(cert_file, pkey_file)
return ctx
def generate_adhoc_ssl_context():
"""Generates an adhoc SSL context for the development server."""
crypto = _get_openssl_crypto_module()
import tempfile
import atexit
cert, pkey = generate_adhoc_ssl_pair()
cert_handle, cert_file = tempfile.mkstemp()
pkey_handle, pkey_file = tempfile.mkstemp()
atexit.register(os.remove, pkey_file)
atexit.register(os.remove, cert_file)
os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
os.close(cert_handle)
os.close(pkey_handle)
ctx = load_ssl_context(cert_file, pkey_file)
return ctx