{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Batch Loading of Data\n",
"\n",
"Let us understand how we should take care of loading data in batches. We will perform load using multiple approaches to understand which one is better.\n",
"* Approach 1: Insert and commit each record. Whenever there is a commit in database, there is considerable amount of overhead.\n",
"* Approach 2: Insert one record at a time, but commit at the end.\n",
"* Approach 3: Insert all records at once and commit at the end.\n",
"* Approach 4: Insert records in chunks or batches and commit per chunk or batch.\n",
"\n",
"We should follow the fourth approach while dealing with huge amounts of data. It will facilitate us to restartability or recoverability. "
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"tags": [
"remove-cell"
]
},
"outputs": [
{
"data": {
"text/html": [
"VIDEO \n"
],
"text/plain": [
""
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"%%HTML\n",
"VIDEO "
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"%run 02_function_get_database_connection.ipynb"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"def get_cursor(connection):\n",
" return connection.cursor()"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"%run 06_reading_data_from_file.ipynb"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" \n",
" order_id \n",
" order_date \n",
" order_customer_id \n",
" order_status \n",
" \n",
" \n",
" \n",
" \n",
" 0 \n",
" 1 \n",
" 2013-07-25 00:00:00.0 \n",
" 11599 \n",
" CLOSED \n",
" \n",
" \n",
" 1 \n",
" 2 \n",
" 2013-07-25 00:00:00.0 \n",
" 256 \n",
" PENDING_PAYMENT \n",
" \n",
" \n",
" 2 \n",
" 3 \n",
" 2013-07-25 00:00:00.0 \n",
" 12111 \n",
" COMPLETE \n",
" \n",
" \n",
"
\n",
"
"
],
"text/plain": [
" order_id order_date order_customer_id order_status\n",
"0 1 2013-07-25 00:00:00.0 11599 CLOSED\n",
"1 2 2013-07-25 00:00:00.0 256 PENDING_PAYMENT\n",
"2 3 2013-07-25 00:00:00.0 12111 COMPLETE"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"orders.head(3)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" \n",
" order_item_id \n",
" order_item_order_id \n",
" order_item_product_id \n",
" order_item_quantity \n",
" order_item_subtotal \n",
" order_item_product_price \n",
" \n",
" \n",
" \n",
" \n",
" 0 \n",
" 1 \n",
" 1 \n",
" 957 \n",
" 1 \n",
" 299.98 \n",
" 299.98 \n",
" \n",
" \n",
" 1 \n",
" 2 \n",
" 2 \n",
" 1073 \n",
" 1 \n",
" 199.99 \n",
" 199.99 \n",
" \n",
" \n",
" 2 \n",
" 3 \n",
" 2 \n",
" 502 \n",
" 5 \n",
" 250.00 \n",
" 50.00 \n",
" \n",
" \n",
"
\n",
"
"
],
"text/plain": [
" order_item_id order_item_order_id order_item_product_id \\\n",
"0 1 1 957 \n",
"1 2 2 1073 \n",
"2 3 2 502 \n",
"\n",
" order_item_quantity order_item_subtotal order_item_product_price \n",
"0 1 299.98 299.98 \n",
"1 1 199.99 199.99 \n",
"2 5 250.00 50.00 "
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"order_items.head(3)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"query = (\"\"\"INSERT INTO orders\n",
" (order_id, order_date, order_customer_id, order_status)\n",
" VALUES\n",
" (%s, %s, %s, %s)\"\"\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```{note}\n",
"Inserting and committing one row in each iteration. Commit is quite expensive as it result in database checkpoint.\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"def load_orders(connection, cursor, query, data):\n",
" for rec in data:\n",
" cursor.execute(query, rec)\n",
" connection.commit()"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"cursor = get_cursor(retail_connection)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 460 ms, sys: 479 ms, total: 939 ms\n",
"Wall time: 7.45 s\n"
]
}
],
"source": [
"%%time\n",
"load_orders(retail_connection, cursor, query, orders.values.tolist()[:10000])"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"cursor.execute('TRUNCATE TABLE orders')"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"retail_connection.commit()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```{note}\n",
"Inserting one row at a time but committing at the end. Even though it is much faster than previous approach, it is transferring one record at a time between Python Engine and Database Engine.\n",
"\n",
"We can further tune by leveraging batch insert.\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"def load_orders(connection, cursor, query, data):\n",
" for rec in data:\n",
" cursor.execute(query, rec)\n",
" connection.commit()"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"cursor = get_cursor(retail_connection)"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 1.28 s, sys: 1.03 s, total: 2.3 s\n",
"Wall time: 5.16 s\n"
]
}
],
"source": [
"%%time\n",
"# Inserting all orders\n",
"load_orders(retail_connection, cursor, query, orders.values.tolist())"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [],
"source": [
"cursor.execute('TRUNCATE TABLE orders')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```{note}\n",
"All the records will be inserted as part of one batch insert operation. If there is lot of data to be inserted, then this might start running into issues such as out of memory.\n",
"\n",
"Also, if the job fails in the middle then all the data that is transferred thus far will be lost. Hence it is better to batch with manageable size and then insert as well as commit.\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [],
"source": [
"def load_orders(connection, cursor, query, data):\n",
" cursor.executemany(query, data)\n",
" connection.commit()"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [],
"source": [
"cursor = get_cursor(retail_connection)"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 1.42 s, sys: 916 ms, total: 2.34 s\n",
"Wall time: 5.1 s\n"
]
}
],
"source": [
"%%time\n",
"# Inserting all orders\n",
"load_orders(retail_connection, cursor, query, orders.values.tolist())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```{note}\n",
"You might not see significant difference in performance as our database is running in the same server from where the code is running to insert the data.\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [],
"source": [
"cursor.execute('TRUNCATE TABLE orders')"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"68883"
]
},
"execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"len(orders.values.tolist())"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[0, 10000, 20000, 30000, 40000, 50000, 60000]"
]
},
"execution_count": 21,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"list(range(0, len(orders.values.tolist()), 10000))"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [],
"source": [
"def load_orders(connection, cursor, query, data, batch_size=10000):\n",
" for i in range(0, len(data), batch_size):\n",
" cursor.executemany(query, data[i:i+batch_size])\n",
" connection.commit()"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [],
"source": [
"cursor = get_cursor(retail_connection)"
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 1.27 s, sys: 1.02 s, total: 2.29 s\n",
"Wall time: 5.1 s\n"
]
}
],
"source": [
"%%time\n",
"# Inserting all orders\n",
"load_orders(retail_connection, cursor, query, orders.values.tolist())"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [],
"source": [
"%load_ext sql"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"env: DATABASE_URL=postgresql://itversity_retail_user:retail_password@localhost:5432/itversity_retail_db\n"
]
}
],
"source": [
"%env DATABASE_URL=postgresql://itversity_retail_user:retail_password@localhost:5432/itversity_retail_db"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1 rows affected.\n"
]
},
{
"data": {
"text/html": [
"\n",
" \n",
" count \n",
" \n",
" \n",
" 68883 \n",
" \n",
"
"
],
"text/plain": [
"[(68883,)]"
]
},
"execution_count": 30,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%sql\n",
"\n",
"SELECT count(1) FROM orders"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.12"
}
},
"nbformat": 4,
"nbformat_minor": 4
}