def gpu_status(self,av_type_list):
for t in av_type_list:
cmd='nvidia-smi -q --display='+t
#print('\nCMD:',cmd,'\n')
r=os.popen(cmd)
info=r.readlines()
r.close()
content = " ".join(info)
#print('\ncontent:',content,'\n')
index=content.find('Attached GPUs')
s=content[index:].replace(' ','').rstrip('\n')
self.t_send(s, toUserName='filehelper')
time.sleep(.5)
#th.exit()
#==============================================================================
#
#==============================================================================
python类content()的实例源码
def WhatToPaiDui(self, groupName):
msgCount = {}
msgs = self.groupLastMsgsDict[groupName]
for msg in msgs:
if msg['Content'] not in msgCount:
msgCount[msg['Content']] = 0
msgCount[msg['Content']] += 1
contentToPaiDui = [ x for x in msgCount if msgCount[x] > 1 ]
if len(contentToPaiDui) == 0:
# No dui to pai
return
# it's possible that two duis are formed at the same time, but only one can pass the TTL check
for content in contentToPaiDui:
if (groupName, content) not in self.selfPaiDuiTTL or self.selfPaiDuiTTL == 0:
self.selfPaiDuiTTL[(groupName, content)] = self.maxSelfPaiDuiTTL
yield content # We use yield here because we still need to conitnue managing the TTL
else:
self.selfPaiDuiTTL[(groupName, content)] -= 1
def note():
@itchat.msg_register(['Sharing'],isGroupChat=True)
def collect_links(msg):
try:
received_date = datetime.datetime.now()
title = msg['FileName']
url = msg['Url']
sharer = msg['ActualNickName']
text = find_text(content=msg['Content'])
print(title, sharer, text, url, received_date)
store(received_date=received_date,
title=title,
url=url,
sharer=sharer,
text=text)
print('data stored done')
except Exception as e:
print(e)
def text_reply(msg):
# cont = alice.respond(msg['Text'])
cont = requests.get('http://www.tuling123.com/openapi/api?key=?????&info=%s' % msg['Content']).content
m = json.loads(cont)
itchat.send(m['text'], msg['FromUserName'])
if m['code'] == 200000:
itchat.send(m['url'], msg['FromUserName'])
if m['code'] == 302000:
itchat.send(m['list'], msg['FromUserName'])
if m['code'] == 308000:
itchat.send(m['list'], msg['FromUserName'])
def GetMiddleStr(self,content,startStr,endStr):
#get the string between two specified strings
#??????????????
try:
startIndex = content.index(startStr)
if startIndex>=0:
startIndex += len(startStr)
endIndex = content.index(endStr)
return content[startIndex:endIndex]
except:
return ''
#==============================================================================
#
#==============================================================================
def __init__(self, blacklist=[]):
self.blacklist = blacklist
self.groupLastMsgsDict = {}
# A dictionary controlling not pai dui for more than one time
# Key: (groupName, content), Value: TTL (0 or non-exist means OK to paidui)
self.selfPaiDuiTTL = {}
logging.info('PaiduiHook initialized.')
def process(self, msg, type):
if not self.isInitialized:
logging.error('The forwarder was not properly initialized. Please send a message in the groups you want to connect and try again.')
return
shallSendObj = self.shallSend(msg)
if not shallSendObj['shallSend']:
return
if type == TEXT:
fromText = '[{0}]'.format(self.chatroomDisplayNames[shallSendObj['fromChatroom']])
destinationChatroomId = self.chatroomIds[not shallSendObj['fromChatroom']]
content = '{0} {1}: {2}'.format(fromText, msg['ActualNickName'], msg['Content'])
logging.info(content)
itchat.send(content, destinationChatroomId)
elif type == PICTURE:
fn = msg['FileName']
newfn = os.path.join(self.fileFolder, fn)
msg['Text'](fn)
os.rename(fn, newfn)
type = {'Picture': 'img', 'Video': 'vid'}.get(msg['Type'], 'fil')
typeText = {'Picture': '??', 'Video': '??'}.get(msg['Type'], '??')
fromText = '[{0}]'.format(self.chatroomDisplayNames[shallSendObj['fromChatroom']])
destinationChatroomId = self.chatroomIds[not shallSendObj['fromChatroom']]
content = '{0} {1} ???{2}:'.format(fromText, self.nickNameLookup.lookupNickName(msg), typeText)
itchat.send(content, destinationChatroomId)
logging.info(content)
itchat.send('@{0}@{1}'.format(type, newfn), destinationChatroomId)
elif type == SHARING:
fromText = '[{0}]'.format(self.chatroomDisplayNames[shallSendObj['fromChatroom']])
destinationChatroomId = self.chatroomIds[not shallSendObj['fromChatroom']]
content = '{0} {1} ?????: {2} {3}'.format(fromText, self.nickNameLookup.lookupNickName(msg), msg['Text'], msg['Url'])
logging.info(content)
itchat.send(content, destinationChatroomId)
else:
logging.info('Unknown type encoutered.')
pass
def generateTagCloudForGroupV2(self, groupName, userName=None):
records = None
if userName is None:
records = self.coll.find({ 'to': groupName }).sort([ ('timestamp', DESCENDING) ]).limit(self.recordMaxNum)
allRecords = self.coll.find({ 'to': { '$ne': groupName } }).sort([ ('timestamp', DESCENDING) ]).limit(self.recordMaxNum * 5)
allRecordsGroup = sorted(allRecords, key=lambda x: x['to'])
else:
records = self.coll.find({ 'from': userName, 'to': groupName }).sort([ ('timestamp', DESCENDING) ]).limit(self.recordMaxNum)
allRecords = self.coll.find({ 'from': { '$ne': userName }, 'to': groupName }).sort([ ('timestamp', DESCENDING) ]).limit(self.recordMaxNum * 5)
allRecordsGroup = sorted(allRecords, key=lambda x: x['from'])
docThisGroup = list(jieba.cut(' '.join([ r['content'] for r in records if re.match('<<<IMG', r['content']) is None]))) # remove the image records
allRecordsGroup = itertools.groupby(allRecordsGroup, lambda x: x['to'])
docsOtherGroups = [ list(jieba.cut(' '.join([x['content'] for x in list(g) if re.match('<<<IMG', x['content']) is None]))) for k, g in allRecordsGroup ]
docs = [ docThisGroup ] + docsOtherGroups
dictionary = gensim.corpora.Dictionary(docs)
docs = [ dictionary.doc2bow(doc) for doc in docs ]
id2token = { v: k for k, v in dictionary.token2id.items() }
tfidf = gensim.models.tfidfmodel.TfidfModel(corpus=docs)
tagCloudFrequencies = { id2token[x[0]]: x[1] for x in tfidf[docs[0]] }
img = self.wordCloud.generate_from_frequencies(tagCloudFrequencies).to_image()
fn = self.generateTmpFileName()
img.save(fn)
return fn
# Generate a tag cloud image from the latest self.recordMaxNum messages. Return the file name.
def generateTagCloudForGroup(self, groupName, userName=None):
records = None
if userName is None:
records = self.coll.find({ 'to': groupName }).sort([ ('timestamp', DESCENDING) ]).limit(self.recordMaxNum)
else:
records = self.coll.find({ 'from': userName, 'to': groupName }).sort([ ('timestamp', DESCENDING) ]).limit(self.recordMaxNum)
texts = [ r['content'] for r in records ]
frequencies = Counter([ w for text in texts for w in jieba.cut(text, cut_all=False) if len(w) > 1 ])
frequencies = { k: min(self.maxFrequency, frequencies[k]) for k in frequencies }
img = self.wordCloud.generate_from_frequencies(frequencies).to_image()
fn = self.generateTmpFileName()
img.save(fn)
return fn
def find_text(content):
content = content.replace(' ','A').replace('\n', 'A').replace('\t','A')
pattern = r"<des>(.*?)</des>"
r = re.compile(pattern)
result = r.findall(content)[0]
return result
def find_text(content):
content = content.replace(' ','A').replace('\n', 'A').replace('\t','A')
pattern = r"<des>(.*?)</des>"
r = re.compile(pattern)
result = r.findall(content)[0]
return result
def note():
@itchat.msg_register(NOTE, isGroupChat=True)
def deal_note(msg):
try:
pattern = r'??"(.*?)"?????'
r = re.compile(pattern)
result = r.findall(msg['Content'])[0]
return(u"""
@%s
??????
??????
?????????????,
????? ???? ?? ??
????? 14? ?? ?????
ps: ??????sharing.zhuojiayuan.com""" % result)
except Exception as e:
print(e)
@itchat.msg_register(['Sharing'],isGroupChat=True)
def collect_links(msg):
try:
received_date = datetime.datetime.now()
title = msg['FileName']
url = msg['Url']
sharer = msg['ActualNickName']
text = find_text(content=msg['Content'])
print(title, sharer, text, url, received_date)
store(received_date=received_date,
title=title,
url=url,
sharer=sharer,
text=text)
print('data stored done')
#????????flask????????????????
#????controller/glinks.py?new??
r = requests.get('http://www.zhuojiayuan.com:66/glinks/new')
print(r.status_code)
except Exception as e:
print(e)
def handle_text_msg(msg):
global IN_ACTION
# todo ?????? ???????
username = msg['ActualNickName'] # ???,??
content = msg['Text']
userlogo = msg["UserImg"]
# ??
if username in setting.ACTION_ADMIN and setting.ACTION_KEYWORD in content:
begin_action()
response = "????! 2?????:)"
return {'type': 'b', 'response': response} # ???? ??
if '[??]' in content and IN_ACTION:
# ?? ????
clean_content = re.split(r'\[??\]', content)[-1]
if clean_content:
# todo :???????? ???????
clean_content = "<span class='api_icon'></span><span class='api_nickname'>" + username + "</span>" + clean_content
try:
# ????? timeout
plugin.msg_input(msg=clean_content)
except Exception as e:
logger.error(str(e))
# ???????webhook
# ?? forum_client.post_thread(username,clean_content)
# ??username????????clean_content?????
response = "@{} ??????".format(username)
return {'type': 'q', 'response': response}
# if '/bot/t' in content:
'''
if content.startswith('[??]'):
#??
#?????????
thread_id,clean_content = re.split(r'\[??\].*?(?P<id>\d+)', content)[-2:]
response = "????:)"
return {'type':'t','response':response}
'''
# if '/bot/h' in content:
if '[??]' in content:
# help
#response='Hi @{} ???????\n??:[??]\n??:[??] ????\n??:[??](id) ????\n??:[??] ????'.format(msg['ActualNickName'])
response = 'Hi @{} ???????\n??:[??]\n??:[??] ????'.format(
msg['ActualNickName'])
return {'type': 'h', 'response': response}
return {'type': None, 'response': None} # ???
# ????