def decode(self, data):
self.data = base64.decodebytes(data)
python类decodebytes()的实例源码
def getparser(use_datetime=False, use_builtin_types=False):
"""getparser() -> parser, unmarshaller
Create an instance of the fastest available parser, and attach it
to an unmarshalling object. Return both objects.
"""
if FastParser and FastUnmarshaller:
if use_builtin_types:
mkdatetime = _datetime_type
mkbytes = base64.decodebytes
elif use_datetime:
mkdatetime = _datetime_type
mkbytes = _binary
else:
mkdatetime = _datetime
mkbytes = _binary
target = FastUnmarshaller(True, False, mkbytes, mkdatetime, Fault)
parser = FastParser(target)
else:
target = Unmarshaller(use_datetime=use_datetime, use_builtin_types=use_builtin_types)
if FastParser:
parser = FastParser(target)
else:
parser = ExpatParser(target)
return parser, target
##
# Convert a Python tuple or a Fault instance to an XML-RPC packet.
#
# @def dumps(params, **options)
# @param params A tuple or Fault instance.
# @keyparam methodname If given, create a methodCall request for
# this method name.
# @keyparam methodresponse If given, create a methodResponse packet.
# If used with a tuple, the tuple must be a singleton (that is,
# it must contain exactly one element).
# @keyparam encoding The packet encoding.
# @return A string containing marshalled data.
def from_jsonb64(msg: str):
return base64.decodebytes(msg.encode())
# TODO: monkeypatch is ugly but I'm in a hurry...
def set_payout_address(client, wallet):
""" Set a new address from the HD wallet for payouts.
Note that is set server-side on a per-account basis. Thus, in the
case where a single user has different wallets on different
machines, all mining proceeds on all machines are sent to this
address.
Args:
client (TwentyOneRestClient): rest client used for communication with the backend api
wallet (two1.wallet.Wallet): a user's wallet instance
Returns:
bytes: extra nonce 1 which is required for computing the coinbase transaction
int: the size in bytes of the extra nonce 2
int: reward amount given upon successful solution found
"""
payout_address = wallet.current_address
auth_resp = client.account_payout_address_post(payout_address)
user_info = json.loads(auth_resp.text)
enonce1_base64 = user_info["enonce1"]
enonce1 = base64.decodebytes(enonce1_base64.encode())
enonce2_size = user_info["enonce2_size"]
reward = user_info["reward"]
return enonce1, enonce2_size, reward
def get_work(client):
""" Get work from the pool using the rest client.
Args:
client (TwentyOneRestClient): rest client used for communication with the backend api
Returns:
WorkNotification: a Swirl work notification message
"""
try:
response = client.get_work()
except exceptions.ServerRequestError as e:
profile_url = "{}/{}".format(two1.TWO1_WWW_HOST, client.username)
profile_cta = uxstring.UxString.mining_profile_call_to_action.format(profile_url)
err_string = None
if e.status_code == 403 and e.data.get("error") == "TO201":
err_string = uxstring.UxString.Error.suspended_account
elif e.status_code == 403 and e.data.get("error") == "TO501":
err_string = uxstring.UxString.monthly_mining_limit_reached
elif e.status_code == 403 and e.data.get("error") == "TO502":
err_string = uxstring.UxString.lifetime_earn_limit_reached
elif e.status_code == 403 and e.data.get("error") == "TO503":
err_string = uxstring.UxString.no_earn_allocations.format(two1.TWO1_WWW_HOST,
client.username)
elif e.status_code == 404:
if bitcoin_computer.has_mining_chip():
err_string = uxstring.UxString.monthly_mining_limit_reached
else:
err_string = uxstring.UxString.earn_limit_reached
if err_string:
raise exceptions.MiningDisabledError("{}\n\n{}".format(err_string, profile_cta))
else:
raise e
msg_factory = message_factory.SwirlMessageFactory()
msg = base64.decodebytes(response.content)
work = msg_factory.read_object(msg)
return work
def verify(self, message, signature):
# assume we're dealing with a base64 encoded signature
signature = b64decode(signature.encode())
message = self.ensure_bytes(message)
# ensure we have a public key we can use use to verify
if not self._public_key:
raise Exception("Key object does not have public key defined, and thus cannot be used to verify.")
return self._verify(message, signature)
def base64_decode(input, errors='strict'):
assert errors == 'strict'
return (base64.decodebytes(input), len(input))
def decode(self, input, final=False):
assert self.errors == 'strict'
return base64.decodebytes(input)
def setUp(self):
if not os.path.isdir(LOCALEDIR):
os.makedirs(LOCALEDIR)
with open(MOFILE, 'wb') as fp:
fp.write(base64.decodebytes(GNU_MO_DATA))
with open(UMOFILE, 'wb') as fp:
fp.write(base64.decodebytes(UMO_DATA))
with open(MMOFILE, 'wb') as fp:
fp.write(base64.decodebytes(MMO_DATA))
self.env = support.EnvironmentVarGuard()
self.env['LANGUAGE'] = 'xx'
gettext._translations.clear()
def test_encoding(self):
payload = self._au.get_payload()
self.assertEqual(base64.decodebytes(bytes(payload, 'ascii')),
self._audiodata)
def open_data(self, url, data=None):
"""Use "data" URL."""
if not isinstance(url, str):
raise URLError('data error', 'proxy support for data protocol currently not implemented')
# ignore POSTed data
#
# syntax of data URLs:
# dataurl := "data:" [ mediatype ] [ ";base64" ] "," data
# mediatype := [ type "/" subtype ] *( ";" parameter )
# data := *urlchar
# parameter := attribute "=" value
try:
[type, data] = url.split(',', 1)
except ValueError:
raise IOError('data error', 'bad data URL')
if not type:
type = 'text/plain;charset=US-ASCII'
semi = type.rfind(';')
if semi >= 0 and '=' not in type[semi:]:
encoding = type[semi+1:]
type = type[:semi]
else:
encoding = ''
msg = []
msg.append('Date: %s'%time.strftime('%a, %d %b %Y %H:%M:%S GMT',
time.gmtime(time.time())))
msg.append('Content-type: %s' % type)
if encoding == 'base64':
# XXX is this encoding/decoding ok?
data = base64.decodebytes(data.encode('ascii')).decode('latin1')
else:
data = unquote(data)
msg.append('Content-Length: %d' % len(data))
msg.append('')
msg.append(data)
msg = '\n'.join(msg)
headers = email.message_from_string(msg)
f = io.StringIO(msg)
#f.fileno = None # needed for addinfourl
return addinfourl(f, headers, url)
def decode(self, data):
self.data = base64.decodebytes(data)
def construct_yaml_binary(self, node):
try:
value = self.construct_scalar(node).encode('ascii')
except UnicodeEncodeError as exc:
raise ConstructorError(None, None,
"failed to convert base64 data into ascii: %s" % exc,
node.start_mark)
try:
if hasattr(base64, 'decodebytes'):
return base64.decodebytes(value)
else:
return base64.decodestring(value)
except binascii.Error as exc:
raise ConstructorError(None, None,
"failed to decode base64 data: %s" % exc, node.start_mark)
def construct_python_bytes(self, node):
try:
value = self.construct_scalar(node).encode('ascii')
except UnicodeEncodeError as exc:
raise ConstructorError(None, None,
"failed to convert base64 data into ascii: %s" % exc,
node.start_mark)
try:
if hasattr(base64, 'decodebytes'):
return base64.decodebytes(value)
else:
return base64.decodestring(value)
except binascii.Error as exc:
raise ConstructorError(None, None,
"failed to decode base64 data: %s" % exc, node.start_mark)
def base64_decode(input, errors='strict'):
assert errors == 'strict'
return (base64.decodebytes(input), len(input))
def decode(self, input, final=False):
assert self.errors == 'strict'
return base64.decodebytes(input)