def send_properties(self, **kwargs):
if not self._properties or 'ui_properties' not in self._properties:
return
# Decide the method to use.
if self._instance.game.game == 'tm':
method = 'Trackmania.UI.SetProperties'
else:
method = 'Shootmania.UI.SetProperties'
# Create XML document
try:
xml = xd.unparse(self._properties, full_document=False, short_empty_elements=True)
except Exception as e:
logger.warning('Can\'t convert UI Properties to XML document! Error: {}'.format(str(e)))
return
try:
await self._instance.gbx(method, xml, encode_json=False, response_id=False)
except Exception as e:
logger.warning('Can\'t send UI Properties! Error: {}'.format(str(e)))
return
python类unparse()的实例源码
def prepare_request(self, method, path, params):
kwargs = {}
_params = self.get_base_params()
params.update(_params)
newparams, prestr = params_filter(params)
sign = build_mysign(prestr, self.partner_key)
# ??????unicode xmltodict ???unicode
newparams = params_encoding(newparams)
newparams['sign'] = sign
xml_dict = {'xml': newparams}
kwargs['data'] = smart_str(xmltodict.unparse(xml_dict))
url = self._full_url(path)
if self.mch_cert and self.mch_key:
kwargs['cert'] = (self.mch_cert, self.mch_key)
return method, url, kwargs
# ????
# https://pay.weixin.qq.com/wiki/doc/api/jsapi.php?chapter=9_1
def prepare_request(self, method, path, params):
kwargs = {}
_params = self.get_base_params()
params.update(_params)
newparams, prestr = params_filter(params)
sign = build_mysign(prestr, key=self.partner_key)
# ??????unicode xmltodict ???unicode
newparams = params_encoding(newparams)
newparams['sign'] = sign
xml_dict = {'xml': newparams}
kwargs['data'] = smart_str(xmltodict.unparse(xml_dict))
url = self._full_url(path)
if self.mch_cert and self.mch_key:
kwargs['cert'] = (self.mch_cert, self.mch_key)
return method, url, kwargs
# ????
# https://pay.weixin.qq.com/wiki/doc/api/jsapi.php?chapter=9_1
def setUpClass(self):
self.m_xml = ""
self.m_data = {}
self.m_data['MaltegoMessage'] = {}
self.m_data['MaltegoMessage']['MaltegoTransformRequestMessage'] = {}
self.m_data['MaltegoMessage']['MaltegoTransformRequestMessage']['Entities'] = {}
self.m_data['MaltegoMessage']['MaltegoTransformRequestMessage']['Limits'] = {}
self.m_data['MaltegoMessage']['MaltegoTransformRequestMessage']['Limits']["@HardLimit"] = 50
self.m_data['MaltegoMessage']['MaltegoTransformRequestMessage']['Limits']["@SoftLimit"] = 50
self.m_data['MaltegoMessage']['MaltegoTransformRequestMessage']['TransformFields'] = {}
self.m_data['MaltegoMessage']['MaltegoTransformRequestMessage']['TransformFields']['Field'] = []
self.m_data['MaltegoMessage']['MaltegoTransformRequestMessage']['TransformFields']['Field'].append({"@Name": "api", "#text": "JUSTKIDDING"})
ent_data = {}
ent_data['@Type'] = "IPAddress"
ent_data['Value'] = "127.0.0.1"
ent_data['Weight'] = "100"
ent_data['AdditionalFields'] = {}
ent_data['AdditionalFields']['Field'] = []
ent_data['AdditionalFields']['Field'].append({"@Name": "ipv4-address", "@DisplayName": "IP Address", "#text": "127.0.0.1"})
ent_data['AdditionalFields']['Field'].append({"@Name": "ipaddress.internal", "@DisplayName": "Internal", "#text": "true"})
self.m_data['MaltegoMessage']['MaltegoTransformRequestMessage']['Entities']['Entity'] = ent_data
self.m_xml = xmltodict.unparse(self.m_data)
def instruction_to_svg(self, instruction):
""":return: an SVG representing the instruction.
The SVG file is determined by the type attribute of the instruction.
An instruction of type ``"knit"`` is looked for in a file named
``"knit.svg"``.
Every element inside a group labeled ``"color"`` of mode ``"layer"``
that has a ``"fill"`` style gets this fill replaced by the color of
the instruction.
Example of a recangle that gets filled like the instruction:
.. code:: xml
<g inkscape:label="color" inkscape:groupmode="layer">
<rect style="fill:#ff0000;fill-opacity:1;fill-rule:nonzero"
id="rectangle1" width="10" height="10" x="0" y="0" />
</g>
If nothing was loaded to display this instruction, a default image is
be generated by :meth:`default_instruction_to_svg`.
"""
return xmltodict.unparse(self.instruction_to_svg_dict(instruction))
def write_jmeter_config(self, jconfig, target):
with open(ORIGINAL_JMETER_FILE, 'r') as f:
default_jmeter_config = f.read()
parsed_xml = xmltodict.parse(default_jmeter_config)
xml_short = parsed_xml['jmeterTestPlan']['hashTree']['hashTree']
# Change thread number
xml_short['ThreadGroup']['stringProp'][1]['#text'] = str(jconfig['threads'])
# Change ramp up time
xml_short['ThreadGroup']['stringProp'][2]['#text'] = str(jconfig['ramp-up'])
# Change duration
xml_short['ThreadGroup']['stringProp'][3]['#text'] = str(jconfig['duration'])
# Change target
xml_short['hashTree']['HTTPSamplerProxy']['stringProp'][0]['#text'] = target
# Change port
xml_short['hashTree']['HTTPSamplerProxy']['stringProp'][1]['#text'] = str(jconfig['port'])
# Change path
xml_short['hashTree']['HTTPSamplerProxy']['stringProp'][6]['#text'] = str(jconfig['path'])
with open(NEW_JMETER_FILE, 'w') as f:
f.write(xmltodict.unparse(parsed_xml).encode('utf-8'))
def wx_dict2xml(d):
return xmltodict.unparse({'xml': d}, full_document=False)
def pack(name, dict_data, *args, **kwargs):
if '@xmlns' not in dict_data:
dict_data['@xmlns'] = 'http://chomikuj.pl/'
data = {
's:Envelope': {'s:Body': {name: dict_data}, '@s:encodingStyle': 'http://schemas.xmlsoap.org/soap/encoding/',
'@xmlns:s': 'http://schemas.xmlsoap.org/soap/envelope/'}}
return xmltodict.unparse(data, *args, **kwargs)
def write_config_xml(xmlfile, dict):
try:
with open(xmlfile, "wt") as fo:
xmltodict.unparse(dict, fo, pretty=True)
except IOError as e:
print "Error writing XML file: ", e
return False
return True
def test_known_transaction_known_at_sofort_received(self):
client = Client()
self._create_test_transaction(transaction_id='123-abc-received')
post_data = {'status_notification': {'transaction': '123-abc-received'}}
xml_data = xmltodict.unparse(post_data)
response = client.post('/sofort/notify/', data=xml_data, content_type='application/hal+json')
self.assertEqual(response.status_code, 202)
def test_known_transaction_known_at_sofort_loss(self):
client = Client()
self._create_test_transaction(transaction_id='123-abc-loss')
post_data = {'status_notification': {'transaction': '123-abc-loss'}}
xml_data = xmltodict.unparse(post_data)
response = client.post('/sofort/notify/', data=xml_data, content_type='application/hal+json')
self.assertEqual(response.status_code, 202)
def test_known_transaction_unknown_at_sofort(self):
client = Client()
self._create_test_transaction(transaction_id='123-abc-unknown')
post_data = {'status_notification': {'transaction': '123-abc-unknown'}}
xml_data = xmltodict.unparse(post_data)
response = client.post('/sofort/notify/', data=xml_data, content_type='application/hal+json')
self.assertEqual(response.status_code, 400)
def _winrm_send_input(self, protocol, shell_id, command_id, stdin, eof=False):
rq = {'env:Envelope': protocol._get_soap_header(
resource_uri='http://schemas.microsoft.com/wbem/wsman/1/windows/shell/cmd',
action='http://schemas.microsoft.com/wbem/wsman/1/windows/shell/Send',
shell_id=shell_id)}
stream = rq['env:Envelope'].setdefault('env:Body', {}).setdefault('rsp:Send', {})\
.setdefault('rsp:Stream', {})
stream['@Name'] = 'stdin'
stream['@CommandId'] = command_id
stream['#text'] = base64.b64encode(to_bytes(stdin))
if eof:
stream['@End'] = 'true'
protocol.send_message(xmltodict.unparse(rq))
def _output_convert(output_type, data):
output_switch = {'dict': data,
'raw': data,
'json': json.dumps(data, indent=4),
'xml': xmltodict.unparse({'root': data})}
#output_switch['json'] = output_switch['json'].replace('\n',''),replace('\\','')
#print(output_switch['json'])
return output_switch.get(output_type, None)
def pythonJsonToXml():
dictVal = {
'page': {
'title': 'King Crimson',
'ns': 0,
'revision': {
'id': 547909091,
}
}
}
convertedXml = xmltodict.unparse(dictVal)
print("convertedXml=\n",convertedXml)
###############################################################################
def html2xml(html):
soup = BeautifulSoup(html)
json_ = get_deep_table(soup.table)
return xmltodict.unparse(json_, pretty=True)
def default_instruction_to_svg(self, instruction):
"""As :meth:`instruction_to_svg` but it only takes the ``default.svg``
file into account.
In case no file is found for an instruction in
:meth:`instruction_to_svg`,
this method is used to determine the default svg for it.
The content is created by replacing the text ``{instruction.type}`` in
the whole svg file named ``default.svg``.
If no file ``default.svg`` was loaded, an empty string is returned.
"""
svg_dict = self.default_instruction_to_svg_dict(instruction)
return xmltodict.unparse(svg_dict)
def write_to_file(self, file):
"""Writes the current SVG to the :paramref:`file`.
:param file: a file-like object
"""
xmltodict.unparse(self._structure, file, pretty=True)
def _dump_to_file(self, file):
"""dump to the file"""
xmltodict.unparse(self.object(), file, pretty=True)
def _winrm_send_input(self, protocol, shell_id, command_id, stdin, eof=False):
rq = {'env:Envelope': protocol._get_soap_header(
resource_uri='http://schemas.microsoft.com/wbem/wsman/1/windows/shell/cmd',
action='http://schemas.microsoft.com/wbem/wsman/1/windows/shell/Send',
shell_id=shell_id)}
stream = rq['env:Envelope'].setdefault('env:Body', {}).setdefault('rsp:Send', {})\
.setdefault('rsp:Stream', {})
stream['@Name'] = 'stdin'
stream['@CommandId'] = command_id
stream['#text'] = base64.b64encode(to_bytes(stdin))
if eof:
stream['@End'] = 'true'
protocol.send_message(xmltodict.unparse(rq))
def _request(self, method, path, params=None, data=None, files=None):
if data is not None:
data = xmltodict.unparse({'prestashop': data}).encode('utf-8')
res = requests.request(method, self._get_url(path), auth=(self.key, ''), params=params, data=data, files=files)
return self._check_response(res, xmltodict.parse(res.text)['prestashop'] if not files and res.text else None)
def xmlify(data):
for course in data:
if 'revisions' in course:
course['revisions'] = [OrderedDict(sorted(rev.items()))
for rev in course['revisions']]
data = [OrderedDict(sorted(c.items())) for c in data]
massaged = {'root': {'course': data}}
return xmltodict.unparse(massaged, pretty=True)
def load_data_from_server(term, dry_run=False):
try:
url = build_static_term_url(term)
parsed_data = request_data(url, term)
except BadDataException:
print(f'{term}: static file is invlid xml; attempting database query')
url = build_term_url(term)
try:
parsed_data = request_data(url, term)
except BadDataException:
print(f'{term}: fetching attempt #1 failed')
try:
parsed_data = request_data(url, term)
except BadDataException:
print(f'{term}: fetching attempt #2 failed')
try:
parsed_data = request_data(url, term)
except BadDataException:
print(f'{term}: fetching attempt #3 failed')
print(f'{term}: no xml returned after three tries')
return None
if not parsed_data['searchresults']:
logging.info(f'No data returned for {term}')
return None
# We sort the courses here, before we save it to disk, so that we don't
# need to re-sort every time we load from disk.
parsed_data['searchresults']['course'].sort(key=lambda c: c['clbid'])
# Embed the term into each course individually
for course in parsed_data['searchresults']['course']:
course['term'] = term
if not dry_run:
destination = make_xml_term_path(term)
serialized_data = xmltodict.unparse(parsed_data, pretty=True)
save_data(serialized_data, destination)
logging.debug(f'Fetched {destination}')
return parsed_data
def get_cors(bucket_name):
response = Response()
cors = BUCKET_CORS.get(bucket_name)
if not cors:
# TODO: check if bucket exists, otherwise return 404-like error
cors = {
'CORSConfiguration': []
}
body = xmltodict.unparse(cors)
response._content = body
response.status_code = 200
return response
def get_lifecycle(bucket_name):
response = Response()
lifecycle = BUCKET_LIFECYCLE.get(bucket_name)
if not lifecycle:
# TODO: check if bucket exists, otherwise return 404-like error
lifecycle = {
'LifecycleConfiguration': []
}
body = xmltodict.unparse(lifecycle)
response._content = body
response.status_code = 200
return response
def dict2xml(_dict):
"""Returns a XML string from the input dictionary"""
xml = xmltodict.unparse(_dict, pretty=True)
xmlepured = remove_namespace(xml)
return xmlepured
def dict_to_xml(spec, full_document=False):
"""
Convert dict to XML
Args:
spec(dict): dict to convert
full_document(bool): whether to add XML headers
Returns:
lxml.etree.Element: XML tree
"""
middle = xmltodict.unparse(spec, full_document=full_document, pretty=True)
return lxml.etree.fromstring(middle)
def get_cors(bucket_name):
response = Response()
cors = BUCKET_CORS.get(bucket_name)
if not cors:
# TODO: check if bucket exists, otherwise return 404-like error
cors = {
'CORSConfiguration': []
}
body = xmltodict.unparse(cors)
response._content = body
response.status_code = 200
return response