def post_phonemes():
print(request.files)
if 'wave' not in request.files:
abort(400)
file = request.files['wave']
tf = tempfile.NamedTemporaryFile(dir=UPLOAD_FOLDER)
tmp_filename = tf.name
tf.close()
file.save(tmp_filename)
wave = Wave()
wave.load(tmp_filename)
recogn = PhonemeRecognition()
recogn.load('model_en_full')
recogn.predict(wave)
result = {'phonemes': wave.get_phoneme_map()}
print(result)
return json.dumps(result)
python类dumps()的实例源码
def run(self, Resource, path=None, query_string=None, kwargs=None, **rkwargs):
"""Run given resource manually.
See werkzeug.test.EnvironBuilder for rkwargs.
"""
resource = Resource(self, raw=True)
if isinstance(query_string, dict):
if 'where' in query_string:
query_string['where'] = json.dumps(query_string['where'])
query_string = urlencode(query_string)
rkwargs['query_string'] = query_string
args = []
if path:
args.append(path)
ctx = self.app.test_request_context(*args, **rkwargs)
with ctx:
kwargs = kwargs or {}
return resource.dispatch_request(**kwargs)
def test_user_can_follow_another_user(client, users_repo):
eve = User(name='Eve')
adam = User(name='Adam')
users_repo.store(eve)
users_repo.store(adam)
response = client.post('/users/{}/follow'.format(eve.pk), data=json.dumps({
'pk': adam.pk
}), content_type='application/json')
assert response.status_code == 200
response = client.get('/users/{}'.format(eve.pk))
assert response.status_code == 200
followed_users = json.loads(response.data)['followed_users']
followed_users_names = [u['name'] for u in followed_users]
assert 'Adam' in followed_users_names
def create(self, data, suffix=''):
"""Create a new object inside the data store.
Args:
data (dict): Dictionary containing values you want to store
suffix (str): If given then it will be used as a key while storing
Returns:
etcd.Result: THe created object expanded
"""
append = True if suffix == '' else False
key = '/{}/{}'.format(self.KEY, suffix)
entry = self.client.write(key, json.dumps(data), append=append)
return self._load_from_etcd(entry)
def update(self, entity):
"""Update an existing object from the data store.
Args:
entity (etcd.Result): An expanded result object that needs to be serialized
Returns:
etcd.Result: The resulting object expanded, or None if Etcd failed
"""
entity.value = json.dumps(entity.value)
try:
entity = self.client.update(entity)
except (etcd.EtcdCompareFailed, etcd.EtcdKeyNotFound):
return None
return self._load_from_etcd(entity)
def test_server_deregister_service(dynamodb):
app.test_client().post(
'/service',
data=json.dumps({'name': 'foo', 'fqdn': 'foo.example.com'})
)
response = app.test_client().delete(
'/service/foo'
)
assert response.status_code == 200
describe_response = app.test_client().get(
'/service/foo'
)
assert describe_response.status_code == 404
deletion_response = app.test_client().delete(
'/service/foo'
)
assert deletion_response.status_code == 404
assert deletion_response.data.decode('UTF-8') == 'Service: foo not currently registered with Flyby.'
def test_server_deregister_target_group(dynamodb):
app.test_client().post(
'/service',
data=json.dumps({'name': 'foo', 'fqdn': 'foo.example.com'})
)
app.test_client().post(
'/target',
data=json.dumps(
{
'service_name': 'foo',
'target_group_name': 'foo-blue',
'weight': 50,
}
)
)
response = app.test_client().delete('/target/foo/foo-blue')
assert response.status_code == 200
service = json.loads(app.test_client().get('/service/foo').data)
assert service['target_groups'] == []
second_response = app.test_client().delete('/target/foo/foo-blue')
assert second_response.status_code == 404
service = json.loads(app.test_client().get('/service/foo').data)
assert service['target_groups'] == []
def test_server_register_backend(dynamodb):
app.test_client().post(
'/service',
data=json.dumps({'name': 'foo', 'fqdn': 'foo.example.com'})
)
app.test_client().post(
'/target',
data=json.dumps({'target_group_name': 'foo-blue', 'service_name': 'foo', 'weight': 80})
)
response = app.test_client().post(
'/backend',
data=json.dumps(
{
'host': '10.0.0.1:80',
'service_name': 'foo',
'target_group_name': 'foo-blue',
}
)
)
assert response.status_code == 200
assert json.loads(response.data)['host'] == '10.0.0.1:80'
def chapter(comic_id, chapter_id):
comic = data.get_comic(comic_id)
chapter = data.get_chapter(chapter_id)
next_chapter = data.get_next_chapter(comic_id, chapter.chapter_number)
prev_chapter = data.get_prev_chapter(comic_id, chapter.chapter_number)
url = 'http://www.ishuhui.net/ComicBooks/ReadComicBooksToIsoV1/' + str(
chapter_id) + '.html'
if chapter.comic_id != comic_id:
abort(404)
if chapter.images:
images = json.loads(chapter.images)
else:
images = get_images_from_url(url)
chapter.images = json.dumps(images)
db.session.commit()
return render_template(
'images.html',
comic=comic,
chapter=chapter,
next_chapter=next_chapter,
prev_chapter=prev_chapter,
images=images,
url=url)
def slack_stats():
"""
Screen-scrape slack signup app since it's dynamic with node.js and grabs
from slack API.
"""
stats = ''
resp = requests.get(SLACK_URL)
if resp.status_code == 200:
user_count = re.search(r'<p class="status">(.*?)</p>', resp.content)
if user_count is not None:
stats = user_count.group(1)
return Response(response=json.dumps({'text': stats}), status=200,
mimetype='application/json')
def add_heart():
"""
Add heart to referenced article and return new heart count as JSON
"""
user = models.find_user()
if user is None:
data = {'error': 'Cannot heart unless logged in'}
return Response(response=json.dumps(data), status=401,
mimetype='application/json')
count = models.add_heart(request.form['stack'], request.form['title'],
user.login)
return Response(response=json.dumps({'count': count}), status=200,
mimetype='application/json')
def remove_heart():
"""
Remove heart to referenced article and return new heart count as JSON
"""
user = models.find_user()
if user is None:
data = {'error': 'Cannot heart unless logged in'}
return Response(response=json.dumps(data), status=401,
mimetype='application/json')
count = models.remove_heart(request.form['stack'], request.form['title'],
user.login)
return Response(response=json.dumps({'count': count}), status=200,
mimetype='application/json')
def indicator_bulk_add():
req_keys = ('control', 'data_type', 'event_id', 'pending', 'data')
try:
pld = request.get_json(silent=True)
except Exception, e:
return json.dumps({'results': 'error', 'data': '%s' % e})
if _valid_json(req_keys, pld):
# load related stuff
res_dict = ResultsDict(pld['event_id'], pld['control'])
for val, desc in pld['data']:
res_dict.new_ind(data_type=pld['data_type'],
indicator=val,
date=None,
description=desc)
results = _add_indicators(res_dict, pld['pending'])
return json.dumps({'results': 'success', 'data': results})
else:
return json.dumps({'results': 'error', 'data': 'bad json'})
def api_indicator_get():
req_keys = ('conditions',)
req_keys2 = ('field', 'operator', 'val')
try:
pld = request.get_json(silent=True)
except Exception, e:
return json.dumps({'results': 'error', 'data': '%s' % e})
if not _valid_json(req_keys, pld):
return json.dumps({'results': 'error', 'data': 'Invalid json'})
for item in pld.get('conditions'):
if not _valid_json(req_keys2, item):
return json.dumps({'results': 'error', 'data': 'Invalid json'})
q = filter_query(Indicator.query.join(Event), pld.get('conditions'))
def pushbullet(ALERTID=None, TOKEN=None):
"""
Send a `link` notification to all devices on pushbullet with a link back to the alert's query.
If `TOKEN` is not passed, requires `PUSHBULLETTOKEN` defined, see https://www.pushbullet.com/#settings/account
"""
if not PUSHBULLETURL:
return ("PUSHBULLET parameter must be set, please edit the shim!", 500, None)
if (TOKEN is not None):
PUSHBULLETTOKEN = TOKEN
if not PUSHBULLETTOKEN:
return ("PUSHBULLETTOKEN parameter must be set, please edit the shim!", 500, None)
a = parse(request)
payload = {
"body": a['info'],
"title": a['AlertName'],
"type": "link",
"url": a['url'],
}
headers = {'Content-type': 'application/json', 'Access-Token': PUSHBULLETTOKEN}
return callapi(PUSHBULLETURL, 'post', json.dumps(payload), headers)
def socialcast(ALERTID=None, NUMRESULTS=10, TEAM=None, I=None, X=None):
"""
Create a post on Socialcast containing log events.
Limited to `NUMRESULTS` (default 10) or 1MB.
If `TEAM/I/X` is not passed, requires `SOCIALCASTURL` defined in the form `https://TEAM.socialcast.com/api/webhooks/IIIIIIIIII/XXXXXXXXXXX`
For more information see https://socialcast.github.io/socialcast/apidoc/incoming_webhook.html
"""
if X is not None:
URL = 'https://' + TEAM + '.socialcast.com/api/webhooks/' + I + '/' + X
if not SOCIALCASTURL or not 'socialcast.com/api/webhooks' in SOCIALCASTURL:
return ("SOCIALCASTURL parameter must be set properly, please edit the shim!\n", 500, None)
else:
URL = SOCIALCASTURL
# Socialcast cares about the order of the information in the body
# json.dumps() does not preserve the order so just use raw get_data()
return callapi(URL, 'post', request.get_data())
def active_tasks():
rpc = app.config['scheduler_rpc']
taskdb = app.config['taskdb']
project = request.args.get('project', "")
limit = int(request.args.get('limit', 100))
try:
tasks = rpc.get_active_tasks(project, limit)
except socket.error as e:
app.logger.warning('connect to scheduler rpc error: %r', e)
return '{}', 502, {'Content-Type': 'application/json'}
result = []
for updatetime, task in tasks:
task['updatetime'] = updatetime
task['updatetime_text'] = utils.format_date(updatetime)
if 'status' in task:
task['status_text'] = taskdb.status_to_string(task['status'])
result.append(task)
return json.dumps(result), 200, {'Content-Type': 'application/json'}
def counter():
rpc = app.config['scheduler_rpc']
if rpc is None:
return json.dumps({})
result = {}
try:
data = rpc.webui_update()
for type, counters in iteritems(data['counter']):
for project, counter in iteritems(counters):
result.setdefault(project, {})[type] = counter
for project, paused in iteritems(data['pause_status']):
result.setdefault(project, {})['paused'] = paused
except socket.error as e:
app.logger.warning('connect to scheduler rpc error: %r', e)
return json.dumps({}), 200, {'Content-Type': 'application/json'}
return json.dumps(result), 200, {'Content-Type': 'application/json'}
def test_plan_add_valid_body(app, manager):
url = url_for("plans.add_plan")
# add a planner and an attack
manager.attacks_store.add(attack1_module)
manager.planners_store.add(planner1_module)
with app.test_request_context(url):
res = app.test_client().post(url_for("plans.add_plan"),
content_type="application/json",
data=json.dumps(valid_request_body))
assert res.status_code == 200
assert res.mimetype == "application/json"
assert res.json == {"msg": "ok"}
def test_handle_empty_input(self, mocked_send_to_chat, mocked_celery_chain):
response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK,
data=json.dumps(TelegramUpdates.EMPTY),
follow_redirects=True,
headers=TelegramUpdates.HEADERS)
self.assertTrue(response.status_code == 200,
'Failed to return status code 200 for empty input')
self.assertEqual({}, json.loads(response.data),
'Failed to return an empty JSON for empty input')
mocked_send_to_chat.apply_async.assert_called_with(args=[{}])
with self.assertRaises(AssertionError) as chain_err:
mocked_celery_chain.assert_called()
self.assertIn("Expected 'celery_chain' to have been called",
str(chain_err.exception))
self.assertEqual(mocked_celery_chain.call_args_list, [])
def test_handle_bad_id_bad_text(self, mocked_send_to_chat, mocked_celery_chain):
response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK,
data=json.dumps(TelegramUpdates.TEXT_BAD_ID_BAD_TEXT),
follow_redirects=True,
headers=TelegramUpdates.HEADERS)
self.assertTrue(response.status_code == 200,
'Failed to return status code 200 for an Update with '
'bad reply_message_id and bad text length')
self.assertEqual({}, json.loads(response.data),
'Failed to return an empty JSON for an Update with '
'bad reply_message_id and bad text length')
mocked_send_to_chat.apply_async.assert_called_with(args=[{}])
with self.assertRaises(AssertionError) as chain_err:
mocked_celery_chain.assert_called()
self.assertIn("Expected 'celery_chain' to have been called",
str(chain_err.exception))
self.assertEqual(mocked_celery_chain.call_args_list, [])
def test_handle_ok_id_bad_text(self, mocked_send_to_chat, mocked_celery_chain):
response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK,
data=json.dumps(TelegramUpdates.TEXT_OK_ID_BAD_TEXT),
follow_redirects=True,
headers=TelegramUpdates.HEADERS)
self.assertTrue(response.status_code == 200,
'Failed to return status code 200 for an Update with '
'bad text length')
self.assertEqual({}, json.loads(response.data),
'Failed to return an empty JSON for an Update with '
'bad text length')
mocked_send_to_chat.apply_async.assert_called_with(args=[{}])
with self.assertRaises(AssertionError) as chain_err:
mocked_celery_chain.assert_called()
self.assertIn("Expected 'celery_chain' to have been called",
str(chain_err.exception))
self.assertEqual(mocked_celery_chain.call_args_list, [])
def test_handle_bad_id_ok_text(self, mocked_send_to_chat, mocked_celery_chain):
response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK,
data=json.dumps(TelegramUpdates.TEXT_BAD_ID_OK_TEXT),
follow_redirects=True,
headers=TelegramUpdates.HEADERS)
self.assertTrue(response.status_code == 200,
'Failed to return status code 200 for an Update with '
'bad reply_message_id')
self.assertEqual({}, json.loads(response.data),
'Failed to return an empty JSON for an Update with '
'bad reply_message_id')
mocked_send_to_chat.apply_async.assert_called_with(args=[{}])
with self.assertRaises(AssertionError) as chain_err:
mocked_celery_chain.assert_called()
self.assertIn("Expected 'celery_chain' to have been called",
str(chain_err.exception))
self.assertEqual(mocked_celery_chain.call_args_list, [])
def test_handle_malformed_Message(self, mocked_send_to_chat, mocked_celery_chain):
response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK,
data=json.dumps(TelegramUpdates.TEXT_MALFORMED_NO_MESSAGE),
follow_redirects=True,
headers=TelegramUpdates.HEADERS)
self.assertTrue(response.status_code == 200,
'Failed to return status code 200 for an Update with '
'a malformed Message')
self.assertEqual({}, json.loads(response.data),
'Failed to return an empty JSON for an Update with '
'a malformed Message')
mocked_send_to_chat.apply_async.assert_called_with(args=[{}])
with self.assertRaises(AssertionError) as chain_err:
mocked_celery_chain.assert_called()
self.assertIn("Expected 'celery_chain' to have been called",
str(chain_err.exception))
self.assertEqual(mocked_celery_chain.call_args_list, [])
def test_handle_update_id_already_used(self, mocked_send_to_chat, mocked_celery_chain):
# we don't need to test celery tasks in the view
# that's objective for a separate test suite
mocked_send_to_chat.return_value = None
mocked_celery_chain.return_value = None
response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK,
data=json.dumps(TelegramUpdates.TEXT_SAME_UPDATE_ID),
follow_redirects=True,
headers=TelegramUpdates.HEADERS)
self.assertTrue(response.status_code == 200,
'Failed to return status code 200 for an Update with '
'an ID already used')
self.assertEqual({}, json.loads(response.data),
'Failed to return an empty JSON for an Update with '
'an ID already used')
mocked_send_to_chat.apply_async.assert_called_with(args=[{}])
with self.assertRaises(AssertionError) as chain_err:
mocked_celery_chain.assert_called()
self.assertIn("Expected 'celery_chain' to have been called",
str(chain_err.exception))
self.assertEqual(mocked_celery_chain.call_args_list, [])
def test_handle_valid_input(self, mocked_send_to_chat, mocked_celery_chain):
response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK,
data=json.dumps(TelegramUpdates.TEXT_OK_ID_OK_TEXT),
follow_redirects=True,
headers=TelegramUpdates.HEADERS)
self.assertTrue(response.status_code == 200,
'Failed to return status code 200 for a valid input')
self.assertNotEqual({}, response.data,
'Failed to return a non-empty JSON for a valid input')
self.assertEqual(TelegramUpdates.TEXT_OK_ID_OK_TEXT,
json.loads(response.data),
'Failed to return an Update itself for a valid input Update')
# watch out! parse_update eliminates Updates with message_id already processed
# so we have to use not parsed Update here
mocked_update = {'chat_id': TelegramUpdates.TEXT_OK_ID_OK_TEXT['message']['chat']['id'],
'reply_to_message_id': TelegramUpdates.TEXT_OK_ID_OK_TEXT['message']['message_id'],
'text': TelegramUpdates.TEXT_OK_ID_OK_TEXT['message']['text']}
mocked_celery_chain.assert_called_with(mocked_update)
with self.assertRaises(AssertionError) as send_err:
mocked_send_to_chat.assert_called()
self.assertIn("Expected 'send_message_to_chat' to have been called",
str(send_err.exception))
self.assertEqual(mocked_send_to_chat.call_args_list, [])
def update(cls, **kwargs):
cur = get_db().cursor()
query = 'UPDATE %s SET %s WHERE %s = ?' % (
cls.__table_name__,
', '.join("'%s' = ?" % str(x) for x in cls.__fields__),cls.__priamry_key__)
data = ()
for f in cls.__fields__:
if f in cls.__json_fields__:
data = data + (json.dumps(kwargs.get(f)),)
else:
data = data + (kwargs.get(f),)
data = data + (kwargs.get(cls.__priamry_key__),)
cur.execute(query, data)
get_db().commit()
return cls(kwargs)
def git_status(self):
repo = Repo('./')
o = repo.remotes.origin
o.fetch()
# Tags
tags = []
for t in repo.tags:
tags.append({"name": t.name, "commit": str(t.commit), "date": t.commit.committed_date,
"committer": t.commit.committer.name, "message": t.commit.message})
try:
branch_name = repo.active_branch.name
# test1
except:
branch_name = None
changes = []
commits_behind = repo.iter_commits('master..origin/master')
for c in list(commits_behind):
changes.append({"committer": c.committer.name, "message": c.message})
return json.dumps({"tags": tags, "headcommit": str(repo.head.commit), "branchname": branch_name,
"master": {"changes": changes}})
def get(self):
conn = None
try:
if not os.path.exists(self.api.app.config['UPLOAD_FOLDER'] + '/kbh.db'):
self.api.notify(headline="File Not Found", message="Please upload a Kleiner Brauhelfer Database", type="danger")
return ('', 404)
conn = sqlite3.connect(self.api.app.config['UPLOAD_FOLDER'] + '/kbh.db')
c = conn.cursor()
c.execute('SELECT ID, Sudname, BierWurdeGebraut FROM Sud')
data = c.fetchall()
result = []
for row in data:
result.append({"id": row[0], "name": row[1], "brewed": row[2]})
return json.dumps(result)
except Exception as e:
print e
self.api.notify(headline="Failed to load KHB database", message="ERROR", type="danger")
return ('', 500)
finally:
if conn:
conn.close()
def get_logs_as_json(self, t, id):
data = request.json
result = []
if t == "s":
name = cbpi.cache.get("sensors").get(id).name
result.append({"name": name, "data": self.read_log_as_json("sensor", id)})
if t == "k":
kettle = cbpi.cache.get("kettle").get(id)
result = map(self.convert_chart_data_to_json, cbpi.get_controller(kettle.logic).get("class").chart(kettle))
if t == "f":
fermenter = cbpi.cache.get("fermenter").get(id)
result = map(self.convert_chart_data_to_json, cbpi.get_fermentation_controller(fermenter.logic).get("class").chart(fermenter))
return json.dumps(result)