def sign(self, url, endpoint, endpoint_path, method_verb, *args, **kwargs):
try:
req = kwargs['params']
except KeyError:
req = {}
if self.version == 'v1':
req['request'] = endpoint_path
req['nonce'] = self.nonce()
js = json.dumps(req)
data = base64.standard_b64encode(js.encode('utf8'))
else:
data = '/api/' + endpoint_path + self.nonce() + json.dumps(req)
h = hmac.new(self.secret.encode('utf8'), data, hashlib.sha384)
signature = h.hexdigest()
headers = {"X-BFX-APIKEY": self.key,
"X-BFX-SIGNATURE": signature,
"X-BFX-PAYLOAD": data}
if self.version == 'v2':
headers['content-type'] = 'application/json'
return url, {'headers': headers}
python类standard_b64encode()的实例源码
def sign(self, uri, endpoint, endpoint_path, method_verb, *args, **kwargs):
nonce = self.nonce()
try:
params = kwargs['params']
except KeyError:
params = {}
payload = params
payload['nonce'] = nonce
payload['request'] = endpoint_path
js = json.dumps(payload)
data = base64.standard_b64encode(js.encode('utf8'))
h = hmac.new(self.secret.encode('utf8'), data, hashlib.sha384)
signature = h.hexdigest()
headers = {'X-GEMINI-APIKEY': self.key,
'X-GEMINI-PAYLOAD': data,
'X-GEMINI-SIGNATURE': signature}
return uri, {'headers': headers}
def _from_inst(inst, rostype):
# Special case for uint8[], we base64 encode the string
if rostype in ros_binary_types:
return standard_b64encode(inst)
# Check for time or duration
if rostype in ros_time_types:
return {"secs": inst.secs, "nsecs": inst.nsecs}
# Check for primitive types
if rostype in ros_primitive_types:
return inst
# Check if it's a list or tuple
if type(inst) in list_types:
return _from_list_inst(inst, rostype)
# Assume it's otherwise a full ros msg object
return _from_object_inst(inst, rostype)
lightstep_binary_propagator.py 文件源码
项目:lightstep-tracer-python
作者: lightstep
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def inject(self, span_context, carrier):
if type(carrier) is not bytearray:
raise InvalidCarrierException()
state = BinaryCarrier()
basic_ctx = state.basic_ctx
basic_ctx.trace_id = span_context.trace_id
basic_ctx.span_id = span_context.span_id
basic_ctx.sampled = span_context.sampled
if span_context.baggage is not None:
for key in span_context.baggage:
basic_ctx.baggage_items[key] = span_context.baggage[key]
serializedProto = state.SerializeToString()
encoded = standard_b64encode(serializedProto)
carrier.extend(encoded)
def save_pem(contents, pem_marker):
"""Saves a PEM file.
:param contents: the contents to encode in PEM format
:param pem_marker: the marker of the PEM content, such as 'RSA PRIVATE KEY'
when your file has '-----BEGIN RSA PRIVATE KEY-----' and
'-----END RSA PRIVATE KEY-----' markers.
:return: the base64-encoded content between the start and end markers.
"""
(pem_start, pem_end) = _markers(pem_marker)
b64 = base64.standard_b64encode(contents).replace(b('\n'), b(''))
pem_lines = [pem_start]
for block_start in range(0, len(b64), 64):
block = b64[block_start:block_start + 64]
pem_lines.append(block)
pem_lines.append(pem_end)
pem_lines.append(b(''))
return b('\n').join(pem_lines)
def save_pem(contents, pem_marker):
"""Saves a PEM file.
:param contents: the contents to encode in PEM format
:param pem_marker: the marker of the PEM content, such as 'RSA PRIVATE KEY'
when your file has '-----BEGIN RSA PRIVATE KEY-----' and
'-----END RSA PRIVATE KEY-----' markers.
:return: the base64-encoded content between the start and end markers.
"""
(pem_start, pem_end) = _markers(pem_marker)
b64 = base64.standard_b64encode(contents).replace(b('\n'), b(''))
pem_lines = [pem_start]
for block_start in range(0, len(b64), 64):
block = b64[block_start:block_start + 64]
pem_lines.append(block)
pem_lines.append(pem_end)
pem_lines.append(b(''))
return b('\n').join(pem_lines)
def save_pem(contents, pem_marker):
"""Saves a PEM file.
:param contents: the contents to encode in PEM format
:param pem_marker: the marker of the PEM content, such as 'RSA PRIVATE KEY'
when your file has '-----BEGIN RSA PRIVATE KEY-----' and
'-----END RSA PRIVATE KEY-----' markers.
:return: the base64-encoded content between the start and end markers.
"""
(pem_start, pem_end) = _markers(pem_marker)
b64 = base64.standard_b64encode(contents).replace(b('\n'), b(''))
pem_lines = [pem_start]
for block_start in range(0, len(b64), 64):
block = b64[block_start:block_start + 64]
pem_lines.append(block)
pem_lines.append(pem_end)
pem_lines.append(b(''))
return b('\n').join(pem_lines)
def save_pem(contents, pem_marker):
"""Saves a PEM file.
:param contents: the contents to encode in PEM format
:param pem_marker: the marker of the PEM content, such as 'RSA PRIVATE KEY'
when your file has '-----BEGIN RSA PRIVATE KEY-----' and
'-----END RSA PRIVATE KEY-----' markers.
:return: the base64-encoded content between the start and end markers.
"""
(pem_start, pem_end) = _markers(pem_marker)
b64 = base64.standard_b64encode(contents).replace(b('\n'), b(''))
pem_lines = [pem_start]
for block_start in range(0, len(b64), 64):
block = b64[block_start:block_start + 64]
pem_lines.append(block)
pem_lines.append(pem_end)
pem_lines.append(b(''))
return b('\n').join(pem_lines)
def save_pem(contents, pem_marker):
"""Saves a PEM file.
:param contents: the contents to encode in PEM format
:param pem_marker: the marker of the PEM content, such as 'RSA PRIVATE KEY'
when your file has '-----BEGIN RSA PRIVATE KEY-----' and
'-----END RSA PRIVATE KEY-----' markers.
:return: the base64-encoded content between the start and end markers.
"""
(pem_start, pem_end) = _markers(pem_marker)
b64 = base64.standard_b64encode(contents).replace(b('\n'), b(''))
pem_lines = [pem_start]
for block_start in range(0, len(b64), 64):
block = b64[block_start:block_start + 64]
pem_lines.append(block)
pem_lines.append(pem_end)
pem_lines.append(b(''))
return b('\n').join(pem_lines)
def _sign_payload(self, method, path, params=None, payload=None):
route = build_route(path, params)
nonce = gen_nonce()
if payload:
j = json.dumps(payload).encode('utf-8')
encoded_body = base64.standard_b64encode(j).decode('utf-8')
string = method + ' ' + route + ' ' + encoded_body + ' ' + nonce
else:
string = method + ' ' + route + ' ' + nonce
h = hmac.new(key=self.SECRET.encode('utf-8'),
msg=string.encode('utf-8'),
digestmod=hashlib.sha384)
signature = h.hexdigest()
return {
'X-SBTC-APIKEY': self.KEY,
'X-SBTC-NONCE': nonce,
'X-SBTC-SIGNATURE': signature,
'Content-Type': 'application/json',
}
def __encrypt(self, plaintext):
"""
Encrypt the given Plaintext with the encryption key
:param plaintext:
:return: Serialized JSON String of the encryption Envelope
"""
esn = self.kodi_helper.get_esn()
iv = get_random_bytes(16)
encryption_envelope = {
'ciphertext': '',
'keyid': esn + '_' + str(self.sequence_number),
'sha256': 'AA==',
'iv': base64.standard_b64encode(iv)
}
# Padd the plaintext
plaintext = Padding.pad(plaintext, 16)
# Encrypt the text
cipher = AES.new(self.encryption_key, AES.MODE_CBC, iv)
citext = cipher.encrypt(plaintext)
encryption_envelope['ciphertext'] = base64.standard_b64encode(citext)
return json.dumps(encryption_envelope)
def __save_msl_data(self):
"""
Saves the keys and tokens in json file
:return:
"""
data = {
"encryption_key": base64.standard_b64encode(self.encryption_key),
'sign_key': base64.standard_b64encode(self.sign_key),
'tokens': {
'mastertoken': self.mastertoken
}
}
serialized_data = json.JSONEncoder().encode(data)
self.save_file(
msl_data_path=self.kodi_helper.msl_data_path,
filename='msl_data.json',
content=serialized_data)
status_config.py 文件源码
项目:Desert-Fireball-Maintainence-GUI
作者: CPedersen3245
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def GET(self):
"""
Serves the dfnstation.cfg file to the user to read.
Returns:
result (str): A Base64 encoded string containing the contents of dfnstation.cfg.
Raises:
web.notfound
"""
if LoginChecker.loggedIn():
path = constants.dfnconfigPath
if os.path.exists(path):
getFile = file(path, 'rb')
web.header('Content-type', 'application/octet-stream')
web.header('Content-transfer-encoding', 'base64')
return base64.standard_b64encode(getFile.read())
else:
raise web.notfound()
def write_vm_file(self, path, content, uri="qemu:///system"):
FILE_OPEN_WRITE="""{"execute":"guest-file-open", "arguments":{"path":"%s","mode":"w+"}}"""
FILE_WRITE="""{"execute":"guest-file-write", "arguments":{"handle":%s,"buf-b64":"%s"}}"""
FILE_CLOSE="""{"execute":"guest-file-close", "arguments":{"handle":%s}}"""
FILE_FLUSH="""{"execute":"guest-file-flush", "arguments":{"handle":%s}}"""
file_handle=-1
enc_content = base64.standard_b64encode(content)
try:
file_handle=self.EXE(FILE_OPEN_WRITE % path)["return"]
write_count=self.EXE(FILE_WRITE % (file_handle,enc_content))["return"]["count"]
logger.debug("content:\n%s\npath:\n%s"%(content, path))
except Exception,ex:
print Exception,":",ex
return -1
finally:
self.EXE(FILE_FLUSH % file_handle)
self.EXE(FILE_CLOSE % file_handle)
return write_count
def authenticate_with_apikey(self, api_key, scope=None):
"""perform authentication by api key and store result for execute_request method
api_key -- secret api key from account settings
scope -- optional scope of authentication request. If None full list of API scopes will be used.
"""
scope = "auto" if scope is None else scope
data = {
"grant_type": "client_credentials",
"scope": scope
}
encoded_data = urllib.parse.urlencode(data).encode()
request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST")
request.add_header("ContentType", "application/x-www-form-urlencoded")
request.add_header("Authorization", 'Basic ' + base64.standard_b64encode(('APIKEY:' + api_key).encode()).decode())
response = urllib.request.urlopen(request)
self._token = WaApiClient._parse_response(response)
self._token.retrieved_at = datetime.datetime.now()
def authenticate_with_contact_credentials(self, username, password, scope=None):
"""perform authentication by contact credentials and store result for execute_request method
username -- typically a contact email
password -- contact password
scope -- optional scope of authentication request. If None full list of API scopes will be used.
"""
scope = "auto" if scope is None else scope
data = {
"grant_type": "password",
"username": username,
"password": password,
"scope": scope
}
encoded_data = urllib.parse.urlencode(data).encode()
request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST")
request.add_header("ContentType", "application/x-www-form-urlencoded")
auth_header = base64.standard_b64encode((self.client_id + ':' + self.client_secret).encode()).decode()
request.add_header("Authorization", 'Basic ' + auth_header)
response = urllib.request.urlopen(request)
self._token = WaApiClient._parse_response(response)
self._token.retrieved_at = datetime.datetime.now()
def get(self):
# Extract access code
access_code = extract_code_from_args(self.request.arguments)
# Request access token from github
access_token = self.request_access_token(access_code)
github_headers = {"Accept": "application/json",
"Authorization": "token " + access_token}
response = request_session.get("https://api.github.com/gists",
headers=github_headers)
response_to_send = bytearray(response.text, 'utf-8')
self.write("<script>var gists = '")
self.write(base64.standard_b64encode(response_to_send))
self.write("';")
self.write("window.opener.postMessage(gists, window.opener.location)")
self.finish(";</script>")
def _sign_payload(self, method, path, params=None, payload=None):
route = build_route(path, params)
nonce = gen_nonce()
if payload:
j = json.dumps(payload).encode('utf-8')
encoded_body = base64.standard_b64encode(j).decode('utf-8')
string = method + ' ' + route + ' ' + encoded_body + ' ' + nonce
else:
string = method + ' ' + route + ' ' + nonce
h = hmac.new(key=self.SECRET.encode('utf-8'),
msg=string.encode('utf-8'),
digestmod=hashlib.sha384)
signature = h.hexdigest()
return {
'X-SBTC-APIKEY': self.KEY,
'X-SBTC-NONCE': nonce,
'X-SBTC-SIGNATURE': signature,
'Content-Type': 'application/json',
}
def save_pem(contents, pem_marker):
"""Saves a PEM file.
:param contents: the contents to encode in PEM format
:param pem_marker: the marker of the PEM content, such as 'RSA PRIVATE KEY'
when your file has '-----BEGIN RSA PRIVATE KEY-----' and
'-----END RSA PRIVATE KEY-----' markers.
:return: the base64-encoded content between the start and end markers.
"""
(pem_start, pem_end) = _markers(pem_marker)
b64 = base64.standard_b64encode(contents).replace(b('\n'), b(''))
pem_lines = [pem_start]
for block_start in range(0, len(b64), 64):
block = b64[block_start:block_start + 64]
pem_lines.append(block)
pem_lines.append(pem_end)
pem_lines.append(b(''))
return b('\n').join(pem_lines)
def save_pem(contents, pem_marker):
"""Saves a PEM file.
:param contents: the contents to encode in PEM format
:param pem_marker: the marker of the PEM content, such as 'RSA PRIVATE KEY'
when your file has '-----BEGIN RSA PRIVATE KEY-----' and
'-----END RSA PRIVATE KEY-----' markers.
:return: the base64-encoded content between the start and end markers.
"""
(pem_start, pem_end) = _markers(pem_marker)
b64 = base64.standard_b64encode(contents).replace(b('\n'), b(''))
pem_lines = [pem_start]
for block_start in range(0, len(b64), 64):
block = b64[block_start:block_start + 64]
pem_lines.append(block)
pem_lines.append(pem_end)
pem_lines.append(b(''))
return b('\n').join(pem_lines)
def save_pem(contents, pem_marker):
"""Saves a PEM file.
:param contents: the contents to encode in PEM format
:param pem_marker: the marker of the PEM content, such as 'RSA PRIVATE KEY'
when your file has '-----BEGIN RSA PRIVATE KEY-----' and
'-----END RSA PRIVATE KEY-----' markers.
:return: the base64-encoded content between the start and end markers.
"""
(pem_start, pem_end) = _markers(pem_marker)
b64 = base64.standard_b64encode(contents).replace(b('\n'), b(''))
pem_lines = [pem_start]
for block_start in range(0, len(b64), 64):
block = b64[block_start:block_start + 64]
pem_lines.append(block)
pem_lines.append(pem_end)
pem_lines.append(b(''))
return b('\n').join(pem_lines)
def save_pem(contents, pem_marker):
"""Saves a PEM file.
:param contents: the contents to encode in PEM format
:param pem_marker: the marker of the PEM content, such as 'RSA PRIVATE KEY'
when your file has '-----BEGIN RSA PRIVATE KEY-----' and
'-----END RSA PRIVATE KEY-----' markers.
:return: the base64-encoded content between the start and end markers.
"""
(pem_start, pem_end) = _markers(pem_marker)
b64 = base64.standard_b64encode(contents).replace(b('\n'), b(''))
pem_lines = [pem_start]
for block_start in range(0, len(b64), 64):
block = b64[block_start:block_start + 64]
pem_lines.append(block)
pem_lines.append(pem_end)
pem_lines.append(b(''))
return b('\n').join(pem_lines)
def get_digest_and_size_for_file(file_name):
"""
Gets file digest and size
:param file_name: a file name
:return:
"""
CHUNK_SIZE = 16 * 4 * 1024
f = open(file_name, 'rb')
m = SHA256.new()
while True:
chunk = f.read(CHUNK_SIZE)
if chunk == b'':
break
m.update(chunk)
statinfo = os.stat(file_name)
file_size = statinfo.st_size
digest = base64.standard_b64encode(m.digest()).decode(UTF8)
logger = getLogger(__name__)
logger.debug(u'getting digest and size: %s, %s, file=%s', digest,
file_size, file_name)
return digest, file_size
def save_pem(contents, pem_marker):
"""Saves a PEM file.
:param contents: the contents to encode in PEM format
:param pem_marker: the marker of the PEM content, such as 'RSA PRIVATE KEY'
when your file has '-----BEGIN RSA PRIVATE KEY-----' and
'-----END RSA PRIVATE KEY-----' markers.
:return: the base64-encoded content between the start and end markers.
"""
(pem_start, pem_end) = _markers(pem_marker)
b64 = base64.standard_b64encode(contents).replace(b('\n'), b(''))
pem_lines = [pem_start]
for block_start in range(0, len(b64), 64):
block = b64[block_start:block_start + 64]
pem_lines.append(block)
pem_lines.append(pem_end)
pem_lines.append(b(''))
return b('\n').join(pem_lines)
def save_pem(contents, pem_marker):
"""Saves a PEM file.
:param contents: the contents to encode in PEM format
:param pem_marker: the marker of the PEM content, such as 'RSA PRIVATE KEY'
when your file has '-----BEGIN RSA PRIVATE KEY-----' and
'-----END RSA PRIVATE KEY-----' markers.
:return: the base64-encoded content between the start and end markers.
"""
(pem_start, pem_end) = _markers(pem_marker)
b64 = base64.standard_b64encode(contents).replace(b('\n'), b(''))
pem_lines = [pem_start]
for block_start in range(0, len(b64), 64):
block = b64[block_start:block_start + 64]
pem_lines.append(block)
pem_lines.append(pem_end)
pem_lines.append(b(''))
return b('\n').join(pem_lines)
def _generate_packages_dict(self, package_files):
package_json = json.loads(package_files[_package_json_filename], object_pairs_hook=collections.OrderedDict)
package_json['releaseVersion'] = 0
package_json['packagingVersion'] = '{}.0'.format(self._cosmos_packaging_version)
command_json = package_files.get(_command_json_filename)
if command_json is not None:
package_json['command'] = json.loads(command_json, object_pairs_hook=collections.OrderedDict)
config_json = package_files.get(_config_json_filename)
if config_json is not None:
package_json['config'] = json.loads(config_json, object_pairs_hook=collections.OrderedDict)
marathon_json = package_files.get(_marathon_json_filename)
if marathon_json is not None:
package_json['marathon'] = {
'v2AppMustacheTemplate': base64.standard_b64encode(bytearray(marathon_json, 'utf-8')).decode()}
resource_json = package_files.get(_resource_json_filename)
if resource_json is not None:
package_json['resource'] = json.loads(resource_json, object_pairs_hook=collections.OrderedDict)
return {'packages': [package_json]}
def save_pem(contents, pem_marker):
"""Saves a PEM file.
:param contents: the contents to encode in PEM format
:param pem_marker: the marker of the PEM content, such as 'RSA PRIVATE KEY'
when your file has '-----BEGIN RSA PRIVATE KEY-----' and
'-----END RSA PRIVATE KEY-----' markers.
:return: the base64-encoded content between the start and end markers, as bytes.
"""
(pem_start, pem_end) = _markers(pem_marker)
b64 = base64.standard_b64encode(contents).replace(b'\n', b'')
pem_lines = [pem_start]
for block_start in range(0, len(b64), 64):
block = b64[block_start:block_start + 64]
pem_lines.append(block)
pem_lines.append(pem_end)
pem_lines.append(b'')
return b'\n'.join(pem_lines)
def process_challenge(self, challenge_parameters):
user_id_for_srp = challenge_parameters['USER_ID_FOR_SRP']
salt_hex = challenge_parameters['SALT']
srp_b_hex = challenge_parameters['SRP_B']
secret_block_b64 = challenge_parameters['SECRET_BLOCK']
# re strips leading zero from a day number (required by AWS Cognito)
timestamp = re.sub(r" 0(\d) ", r" \1 ",
datetime.datetime.utcnow().strftime("%a %b %d %H:%M:%S UTC %Y"))
hkdf = self.get_password_authentication_key(user_id_for_srp,
self.password, hex_to_long(srp_b_hex), salt_hex)
secret_block_bytes = base64.standard_b64decode(secret_block_b64)
msg = bytearray(self.pool_id.split('_')[1], 'utf-8') + bytearray(user_id_for_srp, 'utf-8') + \
bytearray(secret_block_bytes) + bytearray(timestamp, 'utf-8')
hmac_obj = hmac.new(hkdf, msg, digestmod=hashlib.sha256)
signature_string = base64.standard_b64encode(hmac_obj.digest())
response = {'TIMESTAMP': timestamp,
'USERNAME': user_id_for_srp,
'PASSWORD_CLAIM_SECRET_BLOCK': secret_block_b64,
'PASSWORD_CLAIM_SIGNATURE': signature_string.decode('utf-8')}
if self.client_secret is not None:
response.update({
"SECRET_HASH":
self.get_secret_hash(self.username, self.client_id, self.client_secret)})
return response
def save_pem(contents, pem_marker):
"""Saves a PEM file.
:param contents: the contents to encode in PEM format
:param pem_marker: the marker of the PEM content, such as 'RSA PRIVATE KEY'
when your file has '-----BEGIN RSA PRIVATE KEY-----' and
'-----END RSA PRIVATE KEY-----' markers.
:return: the base64-encoded content between the start and end markers.
"""
(pem_start, pem_end) = _markers(pem_marker)
b64 = base64.standard_b64encode(contents).replace(b('\n'), b(''))
pem_lines = [pem_start]
for block_start in range(0, len(b64), 64):
block = b64[block_start:block_start + 64]
pem_lines.append(block)
pem_lines.append(pem_end)
pem_lines.append(b(''))
return b('\n').join(pem_lines)
def get_md5(data):
m2 = hashlib.md5(data)
MD5 = base64.standard_b64encode(m2.digest())
return MD5