Zionetrix::Git
Repositories
Help
Report an Issue
mass_validate_email
Code
Commits
Branches
Tags
Search
Tree:
bc4cf99
Branches
Tags
master
mass_validate_email
mass_validate_email.py
Initial commit
Benjamin Renard
commited
bc4cf99
at 2013-05-29 22:45:50
mass_validate_email.py
Blame
History
Raw
#!/usr/bin/python # # Python module to mass validating email address # # This module was inspired by (and use) validate_email library # write by Syrus Akbary : # # https://github.com/SyrusAkbary/validate_email # # This main goal is to optimize mass validating using cache of # bad (or good) domain or MX server. # # Author: Benjamin Renard # Website: http://git.zionetrix.net/mass_validate_email # Licence: LGPL import sys import logging from validate_email import validate_email import smtplib import socket import DNS try: DNS.DiscoverNameServers() except DNS.ServerError, e: logging.fatal("Error discovering DNS servers : %s" % e) sys.exit(1) # options class OptionsClass(object): def __init__(self): self.debug = False self.debugsmtp = False self.checkmx = False self.verifyaddress = False self.usesmtpvrfy = False self.refusemailifnotpermit = True self.refuseontemporaryerror = True options=OptionsClass() if options.verifyaddress: options.checkmx=True def clean_mail(mail): mail=str(mail).lower().strip() return mail domain_mx={} valid_domain=[] invalid_domain=[] def check_mx(mail): dom = mail[mail.find('@')+1:] if not options.verifyaddress: if dom in valid_domain: return True elif dom in invalid_domain: return False else: try: mx_hosts = DNS.mxlookup(dom) if len(mx_hosts)>0: domain_mx[dom]=[] for mx in mx_hosts: domain_mx[dom].append(mx[1]) logging.debug("MX of domain %s : %s" % (dom,','.join(domain_mx[dom]))) valid_domain.append(dom) return True elif connect_to_mx(dom): domain_mx[dom]=[dom] logging.debug("MX of domain %s : %s" % (dom,','.join(domain_mx[dom]))) valid_domain.append(dom) return True else: logging.debug("No valid MX of domain %s found" % dom) invalid_domain.append(dom) return False except DNS.ServerError, e: logging.debug('Error getting MX servers of domain %s : %s' % (dom,e)) invalid_domain.append(dom) return False else: if dom in invalid_domain: return False if dom in domain_mx: for mx in domain_mx[dom]: con = connect_to_mx(mx) if not con: continue if check_mail_on_mx(dom,con,mail,ifNotPermit=(not options.refusemailifnotpermit)): return True return False else: try: mx_hosts = DNS.mxlookup(dom) if len(mx_hosts)>0: domain_mx[dom]=[] for mx in mx_hosts: domain_mx[dom].append(mx[1]) valid_domain.append(dom) return check_mx(mail) else: # Directly check MX and mail con=connect_to_mx(dom) if not con: invalid_domain.append(dom) return False domain_mx[dom]=[dom] return check_mail_on_mx(dom,con,mail,ifNotPermit=(not options.refusemailifnotpermit)) except DNS.ServerError, e: logging.debug('Error getting MX servers of domain %s : %s' % (dom,e)) invalid_domain.append(dom) return False valid_mx=[] invalid_mx=[] def verify_mx(mx,mail,check_mail=False): if not check_mail and mx in valid_mx: return True elif not check_mail and mx in invalid_mx: return False def connect_to_mx(mx): if mx in invalid_mx: return False try: smtp = smtplib.SMTP(timeout=5) smtp.connect(mx) if options.debugsmtp: smtp.set_debuglevel(True) valid_mx.append(mx) return smtp except smtplib.SMTPConnectError: logging.debug("MX server %s does not respond from SMTP" % mx) except smtplib.SMTPServerDisconnected: logging.debug("MX server %s unexpectedly closed connection" % mx) except socket.gaierror: logging.debug("Can't resolv MX server %s" % mx) except socket.timeout: logging.debug("Connection timeout to SMTP server %s" % mx) except socket.error: logging.debug("Connection error on SMTP server %s" % mx) except Exception, e: logging.error("Unknown error (%s) connecting to SMTP server %s : %s" % (type(e),mx,e)) invalid_mx.append(mx) return None mx_refuse_check_mail=[] def check_mail_on_mx(mx,smtp,mail,ifNotPermit=False): if mx in mx_refuse_check_mail: return ifNotPermit try: status, _ = smtp.helo() if status != 250: mx_refuse_check_mail.append(mx) return ifNotPermit if options.usesmtpvrfy: (status, msg) = smtp.verify(mail) if status >= 250 and status < 260: # Server normaly return an normalize email address for word in msg.split(' '): if validate_email(word): return True smtp.mail('') status, msg = smtp.rcpt(mail) if status >= 400 and status < 500: logging.debug('SMTP server return temporary error (code=%s) : %s' % (status,msg)) return not options.refuseontemporaryerror elif status != 250: return False return True except smtplib.SMTPServerDisconnected: # Server not permits verify user mx_refuse_check_mail.append(mx) return ifNotPermit except smtplib.SMTPConnectError: return False def mass_validate_email(mail,simple=False): mail=clean_mail(mail) if not validate_email(mail): return elif simple: return True elif options.checkmx: return check_mx(mail) else: return True if __name__=='__main__': if len(sys.argv)!=2: print "Usage : %s [email]" % sys.argv[0] sys.exit(0) logging.basicConfig(level=logging.DEBUG) options.debugsmtp=True mail=sys.argv[1] print "Simple syntax validation :" print "==========================" print "Return : %s" % mass_validate_email(mail) options.checkmx=True print "\n\n" print "Syntax validation and domain MX check :" print "=======================================" print "Return : %s" % mass_validate_email(mail) options.verifyaddress=True print "\n\n" print "Syntax validation, domain MX check and validation of email address by SMTP server :" print "===================================================================================" print "Return : %s" % mass_validate_email(mail)