def verifySessionKey():
if not "application/json" in request.headers["Content-Type"]:
return None, None, make_response("Expected content-type JSON", 400)
data = request.json
for key in ("appid", "key", "_sig"):
if not key in data:
return make_response("Missing argument: {key}".format(key=key), 400)
appid = str(data["appid"])
if not "appversion" in data:
appversion = "any"
else:
appversion = str(data["appversion"])
key = str(data["key"])
# calculate message that was signed
message = "{appid}:{appversion}:{key}".format(**locals())
# decode signature
import base64
signature = data["_sig"]
signature = base64.decodestring("\n".join([signature[x:x+64] for x in range(0, len(signature), 64)]))
# fetch and validate app information
lookup_key = appid + ":" + appversion
apps = _get_registered_apps()
if not lookup_key in apps or not apps[lookup_key]["enabled"] or not "pubkey" in apps[lookup_key]:
octoprint.server.appSessionManager.remove(key)
return make_response("Invalid app: {lookup_key}".format(lookup_key=lookup_key), 401)
pubkey_string = apps[lookup_key]["pubkey"]
pubkey_string = "\n".join([pubkey_string[x:x+64] for x in range(0, len(pubkey_string), 64)])
try:
pubkey = rsa.PublicKey.load_pkcs1("-----BEGIN RSA PUBLIC KEY-----\n" + pubkey_string + "\n-----END RSA PUBLIC KEY-----\n")
except:
octoprint.server.appSessionManager.remove(key)
return make_response("Invalid pubkey stored in server", 500)
# verify signature
try:
rsa.verify(message, signature, pubkey)
except rsa.VerificationError:
octoprint.server.appSessionManager.remove(key)
return make_response("Invalid signature", 401)
# generate new session key and return it
result = octoprint.server.appSessionManager.verify(key)
if not result:
return make_response("Invalid key or already verified", 401)
verified_key, valid_until = result
return jsonify(key=verified_key, validUntil=valid_until)