def main():
args, cmd, capturer = arg.parse_args()
log.configure_logging(args.output_directory, args.log_to_stderr)
log.log_header()
result = cache.retrieve(cmd, args, capturer)
if not result:
print "DLJC: Build command failed."
sys.exit(1)
javac_commands, jars, stats = result
log.info('Results: %s', pprint.pformat(javac_commands))
output_json(os.path.join(args.output_directory, 'javac.json'), javac_commands)
output_json(os.path.join(args.output_directory, 'jars.json'), jars)
output_json(os.path.join(args.output_directory, 'stats.json'), stats)
tools.run(args, javac_commands, jars)
python类info()的实例源码
def self_correct(bot, event, irc, args):
match = re.match(r"^s[/](.*)[/](.*)[/]?$", " ".join(args))
if match is not None:
nick = event.source.nick
channel = event.target
for i in bot.userdb[channel][nick]['seen']:
msg = i['message']
output = msg.replace(match.group(1), match.group(2))
if msg == output:
pass
else:
break
irc.reply(event, '<{0}> {1}'.format(nick, output))
log.info('Changing %s to %s', msg, output)
else:
pass
def user_correct(bot, event, irc, args):
match = re.match(r"^u[/]([\w]+)[/](.*)[/](.*)[/]?$", " ".join(args))
if match is not None:
nick = match.group(1)
channel = event.target
for i in bot.userdb[channel][nick]['seen']:
msg = i['message']
output = msg.replace(match.group(2), match.group(3))
if msg == output:
pass
else:
break
irc.reply(event, '<{0}> {1}'.format(nick, output))
log.info('Changing %s to %s', args, output)
else:
pass
def prepare_xml_product(product_infos):
xml_data="<customer_product>\
<is_active>1</is_active>\
<is_from_vendor>0</is_from_vendor>\
<currency_id>58</currency_id>\
<vat_id>607</vat_id>\
<activity_classification_choice>commerce</activity_classification_choice>\
<type_of_product_id>20004</type_of_product_id>"
for tag, value in product_infos.iteritems():
if tag in INCWO_PARAMS:
xml_data+="<"+tag+">"+str(value).replace("& ","& ")+"</"+tag+">"
# log.debug("xml info of product : tag {}, value {} ".format(tag, value))
xml_data+="</customer_product>"
return xml_data
def start(self, join=False):
self._t = threading.Thread(target=self._server.serve_forever)
self._t.setDaemon(True) # don't hang on exit
self._t.start()
log.info("Listening on %s", self.address)
if join:
self._t.join()
def stop(self):
log.info("Closing server...")
self._server.shutdown()
self._server.server_close()
# self._t.join()
def close(self):
if self._com is not None:
log.info("Closing connection to: %s", self.dev)
self._com.close()
def _connect(self):
try:
if(self.dev == ""):
SerialGamePad.findSerialDevices(self._hardwareID)
if len(SerialGamePad.foundDevices) > 0:
self.dev = SerialGamePad.foundDevices[0]
log.info("Using COM Port: %s", self.dev)
try:
self._com = serial.Serial(self.dev, timeout=5)
except serial.SerialException as e:
ports = SerialGamePad.findSerialDevices(self._hardwareID)
error = "Invalid port specified. No COM ports available."
if len(ports) > 0:
error = "Invalid port specified. Try using one of: \n" + \
"\n".join(ports)
log.info(error)
raise SerialPadError(error)
packet = SerialGamePad._generateHeader(CMDTYPE.INIT, 0)
self._com.write(packet)
resp = self._com.read(1)
if len(resp) == 0:
SerialGamePad._comError()
return ord(resp)
except serial.SerialException as e:
error = "Unable to connect to the device. Please check that it is connected and the correct port is selected."
log.exception(e)
log.error(error)
raise e
def click_handler(channel, click_type, was_queued, time_diff):
log.info(channel.bd_addr + " " + str(click_type))
if str(click_type) == 'ClickType.ButtonSingleClick':
try:
log.info("Switching on lights associated with button " + channel.bd_addr)
for light in groups[channel.bd_addr]['group']:
bridge.get(light).on()
except KeyError:
log.warning("Light not found for button " + str(channel.bd_addr))
elif str(click_type) == 'ClickType.ButtonHold':
# turn off all lights
log.info("Turning off all lights...")
for light in bridge:
light.off()
return
def got_button(bd_addr):
cc = fliclib.ButtonConnectionChannel(bd_addr)
# Assign function to call when a button is clicked
cc.on_button_single_or_double_click_or_hold = click_handler
cc.on_connection_status_changed = \
lambda channel, connection_status, disconnect_reason: \
log.info(channel.bd_addr + " " + str(connection_status) + (" " + str(disconnect_reason) if connection_status == fliclib.ConnectionStatus.Disconnected else ""))
client.add_connection_channel(cc)
def got_info(items):
log.info('Checking verified flic buttons')
for bd_addr in items["bd_addr_of_verified_buttons"]:
log.success(bd_addr)
got_button(bd_addr)
def call_command(bot, event, irc, arguments):
command = ' '.join(arguments).split(' ')
if not command[0].startswith("?"):
del command[0]
name = command[0]
else:
name = command[0][1:]
if not name == '' and not name.find("?") != -1:
privmsg = event.target == bot.config['nickname']
args = command[1:] if len(command) > 1 else ''
host = event.source.host
chan = event.target if not privmsg else False
try:
perms = commands[name]['perms']
min_args = commands[name]['minArgs']
if check_perms(host, chan, owner=perms[2], admin=perms[1],
trusted=perms[0]):
if len(args) < min_args:
irc.reply(event, config.argsMissing)
else:
target = "a private message" if privmsg else event.target
source = event.source
log.info("%s called %s in %s", source, name, target)
commands[name]['func'](bot, event, irc, args)
else:
if not event.source.host.find("/bot/"):
irc.reply(event, config.noPerms)
except KeyError:
irc.notice(event.source.nick, config.invalidCmd.format(name))
except Exception:
irc.reply(event, 'Oops, an error occured!')
print_error(irc, event)
def add_ignore(irc, event, args):
host = args[0]
base_message = "Ignoring %s for %s seconds"
indefinite = "Ignoring %s indefinately"
if len(args) > 1:
if args[1] == 'random':
duration = random.randrange(100, 10000)
expires = duration + int(time.time())
else:
duration = int(args[1])
expires = duration + int(time.time())
else:
expires = None
channel = args[2] if len(args) > 2 else None
if channel is not None:
try:
i = config.ignores['channels'][channel]
except KeyError:
i = config.ignores['channels'][channel] = []
i.append([host, expires])
else:
i = config.ignores['global']
i.append([host, expires])
if expires is not None:
if channel is not None:
logging.info(base_message + " in %s", host, duration, channel)
else:
logging.info(base_message, host, duration)
else:
if channel is not None:
logging.info(indefinite + " in %s", host, channel)
else:
logging.info(indefinite, host)
def on_endofmotd(event, irc):
log.info("Received MOTD from network")
def on_ctcp(irc, event, raw):
log.info("Received CTCP reply " + raw)
def on_join(self, event, irc):
if event.source.nick == self.config['nickname']:
log.info("Joining %s", event.target)
if event.target not in self.userdb:
self.userdb[event.target] = {}
irc.send("WHO {0} nuhs%nhuac".format(event.target))
irc.send("NAMES {0}".format(event.target))
else:
irc.send("WHO {0} nuhs%nhuac".format(event.source.nick))
def on_invite(event, irc):
hostmask = event.source.host
channel = event.arguments[0]
if util.check_perms(hostmask, channel, trusted=True):
log.info("Invited to %s by %s", channel, hostmask)
irc.join(channel)
def on_notice(self, event, irc):
source = event.source.host
if not event.target == "*":
if not event.target == self.config['nickname']:
channel = event.target
log.info("Received channel notice from %s in %s",
source,
channel)
else:
log.info("Received private notice from %s", source)
def on_endofnames(event, irc):
log.info('Received end of NAMES reply.')
def run_test(self, suite):
if not os.path.exists('report'): # ??????????????????
os.makedirs('report')
report_name = "report\{}-{}.html".format("report",
time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime(time.time())))
with open(report_name, "wb") as f:
runner = BSTestRunner(stream=f, title='{}???????'.format(self.name))
runner.run(suite)
log.info("{}?????????????".format(self.name))
os.system("start {}".format(report_name))
def run(self, test):
"Run the given test case or test suite."
log.info("??????...")
result = _TestResult(self.verbosity)
test(result)
self.stopTime = datetime.datetime.now()
self.generateReport(test, result)
print('\nTime Elapsed: %s' % (self.stopTime - self.startTime), file=sys.stderr)
return result
def main():
db.init_db_engine(config.SQLALCHEMY_DATABASE_URI)
total = db.data.get_count_pending_songs()
done = 0
starttime = time.time()
thisdone, rem = lookup()
done += thisdone
while rem > 0:
thisdone, rem = lookup()
done += thisdone
durdelta, remdelta = util.stats(done, total, starttime)
log.info("Done %s/%s in %s; %s remaining", done, total, str(durdelta), str(remdelta))
def get(self):
ticket = DataCenter().get_jsapi_ticket()
sign = Sign(ticket, 'http://www.test.com/wx/getjsapiticket')
sign_str = sign.sign()
#print 'weixin_JSAPI_ticket: '
#print sign_str
log.info('weixin_JSAPI_ticket: %s'%(sign_str))
self.write(sign_str)
def post(self):
body = json.loads(self.request.body)
response = {'code': 0}
if body.has_key('songId'):
search_result = BaiduMusicSearch().search_song_byid(body['songId'])
response['searchResult'] = search_result
else:
response['code'] = -1
response['error'] = "not found song id."
json_data = json.dumps(response, ensure_ascii=False)
log.info(json_data)
self.write(json_data)
def get_access_token(self):
appId = "appId"
appSecret = "appSecret"
postUrl = ("https://api.weixin.qq.com/cgi-bin/token?grant_type="
"client_credential&appid=%s&secret=%s" % (appId, appSecret))
urlResp = urllib.urlopen(postUrl)
urlResp = json.loads(urlResp.read())
if urlResp.has_key('access_token'):
self.__accessToken = urlResp['access_token']
self.__leftTime = urlResp['expires_in']
#get jsapi_ticket
postUrl = ("https://api.weixin.qq.com/cgi-bin/ticket/getticket?access_token=%s&type=jsapi" % (self.__accessToken))
urlResp = urllib.urlopen(postUrl)
urlResp = json.loads(urlResp.read())
if urlResp.has_key('ticket') and urlResp['errcode']==0:
self.__jsapi_ticket = urlResp['ticket']
# restore in datacenter
self.__data_global_obj.set_access_token(self.__accessToken)
self.__data_global_obj.set_jsapi_ticket(self.__jsapi_ticket)
#print "access_token: %s" % self.__accessToken
#print "saved access_token: %s" % DataCenter().get_access_token()
#print "expires_in: %s" % self.__leftTime
log.info("access_token: %s"%(self.__accessToken))
log.info("saved access_token: %s"%(DataCenter().get_access_token()))
log.info("expires_in: %s"%(self.__leftTime))
def query_menu(self):
postUrl = "https://api.weixin.qq.com/cgi-bin/menu/get?access_token=%s" % self.__data_global_obj.get_access_token()
urlResp = urllib.urlopen(url=postUrl)
#print urlResp.read()
log.info(urlResp.read())
def delete_menu(self):
postUrl = "https://api.weixin.qq.com/cgi-bin/menu/delete?access_token=%s" % self.__data_global_obj.get_access_token()
urlResp = urllib.urlopen(url=postUrl)
#print urlResp.read()
log.info(urlResp.read())
#???????????
def get_current_selfmenu_info(self):
postUrl = "https://api.weixin.qq.com/cgi-bin/get_current_selfmenu_info?access_token=%s" % self.__data_global_obj.get_access_token()
urlResp = urllib.urlopen(url=postUrl)
#print urlResp.read()
log.info(urlResp.read())
#### user operation ####
def get_user_info(self, openid):
postUrl = "https://api.weixin.qq.com/cgi-bin/user/info?access_token=%s&openid=%s&lang=zh_CN " \
% (self.__data_global_obj.get_access_token(), openid)
urlResp = urllib.urlopen(postUrl)
recJson = urlResp.read() #read??????????????
#print recJson
log.info(recJson)
urlResp = json.loads(recJson)
#return _decode_dict(urlResp)
return urlResp
def get_access_token(self):
grant_type = 'client_credentials'
client_id = ''
client_secret = ''
#scope = '301 302 303 304 305 306 307'
#scope = 'music_media_basic music_musicdata_basic music_userdata_basic music_search_basic music_media_premium music_audio_premium music_audio_hq'
postUrl = ("https://openapi.baidu.com/oauth/2.0/token?"
"grant_type=%s&client_id=%s&client_secret=%s" %
(grant_type, client_id, client_secret))
postUrl = urllib.quote(postUrl) #URL?????
urlResp = urllib.urlopen(postUrl)
urlResp = urlResp.read()
#print urlResp
urlResp = json.loads(urlResp)
#print urlResp
DebugPrint(urlResp)
if urlResp.has_key('access_token'):
self.__accessToken = urlResp['access_token']
self.__session_key = urlResp['session_key']
#print "baidu music access_token: %s" % self.__accessToken
#print "baidu music session_key: %s" % self.__session_key
log.info("baidu music access_token: %s" % self.__accessToken)
log.info("baidu music session_key: %s" % self.__session_key)