#! /usr/bin/python # -*- coding: iso-8859-1 -*- ''' module name : iplist2forti.py purpose : convert list of IP addresses into 1- a FORTINET batch commandfile and 2- a 'purge' commandfile to remove the entries again create address definitions, address groups and supergroup to use in one or more policies numeric IPs or FQDNs allowed uses sockets for resolving FQDNs recognizes hosts.deny file syntax as input file required : python 2.7 or higher created : 2010-07-18 last changes : 2015-12-01 code base 2021-10-31 adopted to python 3.x added is_valid_IPv4() relaxed text file handling 2021-12-30 (math.ceil, print(), except) 2022-01-01 allow/create netmasks, fix bugs 2022-01-02 relaxed FQDN check; max. values of FOS v6.0 2022-01-04 more concise messages 2022-01-05 fixed dummy address needs netmask Copyright by E/S/P Dr. Beneicke, Heidelberg (http://beneicke-edv.de) (https://www.beneicke-edv.de/support/tools) ''' import sys import argparse import socket # for nslookup() import os.path import time from math import ceil # globals DEBUG = False # ---------------- # FortiOS specific strings # object names used in FortiOS AddrName = 'zblock{0:05d}' DummyName = 'block_dummy' DummyIP = '169.254.254.254/32' # FOS demands IP+netmask! AddrGrpName = 'blockgroup{0:03d}' SuperGrpName = 'sblockgroup{0:03d}' # FortiOS CLI commands to create addresses and address groups cmdAddr_start = 'config firewall address\n' cmdAddr = 'edit {0:s}\nset subnet {1:s}\nnext\n' cmdDelete = 'delete {0:s}\n' cmdAddrGrp_start = 'config firewall addrgrp\n' cmdAddrGrp = 'edit {0:s}\nset member'.format(AddrGrpName) cmdSuperGrp = 'edit {0:s}\nset member' cmdNoSuperGrp = 'edit {0:s}\nset member {1:s}'.format(SuperGrpName, DummyName) cmdEnd = 'end\n\n' cmdNext = '\nnext\n' cmdNextEnd = '\nnext\nend\n\n' # hard limits for all models maxgroups = 2500 # number of address groups maxgroupsize = 1500 # addresses per group modeldata = { # max # of addresses, desired_groupsize <= maxgroupsize 's': ( 5000, 100), # < FG-100x 'm': (20000, 200), # < FG-1000x 'l': (40000, 1500) } # ---------------- def createDatedFile(fname, mode='w', withminutes=False): ''' Open a file for writing. Creates missing path components if necessary. Default mode suits text files; use 'wb' for binary access. Filename will be 'fname' + current date. if withminutes, hour and minute will be included in name. Existing file will be overwritten. ''' import os.path import sys import time err = sys.stderr.write # alias create = 'w' in mode or 'a' in mode if create: path, filename = os.path.split(fname) if path: if not os.path.exists(path): try: os.makedirs(path) except IOError as errmsg: err('error in createDatedfile()/makedirs(%s):\n%s\n' % (path, errmsg)) return None # construct filename # skip century... tmstamp = time.strftime("%Y%m%d%H%M" if withminutes else "%Y%m%d")[2:] filename = filename_insert(fname, '_' + tmstamp) # now open the file try: f = open(filename, mode) except IOError as errmsg: err('error in createDatedFile(%s)\n%s\n' % filename) return None else: return f def nslookup(fqdn): try: IP = socket.gethostbyname(fqdn) except socket.error: if DEBUG: print('\t\tcannot resolve "%s"' % fqdn) IP = None return(IP) def filename_insert(fname, insert): filename, ext = os.path.splitext(fname) return ''.join((filename, insert, ext)) def is_valid_FQDN(s): ''' returns TRUE if string could be a FQDN needs at least one dot and last part is non-numeric, but not empty input may contain a netmask ''' sep = '/' if s.count('/') > 0 else ' ' ip = s.split(sep)[0] res = ip.count('.') > 0 and not ip.replace('.','').isdigit() return res def is_valid_IPv4(s): ''' returns TRUE if string could be an IPv4 numeric address ''' sep = '/' if s.count('/') > 0 else ' ' ip = s.split(sep)[0] parts = ip.split('.') if len(parts) != 4: return False valid_bytevalue = range(0, 256) # 255 valid if not in last byte try: return (all( int(p) in valid_bytevalue for p in parts ) and int(parts[3]) < 255) except ValueError: return False def clean_ints(s): ''' remove any prefixed zeroes in IP string ''' new = [] for p in s.split('.'): if not int(p) in range(256): return '' new.append(str(int(p))) return '.'.join(new) def readIPlist(fd): ''' read all lines from open file handle fd ignore comment lines starting with '#' or empty lines return list of strings recognizes alternative host.deny file format sample: # bla bla 1.34.163. ==> 1.34.163.0/24 2.86.93.35 2.93.72.124 or # /etc/hosts.deny # See "man tcpd" and "man 5 hosts_access" as well as /etc/hosts.allow # for a detailed description. http-rman : ALL EXCEPT LOCAL # DenyHosts: Sun May 2 09:38:19 2010 | sshd: 221.11.70.139 sshd: 221.11.70.139 # DenyHosts: Sun May 2 17:26:29 2010 | sshd: 85.17.155.134 sshd: 85.17.155.134 Each IP will contain a netmask on return, either '/nn' or ' 255.2xx.yyy.zzz' ''' IPlist = [] for line in fd: line = line.strip() if not line or line.startswith('#'): continue # check for valid IP lines from here on if line.count(':') > 1 or line.count('.') < 1: continue ip = line.split(':')[-1].strip() if is_valid_FQDN(ip): IPlist.append('@' + ip) # FQDN marker = @ continue # may contain trailing netmask...a.b.c.d/m iplist = ip.split('/', 1) ip = iplist[0] # numeric IP may only contain digits and dots! if not ip.replace('.','').isdigit(): print('\tinvalid IP (non-numeric): %s' % ip) continue # may be shorter than 4 bytes, fill up with '.0' bytes rawip = ip if ip[-1] == '.': ip = ip[:-1] missing = 3 - ip.count('.') ip += '.0'*missing masklen = 32 - 8*missing if DEBUG and ip != rawip: print('IP modified: %s -> %s' % (rawip, ip)) ip = clean_ints(ip) if not is_valid_IPv4(ip): print('\tinvalid IP (value): %s' % rawip) continue if len(iplist) == 1: # no mask specified mask = str(masklen) else: mask = clean_ints(iplist[1]) if mask.count('.') > 0: ip = ip + ' ' + mask # 1.2.3.4 255.0.0.0 else: ip = ip + '/' + mask # 1.2.3.4/8 IPlist.append(ip) fd.close() return IPlist def checkRawIPs(rawIPs, topN, dontResolve): ''' returns a list of numeric IP addresses FQDNs are either resolved or skipped ''' global DEBUG # start with the most recent entry first # in case only the first TopN entries are used rawIPs.reverse() IPs = [] FQDNs_skipped = 0 uniqueIPs = set() # used for check only for ip in rawIPs: # may be an IP or an FQDN if len(IPs) >= topN: print('skipping IPs as maximum of %d is reached!' % topN) break if ip[0] == '@': # is a FQDN host = ip[1:] ip = '' if not dontResolve: ip = nslookup(host) if DEBUG and ip: print('{0:55s} is {1:>17s}'.format(host, ip)) if not ip: FQDNs_skipped += 1 continue ip += '/32' if ip in uniqueIPs: print('\tduplicate IP: %s' % ip) else: IPs.append(ip) uniqueIPs.add(ip) return IPs, FQDNs_skipped def grpnum(i): global groupsize # map item #i to group number containing groupsize items # group number: 0.. return (i // groupsize) def purgecmd(first, last, splitcount): ''' returns delete commands for IP # to # as a list delete groups but not the group of groups (SuperGrp) because it's used in the DENY policy delete all address groups from SuperGrp by inserting one dummy address only deletes splitcount supergroups ''' cmd = [] add = cmd.append # an alias # create dummy address entry if not existent add(cmdAddr_start) add(cmdAddr.format(DummyName, DummyIP)) add(cmdEnd) group1 = grpnum(first) groupN = grpnum(last) ngroups = groupN - group1 + 1 # create/edit super group(s), referenced in DENY policy/policies add(cmdAddrGrp_start) for isplit in range(splitcount): add(cmdNoSuperGrp.format(isplit)) add(cmdNext) add(cmdEnd) add(cmdAddrGrp_start) for g in range(group1, groupN + 1): add(cmdDelete.format(AddrGrpName.format(g))) add(cmdEnd) # delete addresses add(cmdAddr_start) for i in range(first, last + 1): add(cmdDelete.format(AddrName.format(i))) add(cmdEnd) # return(''.join(cmd)) # list to string return(cmd) def main(): global DEBUG, groupsize # ---------------- # parse command line args p = argparse.ArgumentParser(description='Create (a lot of) address \ objects and groups from list for use in FortiOS.') p.set_defaults( \ model = 'm', \ maxAddr = 999999, \ prevAddr = 0, \ dontresolve = False, \ debug = False, \ cmdfname = 'blocklist.bcmd', \ splitcount = 1 ) addarg = p.add_argument # an alias addarg('-m', '--model', action='store', dest='model', nargs=1, choices='sml', help='FortiGate model: small (< FGT-100) / medium (< FGT-1000) / large') addarg('-n', '--newest', action='store', dest='maxAddr', type=int, help='use only newest/last <%(dest)s> addresses from list') addarg('-p', '--prev', action='store', dest='prevAddr', type=int, help='replace <%(dest)s> old addresses') addarg('-d', '--dontresolve', action='store_true', dest='dontresolve', help='skip non-numeric addresses (FQDNs) in input') addarg('-D', '--debug', action='store_true', dest='debug', help='print debug output') addarg('infile', type=argparse.FileType('r'), help='read IPs from <%(dest)s>') addarg('-o', '--outfile', dest='cmdfname', help='write output to <%(dest)s>') addarg('-s', '--split', action='store', dest='splitcount', type=int, help='split output into <%(dest)s> supergroups') args = p.parse_args() # ---------------- # get and set parameters model = args.model[0].lower() maxAddr = min(modeldata[model][0], args.maxAddr) DEBUG = args.debug splitcount = args.splitcount print() # ---------------- # read IPs from file print('reading file "{0:s}"...'.format(args.infile.name)) rawIPs = readIPlist(args.infile) nRaw = len(rawIPs) # ---------------- # check and resolve the IPs print('processing...') maxAddr = min(nRaw, maxAddr) IPs, FQDNs_skipped = checkRawIPs(rawIPs, maxAddr, args.dontresolve) ipcount = len(IPs) skipped = nRaw - ipcount # ---------------- # report to user print() print('{0:7d} IPs in file'.format(nRaw)) print('{0:7d} IP{1:s} skipped'.format(skipped, '' if skipped == 1 else 's')) print('{0:7d} FQDN{1:s} skipped'.format(FQDNs_skipped, '' if FQDNs_skipped == 1 else 's')) if ipcount == 0: sys.exit(0) # ---------------- # def group size: max(desired_groupsize, ipcount/maxgroups) <= size < maxgroupsize desired_groupsize = modeldata[model][1] groupsize = max(desired_groupsize, ipcount // maxgroups) groupsize = min(ipcount, groupsize, maxgroupsize) # ---------------- # write bulk command files # one for defining, one for purging cmdfile = createDatedFile(args.cmdfname, 'w', True) if not cmdfile: sys.exit(-1) out = cmdfile.write # just an alias # first, delete old definitions to free up memory # delete only if there are more old addresses than new ones first = ipcount last = args.prevAddr delprev = '' if first < last: cmdfile.writelines(purgecmd(first, last-1, splitcount)) # from# .. to# .. delprev = 'deleted old addresses {0:d} to {1:d}'.format(first, last) # create all new address objects out(cmdAddr_start) for i in range(ipcount): out(cmdAddr.format(AddrName.format(i), IPs[i])) out(cmdEnd) out(cmdAddrGrp_start) # create address groups with max. groupsize addresses each ngroups = ceil(ipcount / groupsize) i_grp = 0 for i in range(ipcount): if (i % groupsize) == 0: if i > 0: out(cmdNext) i_grp += 1 out(cmdAddrGrp.format(i_grp)) # add address to group out(' ' + AddrName.format(i)) # add address to group out(cmdNext) # create/edit super group(s), referenced in DENY policy/policies supergroupsize = ceil(ngroups / splitcount) supergroupsize = min(supergroupsize, maxgroupsize) i_sg = 0 for g in range(ngroups): if (g % supergroupsize) == 0: if g > 0: out(cmdNext) i_sg += 1 out(cmdSuperGrp.format(SuperGrpName.format(i_sg))) # add address groups to super group out(' ' + AddrGrpName.format(g)) out(cmdNextEnd) # ---------------- # write a purgefile to be able to delete all definitions created here purgefn = filename_insert(args.cmdfname, '_purge') fd = createDatedFile(purgefn, 'w', True) if not fd: sys.exit(-1) with fd as purgefile: # from# .. to# inclusive purgefile.writelines(purgecmd(0, ipcount-1, splitcount)) purgefn = purgefile.name # ---------------- print('{0:7d} IPs in {1:d} address group{2:s} of size {3:d}'.format( ipcount, ngroups, '' if ngroups == 1 else 's', groupsize) ) print('{0:7d} address groups in {1:d} super group{2:s}'.format( ngroups, splitcount, '' if splitcount == 1 else 's') ) if delprev: print(delprev) print('specific for {0:s} Fortigate model'.format({'s': 'small', 'm': 'medium', 'l': 'large'}[model])) print() print('now import bulk command file "{0:s}"'.format(cmdfile.name)) print('refer to address group "{0:s}" in DENY policy'.format(SuperGrpName.format(0))) print('to get rid of these addresses, import "{0:s}"'.format(purgefn)) if __name__ == '__main__': main()