def __call__(self, topic=None):
if topic is None:
sys.stdout._write('<span class=help>%s</span>' % repr(self))
return
import pydoc
pydoc.help(topic)
rv = sys.stdout.reset()
if isinstance(rv, bytes):
rv = rv.decode('utf-8', 'ignore')
paragraphs = _paragraph_re.split(rv)
if len(paragraphs) > 1:
title = paragraphs[0]
text = '\n\n'.join(paragraphs[1:])
else: # pragma: no cover
title = 'Help'
text = paragraphs[0]
sys.stdout._write(HELP_HTML % {'title': title, 'text': text})
python类decode()的实例源码
def __init__(self, stream_factory=None, charset='utf-8', errors='replace',
max_form_memory_size=None, cls=None, buffer_size=64 * 1024):
self.stream_factory = stream_factory
self.charset = charset
self.errors = errors
self.max_form_memory_size = max_form_memory_size
if stream_factory is None:
stream_factory = default_stream_factory
if cls is None:
cls = MultiDict
self.cls = cls
# make sure the buffer size is divisible by four so that we can base64
# decode chunk by chunk
assert buffer_size % 4 == 0, 'buffer size has to be divisible by 4'
# also the buffer size has to be at least 1024 bytes long or long headers
# will freak out the system
assert buffer_size >= 1024, 'buffer size has to be at least 1KB'
self.buffer_size = buffer_size
def http_basic_auth(func):
@wraps(func)
def _decorator(request, *args, **kwargs):
from django.contrib.auth import authenticate, login
if "HTTP_AUTHORIZATION" in request.META:
authmeth, auth = request.META["HTTP_AUTHORIZATION"].split(b" ", 1)
if authmeth.lower() == b"basic":
auth = codecs.decode(auth.strip(), "base64")
username, password = auth.split(b":", 1)
user = authenticate(username=username, password=password)
if user and is_authorized(user):
login(request, user)
else:
raise PermissionDenied()
return func(request, *args, **kwargs)
return _decorator
def __call__(self, topic=None):
if topic is None:
sys.stdout._write('<span class=help>%s</span>' % repr(self))
return
import pydoc
pydoc.help(topic)
rv = sys.stdout.reset()
if isinstance(rv, bytes):
rv = rv.decode('utf-8', 'ignore')
paragraphs = _paragraph_re.split(rv)
if len(paragraphs) > 1:
title = paragraphs[0]
text = '\n\n'.join(paragraphs[1:])
else: # pragma: no cover
title = 'Help'
text = paragraphs[0]
sys.stdout._write(HELP_HTML % {'title': title, 'text': text})
def __init__(self, stream_factory=None, charset='utf-8', errors='replace',
max_form_memory_size=None, cls=None, buffer_size=64 * 1024):
self.stream_factory = stream_factory
self.charset = charset
self.errors = errors
self.max_form_memory_size = max_form_memory_size
if stream_factory is None:
stream_factory = default_stream_factory
if cls is None:
cls = MultiDict
self.cls = cls
# make sure the buffer size is divisible by four so that we can base64
# decode chunk by chunk
assert buffer_size % 4 == 0, 'buffer size has to be divisible by 4'
# also the buffer size has to be at least 1024 bytes long or long headers
# will freak out the system
assert buffer_size >= 1024, 'buffer size has to be at least 1KB'
self.buffer_size = buffer_size
def test_resume_archive_from_file(self, mock_resume_file_upload):
part_size = 4
mock_list_parts = Mock()
mock_list_parts.return_value = {
'PartSizeInBytes': part_size,
'Parts': [{
'RangeInBytes': '0-3',
'SHA256TreeHash': '12',
}, {
'RangeInBytes': '4-6',
'SHA256TreeHash': '34',
}],
}
self.vault.list_all_parts = mock_list_parts
self.vault.resume_archive_from_file(
sentinel.upload_id, file_obj=sentinel.file_obj)
mock_resume_file_upload.assert_called_once_with(
self.vault, sentinel.upload_id, part_size, sentinel.file_obj,
{0: codecs.decode('12', 'hex_codec'), 1: codecs.decode('34', 'hex_codec')})
def _load_channel(self, deposit_txid):
# Check if a payment has been made to this chanel
if not os.path.exists(deposit_txid + ".server.tx"):
raise PaymentChannelNotFoundError("Payment channel not found.")
# Load JSON channel
with open(deposit_txid + ".server.tx") as f:
channel_json = json.load(f)
# Deserialize into objects
channel = {}
channel['deposit_tx'] = bitcoin.Transaction.from_hex(channel_json['deposit_tx'])
channel['redeem_script'] = PaymentChannelRedeemScript.from_bytes(
codecs.decode(channel_json['redeem_script'], 'hex_codec'))
channel['payment_tx'] = bitcoin.Transaction.from_hex(
channel_json['payment_tx']) if channel_json['payment_tx'] else None
return channel
def open(self, deposit_tx, redeem_script):
# Deserialize deposit tx and redeem script
deposit_tx = bitcoin.Transaction.from_hex(deposit_tx)
try:
redeem_script = PaymentChannelRedeemScript.from_bytes(codecs.decode(redeem_script, 'hex_codec'))
except ValueError:
raise AssertionError("Invalid payment channel redeem script.")
# Validate deposit tx
assert len(deposit_tx.outputs) > 1, "Invalid deposit tx outputs."
output_index = deposit_tx.output_index_for_address(redeem_script.hash160())
assert output_index is not None, "Missing deposit tx P2SH output."
assert deposit_tx.outputs[output_index].script.is_p2sh(), "Invalid deposit tx output P2SH script."
assert deposit_tx.outputs[output_index].script.get_hash160() == redeem_script.hash160(), "Invalid deposit tx output script P2SH address." # nopep8
# Store channel
deposit_txid = str(deposit_tx.hash)
self._store_channel(
deposit_txid, {'deposit_tx': deposit_tx, 'redeem_script': redeem_script, 'payment_tx': None})
def open(self, deposit_tx, redeem_script):
# Deserialize deposit tx and redeem script
deposit_tx = bitcoin.Transaction.from_hex(deposit_tx)
deposit_txid = str(deposit_tx.hash)
redeem_script = statemachine.PaymentChannelRedeemScript.from_bytes(codecs.decode(redeem_script, 'hex_codec'))
# Validate redeem_script
assert redeem_script.merchant_public_key.compressed_bytes == self.PRIVATE_KEY.public_key.compressed_bytes
# Validate deposit tx
assert len(deposit_tx.outputs) == 1, "Invalid deposit tx outputs."
output_index = deposit_tx.output_index_for_address(redeem_script.hash160())
assert output_index is not None, "Missing deposit tx P2SH output."
assert deposit_tx.outputs[output_index].script.is_p2sh(), "Invalid deposit tx output P2SH script."
assert deposit_tx.outputs[output_index].script.get_hash160() == redeem_script.hash160(), "Invalid deposit tx output script P2SH address." # nopep8
self.channels[deposit_txid] = {'deposit_tx': deposit_tx, 'redeem_script': redeem_script, 'payment_tx': None}
def __call__(self, topic=None):
if topic is None:
sys.stdout._write('<span class=help>%s</span>' % repr(self))
return
import pydoc
pydoc.help(topic)
rv = sys.stdout.reset()
if isinstance(rv, bytes):
rv = rv.decode('utf-8', 'ignore')
paragraphs = _paragraph_re.split(rv)
if len(paragraphs) > 1:
title = paragraphs[0]
text = '\n\n'.join(paragraphs[1:])
else: # pragma: no cover
title = 'Help'
text = paragraphs[0]
sys.stdout._write(HELP_HTML % {'title': title, 'text': text})
def __init__(self, stream_factory=None, charset='utf-8', errors='replace',
max_form_memory_size=None, cls=None, buffer_size=64 * 1024):
self.charset = charset
self.errors = errors
self.max_form_memory_size = max_form_memory_size
self.stream_factory = default_stream_factory if stream_factory is None else stream_factory
self.cls = MultiDict if cls is None else cls
# make sure the buffer size is divisible by four so that we can base64
# decode chunk by chunk
assert buffer_size % 4 == 0, 'buffer size has to be divisible by 4'
# also the buffer size has to be at least 1024 bytes long or long headers
# will freak out the system
assert buffer_size >= 1024, 'buffer size has to be at least 1KB'
self.buffer_size = buffer_size
def __call__(self, topic=None):
if topic is None:
sys.stdout._write('<span class=help>%s</span>' % repr(self))
return
import pydoc
pydoc.help(topic)
rv = sys.stdout.reset()
if isinstance(rv, bytes):
rv = rv.decode('utf-8', 'ignore')
paragraphs = _paragraph_re.split(rv)
if len(paragraphs) > 1:
title = paragraphs[0]
text = '\n\n'.join(paragraphs[1:])
else: # pragma: no cover
title = 'Help'
text = paragraphs[0]
sys.stdout._write(HELP_HTML % {'title': title, 'text': text})
def __init__(self, stream_factory=None, charset='utf-8', errors='replace',
max_form_memory_size=None, cls=None, buffer_size=64 * 1024):
self.charset = charset
self.errors = errors
self.max_form_memory_size = max_form_memory_size
self.stream_factory = default_stream_factory if stream_factory is None else stream_factory
self.cls = MultiDict if cls is None else cls
# make sure the buffer size is divisible by four so that we can base64
# decode chunk by chunk
assert buffer_size % 4 == 0, 'buffer size has to be divisible by 4'
# also the buffer size has to be at least 1024 bytes long or long headers
# will freak out the system
assert buffer_size >= 1024, 'buffer size has to be at least 1KB'
self.buffer_size = buffer_size
def bytes_to_hex_str(arg_bytes):
return codecs.encode(arg_bytes,'hex').decode('ascii')
def hex_str_to_bytes(arg_str):
return codecs.decode(arg_str.encode('ascii'),'hex')
def parse_42_guid(guid):
guid_parts = guid.split('-')
guid_int = codecs.decode("".join(guid_parts)[:32], "hex")
return struct.unpack('>IIQ', guid_int)
def unistr(text, errors='strict'):
# type: (Union[str, bytes], str) -> str
if isinstance(text, text_type):
return text
try:
return text.decode('utf-8', errors=errors)
except UnicodeDecodeError:
if HAS_FSCODEC:
return os.fsdecode(text)
raise
def unifilename(filename):
# type: (Union[str, bytes]) -> str
if isinstance(filename, text_type):
return filename
try:
return filename.decode(sys.getfilesystemencoding())
except UnicodeDecodeError:
try:
return filename.decode('utf-8')
except UnicodeDecodeError:
if HAS_FSCODEC:
return os.fsdecode(filename)
else:
raise
def use_startstyle(self, inlinestyletext):
# type: (Optional[str]) -> None
if inlinestyletext:
# Transform YAML-like JSON
# e.g. '{based_on_style: pep8, column_limit: 79}'
# into '{"based_on_style": "pep8", "column_limit": 79}'
# which can be parsed as JSON.
inlinestyletext = re.sub(r'([a-zA-Z_]\w+)', r'"\1"', inlinestyletext)
d = json.JSONDecoder().decode(inlinestyletext) # type: ignore
self.initial_style = style_make(d)
def surrdecode(s, enc='utf-8'):
# type: (bytes, str) -> str
return s.decode(enc, 'replace')