def test_classification_result(self):
result = ClassificationResult(5)
result.additions["iambos"].append(StressCorrection(0, 0, 0, "", 0))
self.assertEqual(result, jsonpickle.decode(result.to_json()))
python类decode()的实例源码
def loads(json, **args):
return jsonpickle.decode(json)
def get_creds(request):
flow = jsonpickle.decode(request.session['flow'])
credential = flow.step2_exchange(request.GET.get('code', False))
storage = DjangoORMStorage(GoogleCredentials, 'id', request.user, 'credential')
storage.put(credential)
if request.GET.get('state', False):
return redirect(unquote_plus(request.GET.get('state')))
else:
return HttpResponse(status=200)
def _get_flow_for_token(csrf_token, request):
""" Looks up the flow in session to recover information about requested
scopes.
Args:
csrf_token: The token passed in the callback request that should
match the one previously generated and stored in the request on the
initial authorization view.
Returns:
The OAuth2 Flow object associated with this flow based on the
CSRF token.
"""
flow_pickle = request.session.get(_FLOW_KEY.format(csrf_token), None)
return None if flow_pickle is None else jsonpickle.decode(flow_pickle)
def to_python(self, value):
"""Overrides ``models.Field`` method. This is used to convert
bytes (from serialization etc) to an instance of this class"""
if value is None:
return None
elif isinstance(value, oauth2client.client.Credentials):
return value
else:
try:
return jsonpickle.decode(
base64.b64decode(encoding.smart_bytes(value)).decode())
except ValueError:
return pickle.loads(
base64.b64decode(encoding.smart_bytes(value)))
def prepare_connectivity(self, context, request, cancellation_context):
"""
Creates a connectivity for the Sandbox:
1.Resource group
2.Storage account
3.Key pair
4.Network Security Group
5.Creating a subnet under the
:param context:
:param request:
:param cancellation_context cloudshell.shell.core.driver_context.CancellationContext instance
:return:
"""
with LoggingSessionContext(context) as logger:
with ErrorHandlingContext(logger):
logger.info('Preparing Connectivity for Azure VM...')
with CloudShellSessionContext(context) as cloudshell_session:
cloud_provider_model = self.model_parser.convert_to_cloud_provider_resource_model(
resource=context.resource,
cloudshell_session=cloudshell_session)
azure_clients = AzureClientsManager(cloud_provider_model)
prepare_connectivity_request = DeployDataHolder(jsonpickle.decode(request))
prepare_connectivity_request = getattr(prepare_connectivity_request, 'driverRequest', None)
result = self.prepare_connectivity_operation.prepare_connectivity(
reservation=self.model_parser.convert_to_reservation_model(context.reservation),
cloud_provider_model=cloud_provider_model,
storage_client=azure_clients.storage_client,
resource_client=azure_clients.resource_client,
network_client=azure_clients.network_client,
logger=logger,
request=prepare_connectivity_request,
cancellation_context=cancellation_context)
logger.info('End Preparing Connectivity for Azure VM')
return self.command_result_parser.set_command_result({'driverResponse': {'actionResults': result}})
def cleanup_connectivity(self, command_context, request):
with LoggingSessionContext(command_context) as logger:
with ErrorHandlingContext(logger):
logger.info('Teardown...')
with CloudShellSessionContext(command_context) as cloudshell_session:
cloud_provider_model = self.model_parser.convert_to_cloud_provider_resource_model(
resource=command_context.resource,
cloudshell_session=cloudshell_session)
azure_clients = AzureClientsManager(cloud_provider_model)
resource_group_name = command_context.reservation.reservation_id
cleanup_connectivity_request = getattr(DeployDataHolder(jsonpickle.decode(request)),
'driverRequest', None)
result = self.delete_azure_vm_operation.cleanup_connectivity(
network_client=azure_clients.network_client,
resource_client=azure_clients.resource_client,
cloud_provider_model=cloud_provider_model,
resource_group_name=resource_group_name,
request=cleanup_connectivity_request,
logger=logger)
logger.info('End Teardown')
return self.command_result_parser.set_command_result({'driverResponse': {'actionResults': [result]}})
def _set_base_deploy_azure_vm_model_params(self, deployed_resource, resource):
"""Convert all basic parameters for VM deploy models
:param deployed_resource: deploy_azure_vm_resource_models.BaseDeployAzureVMResourceModel subclass instance
:param resource: The context of the resource
:return:
"""
deployed_resource.group_name = "" # needs to be auto generated
deployed_resource.vm_name = "" # needs to be auto generated
deployed_resource.cloud_provider = resource.attributes[
'Cloud Provider'] if 'Cloud Provider' in resource.attributes.keys() else None
deployed_resource.vm_size = resource.attributes['VM Size']
deployed_resource.autoload = self._convert_to_bool(resource.attributes['Autoload'])
deployed_resource.add_public_ip = self._convert_to_bool(resource.attributes['Add Public IP'])
deployed_resource.inbound_ports = resource.attributes['Inbound Ports']
deployed_resource.public_ip_type = resource.attributes['Public IP Type']
deployed_resource.disk_type = resource.attributes['Disk Type']
deployed_resource.extension_script_file = resource.attributes['Extension Script file']
deployed_resource.extension_script_configurations = resource.attributes['Extension Script Configurations']
deployed_resource.extension_script_timeout = int(resource.attributes['Extension Script Timeout'])
app_request = jsonpickle.decode(resource.app_context.app_request_json)
attrs = app_request["logicalResource"]["attributes"]
deployed_resource.username = AzureModelsParser.get_attribute_value_by_name_ignoring_namespace(attrs, "User")
deployed_resource.password = AzureModelsParser.get_attribute_value_by_name_ignoring_namespace(attrs, "Password")
def convert_app_resource_to_deployed_app(resource):
json_str = jsonpickle.decode(resource.app_context.deployed_app_json)
data_holder = DeployDataHolder(json_str)
return data_holder
def Deploy(self, context, request=None, cancellation_context=None):
app_request = jsonpickle.decode(request)
deployment_name = app_request['DeploymentServiceName']
if deployment_name in self.deployments.keys():
deploy_method = self.deployments[deployment_name]
return deploy_method(context,request,cancellation_context)
else:
raise Exception('Could not find the deployment')
def load_game():
with open('savegame.json', 'r') as save_file:
data = json.load(save_file)
game = jsonpickle.decode(data['serialized_game'])
player_index = jsonpickle.decode(data['serialized_player_index'])
camera = jsonpickle.decode(data['serialized_cam'])
player = game.level.current_entities[player_index]
return game, player, camera
# main loop
def read(path):
with open(path, 'r') as file:
s = file.read()
result = jsonpickle.decode(s)
result._init()
return result
def test___str__(self):
"""
Test whether the __str__ method successfully generates a json string
representation of the object.
"""
# Confirm that the string representation of the current object and that
# of an object decoded from this string representation are equal.
str_o = str(self.o)
new_o = jsonpickle.decode(str_o)
self.assertEqual(str_o, str(new_o))
def deserialize(self, byte_array):
if not byte_array:
return None
else:
return byte_array.decode("UTF-8")
def deserialize(self, byte_array):
if byte_array:
return jsonpickle.decode(byte_array.decode("UTF-8"))
else:
return None
def create_new_customer():
"""Creates a new customer from the given data
@data The attributes to be present in the customer item"""
data = jsonpickle.decode(request.data.decode("utf-8"))
customer = Customer.create_new(attributes=data)
customer.create()
return jsonpickle.encode(dict(
success=True
))
def deserialize_session(session):
'''
Takes a dictionary having a session object's atributes and deserializes it into a sessoin
object.
'''
decoded = jsonpickle.decode(session)
new_session = requests.session()
new_session.__dict__.update(decoded)
return new_session
def json_deserialize(json):
"""JSON Deerialization of a given string.
Args:
json (str): The JSON serialized string to deserialize.
Returns:
dict: A dictionary representing the data contained in the
JSON serialized string.
"""
if json is None:
return None
return jsonpickle.decode(json)
def on_message(self, message):
if message:
decoded_message = jsonpickle.decode(message)
logging.info("incomding ws message %s " % decoded_message)
handler = ZynthianWebSocketMessageHandlerFactory(decoded_message['handler_name'], self)
handler.on_websocket_message(decoded_message['data'])
self.handlers.append(handler)
# client disconnected
def load(self, filepath):
"""Load the optimizer parameters from the specifiec path as JSON.
Parameters
----------
filepath: str
The file path.
"""
with open(filepath, 'r') as f:
json = f.read()
model = jsonpickle.decode(json)
self.__dict__.update(model.__dict__)