def test_background_queue():
publish_event = MagicMock()
publish_event.return_value = 'aaa'
task_queue = TaskQueue(publish_event)
@task_queue.task()
def funcy():
global funcy_called
funcy_called += 1
return "blah"
assert funcy() == "blah"
assert funcy.delay() is True
event = jsonpickle.dumps((funcy.path, (), {}))
publish_event.assert_called_once_with(event)
task_queue.process_event(event)
assert funcy_called == 2
python类dumps()的实例源码
def to_dict(cls, node):
"""
Creates a dict representation of a node
:param node: datamodel.base.node.Node instance
:return: dict
"""
params = dict()
for name, value in node._params.items():
if callable(value):
params[name] = jsonpickle.dumps(cloudpickle.dumps(value))
else:
params[name] = value
return {'class': str(node.__class__),
'name': node._name,
'params': params,
'output_label': node._output_label}
def _get_encoded_attachments(self):
attachments = self.get_attachments()
if attachments:
new_attachments = []
for attachment in attachments:
if isinstance(attachment, File):
attachment.seek(0)
new_attachments.append((attachment.name, attachment.read(), guess_type(attachment.name)[0]))
else:
new_attachments.append(attachment)
attachments = new_attachments
return jsonpickle.dumps(attachments)
def _execute(self, ctx):
self._check_closed()
# Temporary file used to pass arguments to the started subprocess
file_descriptor, arguments_json_path = tempfile.mkstemp(prefix='executor-', suffix='.json')
os.close(file_descriptor)
with open(arguments_json_path, 'wb') as f:
f.write(pickle.dumps(self._create_arguments_dict(ctx)))
env = self._construct_subprocess_env(task=ctx.task)
# Asynchronously start the operation in a subprocess
proc = subprocess.Popen(
[
sys.executable,
os.path.expanduser(os.path.expandvars(__file__)),
os.path.expanduser(os.path.expandvars(arguments_json_path))
],
env=env)
self._tasks[ctx.task.id] = _Task(ctx=ctx, proc=proc)
def site_map():
output = []
for rule in app.url_map.iter_rules():
options = {}
for arg in rule.arguments:
options[arg] = "[{0}]".format(arg)
methods = ','.join(rule.methods)
url = url_for(rule.endpoint, **options)
import urllib.request
line = urllib.request.unquote("{:50s} {:20s} {}".format(rule.endpoint, methods, url))
output.append(line)
logging.info(str(output))
response = app.response_class(
response=jsonpickle.dumps(output),
status=200,
mimetype='application/json'
)
return response
def site_map():
output = []
for rule in app.url_map.iter_rules():
options = {}
for arg in rule.arguments:
options[arg] = "[{0}]".format(arg)
methods = ','.join(rule.methods)
url = url_for(rule.endpoint, **options)
import urllib.request
line = urllib.request.unquote("{:50s} {:20s} {}".format(rule.endpoint, methods, url))
output.append(line)
logging.info(str(output))
response = app.response_class(
response=jsonpickle.dumps(output),
status=200,
mimetype='application/json'
)
return response
def test_schedule_queue():
publish_event = MagicMock()
publish_event.return_value = 'aaa'
task_queue = TaskQueue(publish_event)
@task_queue.task(schedules=['bbb'])
def funcy():
return "blah"
task_queue.process_schedule('bbb')
event = jsonpickle.dumps((funcy.path, (), {}))
publish_event.assert_called_once_with(event)
def test_sns_background_queue():
with patch('xavier.aws.sns.send_sns_message') as mock_send_sns_message:
mock_send_sns_message.return_value = {"MessageId": "1234"}
publish_event = publish_sns_message("sns:topic")
task_queue = TaskQueue(publish_event)
@task_queue.task(schedules=['bbb'])
def funcy():
return "Blah"
funcy.delay()
event = jsonpickle.dumps((funcy.path, (), {}))
mock_send_sns_message.assert_called_once_with(
Message=event,
TopicArn="sns:topic",
)
mock_background_func = MagicMock()
mock_background_func.__name__ = "background_func"
mock_background_func.__module__ = "testing"
mock_background_func.return_value = "awesome"
composed_mock = task_queue.task()(mock_background_func)
event = jsonpickle.dumps((composed_mock.path, (), {}))
sns_consumer = handle_sns_message(task_queue.process_event)
sns_consumer({
"Records": [{
'Sns': {
'Message': event
}
}]
}, {})
mock_background_func.assert_called_once_with()
def delay(self, *args, **kwargs):
event = jsonpickle.dumps((self.path, args, kwargs))
if not self.publish_event:
logger.error("This task has not yet been registered with a task queue")
return False
self.publish_event(event)
return True
def serialize(cls, node):
"""
Creates a JSON representation of a node
:param node: datamodel.base.node.Node instance
:return: str
"""
return json.dumps(cls.to_dict(node))
def serialize(cls, graph):
"""
Creates a JSON representation of a graph
:param node: datamodel.base.graph.Graph instance
:return: str
"""
return json.dumps(cls.to_dict(graph))
def send(self, raise_exception=False, user=None):
"""
Handles the preparing the notification for sending. Called to trigger the send from code.
If raise_exception is True, it will raise any exceptions rather than simply logging them.
returns boolean whether or not the notification was sent successfully
"""
context = self.get_context_data()
recipients = self.get_recipients()
if 'text' in self.render_types:
text_content = self.render('text', context)
else:
text_content = None
if 'html' in self.render_types:
html_content = self.render('html', context)
else:
html_content = None
sent_from = self.get_sent_from()
subject = self.get_subject()
extra_data = self.get_extra_data()
sent_notification = SentNotification(
recipients=','.join(recipients),
text_content=text_content,
html_content=html_content,
sent_from=sent_from,
subject=subject,
extra_data=json.dumps(extra_data) if extra_data else None,
notification_class=self.get_class_path(),
attachments=self._get_encoded_attachments(),
user=user,
)
return self.resend(sent_notification, raise_exception=raise_exception)
def dump(obj, file_name):
if file_name.endswith('.json'):
with open(file_name, 'w') as f:
f.write(jsonpickle.dumps(obj))
return
if isinstance(obj, np.ndarray):
np.save(file_name, obj)
return
# Using joblib instead of pickle because of http://bugs.python.org/issue11564
joblib.dump(obj, file_name, protocol=pickle.HIGHEST_PROTOCOL)
def get_state(self):
return jsonpickle.dumps([o.order_id for _, o in self._delayed_orders]).encode('utf-8')
def get_state(self):
return jsonpickle.dumps([o.order_id for _, o in self._delayed_orders]).encode('utf-8')
def get_state(self):
result = {}
for key, obj in six.iteritems(self._objects):
state = obj.get_state()
if state is not None:
result[key] = state
return jsonpickle.dumps(result).encode('utf-8')
def get_state(self):
return jsonpickle.dumps({
'open_orders': [o.get_state() for account, o in self._open_orders],
'delayed_orders': [o.get_state() for account, o in self._delayed_orders]
}).encode('utf-8')
def get_state(self):
return jsonpickle.dumps({
'open_orders': [o.get_state() for account, o in self._open_orders],
'delayed_orders': [o.get_state() for account, o in self._delayed_orders]
}).encode('utf-8')
def _send_message(connection, message):
# Packing the length of the entire msg using struct.pack.
# This enables later reading of the content.
def _pack(data):
return struct.pack(_INT_FMT, len(data))
data = jsonpickle.dumps(message)
msg_metadata = _pack(data)
connection.send(msg_metadata)
connection.sendall(data)
def get(self):
try:
data = dao.get_all_categories()
if data is not None:
return jsonify({"categoryList": json.loads(jsonpickle.dumps(data, unpicklable=False))})
else:
raise
except Exception as e:
print("AllCategoryApi", e)
def publish(self, payload, channel=None):
if channel is None:
channel = self._pub_channel_name
log.debug("publishing to channel: %s \n %s" % (channel, payload))
try:
payload = jsonpickle.dumps(payload)
except ValueError:
pass
self._redis_conn.publish(channel, payload)
def get_state(self):
return jsonpickle.dumps({
'open_orders': [o.get_state() for account, o in self._open_orders],
'delayed_orders': [o.get_state() for account, o in self._delayed_orders]
}).encode('utf-8')
def get_state(self):
result = {}
for key, obj in six.iteritems(self._objects):
state = obj.get_state()
if state is not None:
result[key] = state
return jsonpickle.dumps(result).encode('utf-8')
def get_state(self):
return jsonpickle.dumps([o.order_id for _, o in self._delayed_orders]).encode('utf-8')
plugin_firebase_stats_consumer.py 文件源码
项目:indy-plenum
作者: hyperledger
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def _send(self, data: Dict[str, object]):
self.statsPublisher.send(jsonpickle.dumps(data))