#!/usr/bin/python # -*- coding: utf-8 -*- # # This script permit to convert Thunderbird mail filters to SOGo # filters. It take as input msgFilterRules.dat path (-f parameter), # parse file and try to convert filters in SOGo equivalent. Some # filters conditions or actions can't be converted as SOGo filters, # so warning or critical messages will be inform you of possible # conversions problems. Output is in JSON format. By default, JSON # string is write and stdout but you could provide with -o parameter # a file path where JSON have to be write. # # To know how to load JSON in SOGo, see README file. # # Author : Benjamin Renard # Date : Wed, 25 Dec 2013 20:41:39 +0100 # Source : http://git.zionetrix.net/thunderbird2sogo from optparse import OptionParser import sys import thunderbirdFilters import logging import json import re parser = OptionParser() parser.add_option('-f', action="store", type="string", dest="file", help="The msgFilterRules.dat path", default=None) parser.add_option('-o', action="store", type="string", dest="out", help="Output path (default : '-' => stdout) ", default='-') parser.add_option('-p','--pretty', action="store_true", dest="pretty", help="Pretty JSON output") parser.add_option('-j','--just-try', action="store_true", dest="justtry", help="Just-try mode (no output)") parser.add_option('--dont-warn-cc', action="store_true", dest="dontwarncc", help="Don't warn about cc -> to_or_cc convertion", default=False) parser.add_option('--dont-warn-all-addresses', action="store_true", dest="dontwarnalladdresses", help="Don't warn about all addresses -> from + to_or_cc convertion", default=False) parser.add_option('-r', '--replace-accents', help='Remove accent in folder names', action="store_true", dest="replaceaccents") parser.add_option('-v', '--verbose', action="store_true", dest="verbose") parser.add_option('-d', '--debug', action="store_true", dest="debug") (options, args) = parser.parse_args() if options.debug: loglevel=logging.DEBUG elif options.verbose: loglevel=logging.INFO else: loglevel=logging.WARNING logging.basicConfig(level=loglevel,format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p') if options.file is None: logging.fatal('You must provide msgFilterRules.dat path (-f)') sys.exit(1) if options.out!='-': try: output_file=open(options.out,'w') except Exception,e: logging.fatal('Error opening output file %s : %s' % (options.out,e)) sys.exit(2) (tbf,warning)=thunderbirdFilters.read_from_file(options.file) if len(warning)!=0: logging.warning('Parsing msgFilterRules.dat return warnings :\n - %s' % '\n - '.join(warning)) logging.debug('Parsing return :\n%s' % tbf) sfs=[] for f in tbf: if f['type'] != "17": logging.error("Filter type '%s' no recognized, pass this filter" % f['type']) continue try: sf={} if f["enabled"]=='yes': sf["active"]=1 else: sf["active"]=0 if 'name' not in f: logging.error('Thunderbird filter does not have name ! Pass this filter : %s' % f) continue sf['name']=f['name'] if f["condition"]=='ALL': sf['match']='allmessages' else: fc=thunderbirdFilters.convert_conditions(f["condition"]) bool_ops=[] sfr=[] for c in fc: r={} if c['cri_operator']=='contains' or c['cri_operator']=='is': r['operator']=c['cri_operator'] elif c['cri_operator']=="isn't": r['operator']='is_not' elif c['cri_operator']=="doesn't contain": r['operator']='contains_not' else: raise Exception('Condition operator "%s" not recognized. Pass' % c['cri_operator']) if c['cri_operand'] in ['subject','to','from']: r['field']=c['cri_operand'] elif c['cri_operand']=='to or cc': r['field']='to_or_cc' elif c['cri_operand']=='cc': r['field']='to_or_cc' if not options.dontwarncc: logging.warning('Filter %s : Condition operator cc convert to to_or_cc' % f['name']) elif c['cri_operand']=='all addresses': if c['cri_operator']=='contains' or c['cri_operator']=='is': # bool operand must be OR if c['bool_operator']=='AND': if len(fc)==1: c['bool_operator']='OR' else: raise Exception('Condition operand "all addresses" with condition operator "%s" and bool operator "AND" supported only if only one condition' % c['cri_operator']) elif c['cri_operator']=="doesn't contain" or c['cri_operator']=="isn't": # bool operand must be AND if c['bool_operator']=='OR': if len(fc)==1: c['bool_operator']='AND' else: raise Exception('Condition operand "all addresses" with condition operator "%s" and bool operator "OR" supported only if only one condition' % c['cri_operator']) sfr.append({ 'field': 'from', 'operator': r['operator'], 'value': c['value'] }) if not options.dontwarnalladdresses: logging.warning('Filter %s : Condition operator "all addresses" convert to two filters matching from, to or cc but not bcc field as Thunderbird' % f['name']) r['field']='to_or_cc' elif c['cri_operand'].startswith('"') and c['cri_operand'].endswith('"'): r['field']='header' r['custom_header']=re.sub('^"(.*)"$',r'\1',c['cri_operand'],count=1) else: raise Exception('Condition operand "%s" not recognized. Pass' % c['cri_operand']) if c['bool_operator'] not in bool_ops: bool_ops.append(c['bool_operator']) r['value']=c['value'] sfr.append(r) if len(sfr)==0: logging.error('Filter %s : No condition found ! Pass this filter' % f['name']) continue sf['rules']=sfr if len(bool_ops)==1: if bool_ops[0]=='AND': sf['match']='all' elif bool_ops[0]=='OR': sf['match']='any' else: logging.error("Filter %s : Boolean operator not recognized %s, pass this filter." % (f['name'],bool_ops[0])) continue else: logging.error('Filter %s : Multiple boolean operator not supported (%s). Pass this filter.' % (f['name'],','.join(bool_ops))) continue sfa=[] for a in f['actions']: if a[0]=='Move to folder': sa={ 'method': 'fileinto', 'argument': thunderbirdFilters.convert_uri_path_to_maildir(a[1],separator='/',replaceaccents=options.replaceaccents) } elif a[0]=='Forward': sa={ 'method': 'redirect', 'argument': a[1] } elif a[0]=='Mark read': sa={ 'method': 'addflag', 'argument': 'seen' } elif a[0]=='JunkScore': if a[1]==0: argument='not_junk' else: argument='junk' sa={ 'method': 'addflag', 'argument': argument } elif a[0]=='Mark flagged': sa={ 'method': 'addflag', 'argument': 'flagged' } elif a[0]=='Stop execution': sa={ 'method': 'stop', 'argument': None } elif a[0] in ['Mark unread','Copy to folder','Change priority','Reply']: raise Exception("Action %s doesn't have equivalent in SOGo" % a[0]) else: raise Exception('Filter %s : Action %s not supported' % (f['name'],a[0])) sfa.append(sa) if len(sfa)==0: logging.error('Filter %s : no action found ! Pass this filter.' % f['name']) continue sf['actions']=sfa sfs.append(sf) except Exception,e: if 'name' in f: name=f['name'] else: name=f logging.fatal('Failed to convert filter %s, pass : %s' % (name,e)) continue output_data={'SOGoSieveFilters': sfs} if options.justtry: sys.exit(0) if options.pretty: output_text=json.dumps(output_data,indent=4, separators=(',', ': ')) else: output_text=json.dumps(output_data) if options.out=='-': print output_text else: logging.info('Write SOGo filters on %s file' % options.out) output_file.write(output_text) output_file.close()