Batch Loading of Data

Let us understand how we should take care of loading data in batches. We will perform load using multiple approaches to understand which one is better.

  • Approach 1: Insert and commit each record. Whenever there is a commit in database, there is considerable amount of overhead.

  • Approach 2: Insert one record at a time, but commit at the end.

  • Approach 3: Insert all records at once and commit at the end.

  • Approach 4: Insert records in chunks or batches and commit per chunk or batch.

We should follow the fourth approach while dealing with huge amounts of data. It will facilitate us to restartability or recoverability.

%run 02_function_get_database_connection.ipynb
Copy to clipboard
def get_cursor(connection):
    return connection.cursor()
Copy to clipboard
%run 06_reading_data_from_file.ipynb
Copy to clipboard
orders.head(3)
Copy to clipboard
order_id order_date order_customer_id order_status
0 1 2013-07-25 00:00:00.0 11599 CLOSED
1 2 2013-07-25 00:00:00.0 256 PENDING_PAYMENT
2 3 2013-07-25 00:00:00.0 12111 COMPLETE
order_items.head(3)
Copy to clipboard
order_item_id order_item_order_id order_item_product_id order_item_quantity order_item_subtotal order_item_product_price
0 1 1 957 1 299.98 299.98
1 2 2 1073 1 199.99 199.99
2 3 2 502 5 250.00 50.00
query = ("""INSERT INTO orders
         (order_id, order_date, order_customer_id, order_status)
         VALUES
         (%s, %s, %s, %s)""")
Copy to clipboard

Note

Inserting and committing one row in each iteration. Commit is quite expensive as it result in database checkpoint.

def load_orders(connection, cursor, query, data):
    for rec in data:
        cursor.execute(query, rec)
        connection.commit()
Copy to clipboard
cursor = get_cursor(retail_connection)
Copy to clipboard
%%time
load_orders(retail_connection, cursor, query, orders.values.tolist()[:10000])
Copy to clipboard
CPU times: user 460 ms, sys: 479 ms, total: 939 ms
Wall time: 7.45 s
Copy to clipboard
cursor.execute('TRUNCATE TABLE orders')
Copy to clipboard
retail_connection.commit()
Copy to clipboard

Note

Inserting one row at a time but committing at the end. Even though it is much faster than previous approach, it is transferring one record at a time between Python Engine and Database Engine.

We can further tune by leveraging batch insert.

def load_orders(connection, cursor, query, data):
    for rec in data:
        cursor.execute(query, rec)
    connection.commit()
Copy to clipboard
cursor = get_cursor(retail_connection)
Copy to clipboard
%%time
# Inserting all orders
load_orders(retail_connection, cursor, query, orders.values.tolist())
Copy to clipboard
CPU times: user 1.28 s, sys: 1.03 s, total: 2.3 s
Wall time: 5.16 s
Copy to clipboard
cursor.execute('TRUNCATE TABLE orders')
Copy to clipboard

Note

All the records will be inserted as part of one batch insert operation. If there is lot of data to be inserted, then this might start running into issues such as out of memory.

Also, if the job fails in the middle then all the data that is transferred thus far will be lost. Hence it is better to batch with manageable size and then insert as well as commit.

def load_orders(connection, cursor, query, data):
    cursor.executemany(query, data)
    connection.commit()
Copy to clipboard
cursor = get_cursor(retail_connection)
Copy to clipboard
%%time
# Inserting all orders
load_orders(retail_connection, cursor, query, orders.values.tolist())
Copy to clipboard
CPU times: user 1.42 s, sys: 916 ms, total: 2.34 s
Wall time: 5.1 s
Copy to clipboard

Note

You might not see significant difference in performance as our database is running in the same server from where the code is running to insert the data.

cursor.execute('TRUNCATE TABLE orders')
Copy to clipboard
len(orders.values.tolist())
Copy to clipboard
68883
Copy to clipboard
list(range(0, len(orders.values.tolist()), 10000))
Copy to clipboard
[0, 10000, 20000, 30000, 40000, 50000, 60000]
Copy to clipboard
def load_orders(connection, cursor, query, data, batch_size=10000):
    for i in range(0, len(data), batch_size):
        cursor.executemany(query, data[i:i+batch_size])
        connection.commit()
Copy to clipboard
cursor = get_cursor(retail_connection)
Copy to clipboard
%%time
# Inserting all orders
load_orders(retail_connection, cursor, query, orders.values.tolist())
Copy to clipboard
CPU times: user 1.27 s, sys: 1.02 s, total: 2.29 s
Wall time: 5.1 s
Copy to clipboard
%load_ext sql
Copy to clipboard
%env DATABASE_URL=postgresql://itversity_retail_user:retail_password@localhost:5432/itversity_retail_db
Copy to clipboard
env: DATABASE_URL=postgresql://itversity_retail_user:retail_password@localhost:5432/itversity_retail_db
Copy to clipboard
%%sql

SELECT count(1) FROM orders
Copy to clipboard
1 rows affected.
Copy to clipboard
count
68883