def query(self, query):
query = textwrap.dedent(query)
if self.debug:
self.debug(query)
url = '%s?query=%s' % (self.endpoint, quote(query))
req = Request(url)
if not self.cache:
req.add_header('cache-control', 'no-cache')
req.add_header('Accept', 'application/sparql-results+json')
try:
res = urlopen(req).read()
except (URLError) as e:
raise WdmapperError(e)
if six.PY3:
res = res.decode('utf8')
if res:
try:
data = json.loads(res)
except ValueError as e:
if res.find(b'QueryTimeoutException') != -1:
e = 'query timeout'
raise WdmapperError(e)
if data and 'results' in data:
result = []
qvars = data['head']['vars']
for row in data['results']['bindings']:
values = {}
for var in qvars:
if var in row:
values[var] = row[var]['value']
else:
values[var] = None
result.append(values)
return result
评论列表
文章目录