def log(msg, level=xbmc.LOGNOTICE):
"""
Outputs message to log file
:param msg: message to output
:param level: debug levelxbmc. Values:
xbmc.LOGDEBUG = 0
xbmc.LOGERROR = 4
xbmc.LOGFATAL = 6
xbmc.LOGINFO = 1
xbmc.LOGNONE = 7
xbmc.LOGNOTICE = 2
xbmc.LOGSEVERE = 5
xbmc.LOGWARNING = 3
"""
log_message = u'{0}: {1}'.format(addonID, msg)
xbmc.log(log_message.encode("utf-8"), level)
python类LOGDEBUG的实例源码
def run(self):
self.daemon_active = True
while not self.__exit:
log_msg("Start Spotify Connect Daemon")
#spotty_args = ["-v"]
spotty_args = ["--onstart", "curl -s -f -m 2 http://localhost:%s/playercmd/start" % PROXY_PORT,
"--onstop", "curl -s -f -m 2 http://localhost:%s/playercmd/stop" % PROXY_PORT]
self.__spotty_proc = self.__spotty.run_spotty(arguments=spotty_args)
thread.start_new_thread(self.fill_fake_buffer, ())
while not self.__exit:
line = self.__spotty_proc.stderr.readline().strip()
if line:
log_msg(line, xbmc.LOGDEBUG)
if self.__spotty_proc.returncode and self.__spotty_proc.returncode > 0 and not self.__exit:
# daemon crashed ? restart ?
break
self.daemon_active = False
log_msg("Stopped Spotify Connect Daemon")
def __next__(self):
# For the most part, the buffer-filling thread should prevent the need for waiting here,
# but wait exponentially (up to about 32 seconds) for it to fill before giving up.
log_msg("Spotify radio track buffer asked for next item", xbmc.LOGDEBUG)
attempts = 0
while attempts <= 5:
self._buffer_lock.acquire()
if len(self._buffer) <= self.MIN_BUFFER_SIZE:
self._buffer_lock.release()
sleep_time = pow(2, attempts)
log_msg("Spotify radio track buffer empty, sleeping for %d seconds" % sleep_time, xbmc.LOGDEBUG)
time.sleep(sleep_time)
attempts += 1
else:
track = self._buffer.pop(0)
self._buffer_lock.release()
log_msg("Got track '%s' from Spotify radio track buffer" % track["id"], xbmc.LOGDEBUG)
return track
raise StopIteration
# Support both Python 2.7 & Python 3.0
def onInit(self):
xbmc.log(msg="[Match Center] Script started", level=xbmc.LOGDEBUG)
if os.path.exists(ignored_league_list_file):
self.ignored_leagues = [league.lower() for league in eval(FileIO.fileread(ignored_league_list_file)) if league]
else:
self.ignored_leagues = []
xbmc.executebuiltin("ClearProperty(no-games,Home)")
self.getControl(32540).setImage(os.path.join(addon_path,"resources","img","goal.png"))
xbmc.executebuiltin("SetProperty(loading-script-matchcenter-livescores,1,home)")
self.livescoresThread()
xbmc.executebuiltin("ClearProperty(loading-script-matchcenter-livescores,Home)")
i = 0
while self.isRunning:
if (float(i*200)/(livescores_update_time*60*1000)).is_integer() and ((i*200)/(3*60*1000)) != 0:
self.livescoresThread()
xbmc.sleep(200)
i += 1
xbmc.log(msg="[Match Center] Script stopped", level=xbmc.LOGDEBUG)
def trace(method):
# @trace decorator
def method_trace_on(*args, **kwargs):
start = time.time()
result = method(*args, **kwargs)
end = time.time()
log('{name!r} time: {time:2.4f}s args: |{args!r}| kwargs: |{kwargs!r}|'
.format(name=method.__name__,time=end - start, args=args, kwargs=kwargs), LOGDEBUG)
return result
def method_trace_off(*args, **kwargs):
return method(*args, **kwargs)
if __is_debugging():
return method_trace_on
else:
return method_trace_off
def log(self, txt = '', level=xbmc.LOGDEBUG):
''' Log a text into the Kodi-Logfile '''
try:
if self.detailLevel > 0 or level == xbmc.LOGERROR:
if self.detailLevel == 2 and level == xbmc.LOGDEBUG:
# More Logging
level = xbmc.LOGNOTICE
elif self.detailLevel == 3 and (level == xbmc.LOGDEBUG or level == xbmc.LOGSEVERE):
# Complex Logging
level = xbmc.LOGNOTICE
if level != xbmc.LOGSEVERE:
if isinstance(txt, unicode):
txt = unidecode(txt)
xbmc.log(b"[%s] %s" % (self.pluginName, txt), level)
except:
xbmc.log(b"[%s] Unicode Error in message text" % self.pluginName, xbmc.LOGERROR)
def __init__(self):
# Class initialisation. Fails with a RuntimeError exception if VPN Manager add-on is not available, or too old
self.filtered_addons = []
self.filtered_windows = []
self.primary_vpns = []
self.last_updated = 0
self.refreshLists()
self.default = self.getConnected()
xbmc.log("VPN Mgr API : Default is " + self.default, level=xbmc.LOGDEBUG)
self.timeout = 30
if not xbmc.getCondVisibility("System.HasAddon(service.vpn.manager)"):
xbmc.log("VPN Mgr API : VPN Manager is not installed, cannot use filtering", level=xbmc.LOGERROR)
raise RuntimeError("VPN Manager is not installed")
else:
v = int((xbmcaddon.Addon("service.vpn.manager").getAddonInfo("version").strip()).replace(".",""))
if v < 310:
raise RuntimeError("VPN Manager " + str(v) + " installed, but needs to be v3.1.0 or later")
def connectToValidated(self, connection, wait):
# Given the number of a validated connection, connect to it. Return True when the connection has happened
# or False if there's a problem connecting (or there's not a VPN connection available for this connection
# number). Return True without messing with the connection if the current VPN is the same as the VPN being
# requested. The wait parameter will determine if the function returns once the connection has been made,
# or if it's fire and forget (in which case True will be returned regardless)
if not self.isVPNSetUp(): return False
if connection < 1 or connection > 10: return False
connection = connection - 1
self.refreshLists()
if self.primary_vpns[connection] == "": return False
if self.getConnected() == self.primary_vpns[connection]: return True
xbmc.log(msg="VPN Mgr API : Connecting to " + self.primary_vpns[connection], level=xbmc.LOGDEBUG)
self.setAPICommand(self.primary_vpns[connection])
if wait: return self.waitForConnection(self.primary_vpns[connection])
return True
def __init__(self, addon, force):
self.needReset = False
self.fetchError = False
self.start = True
self.force = force
'''
self.xmltvInterval = int(addon.getSetting('sd.interval'))
self.logoSource = int(addon.getSetting('logos.source'))
self.addonsType = int(addon.getSetting('addons.ini.type'))
# make sure the folder in the user's profile exists or create it!
if not os.path.exists(self.PLUGIN_DATA):
os.makedirs(self.PLUGIN_DATA)
# make sure the ini file is fetched as well if necessary
if self.addonsType != self.INI_TYPE_DEFAULT:
customFile = str(addon.getSetting('addons.ini.file'))
if os.path.exists(customFile):
# uses local file provided by user!
xbmc.log('[%s] Use local file: %s' % (ADDON.getAddonInfo('id'), customFile), xbmc.LOGDEBUG)
else:
# Probably a remote file
xbmc.log('[%s] Use remote file: %s' % (ADDON.getAddonInfo('id'), customFile), xbmc.LOGDEBUG)
self.updateLocalFile(customFile, addon, True, force=force)
'''
def scan_empty(self, datas):
cats = []
def addCat(name, img, **kargs):
cats.append(kargs)
vids = []
def addVid(title, vurl, img):
vids.append(1)
self_module = sys.modules[__name__]
self_module.addDir = addCat
self_module.addLink = addVid
self.get_categories(False)
new_id2skip = []
i = 0
nb = len(cats)
cat_done = []
for cat in cats:
xbmc.log(str(i) + '/' + str(nb),xbmc.LOGDEBUG)
i += 1
vids = []
self.get_videos(cat)
if not len(vids):
new_id2skip.append(cat['id'])
cat_done.append(cat['id'])
xbmc.log('done: ' + ',' .join(cat_done),xbmc.LOGDEBUG)
xbmc.log('id2skip: ' + ','.join(new_id2skip),xbmc.LOGDEBUG)
def enableService(self, serviceId, addonFile):
import xbmc
try:
srvThread = execfile(addonFile, sys.modules['__main__'].__dict__)
except Exception as e:
srvThread = None
msg, loglevel = str(e), xbmc.LOGERROR
else:
msg = 'Service %s, succesfully loaded from %s'
msg, loglevel = msg % (serviceId, addonFile), xbmc.LOGDEBUG
finally:
xbmc.log(msg, loglevel)
if loglevel == xbmc.LOGERROR:
msg = traceback.format_exc()
xbmc.log(msg, xbmc.LOGERROR)
return srvThread
def trace(method):
# @debug decorator
def method_trace_on(*args, **kwargs):
start = time.time()
result = method(*args, **kwargs)
end = time.time()
log('{name!r} time: {time:2.4f}s args: |{args!r}| kwargs: |{kwargs!r}|'.format(name=method.__name__, time=end - start, args=args, kwargs=kwargs), LOGDEBUG)
return result
def method_trace_off(*args, **kwargs):
return method(*args, **kwargs)
if __is_debugging():
return method_trace_on
else:
return method_trace_off
def addon_enabled(self, addon_id):
rpc_request = json.dumps({"jsonrpc": "2.0",
"method": "Addons.GetAddonDetails",
"id": 1,
"params": {"addonid": "%s" % addon_id,
"properties": ["enabled"]}
})
response = json.loads(xbmc.executeJSONRPC(rpc_request))
try:
return response['result']['addon']['enabled'] is True
except KeyError:
message = response['error']['message']
code = response['error']['code']
error = 'Requested |%s| received error |%s| and code: |%s|' % (rpc_request, message, code)
xbmc.log(error, xbmc.LOGDEBUG)
return False
def set_addon_enabled(self, addon_id, enabled=True):
rpc_request = json.dumps({"jsonrpc": "2.0",
"method": "Addons.SetAddonEnabled",
"id": 1,
"params": {"addonid": "%s" % addon_id,
"enabled": enabled}
})
response = json.loads(xbmc.executeJSONRPC(rpc_request))
try:
return response['result'] == 'OK'
except KeyError:
message = response['error']['message']
code = response['error']['code']
error = 'Requested |%s| received error |%s| and code: |%s|' % (rpc_request, message, code)
xbmc.log(error, xbmc.LOGDEBUG)
return False
def log(txt, log_level=xbmc.LOGDEBUG):
"""
Log something to the kodi.log file
:param txt: Text to write to the log
:param log_level: Severity of the log text
:return: None
"""
if (_addon.getSetting("debug") == "true") or (log_level != xbmc.LOGDEBUG):
if isinstance(txt, str):
try:
txt = txt.decode("utf-8")
except UnicodeDecodeError:
xbmc.log('Could not decode to Unicode: {0}'.format(txt), level=xbmc.LOGWARNING)
return
message = u'%s: %s' % (_addonid, txt)
xbmc.log(msg=message.encode("utf-8"), level=log_level)
def addon_enabled(self, addon_id):
rpc_request = json.dumps({"jsonrpc": "2.0",
"method": "Addons.GetAddonDetails",
"id": 1,
"params": {"addonid": "%s" % addon_id,
"properties": ["enabled"]}
})
response = json.loads(xbmc.executeJSONRPC(rpc_request))
try:
return response['result']['addon']['enabled'] is True
except KeyError:
message = response['error']['message']
code = response['error']['code']
error = 'Requested |%s| received error |%s| and code: |%s|' % (rpc_request, message, code)
xbmc.log(error, xbmc.LOGDEBUG)
return False
def set_addon_enabled(self, addon_id, enabled=True):
rpc_request = json.dumps({"jsonrpc": "2.0",
"method": "Addons.SetAddonEnabled",
"id": 1,
"params": {"addonid": "%s" % addon_id,
"enabled": enabled}
})
response = json.loads(xbmc.executeJSONRPC(rpc_request))
try:
return response['result'] == 'OK'
except KeyError:
message = response['error']['message']
code = response['error']['code']
error = 'Requested |%s| received error |%s| and code: |%s|' % (rpc_request, message, code)
xbmc.log(error, xbmc.LOGDEBUG)
return False
def AddAudioEntry(self, a):
title = a.get("artist")
if title:
title += u" : "
title += a.get("title")
d = unicode(datetime.timedelta(seconds=int(a["duration"])))
listTitle = d + u" - " + title
listitem = xbmcgui.ListItem(PrepareString(listTitle))
xbmc.log(str(a),xbmc.LOGDEBUG)
listitem.setInfo(type='Music', infoLabels={'title': a.get("title") or "",
'artist': a.get("artist") or "",
'album': a.get("artist") or "",
'duration': a.get('duration') or 0})
listitem.setProperty('mimetype', 'audio/mpeg')
xbmcplugin.addDirectoryItem(self.handle, a["url"], listitem, False)
def GetAlbums(self, uid=None):
albums=None
if uid:
albums=self.api.call("photos.getAlbums", need_covers=1, uid=uid)
else:
albums=self.api.call("photos.getAlbums", need_covers=1)
items = []
for a in albums:
xbmc.log(a.get('thumb_src'),xbmc.LOGDEBUG)
e = ( a["title"] + unicode(" (%s photo)" % a["size"]),
a["description"],
a.get('thumb_src'),
str(a["aid"]),
a["owner_id"] )
items.append(e)
return items
def __init__(self):
# Class initialisation. Fails with a RuntimeError exception if VPN Manager add-on is not available, or too old
self.filtered_addons = []
self.filtered_windows = []
self.primary_vpns = []
self.last_updated = 0
self.refreshLists()
self.default = self.getConnected()
xbmc.log("VPN Mgr API : Default is " + self.default, level=xbmc.LOGDEBUG)
self.timeout = 30
if not xbmc.getCondVisibility("System.HasAddon(service.vpn.manager)"):
xbmc.log("VPN Mgr API : VPN Manager is not installed, cannot use filtering", level=xbmc.LOGERROR)
raise RuntimeError("VPN Manager is not installed")
else:
v = int((xbmcaddon.Addon("service.vpn.manager").getAddonInfo("version").strip()).replace(".",""))
if v < 310:
raise RuntimeError("VPN Manager " + str(v) + " installed, but needs to be v3.1.0 or later")
def connectToValidated(self, connection, wait):
# Given the number of a validated connection, connect to it. Return True when the connection has happened
# or False if there's a problem connecting (or there's not a VPN connection available for this connection
# number). Return True without messing with the connection if the current VPN is the same as the VPN being
# requested. The wait parameter will determine if the function returns once the connection has been made,
# or if it's fire and forget (in which case True will be returned regardless)
if not self.isVPNSetUp(): return False
if connection < 1 or connection > 10: return False
connection = connection - 1
self.refreshLists()
if self.primary_vpns[connection] == "": return False
if self.getConnected() == self.primary_vpns[connection]: return True
xbmc.log(msg="VPN Mgr API : Connecting to " + self.primary_vpns[connection], level=xbmc.LOGDEBUG)
self.setAPICommand(self.primary_vpns[connection])
if wait: return self.waitForConnection(self.primary_vpns[connection])
return True
def log_msg(msg, loglevel=xbmc.LOGDEBUG):
'''log to kodi logfile'''
if isinstance(msg, unicode):
msg = msg.encode('utf-8')
xbmc.log("Skin Helper Backup --> %s" % msg, level=loglevel)
SubEsUtilities.py 文件源码
项目:service.subtitles.tusubtitulo
作者: josecurioso2
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def log(module, msg):
xbmc.log((u"### [%s] - %s" % (module,msg,)).encode('utf-8'), level=xbmc.LOGDEBUG)
def log(message,loglevel=xbmc.LOGNOTICE):
""""save message to kodi.log.
Args:
message: has to be unicode, http://wiki.xbmc.org/index.php?title=Add-on_unicode_paths#Logging
loglevel: xbmc.LOGDEBUG, xbmc.LOGINFO, xbmc.LOGNOTICE, xbmc.LOGWARNING, xbmc.LOGERROR, xbmc.LOGFATAL
"""
xbmc.log(encode(__addon_id__ + u": " + message), level=loglevel)
def debug(content):
"""
Outputs content to log file
:param content: content which should be output
:return: None
"""
if type(content) is str:
message = unicode(content, "utf-8")
else:
message = content
log(message, xbmc.LOGDEBUG)
def _fill_buffer(self):
while self._running:
self._buffer_lock.acquire()
if len(self._buffer) <= self.MIN_BUFFER_SIZE:
log_msg("Spotify radio track buffer was %d, below minimum size of %d - filling" %
(len(self._buffer), self.MIN_BUFFER_SIZE), xbmc.LOGDEBUG)
self._buffer += self._fetch()
self._buffer_lock.release()
else:
self._buffer_lock.release()
time.sleep(self.CHECK_BUFFER_PERIOD)
def _fetch(self):
log_msg("Spotify radio track buffer invoking recommendations() via spotipy", xbmc.LOGDEBUG)
try:
auth_token = xbmc.getInfoLabel("Window(Home).Property(spotify-token)").decode("utf-8")
client = spotipy.Spotify(auth_token)
tracks = client.recommendations(
seed_tracks=[t["id"] for t in self._buffer[0: 5]],
limit=self.FETCH_SIZE)["tracks"]
log_msg("Spotify radio track buffer got %d results back" % len(tracks))
return tracks
except Exception:
log_exception("SpotifyRadioTrackBuffer", "Failed to fetch recommendations, returning empty result")
return []
def log_exception(modulename, exceptiondetails):
'''helper to properly log an exception'''
log_msg(format_exc(sys.exc_info()), xbmc.LOGDEBUG)
log_msg("Exception in %s ! --> %s" % (modulename, exceptiondetails), xbmc.LOGWARNING)
def request_token_spotty(spotty):
'''request token by using the spotty binary'''
token_info = None
if spotty.playback_supported:
try:
args = ["-t", "--client-id", CLIENTID, "--scope", ",".join(SCOPE), "-n", "temp"]
spotty = spotty.run_spotty(arguments=args)
stdout, stderr = spotty.communicate()
result = None
log_msg(stdout, xbmc.LOGDEBUG)
for line in stdout.split():
line = line.strip()
if line.startswith("{\"accessToken\""):
result = eval(line)
# transform token info to spotipy compatible format
if result:
token_info = {}
token_info["access_token"] = result["accessToken"]
token_info["expires_in"] = result["expiresIn"]
token_info["token_type"] = result["tokenType"]
token_info["scope"] = ' '.join(result["scope"])
token_info['expires_at'] = int(time.time()) + token_info['expires_in']
token_info['refresh_token'] = result["accessToken"]
log_msg("Token from spotty: %s" % token_info, xbmc.LOGDEBUG)
except Exception as exc:
log_exception(__name__, exc)
return token_info
def log(self, message, level=xbmc.LOGDEBUG):
"""
Add message to Kodi log starting with Addon ID
:param message: message to be written into the Kodi log
:type message: str
:param level: log level. :mod:`xbmc` module provides the necessary symbolic constants.
Default: ``xbmc.LOGDEBUG``
:type level: int
"""
if isinstance(message, unicode):
message = message.encode('utf-8')
xbmc.log('{0} [v.{1}]: {2}'.format(self.id, self.version, message), level)