def checkMailAccount(server,user,password,ssl=False,port=None):
'''
????Mail??????
'''
if not port:
port = 995 if ssl else 110
try:
pop3 = poplib.POP3_SSL(server, port) if ssl else poplib.POP3(server, port)
pop3.user(user)
auth = pop3.pass_(password)
pop3.quit()
except Exception as error:
#print "[!] chekcing {0} failed, reason:{1}".format(user, str(error))
return False
if "+OK" in auth:
return True
else:
return False
python类POP3的实例源码
def popPeek(server, user, port=110):
try:
P = poplib.POP3(server, port)
P.user(user)
P.pass_(getpass.getpass())
except:
print "Failed to connect to server."
sys.exit(1)
deleted = 0
try:
l = P.list()
msgcount = len(l[1])
for i in range(msgcount):
msg = i+1
top = P.top(msg, 0)
for line in top[1]:
print line
input = raw_input("D to delete, any other key to leave message on server: ")
if input=="D":
P.dele(msg)
deleted += 1
P.quit()
print "%d messages deleted. %d messages left on server" % (deleted, msgcount-deleted)
except:
P.rset()
P.quit()
deleted = 0
print "\n%d messages deleted. %d messages left on server" % (deleted, msgcount-deleted)
def popauth(popHost, user, passwd):
"""
Log in and log out, only to verify user identity.
Raise exception in case of failure.
"""
import poplib
try:
pop = poplib.POP3(popHost)
except:
raise StandardError, "Could not establish connection "+\
"to %s for password check" % popHost
try:
# Log in and perform a small sanity check
pop.user(user)
pop.pass_(passwd)
length, size = pop.stat()
assert type(length) == type(size) == type(0)
pop.quit()
except:
raise StandardError, "Could not verify identity. \n"+\
"User name %s or password incorrect." % user
pop.quit()
del pop
def pillage(self, username, password, server, port, domain, outputdir="."):
print "%s, %s, %s, %s" % (username, password, server, domain)
mail = None
if (port == 993):
mail = IMAPS(outputdir=outputdir)
elif (port == 143):
mail = IMAP(outputdir=outputdir)
elif (port == 995):
mail = POP3S(outputdir=outputdir)
elif (port == 110):
mail = POP3(outputdir=outputdir)
else:
print "ERROR, unknown port provided"
return
mail.connect(server)
t = Thread(target=self.tworker, args=(mail, username, password, domain, server, port,))
t.start()
#-----------------------------------------------------------------------------
# main test code
#-----------------------------------------------------------------------------
def run(self):
value, user = getword()
try:
print "-"*12
print "User:",user,"Password:",value
pop = poplib.POP3(sys.argv[1])
pop.user(user)
pop.pass_(value)
print "\t\nLogin successful:",value, user
print pop.stat()
pop.quit()
work.join()
sys.exit(2)
except (poplib.error_proto), msg:
#print "An error occurred:", msg
pass
def run(self):
value = getword()
try:
print "-"*12
print "User:",user[:-1],"Password:",value
pop = poplib.POP3(ipaddr[0])
pop.user(user[:-1])
pop.pass_(value)
print "\t\nLogin successful:",value, user
print pop.stat()
pop.quit()
work.join()
sys.exit(2)
except(poplib.error_proto, socket.gaierror, socket.error, socket.herror), msg:
#print "An error occurred:", msg
pass
def run(self):
value = getword()
try:
print "-"*12
print "User:",user[:-1],"Password:",value
pop = poplib.POP3(ip)
pop.user(user[:-1])
pop.pass_(value)
print "\t\nLogin successful:",value, user
print pop.stat()
pop.quit()
work.join()
sys.exit(2)
except(poplib.error_proto), msg:
#print "An error occurred:", msg
pass
def connect(self):
"""Connects to and authenticates with a POP3 mail server"""
import poplib
M = None
try:
if (self.keyfile and self.certfile) or self.ssl:
M = poplib.POP3_SSL(self.host, self.port, self.keyfile, self.certfile)
else:
M = poplib.POP3(self.host, self.port)
M.user(self.username)
M.pass_(self.password)
except socket.error, err:
raise
else:
return M
def pop3_Connection(ip,username,password,port):
try:
pp = poplib.POP3(ip)
#pp.set_debuglevel(1)
pp.user(username)
pp.pass_(password)
(mailCount,size) = pp.stat()
pp.quit()
if mailCount:
lock.acquire()
printGreen("%s pop3 at %s has weaken password!!-------%s:%s\r\n" %(ip,port,username,password))
result.append("%s pop3 at %s has weaken password!!-------%s:%s\r\n" %(ip,port,username,password))
lock.release()
except Exception,e:
print e
lock.acquire()
print "%s pop3 service 's %s:%s login fail " %(ip,username,password)
lock.release()
pass
def poll():
"""
Iterates over all the active services in the database and attempt to execute that service's functionality.
The success or failure of the service and any error messages are stored in the database.
"""
for service in execute_db_query('select * from service where service_active = 1'):
sleep(2)
# Grab the service from the database
row = execute_db_query('select * from service_type join service ON (service_type.service_type_id = service.service_type_id) where service.service_type_id = ?', [service['service_type_id']])[0]
if row:
type = row['service_type_name']
# Perform DNS Request
if type == 'dns':
poll_dns.delay(timeout, service['service_id'], service['service_connection'], service['service_request'], service['service_expected_result'])
# Perform HTTP(S) Request
elif type == 'http' or type == 'https':
poll_web.delay(timeout, service['service_id'], row['service_type_name'], service['service_connection'], service['service_request'], service['service_expected_result'])
# Perform FTP Request
elif type == 'ftp':
poll_ftp.delay(timeout, service['service_id'], service['service_connection'], service['service_request'], service['service_expected_result'])
# Perform SMTP request to send mail, POP3 to retrieve it back
elif type == 'mail':
poll_mail.delay(timeout, service['service_id'], service['service_connection'], service['service_request'], service['service_expected_result'])
def pop3_Connection(ip,username,password,port):
try:
pp = poplib.POP3(ip)
#pp.set_debuglevel(1)
pp.user(username)
pp.pass_(password)
(mailCount,size) = pp.stat()
pp.quit()
if mailCount:
lock.acquire()
printGreen("%s pop3 at %s has weaken password!!-------%s:%s\r\n" %(ip,port,username,password))
result.append("%s pop3 at %s has weaken password!!-------%s:%s\r\n" %(ip,port,username,password))
lock.release()
except Exception,e:
print e
lock.acquire()
print "%s pop3 service 's %s:%s login fail " %(ip,username,password)
lock.release()
pass
def qyt_rec_mail(mailserver, mailuser, mailpasswd, mailprefix):
print('Connecting...')
server = poplib.POP3(mailserver)
server.user(mailuser)
server.pass_(mailpasswd)
try:
print(server.getwelcome())
msgCount, msgBytes = server.stat()
print('There are', msgCount, 'mail message in', msgBytes, 'bytes')
print(server.list())
for i in range(msgCount):
hdr, message, octets = server.retr(i + 1)
mail_file_name = mailprefix + '_' + str(i+1) + '.txt'
mail_file = open(mail_file_name, 'wb')
for line in message:
mail_file.write(line)
mail_file.close()
print(mail_file_name + ' Recieved!!!')
finally:
server.quit()
print('Bye.')
def connect(self):
if self.source_id and self.source_id.id:
self.ensure_one()
if self.source_id.type == 'imap':
if self.source_id.is_ssl:
connection = IMAP4_SSL(self.source_id.server, int(self.source_id.port))
else:
connection = IMAP4(self.source_id.server, int(self.source_id.port))
connection.login(self.user, self.password)
elif self.type == 'pop':
if self.source_id.is_ssl:
connection = POP3_SSL(self.source_id.server, int(self.source_id.port))
else:
connection = POP3(self.source_id.server, int(self.source_id.port))
# TODO: use this to remove only unread messages
# connection.user("recent:"+server.user)
connection.user(self.user)
connection.pass_(self.password)
# Add timeout on socket
connection.sock.settimeout(MAIL_TIMEOUT)
return connection
return super(vieterp_fetchmail_server, self).connect()
def LoginServer(login_server,login_port,login_user,login_pass):
#login the server and return the login status
login_server_return=[False,False,'']
is_auth_user=False
try:
if login_port==995:
pop_login=poplib.POP3_SSL(login_server,login_port)
else:
pop_login=poplib.POP3(login_server,login_port)
user_auth=pop_login.user(login_user)
if "+OK" in user_auth.upper():
login_server_return[0]=True
is_auth_user=True
pass_auth=pop_login.pass_(login_pass)
if "+OK" in pass_auth.upper():
login_server_return[1]=True
login_server_return[2]="[-]Login is successful!"
pop_login.quit()
except Exception,e:
if not is_auth_user:
login_server_return[2]="Login user is not correct"
else:
login_server_return[2]=e
finally:
return login_server_return
def qyt_rec_mail(mailserver, mailuser, mailpasswd, mailprefix):
print('Connecting...')
server = poplib.POP3(mailserver)#????????
server.user(mailuser)#????????
server.pass_(mailpasswd)#???????
try:
print(server.getwelcome())#?????????
msgCount, msgBytes = server.stat()#??????????
print('There are', msgCount, 'mail message in', msgBytes, 'bytes')#??????????
print(server.list())#??????
for i in range(msgCount):#??????
hdr, message, octets = server.retr(i + 1)#????
mail_file_name = mailprefix + '_' + str(i+1) + '.txt'#???????
mail_file = open(mail_file_name, 'wb')#????????
for line in message:
mail_file.write(line)#???????????
mail_file.close()#?????????????
print(mail_file_name + ' Recieved!!!')
finally:
server.quit()#?????
print('Bye.')
def connect(self, mailserver, port="993"):
self.mailserver = mailserver
self.port = port
try:
self.srv = imaplib.IMAP4_SSL(self.mailserver, self.port)
except:
self.srv = None
pass
#-----------------------------------------------------------------------------
# POP3 subclass of Pillager Class
#-----------------------------------------------------------------------------
def connect(self, mailserver, port="110"):
self.mailserver = mailserver
self.port = port
try:
self.srv = poplib.POP3(self.mailserver, self.port)
except:
self.srv = None
pass
def getMessages(self):
if (not self.srv):
return
if (not self.msg_list):
(numMsgs, totalSize) = self.srv.stat()
self.msg_list = []
for i in range(numMsgs):
self.msg_list.append(self.srv.retr(i+1))
#-----------------------------------------------------------------------------
# POP3S subclass of POP3 Class
#-----------------------------------------------------------------------------
def __init__(self, outputdir="."):
POP3.__init__(self, outputdir)
def fetch(self):
"""Fetches email messages from a POP3 server"""
messages = {}
num = len(self.handle.list()[1])
for i in range(num):
message = '\n'.join([msg for msg in self.handle.retr(i + 1)[1]])
messages[num] = self.parse_email(message)
return messages
def disconnect(self):
"""Closes the POP3 handle"""
self.handle.quit()
def found_terminator(self):
line = b''.join(self.in_buffer)
line = str(line, 'ISO-8859-1')
self.in_buffer = []
cmd = line.split(' ')[0].lower()
space = line.find(' ')
if space != -1:
arg = line[space + 1:]
else:
arg = ""
if hasattr(self, 'cmd_' + cmd):
method = getattr(self, 'cmd_' + cmd)
method(arg)
else:
self.push('-ERR unrecognized POP3 command "%s".' %cmd)
def setUp(self):
self.server = DummyPOP3Server((HOST, PORT))
self.server.start()
self.client = poplib.POP3(self.server.host, self.server.port)
def testTimeoutDefault(self):
self.assertTrue(socket.getdefaulttimeout() is None)
socket.setdefaulttimeout(30)
try:
pop = poplib.POP3(HOST, self.port)
finally:
socket.setdefaulttimeout(None)
self.assertEqual(pop.sock.gettimeout(), 30)
pop.sock.close()
def testTimeoutNone(self):
self.assertTrue(socket.getdefaulttimeout() is None)
socket.setdefaulttimeout(30)
try:
pop = poplib.POP3(HOST, self.port, timeout=None)
finally:
socket.setdefaulttimeout(None)
self.assertTrue(pop.sock.gettimeout() is None)
pop.sock.close()
def testTimeoutValue(self):
pop = poplib.POP3(HOST, self.port, timeout=30)
self.assertEqual(pop.sock.gettimeout(), 30)
pop.sock.close()
def found_terminator(self):
line = ''.join(self.in_buffer)
self.in_buffer = []
cmd = line.split(' ')[0].lower()
space = line.find(' ')
if space != -1:
arg = line[space + 1:]
else:
arg = ""
if hasattr(self, 'cmd_' + cmd):
method = getattr(self, 'cmd_' + cmd)
method(arg)
else:
self.push('-ERR unrecognized POP3 command "%s".' %cmd)
def setUp(self):
self.server = DummyPOP3Server((HOST, 0))
self.server.start()
self.client = poplib.POP3(self.server.host, self.server.port)
def testTimeoutDefault(self):
self.assertIsNone(socket.getdefaulttimeout())
socket.setdefaulttimeout(30)
try:
pop = poplib.POP3(HOST, self.port)
finally:
socket.setdefaulttimeout(None)
self.assertEqual(pop.sock.gettimeout(), 30)
pop.sock.close()
def testTimeoutNone(self):
self.assertIsNone(socket.getdefaulttimeout())
socket.setdefaulttimeout(30)
try:
pop = poplib.POP3(HOST, self.port, timeout=None)
finally:
socket.setdefaulttimeout(None)
self.assertIsNone(pop.sock.gettimeout())
pop.sock.close()