Hide
Easily highlight source code for your blog with our Syntax Highlighter. Join Siafoo Now or Learn More

Utility for loading GeoIP data into a PostgreSQL database

Revision 5 vs. Revision 6

Changelog: * Added Zip reading capabilities * Added a check for using ip4r options the wrong way * Added a /usr/bin/python header * Fixed a bug in plain data loading * Updated

Legend:

Unmodified
Added
Removed
  • Description

    r5 r6  
    11This is a script for loading the `MaxMind GeoIP data <http://www.maxmind.com/app/geolitecity>`_ into an PostgreSQL database.  The script will create the database structure, load the data, add some utility functions and create the necessary indexes.  
    22  
    33The following command will create the structure and load the data from the given files into a database named geoip_db (the default)  
    44  
    5 ``python load_geoip.py -c -b ./GeoLiteCity_20080301/GeoLiteCity-Blocks.csv -l ./GeoLiteCity_20080301/GeoLiteCity-Location.csv``  
     5``python load_geoip.py -c -g -z GeoLiteCity_20090501.zip``  
    66  
    7 Run with ``--help`` to get usage options and database connection parameters 
     7if the above command fails try unziping the archive and providing the names of the individual files:  
     8  
     9``python load_geoip.py -c -g -b ./GeoLiteCity_20090501/GeoLiteCity-Blocks.csv -l ./GeoLiteCity_20090501/GeoLiteCity-Location.csv``  
     10  
     11Run with ``--help`` to get usage options and database connection parameters  
     12  
     13.. note:: If you get "psycopg2.DataError: extra data after last expected column" drop the tables and start again.  (The error means that there aren't enough columns in the table to fit the data... in our case the loading script dropped the columns because of ip4r ) 
  • Code

    r5 r6  
    1 '''  
    2 Script for loading GeoIP CSV data into a postgresql database  
    3 '''  
    4   
    5 import logging, psycopg2, psycopg2.extensions, sys  
    6   
    7 from optparse import OptionGroup, OptionParser  
    8 from StringIO import StringIO  
    9   
    10 class GeoIPDataLoader(object):  
    11   
    12     def __init__(self, dsn, blocks='GeoLiteCity-Blocks.csv', locations='GeoLiteCity-Location.csv', schema='public'):  
    13         self.con = psycopg2.connect(dsn)  
    14         # We don't need transactions... right?  
    15         self.con.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)  
    16         # The data is in ISO8859_15 encoding  
    17         self.con.set_client_encoding('iso8859_15')  
    18         self.cur = self.con.cursor()  
    19   
    20         self.blocks_csv = blocks  
    21         self.location_csv = locations  
    22         self.schema = schema  
    23    
    24     def close(self):  
    25         self.con.close()  
    26   
    27     def create_tables(self):  
    28         print 'Creating structure...',  
    29         self.db_execute(  
    30             '''  
    31                 CREATE TABLE locations  
    32                 (  
    33                   id bigint NOT NULL,  
    34                   country character(2) NOT NULL,  
    35                   region character(2),  
    36                   city character varying(75),  
    37                   postal_code character varying(15),  
    38                   latitude numeric(6,4) NOT NULL,  
    39                   longitude numeric(7,4),  
    40                   metro_code integer,  
    41                   area_code integer,  
    42                   CONSTRAINT locations_pkey PRIMARY KEY (id)  
    43                 );  
    44                  
    45                 CREATE TABLE blocks  
    46                 (  
    47                   start_ip bigint NOT NULL,  
    48                   end_ip bigint NOT NULL,  
    49                   location_id bigint NOT NULL  
    50                 );  
    51                  
    52             '''  
    53             )  
    54         print '\033[1;32mDone\033[1;m'  
    55                  
    56     def create_indexes(self, ip4=False):  
    57         print 'Adding Indexes...',  
    58         sys.stdout.flush()  
    59         if not ip4:  
    60             self.db_execute('''  
    61              CREATE INDEX ix_start_end_ip ON blocks  
    62                 USING btree (start_ip, end_ip) WITH (FILLFACTOR=100);  
    63              CREATE INDEX ix_end_start_ip ON blocks  
    64                 USING btree (end_ip, start_ip) WITH (FILLFACTOR=100);  
    65                 ''')  
    66         else:  
    67             self.db_execute('''  
    68                  CREATE INDEX ix_ip_range ON blocks  
    69                    USING gist (ip_range) WITH (FILLFACTOR=100);  
    70                 ''')  
    71         print '\033[1;32mDone\033[1;m'  
    72          
    73     def create_functions(self, ip4=False):  
    74         print 'Adding utility functions...',  
    75         sys.stdout.flush()  
    76         if ip4:  
    77             self.db_execute('''  
    78                 CREATE OR REPLACE FUNCTION get_location(inet) RETURNS bigint AS $$  
    79                   SELECT location_id FROM %s.blocks  
    80                   WHERE ip_range >>= ip4($1)  
    81                 $$ LANGUAGE SQL;  
    82                 ''' % self.schema)  
    83         else:  
    84             self.db_execute('''  
    85                 CREATE OR REPLACE FUNCTION inet_to_bigint(inet) RETURNS bigint AS $$  
    86                     SELECT $1 - inet '0.0.0.0'  
    87                 $$ LANGUAGE SQL;  
    88                 ''' % self.schema)  
    89         print '\033[1;32mDone\033[1;m'  
    90      
    91     def create_schema(self):  
    92         try:  
    93             self.db_execute('''CREATE SCHEMA %s;''' % self.schema)  
    94         except psycopg2.ProgrammingError:  
    95           pass     
    96   
    97         self.db_execute('SET search_path TO %s,public;' % self.schema)  
    98          
    99     def db_execute(self, ddl):  
    100         self.cur.execute(ddl)  
    101 #        self.con.commit()  
    102      
    103     def load_data(self):  
    104         # Load Locations  
    105         self.load_table(self.location_csv, 'locations')  
    106         # Load Blocks  
    107         self.load_table(self.blocks_csv, 'blocks')  
    108      
    109     def load_table(self, file_name, table_name):  
    110         print 'Loading table \033[1;34m%s\033[1;m from file \033[1;34m%s\033[1;m...' % (table_name, file_name),  
    111         sys.stdout.flush()  
    112         geo_file = open(file_name)  
    113         # Skip the copyright header  
    114         geo_file.readline()  
    115         geo_file.readline()  
    116         #Remove quotes... psycopg2's `copy` errors on them  
    117         string_data = geo_file.read().replace('"', '')  
    118         self.cur.copy_from(StringIO(string_data), table_name,  sep=',', null='')  
    119         print '\033[1;32mDone\033[1;m'  
    120      
    121     def migrate_to_ip4(self):  
    122         print 'Adding ip_range column'         
    123         self.db_execute('''  
    124                         ALTER TABLE blocks ADD COLUMN ip_range ip4r;  
    125                         ALTER TABLE blocks ALTER COLUMN ip_range SET STORAGE PLAIN;  
    126                         ''')  
    127          
    128         print 'Migrating data to ip4...',  
    129         sys.stdout.flush()  
    130         self.db_execute('''UPDATE blocks SET ip_range = ip4r(start_ip::ip4, end_ip::ip4)''')  
    131         print '\033[1;32mDone\033[1;m'  
    132   
    133         print 'Dropping unneeded columns'  
    134         self.db_execute('''  
    135                         ALTER TABLE blocks DROP COLUMN start_ip;  
    136                         ALTER TABLE blocks DROP COLUMN end_ip;  
    137                         ''')  
    138     def vacuum(self):  
    139         print 'Vaccuming database...',  
    140         sys.stdout.flush()  
    141         self.db_execute('VACUUM FULL ANALYZE')  
    142         print '\033[1;32mDone\033[1;m'  
    143   
    144 def main():  
    145     DSN = "dbname='%s' user='%s' host='%s'"  
    146   
    147     parser = OptionParser()  
    148     # Operational options  
    149     parser.add_option('-c', '--load-ddl', dest='load_ddl', default=False,  
    150                       action='store_true', help='Create database structure')  
    151      
    152     parser.add_option('-g', '--load-data', dest='load', default=False,  
    153                       action='store_true', help='Load the GeoIP data')  
    154   
    155     parser.add_option('-b', '--blocks-file', dest='blocks_csv', default='GeoLiteCity-Blocks.csv',  
    156                       action='store', help='GeoIP Blocks CSV file [default: %default]', metavar='BLOCKS_FILE')  
    157     parser.add_option('-l', '--locations-file', dest='locations_csv', default='GeoLiteCity-Location.csv',  
    158                       action='store', help='GoeIP Locations CSV file [default: %default]', metavar='LOCATIONS_FILE')  
    159   
    160     db_group = OptionGroup(parser, 'Database Options')  
    161     # Database options  
    162     db_group.add_option('-H', '--host', dest='db_host', default='localhost',  
    163                       action='store', help='Database host [default: %default]', metavar='DB_HOST')  
    164     db_group.add_option('-d', '--database', dest='db_name', default='geoip_db',  
    165                       action='store', help='Database name [default: %default]', metavar='DATABASE_NAME')  
    166     db_group.add_option('-U', '--user', dest='db_user', default='geoip',  
    167                       action='store', help='User [default: %default]', metavar='USER_NAME')  
    168     db_group.add_option('-s', '--schema', dest='schema', default='public',  
    169                       action='store', help='Database Schema [default: %default]', metavar='SCHEMA')  
    170   
    171     db_group.add_option('--ip4r', dest='ip4', default=False,  
    172                       action='store_true', help='Use IP4r module [default: %default]')  
    173   
    174     parser.add_option_group(db_group)  
    175      
    176     (options, args) = parser.parse_args()  
    177   
    178     data_loader = GeoIPDataLoader("dbname='%s' user='%s' host='%s'" % (options.db_name, options.db_user, options.db_host),  
    179                                   blocks=options.blocks_csv, locations=options.locations_csv, schema=options.schema)  
    180   
    181     if not options.load_ddl and not options.load:  
    182         parser.print_help()  
    183         return  
    184   
    185     if options.load_ddl:  
    186         if options.schema != 'public':  
    187             data_loader.create_schema()  
    188         data_loader.create_tables()  
    189    
    190     if options.load:  
    191         data_loader.load_data()  
    192   
    193     if options.ip4:  
    194         data_loader.migrate_to_ip4()  
    195   
    196     if options.load:  
    197         data_loader.create_indexes(options.ip4 is True)  
    198   
    199     if options.load_ddl:  
    200         data_loader.create_functions(options.ip4 is True)  
    201   
    202     data_loader.vacuum()  
    203   
    204 if __name__ == "__main__":  
    205     main()  
    206  
     1#!/usr/bin/python 
     2 
     3''' 
     4Script for loading GeoIP CSV data into a postgresql database 
     5''' 
     6 
     7import logging, psycopg2, psycopg2.extensions, sys 
     8 
     9from optparse import OptionGroup, OptionParser 
     10from StringIO import StringIO 
     11 
     12class GeoIPDataLoader(object): 
     13 
     14    def __init__(self, dsn, blocks='GeoLiteCity-Blocks.csv', locations='GeoLiteCity-Location.csv', schema='public', zip=None): 
     15        self.con = psycopg2.connect(dsn) 
     16        # We don't need transactions... right? 
     17        self.con.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) 
     18        # The data is in ISO8859_15 encoding 
     19        self.con.set_client_encoding('iso8859_15') 
     20        self.cur = self.con.cursor() 
     21 
     22        self.blocks_csv = blocks 
     23        self.location_csv = locations 
     24        self.zip = zip 
     25        self.schema = schema 
     26  
     27    def close(self): 
     28        self.con.close() 
     29 
     30    def create_tables(self): 
     31        print 'Creating structure...', 
     32        self.db_execute( 
     33            ''' 
     34                CREATE TABLE locations 
     35                ( 
     36                  id bigint NOT NULL, 
     37                  country character(2) NOT NULL, 
     38                  region character(2), 
     39                  city character varying(75), 
     40                  postal_code character varying(15), 
     41                  latitude numeric(6,4) NOT NULL, 
     42                  longitude numeric(7,4), 
     43                  metro_code integer, 
     44                  area_code integer, 
     45                  CONSTRAINT locations_pkey PRIMARY KEY (id) 
     46                ); 
     47                 
     48                CREATE TABLE blocks 
     49                ( 
     50                  start_ip bigint NOT NULL, 
     51                  end_ip bigint NOT NULL, 
     52                  location_id bigint NOT NULL 
     53                ); 
     54                
     55            ''' 
     56            ) 
     57        print '\033[1;32mDone\033[1;m' 
     58                 
     59    def create_indexes(self, ip4=False): 
     60        print 'Adding Indexes...', 
     61        sys.stdout.flush() 
     62        if not ip4: 
     63            self.db_execute(''' 
     64             CREATE INDEX ix_start_end_ip ON blocks  
     65                USING btree (start_ip, end_ip) WITH (FILLFACTOR=100); 
     66             CREATE INDEX ix_end_start_ip ON blocks  
     67                USING btree (end_ip, start_ip) WITH (FILLFACTOR=100);  
     68                ''') 
     69        else: 
     70            self.db_execute(''' 
     71                 CREATE INDEX ix_ip_range ON blocks 
     72                   USING gist (ip_range) WITH (FILLFACTOR=100); 
     73                ''') 
     74        print '\033[1;32mDone\033[1;m' 
     75         
     76    def create_functions(self, ip4=False): 
     77        print 'Adding utility functions...', 
     78        sys.stdout.flush() 
     79        if ip4: 
     80            self.db_execute(''' 
     81                CREATE OR REPLACE FUNCTION get_location(inet) RETURNS bigint AS $$ 
     82                  SELECT location_id FROM %s.blocks 
     83                  WHERE ip_range >>= ip4($1) 
     84                  LIMIT 1 
     85                $$ LANGUAGE SQL; 
     86                ''' % self.schema) 
     87        else: 
     88            self.db_execute(''' 
     89                CREATE OR REPLACE FUNCTION inet_to_bigint(inet) RETURNS bigint AS $$ 
     90                    SELECT $1 - inet '0.0.0.0' 
     91                $$ LANGUAGE SQL; 
     92                ''') 
     93        print '\033[1;32mDone\033[1;m' 
     94     
     95    def create_schema(self): 
     96        try: 
     97            self.db_execute('''CREATE SCHEMA %s;''' % self.schema) 
     98        except psycopg2.ProgrammingError: 
     99          pass    
     100 
     101        self.db_execute('SET search_path TO %s,public;' % self.schema) 
     102         
     103    def db_execute(self, ddl): 
     104        self.cur.execute(ddl) 
     105#        self.con.commit() 
     106     
     107    def load_data(self): 
     108         
     109        if self.zip: 
     110            # Something more clever can be done here... but maybe... later 
     111            from zipfile import ZipFile 
     112             
     113            zip = ZipFile(self.zip) 
     114             
     115            for z in zip.infolist(): 
     116                if z.filename.endswith(self.location_csv): 
     117                    self.load_table(z.filename, 'locations', data_file=StringIO(zip.read(z.filename))) 
     118                elif z.filename.endswith(self.blocks_csv): 
     119                    self.load_table(z.filename, 'blocks', data_file=StringIO(zip.read(z.filename))) 
     120        else: 
     121            # Load Locations 
     122            self.load_table(self.location_csv, 'locations') 
     123            # Load Blocks 
     124            self.load_table(self.blocks_csv, 'blocks') 
     125     
     126    def load_table(self, file_name, table_name, data_file=None): 
     127        print 'Loading table \033[1;34m%s\033[1;m from file \033[1;34m%s\033[1;m...' % (table_name, file_name), 
     128        sys.stdout.flush() 
     129 
     130        if not data_file: 
     131            data_file = open(file_name) 
     132         
     133        # Skip the copyright header 
     134        data_file.readline() 
     135        data_file.readline() 
     136        #Remove quotes... psycopg2's `copy` errors on them 
     137        string_data = data_file.read().replace('"', '') 
     138         
     139        self.cur.copy_from(StringIO(string_data), table_name,  sep=',', null='') 
     140        print '\033[1;32mDone\033[1;m' 
     141     
     142    def migrate_to_ip4(self): 
     143        print 'Adding ip_range column'         
     144        self.db_execute(''' 
     145                        ALTER TABLE blocks ADD COLUMN ip_range ip4r; 
     146                        ALTER TABLE blocks ALTER COLUMN ip_range SET STORAGE PLAIN; 
     147                        ''') 
     148         
     149        print 'Migrating data to ip4...', 
     150        sys.stdout.flush() 
     151        self.db_execute('''UPDATE blocks SET ip_range = ip4r(start_ip::ip4, end_ip::ip4)''') 
     152        print '\033[1;32mDone\033[1;m' 
     153 
     154        print 'Dropping unneeded columns' 
     155        self.db_execute(''' 
     156                        ALTER TABLE blocks DROP COLUMN start_ip; 
     157                        ALTER TABLE blocks DROP COLUMN end_ip; 
     158                        ''') 
     159    def vacuum(self): 
     160        print 'Vaccuming database...', 
     161        sys.stdout.flush() 
     162        self.db_execute('VACUUM FULL ANALYZE') 
     163        print '\033[1;32mDone\033[1;m' 
     164 
     165def main(): 
     166    DSN = "dbname='%s' user='%s' host='%s'" 
     167 
     168    parser = OptionParser() 
     169    # Operational options 
     170    parser.add_option('-c', '--load-ddl', dest='load_ddl', default=False, 
     171                      action='store_true', help='Create database structure') 
     172    
     173    parser.add_option('-g', '--load-data', dest='load', default=False, 
     174                      action='store_true', help='Load the GeoIP data') 
     175 
     176    parser.add_option('-b', '--blocks-file', dest='blocks_csv', default='GeoLiteCity-Blocks.csv', 
     177                      action='store', help='GeoIP Blocks CSV file [default: %default]', metavar='BLOCKS_FILE') 
     178    parser.add_option('-l', '--locations-file', dest='locations_csv', default='GeoLiteCity-Location.csv', 
     179                      action='store', help='GoeIP Locations CSV file [default: %default]', metavar='LOCATIONS_FILE') 
     180 
     181    parser.add_option('-z', '--zip', dest='zip', 
     182                      action='store', help='GoeIP Locations ZIP file [default: %default]', metavar='ZIP_FILE') 
     183 
     184    db_group = OptionGroup(parser, 'Database Options') 
     185    # Database options 
     186    db_group.add_option('-H', '--host', dest='db_host', default='localhost', 
     187                      action='store', help='Database host [default: %default]', metavar='DB_HOST') 
     188    db_group.add_option('-d', '--database', dest='db_name', default='geoip_db', 
     189                      action='store', help='Database name [default: %default]', metavar='DATABASE_NAME') 
     190    db_group.add_option('-U', '--user', dest='db_user', default='geoip', 
     191                      action='store', help='User [default: %default]', metavar='USER_NAME') 
     192    db_group.add_option('-s', '--schema', dest='schema', default='public', 
     193                      action='store', help='Database Schema [default: %default]', metavar='SCHEMA') 
     194 
     195    db_group.add_option('--ip4r', dest='ip4', default=False, 
     196                      action='store_true', help='Use IP4r module [default: %default]') 
     197 
     198    parser.add_option_group(db_group) 
     199     
     200    (options, args) = parser.parse_args() 
     201 
     202    data_loader = GeoIPDataLoader("dbname='%s' user='%s' host='%s'" % (options.db_name, options.db_user, options.db_host), 
     203                                  blocks=options.blocks_csv, locations=options.locations_csv, zip=options.zip, schema=options.schema) 
     204 
     205    if options.ip4 and not options.load and options.load_ddl: 
     206        print '\033[1;31mERROR\033[1;m Creating a raw IP4 schema breaks data loading.  Use --ip4r switch ONLY during or after loading data' 
     207        return 
     208 
     209    if not options.load_ddl and not options.load: 
     210        parser.print_help() 
     211        return 
     212 
     213    if options.load_ddl: 
     214        if options.schema != 'public': 
     215            data_loader.create_schema() 
     216        data_loader.create_tables() 
     217  
     218    if options.load: 
     219        data_loader.load_data() 
     220 
     221    if options.ip4: 
     222        data_loader.migrate_to_ip4() 
     223 
     224    if options.load: 
     225        data_loader.create_indexes(options.ip4 is True) 
     226 
     227    if options.load_ddl: 
     228        data_loader.create_functions(options.ip4 is True) 
     229 
     230    data_loader.vacuum() 
     231 
     232if __name__ == "__main__": 
     233    main() 
     234