Content
- Documentation
- Code
1. Documentation
csv2sqliteDB.py:
1. Creates an sqlite Database (from SQLITE_FILEPATH) then
2. Creates and Loads TABLE_NAME (adding id INTEGER PRIMARY KEY )
– FROM CSV_FILEPATH (with header, NO id field necessary)
– INTO DATABASE_FILEPATH
– CREATING DATABASE_FILEPATH if it doesn’t exist.
Usage: csv2sqliteDB.py TABLE_NAME CSV_FILEPATH SQLITE_FILEPATH Example: csv2sqliteDB.py contacts contacts2.csv contactsDB2id.db
2. Code
#!/usr/bin/python #! python3 #! python2 # -*- coding: utf-8 -*- """csv2sqliteDB.py Creates and Loads TABLE_NAME (adding id INTEGER PRIMARY KEY ) from CSV_FILEPATH (with header, NO id field necessary) into DATABASE_FILEPATH creating DATABASE_FILEPATH if it doesn't exist Usage: csv2sqliteDB.py TABLE_NAME CSV_FILEPATH SQLITE_FILEPATH Example: csv2sqliteDB.py contacts contacts2.csv contactsDB2id.db """ # see: # Importing a CSV file into a sqlite3 - http://stackoverflow.com/a/2888042/601770 # csv file using python with headers intact - http://stackoverflow.com/a/3428633/601770 import os, sys, csv, sqlite3 def csv2sqliteDB(table_name, csv_filepath, database_filepath): conn, cursor = createDatabaseIfNotExist(database_filepath) nlines = sum(1 for line in open(csv_filepath)) # http://stackoverflow.com/a/36973958/601770 with open(csv_filepath, 'r') as f: reader = csv.reader(f) headerL = reader.__next__() createTable(table_name, conn, cursor, headerL) loadTable(table_name, conn, cursor, headerL, reader, nlines) conn.close() def createDatabaseIfNotExist(database_filepath): # if we error, we rollback automatically, else commit! with sqlite3.connect(database_filepath) as conn: cursor = conn.cursor() cursor.execute('SELECT SQLITE_VERSION()') data = cursor.fetchone() print('SQLite version:', data) return conn, cursor def createTable(table_name, conn, cursor, headerL): """ :param table_name: :param conn: :param cursor: :param header: :return: """ # http://stackoverflow.com/a/12432311/601770 # http: // stackoverflow.com / a / 19730169 / 601770 textL = len(headerL)*'TEXT'.split() def_str = ', '.join( ['id INTEGER PRIMARY KEY'] + [i[0]+' '+i[1] for i in list(zip(headerL, textL))] ) script_str = """DROP TABLE IF EXISTS %s; CREATE TABLE %s (%s); """%(table_name, table_name, def_str) # checks to see if table exists and makes a fresh table. cursor.executescript(script_str) def loadTable(table_name, conn, cursor, headerL, reader, nlines): insert_str_model = "INSERT INTO {table_name} ({col_names}) VALUES({q_marks});" col_names = ','.join(headerL) q_marks = ','.join(len(headerL)*'?'.split()) insert_str = insert_str_model.format(table_name=table_name, col_names=col_names, q_marks=q_marks) # DO THE INSERTS line_count = 0 for row in reader: line_count += 1 if line_count%10 == 1: print('line %s of %s'%(line_count, nlines)) cursor.execute(insert_str, row) conn.commit() NUM_ARGS = 3 def main(): args = sys.argv[1:] if len(args) != NUM_ARGS or "-h" in args or "--help" in args: print (__doc__) sys.exit(2) csv2sqliteDB(args[0], args[1], args[2]) if __name__ == '__main__': main()