def auth(func):
@wraps(func)
def _decorator(request, *args, **kwargs):
auth = get_auth()
if auth.get('disable', False) is True:
return func(request, *args, **kwargs)
if 'authorized_ips' in auth:
ip = get_client_ip(request)
if is_authorized(ip, auth['authorized_ips']):
return func(request, *args, **kwargs)
prepare_credentials(auth)
if request.META.get('HTTP_AUTHORIZATION'):
authmeth, auth = request.META['HTTP_AUTHORIZATION'].split(' ')
if authmeth.lower() == 'basic':
auth = base64.b64decode(auth).decode('utf-8')
username, password = auth.split(':')
if (username == HEARTBEAT['auth']['username'] and
password == HEARTBEAT['auth']['password']):
return func(request, *args, **kwargs)
response = HttpResponse(
"Authentication failed", status=401)
response['WWW-Authenticate'] = 'Basic realm="Welcome to 1337"'
return response
return _decorator
python类b64decode()的实例源码
def get_encryption():
return '''
import base64
from Crypto import Random
from Crypto.Cipher import AES
abbrev = '{2}'
{0} = base64.b64decode('{1}')
def encrypt(raw):
iv = Random.new().read( AES.block_size )
cipher = AES.new({0}, AES.MODE_CFB, iv )
return (base64.b64encode( iv + cipher.encrypt( raw ) ) )
def decrypt(enc):
enc = base64.b64decode(enc)
iv = enc[:16]
cipher = AES.new({0}, AES.MODE_CFB, iv )
return cipher.decrypt( enc[16:] )
'''.format(st_obf[0],aes_encoded,aes_abbrev)
################################################################################
# st_protocol.py stitch_gen variables #
################################################################################
def parse_cookie(cookie, securekey):
logger.info (">> parse cookie : %s" % cookie)
parts = cookie.split('.')
part1 = parts[0]
part2 = '' if len(parts) < 2 else parts[1]
try:
text = str(base64.b64decode(part1.encode('ascii')), encoding='utf-8')
except:
logger.info ("decode cookie failed")
return None
logger.info ("cookie content : %s" % text)
thatpart2 = hashlib.md5((text+securekey).encode('ascii')).hexdigest()
logger.info ("hash from part1 : %s" % thatpart2)
logger.info ("hash from part2 : %s" % part2)
if part2 == thatpart2:
result = json.loads(text)['name']
else:
result = None
logger.info ("parse from cookie : %s" % result)
return result
def handleDeviceCall(self, deviceId, payload):
deviceIdHex = binascii.hexlify(deviceId)
self.logger.debug("Handling request from device {0} with payload {1}".format(deviceIdHex, payload))
payloadDict = json.loads(payload)
session = self.service.sessions[deviceId]
model = DeviceModel().loadFromSession(session, self.deviceConfig, payloadDict)
if "image" in payloadDict:
imageType = payloadDict.get("type", "jpg")
imageData = base64.b64decode(payloadDict["image"])
fn = "{0}.{1}".format(datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S'), imageType)
imageFile = os.path.join(self.getDeviceFolder(deviceIdHex, "images"), fn)
self.logger.info("Received {0} bytes of image data from device {1} - saving to {2}".format(len(imageData), deviceIdHex, imageFile))
with open(imageFile, 'wb+') as f:
f.write(imageData)
model.newImageUrl = "/upload/{0}/images/{1}".format(deviceIdHex, fn)
if "values" in payloadDict:
for variable, value in payloadDict["values"].items():
self.database.save(deviceIdHex, variable, session.protocol, session.clientAddr[0], session.lastUpdateTime, value)
with session.lock:
model.saveTrends(self.trends)
model.computeTrends(self.trends)
self.webServer.websocketSend(model.toJSON())
def choose_aws_role(assertion):
""" Choose AWS role from SAML assertion """
aws_attribute_role = 'https://aws.amazon.com/SAML/Attributes/Role'
attribute_value_urn = '{urn:oasis:names:tc:SAML:2.0:assertion}AttributeValue'
roles = []
role_tuple = namedtuple("RoleTuple", ["principal_arn", "role_arn"])
root = ET.fromstring(base64.b64decode(assertion))
for saml2attribute in root.iter('{urn:oasis:names:tc:SAML:2.0:assertion}Attribute'):
if saml2attribute.get('Name') == aws_attribute_role:
for saml2attributevalue in saml2attribute.iter(attribute_value_urn):
roles.append(role_tuple(*saml2attributevalue.text.split(',')))
for index, role in enumerate(roles):
role_name = role.role_arn.split('/')[1]
print("%d: %s" % (index+1, role_name))
role_choice = input('Please select the AWS role: ')-1
return roles[role_choice]
def test_password_empty_file():
"""Test the encrypt module's CLI function with an empty YAML file."""
runner = CliRunner()
with runner.isolated_filesystem():
initial_data = {'language': 'python'}
with open('file.yml', 'w') as file:
ordered_dump(initial_data, file)
result = runner.invoke(cli, ['mandeep', 'Travis-Encrypt', 'file.yml'],
'SUPER_SECURE_PASSWORD')
assert not result.exception
with open('file.yml') as file:
config = ordered_load(file)
assert 'password' in config
assert 'secure' in config['password']
assert config['language'] == 'python'
assert base64.b64decode(config['password']['secure'])
def test_password_nonempty_file():
"""Test the encrypt module's CLI function with a nonempty YAML file.
The YAML file includes information that needs to be overwritten."""
runner = CliRunner()
with runner.isolated_filesystem():
initial_data = OrderedDict([('language', 'python'), ('dist', 'trusty'),
('password', {'secure': 'SUPER_INSECURE_PASSWORD'})])
with open('file.yml', 'w') as file:
ordered_dump(initial_data, file)
result = runner.invoke(cli, ['mandeep', 'Travis-Encrypt', 'file.yml'],
'SUPER_SECURE_PASSWORD')
assert not result.exception
with open('file.yml') as file:
config = ordered_load(file)
assert config['language'] == 'python'
assert config['dist'] == 'trusty'
assert base64.b64decode(config['password']['secure'])
assert ['language', 'dist', 'password'] == [key for key in config.keys()]
def test_deploy_empty_file():
"""Test the encrypt module's CLI function with the --deploy flag and an empty YAML file."""
runner = CliRunner()
with runner.isolated_filesystem():
initial_data = {'language': 'python'}
with open('file.yml', 'w') as file:
ordered_dump(initial_data, file)
result = runner.invoke(cli, ['--deploy', 'mandeep', 'Travis-Encrypt', 'file.yml'],
'SUPER_SECURE_PASSWORD')
assert not result.exception
with open('file.yml') as file:
config = ordered_load(file)
assert config['language'] == 'python'
assert base64.b64decode(config['deploy']['password']['secure'])
def test_deploy_nonempty_file():
"""Test the encrypt module's CLI function with the --deploy flag and a nonempty YAML file.
The YAML file includes information that needs to be overwritten."""
runner = CliRunner()
with runner.isolated_filesystem():
initial_data = OrderedDict([('language', 'python'), ('dist', 'trusty'),
('deploy', {'password': {'secure': 'SUPER_INSECURE_PASSWORD'}})])
with open('file.yml', 'w') as file:
ordered_dump(initial_data, file)
result = runner.invoke(cli, ['--deploy', 'mandeep', 'Travis-Encrypt', 'file.yml'],
'SUPER_SECURE_PASSWORD')
assert not result.exception
with open('file.yml') as file:
config = ordered_load(file)
assert config['language'] == 'python'
assert config['dist'] == 'trusty'
assert base64.b64decode(config['deploy']['password']['secure'])
assert ['language', 'dist', 'deploy'] == [key for key in config.keys()]
def test_environment_variable_nonempty_file():
"""Test the encrypt module's CLI function with the --env flag and a nonempty YAML file.
The YAML file includes information that needs to be overwritten."""
runner = CliRunner()
with runner.isolated_filesystem():
initial_data = OrderedDict([('language', 'python'), ('dist', 'trusty'),
('env', {'global': {'secure': 'API_KEY="SUPER_INSECURE_KEY"'}})])
with open('file.yml', 'w') as file:
ordered_dump(initial_data, file)
result = runner.invoke(cli, ['--env', 'mandeep', 'Travis-Encrypt', 'file.yml'],
'SUPER_SECURE_API_KEY')
assert not result.exception
with open('file.yml') as file:
config = ordered_load(file)
assert config['language'] == 'python'
assert config['dist'] == 'trusty'
assert base64.b64decode(config['env']['global']['secure'])
assert ['language', 'dist', 'env'] == [key for key in config.keys()]
def _key():
global __key
if __key:
return __key
data_dir = _key_dir()
key_file = os.path.join(data_dir, 'key')
if os.path.isfile(key_file):
with open(key_file, 'rb') as f:
__key = base64.b64decode(f.read())
return __key
__key = base64.b64encode(os.urandom(16))
try:
os.makedirs(data_dir)
except OSError as e:
# errno17 == dir exists
if e.errno != 17:
raise
with open(key_file, 'wb') as f:
f.write(__key)
return __key
def get(self, result):
try:
s = re.compile("S\s*=\s*'([^']+)").findall(result)[0]
s = base64.b64decode(s)
s = s.replace(' ', '')
s = re.sub('String\.fromCharCode\(([^)]+)\)', r'chr(\1)', s)
s = re.sub('\.slice\((\d+),(\d+)\)', r'[\1:\2]', s)
s = re.sub('\.charAt\(([^)]+)\)', r'[\1]', s)
s = re.sub('\.substr\((\d+),(\d+)\)', r'[\1:\1+\2]', s)
s = re.sub(';location.reload\(\);', '', s)
s = re.sub(r'\n', '', s)
s = re.sub(r'document\.cookie', 'cookie', s)
cookie = '' ; exec(s)
self.cookie = re.compile('([^=]+)=(.*)').findall(cookie)[0]
self.cookie = '%s=%s' % (self.cookie[0], self.cookie[1])
return self.cookie
except:
pass
def resolve(self, url):
try:
b = urlparse.urlparse(url).netloc
b = re.compile('([\w]+[.][\w]+)$').findall(b)[0]
if not b in base64.b64decode(self.b_link): return url
u, p, h = url.split('|')
r = urlparse.parse_qs(h)['Referer'][0]
#u += '&app_id=Exodus'
c = self.request(r, output='cookie', close=False)
result = self.request(u, post=p, referer=r, cookie=c)
url = result.split('url=')
url = [urllib.unquote_plus(i.strip()) for i in url]
url = [i for i in url if i.startswith('http')]
url = url[-1]
return url
except:
return
def get_cainfo(self):
"""Query the ca service information.
Args:
Returns: The base64 encoded CA PEM file content for the caname
"""
if self._ca_name != "":
body_data = {"caname": self._ca_name}
else:
body_data = {}
response = self._send_ca_post(path="cainfo", json=body_data)
_logger.debug("Raw response json {0}".format(response))
if (response['success'] and response['result']['CAName']
== self._ca_name):
return base64.b64decode(response['result']['CAChain'])
else:
raise ValueError("get_cainfo failed with errors {0}"
.format(response['errors']))
def test_inject_file_with_old_agent(self):
tmp_arg_dict = FAKE_ARG_DICT
request_id = tmp_arg_dict["id"]
b64_path = tmp_arg_dict["b64_path"]
b64_file = tmp_arg_dict["b64_contents"]
raw_path = base64.b64decode(b64_path)
raw_file = base64.b64decode(b64_file)
new_b64 = base64.b64encode("%s,%s" % (raw_path, raw_file))
self.mock_patch_object(self.agent,
'_get_agent_features',
'injectfile')
tmp_arg_dict["value"] = json.dumps({"name": "injectfile",
"value": new_b64})
tmp_arg_dict["path"] = "data/host/%s" % request_id
self.agent.inject_file(self.agent, FAKE_ARG_DICT)
self.agent._wait_for_agent.assert_called_once()
self.agent.xenstore.write_record.assert_called_with(self.agent,
tmp_arg_dict)
self.agent._get_agent_features.assert_called_once()
def test_kms(self):
if "CFPP_RUN_KMS_TESTS" not in os.environ:
return
import boto3
import botocore
output = subprocess.check_output(["cfpp", "-s", "tests",
"tests/kms_test.template"])
parsed = json.loads(output)["Parameters"]
without_context = parsed["EncryptedValue"]["Default"]
with_context = parsed["EncryptedValueWithContext"]["Default"]
kms = boto3.client('kms')
kms.decrypt(CiphertextBlob=base64.b64decode(without_context))
try:
kms.decrypt(CiphertextBlob=with_context)
self.fail("expected KMS to fail due to lack of context")
except botocore.exceptions.ClientError:
pass
kms.decrypt(CiphertextBlob=base64.b64decode(with_context),
EncryptionContext={"ContextKey": "ContextValue"})
def unquote(cls, value):
"""Unquote the value for the cookie. If unquoting does not work a
:exc:`UnquoteError` is raised.
:param value: the value to unquote.
"""
try:
if cls.quote_base64:
value = base64.b64decode(value)
if cls.serialization_method is not None:
value = cls.serialization_method.loads(value)
return value
except Exception:
# unfortunately pickle and other serialization modules can
# cause pretty every error here. if we get one we catch it
# and convert it into an UnquoteError
raise UnquoteError()
def loads(self, value):
def object_hook(obj):
if len(obj) != 1:
return obj
the_key, the_value = next(iteritems(obj))
if the_key == ' t':
return tuple(the_value)
elif the_key == ' u':
return uuid.UUID(the_value)
elif the_key == ' b':
return b64decode(the_value)
elif the_key == ' m':
return Markup(the_value)
elif the_key == ' d':
return parse_date(the_value)
return obj
return json.loads(value, object_hook=object_hook)
def post(self):
authenticated = False
if 'Authorization' in self.request.headers:
auth = self.request.headers['Authorization']
decoded = base64.b64decode(auth[6:])
authenticated = True
if authenticated:
self.response.out.write('OK')
else:
site = GetSite()
template_values = {}
template_values['site'] = site
template_values['message'] = "Authentication required"
path = os.path.join(os.path.dirname(__file__), 'tpl', 'api')
t = self.get_template(path,'error.json')
output = t.render(template_values)
self.set_status(401, 'Unauthorized')
self.set_header('Content-type', 'application/json')
self.set_header('WWW-Authenticate', 'Basic realm="' + site.domain + '"')
self.write(output)
# Replies
# /api/replies/show.json
def handshake_cookie_verify(b64_cookie):
'''
If b64_cookie matches the expected format for a base-64 encoded
privcount cookie, return the decoded cookie.
Otherwise, return False.
Raises an exception if the cookie is not correctly padded base64.
'''
if len(b64_cookie) != PrivCountProtocol.COOKIE_B64_BYTES:
logging.warning("Invalid cookie: wrong encoded length {} expected {}"
.format(len(b64_cookie),
PrivCountProtocol.COOKIE_B64_BYTES))
return False
cookie = b64decode(b64_cookie)
if len(cookie) != PrivCountProtocol.COOKIE_BYTES:
logging.warning("Invalid cookie: wrong decoded length {} expected {}"
.format(len(cookie),
PrivCountProtocol.COOKIE_BYTES))
return False
return cookie
def decode(self, encoded):
""" Takes the input from the text box and decodes it using the parameter
set in the combobox """
mode = self._encodings[self._decoder.currentIndex()]
if mode == 'raw':
return str(encoded)
try:
if mode == 'hex':
return encoded.decode('hex')
if mode == 'b64':
return b64decode(encoded)
if mode == 'py2':
return eval(encoded)
except:
log_alert("Failed to decode input")
return None
def unquote(cls, value):
"""Unquote the value for the cookie. If unquoting does not work a
:exc:`UnquoteError` is raised.
:param value: the value to unquote.
"""
try:
if cls.quote_base64:
value = base64.b64decode(value)
if cls.serialization_method is not None:
value = cls.serialization_method.loads(value)
return value
except Exception:
# unfortunately pickle and other serialization modules can
# cause pretty every error here. if we get one we catch it
# and convert it into an UnquoteError
raise UnquoteError()
def loads(self, value):
def object_hook(obj):
if len(obj) != 1:
return obj
the_key, the_value = next(iteritems(obj))
if the_key == ' t':
return tuple(the_value)
elif the_key == ' u':
return uuid.UUID(the_value)
elif the_key == ' b':
return b64decode(the_value)
elif the_key == ' m':
return Markup(the_value)
elif the_key == ' d':
return parse_date(the_value)
return obj
return json.loads(value, object_hook=object_hook)
def _generate_assertion(self):
"""Generate the assertion that will be used in the request."""
now = long(time.time())
payload = {
'aud': self.token_uri,
'scope': self.scope,
'iat': now,
'exp': now + SignedJwtAssertionCredentials.MAX_TOKEN_LIFETIME_SECS,
'iss': self.service_account_name
}
payload.update(self.kwargs)
logger.debug(str(payload))
private_key = base64.b64decode(self.private_key)
return crypt.make_signed_jwt(crypt.Signer.from_string(
private_key, self.private_key_password), payload)
# Only used in verify_id_token(), which is always calling to the same URI
# for the certs.
def proof_open(team, proof):
assert isinstance(proof, bytes)
proof = b64decode(proof)
claimed_chall_id = proof[2*64:].decode('utf-8')
claimed_chall = Challenge(claimed_chall_id)
chall_pk = claimed_chall['pk']
team_pk = team['sign_pk']
membership_proof = pysodium.crypto_sign_open(proof, chall_pk)
chall_id = pysodium.crypto_sign_open(membership_proof,
team_pk).decode('utf-8')
if claimed_chall_id != chall_id:
raise ValueError('invalid proof')
return claimed_chall
def setup_environment():
root = os.getenv('LAMBDA_TASK_ROOT')
bin_dir = os.path.join(root, 'bin')
os.environ['PATH'] += ':' + bin_dir
os.environ['GIT_EXEC_PATH'] = bin_dir
ssh_dir = tempfile.mkdtemp()
ssh_identity = os.path.join(ssh_dir, 'identity')
with os.fdopen(os.open(ssh_identity, os.O_WRONLY | os.O_CREAT, 0o600),
'w') as f:
f.write(base64.b64decode(os.getenv('SSH_IDENTITY')))
ssh_config = os.path.join(ssh_dir, 'config')
with open(ssh_config, 'w') as f:
f.write('CheckHostIP no\n'
'StrictHostKeyChecking yes\n'
'IdentityFile %s\n'
'UserKnownHostsFile %s\n' %
(ssh_identity, os.path.join(root, 'known_hosts')))
os.environ['GIT_SSH_COMMAND'] = 'ssh -F %s' % ssh_config
def get_monitor_script(self, name):
# we will need monitor ID. So get the monitor object first
monitor = self.get_monitor_by_name(name)
if not monitor:
raise ItemNotFoundError('Monitor {} not found'.format(name))
url = '{}/v3/monitors/{}/script'.format(self.base_url, monitor['id'])
try:
r = self._get(
url,
headers=self.default_headers,
timeout=self.timeout
)
# Make error message more specific
except ItemNotFoundError:
raise ItemNotFoundError(
'Script for monitor {} not found'
.format(name)
)
script_base64 = r.json()['scriptText']
script = base64.b64decode(script_base64)
return script
def prepare_v(instance):
# be very careful printing K, U, or V as they leak in logs stored on unprotected disks
if common.DEVELOP_IN_ECLIPSE:
logger.debug("b64_V (non encrypted): " + instance['v'])
if instance.get('b64_encrypted_V',"") !="":
b64_encrypted_V = instance['b64_encrypted_V']
logger.debug("Re-using cached encrypted V")
else:
# encrypt V with the public key
b64_encrypted_V = base64.b64encode(crypto.rsa_encrypt(crypto.rsa_import_pubkey(instance['public_key']),str(base64.b64decode(instance['v']))))
instance['b64_encrypted_V'] = b64_encrypted_V
logger.debug("b64_encrypted_V:" + b64_encrypted_V)
post_data = {
'encrypted_key': b64_encrypted_V
}
v_json_message = json.dumps(post_data)
return v_json_message
def parse_data_uri(data_uri):
if data_uri is None:
return None
data = []
dataset_uris = data_uri.split("\n")
for uri in dataset_uris:
fpos = uri.find(",")
if fpos == -1:
return None
try:
data.append(base64.b64decode(uri[fpos:]))
except Exception as e:
# skip bad data
continue
return data
def activate_group(uuid,keyblob):
if common.STUB_TPM:
return common.TEST_AES_REG_KEY
group_id = get_group_num(uuid)
priv_ca = base64.b64decode(keyblob)
assert len(priv_ca) == 256
logger.debug('Activating group number %d', group_id)
body = vtpm_cmd(VTPM_ORD_GROUP_ACTIVATE,
struct.pack('>II', group_id, 256) + priv_ca)
(algId, encScheme, size), body = unpack('>IHH', body)
logger.info('Received Key. AlgID: 0x%x, encScheme: 0x%x, size: 0x%x',
algId, encScheme, size)
logger.info('Key: %r', body)
assert size == len(body)
return body