Topics: pandas, memory
For those moments when you’re slightly over maximum RAM.
When your data sizes just exceed what fits in memory, pandas doesn’t have to grind to a halt. In this seminar, we’ll explore strategies to push pandas a little further without overhauling your entire workflow. We’ll cover tips to help you improve your pandas memory footprint like using memory-efficient datatypes or even incorporating tools like DuckDB and SQLite into your workflow.
We’ll also cover practical tips for profiling memory usage in pandas and understand where the biggest bottlenecks occur. By combining these insights with some additional tooling, you’ll gain the flexibility to process challenging datasets without resorting to big-data frameworks. This seminar is all about working smarter, not harder, while staying in your familiar pandas environment.
# python version → 3.11
python -m pip install pandas==2.0.3 matplotlib==3.9.* duckdb==1.1.3 numpy<2.* pyarrow==19.0.0 sqlalchemy==2.0.37 memray==1.15 numexpr==2.10.2
print("Let's Take a Look!")
You have some script/notebook that you regularly run, but one day you realize that the dataset you were working with has grown so large you can’t read the entire file into memory any more.
from pathlib import Path
from pandas import DataFrame, date_range, read_sql
from numpy.random import default_rng
from contextlib import closing
from sqlite3 import connect
rng = default_rng(0)
df = DataFrame(
data={
'timestamp': date_range('2000-01', freq='s', periods=(n := 500_000)),
'group': rng.choice([*'abcdefghijklmnop'], size=n),
**{
f'values{i}': rng.normal(1, .001, size=n).cumprod() for i in range(10)
},
},
)
data_dir = Path('data')
data_dir.mkdir(exist_ok=True)
df.to_csv(fpath := (data_dir / 'data.csv'), index=False)
print(f'wrote {fpath}!')
with closing(connect(fpath := data_dir / 'data.db')) as conn:
df.to_sql('mytable', conn, if_exists='replace', index=False)
print(f'wrote {fpath}!')
from pathlib import Path
from pandas import read_csv, to_datetime
from utils import timed
from memray import Tracker, FileDestination
data_dir = Path('data')
# result = df.set_index('timestamp').groupby('group').rolling('timestamp')[['values1', 'values2']].mean()
# columns → timestamp, group, values1, values2
with Tracker(destination=FileDestination('original.bin', overwrite=True)):
df = read_csv(data_dir / 'data.csv')
with Tracker(destination=FileDestination('subset.bin', overwrite=True)):
df = read_csv(data_dir / 'data.csv', usecols=['timestamp', 'group', 'values1', 'values2'])
with Tracker(destination=FileDestination('subset_parse.bin', overwrite=True)):
df = read_csv(
data_dir / 'data.csv',
usecols=['timestamp', 'group', 'values1', 'values2'],
parse_dates=['timestamp'],
# infer_datetime_format=True, # deprecated
date_format={'timestap': '%Y-%m-%d %H:%M:%S'}
)
print('done')
For reducing your memory footprint, loading in less data for your end result is going to have the largest impact. Most other parsing options exist for speed or convenience.
But what if we cannot hold the end result in memory?
Chunking a File
When dealing with files on disk, chunking requires us to read and process part of the file before reading in the next portion of data.
from pandas import read_csv
print(read_csv('data/data.csv', nrows=5))
Say we want to accomplish the following tasks:
from pathlib import Path
from pandas import Series, read_csv, interval_range
from itertools import islice
data_dir = Path('data')
reader = read_csv(data_dir / 'data.csv', chunksize=100_000, usecols=['group', 'values0', 'values1'])
# sums, counts = Series(dtype='float64'), Series(dtype='int64')
values_hist = Series(
0, index=interval_range(0.2, end=1.8, periods=10)
)
for df in reader:
res = df['values1'].value_counts(bins=values_hist.index, sort=False)
values_hist = values_hist.add(res, fill_value=0)
# res = df.groupby('group')['values0'].agg(['sum', 'count'])
# sums = sums.add(res['sum'], fill_value=0)
# counts = counts.add(res['count'], fill_value=0)
# # print(sums / counts)
print(values_hist)
While the above was fairly straightforward, we can start to reason about the computations are hard to perform online (in a streamed/chunked fashion).
Chunking a Database Table
Generic naive approach: offset/limit
from contextlib import closing
from sqlite3 import connect
from itertools import count
from pandas import read_sql
with closing(connect('data/data.db')) as conn:
for offset in count(0, step=(limit := 100_000)):
df = read_sql('''
select * from mytable
limit :limit
offset :offset
''',
con=conn,
params={'offset': offset, 'limit': limit},
)
if df.empty:
break
print(df)
Chunking via Connection Cursor
from contextlib import closing
from sqlite3 import connect
from pandas import DataFrame
with closing(connect('data/data.db')) as conn:
cursor = conn.execute('select * from mytable')
columns = [desc[0] for desc in cursor.description]
while (data := cursor.fetchmany(100_000)):
df = DataFrame(data, columns=columns)
print(df)
Chunking via SQLAlchemy & read_sql options
from contextlib import closing
from pandas import read_sql
from sqlalchemy import create_engine
engine = create_engine('sqlite:///data/data.db')
# Will work for any SQL flavor: https://github.com/pandas-dev/pandas/issues/35689
with closing(engine.connect().execution_options(stream_results=True)) as conn:
for df in read_sql('select * from mytable', con=conn, chunksize=100_000):
print(df.head())
Predicate Pushdown & Streaming Engines
from pandas import read_csv
df = read_csv('data/data.csv').loc[lambda d: d['group'] == 'a']
If you have an analytical pipeline that heavily relies on pandas, try using DuckDB for the edges of the program (loading/filtering) or replace memory-intensive computations with it.
from duckdb import sql
relation = sql('''
select timestamp, "group", values1, values2
from 'data/data.csv'
where "group" = 'a'
''')
print(relation.df())
Interactive IPython holds onto all variables displayed, even if they are deleted.
.head() or .sample(..., random_state=...) when displayingfrom pandas import DataFrame, date_range, concat
from numpy import arange
from numpy.random import default_rng
from string import ascii_lowercase
rng = default_rng(0)
def metainfo(df):
return DataFrame({
'memory': df.memory_usage(deep=True, index=False).map('{:,}'.format),
'dtypes': df.dtypes,
})
df = DataFrame({
'order': arange(n := 10_000, dtype='float64'),
'timestamp': date_range('2000-01-01', periods=n, freq='H').astype(str),
'values': rng.normal(1, .02, size=n).cumprod(),
'group': rng.choice([*'abcdefghijklmnop'], size=n),
'entity': rng.choice([*ascii_lowercase], size=(n, 10)).view('<U10').ravel(),
})
new_df = df.astype({
'order': 'int32',
'timestamp': 'datetime64[ns]',
'group': 'category',
'entity': 'string',
})
auto_df = new_df.convert_dtypes(dtype_backend='pyarrow')
print(
concat({'orig': metainfo(df), 'astype': metainfo(new_df), 'pyarrow': metainfo(auto_df)}, axis=1)
.sort_index(axis=1, level=1, sort_remaining=False)
)
from pandas import read_csv, concat, DataFrame
from utils import timed
def metainfo(df):
return DataFrame({
'memory': df.memory_usage(deep=True, index=False).map('{:,}'.format),
'dtypes': df.dtypes,
})
with timed('standard'):
standard = read_csv('data/data.csv')
with timed('pyarrow engine'):
pyarr_engine = read_csv('data/data.csv', engine='pyarrow')
with timed('pyarrow engine & pyarrow dtypes'):
pyarr_engine_backend = read_csv('data/data.csv', engine='pyarrow', dtype_backend='pyarrow')
print(
concat({
'orig': metainfo(standard),
'pyarrow engine': metainfo(pyarr_engine),
'pyarrow eng & backend': metainfo(pyarr_engine_backend)
},
axis=1
)
.sort_index(axis=1, level=1, sort_remaining=False)
)
Pairwise Comparisons
from pathlib import Path
from numpy.random import default_rng
from string import ascii_lowercase
from pandas import DataFrame
rng = default_rng(0)
df = DataFrame(
data={
'entity': rng.choice([*ascii_lowercase], size=(n := 2_000, 5)).view('<U5').ravel(),
'values': rng.normal(100, 5, size=n),
},
)
data_dir = Path('data')
df.to_parquet(data_dir / 'pairwise.parquet', index=False)
print(df)
Cross Join & Calculate
from pandas import read_parquet
df = read_parquet('data/pairwise.parquet')
print(
df.merge(df, how='cross')
.assign(
delta=lambda d: (d['values_y'] - d['values_x']).abs()
)
.pivot(
index='entity_x', columns='entity_y', values='delta'
)
)
Be familiar with your NumPy ufuncs!
NumPy sits a lower abstraction level than pandas does, so it keeps you a touch “closer to the metal”. If you need a bit more management over memory, reaching into NumPy could be an approach you want to take.
from pandas import DataFrame, read_parquet
from numpy import subtract
s = (
read_parquet('data/pairwise.parquet', engine='pyarrow')
.set_index('entity')['values']
)
print(
DataFrame(
subtract.outer(s.to_numpy(), s.to_numpy()),
columns=s.index,
index=s.index,
).abs()
)
Only operate on the lower triangle
from pandas import MultiIndex, read_parquet
s = (
read_parquet('data/pairwise.parquet', dtype_backend='pyarrow')
.set_index('entity')['values']
)
idx = MultiIndex.from_product([s.index.sort_values()] * 2, names=['entity_x', 'entity_y'])
idx = idx[idx.codes[0] < idx.codes[1]]
res = (s.reindex(idx, level=0) - s.reindex(idx, level=1))
print(res.xs('zzhps', level=0))
print(res.xs('zzhps', level=1))
Occassionally, the problems we seek to solve are simply too large. Instead, we can answer the most interesting narrow vertical slices of that problem.
from pandas import DataFrame, MultiIndex, read_parquet, Series
s = (
read_parquet('data/pairwise.parquet', engine='pyarrow')
.set_index('entity')['values']
)
top_s = s.nlargest(5).index
idx = MultiIndex.from_product([top_s, s.index.sort_values()], names=['entity_x', 'entity_y'])
res = (s.reindex(idx, level=0) - s.reindex(idx, level=1))
print(res)
Does NumExpr help with peak memory?
from pandas import read_csv
from pathlib import Path
import numexpr
from memray import Tracker, FileDestination
from utils import timed
df = read_csv('data/data.csv', usecols=['values1', 'values2', 'values3'])
# with timed('assign'):
with Tracker(destination=FileDestination('assign', overwrite=True)):
res = df.assign(
x=lambda d: d['values1'] ** 4 + d['values2'] * 5 - d['values3'],
y=lambda d: d['values1'] ** 4 + d['values2'] * 5 - d['values3'],
z=lambda d: d['values1'] ** 4 + d['values2'] * 5 - d['values3'],
a=lambda d: d['values1'] ** 4 + d['values2'] * 5 - d['values3'],
b=lambda d: d['values1'] ** 4 + d['values2'] * 5 - d['values3'],
c=lambda d: d['values1'] ** 4 + d['values2'] * 5 - d['values3'],
)
# with timed('eval'):
with Tracker(destination=FileDestination('eval', overwrite=True)):
res = df.eval('''
x=values1 ** 4 + values2 5 - values3
y=values1 ** 4 + values2 5 - values3
z=values1 ** 4 + values2 5 - values3
a=values1 ** 4 + values2 5 - values3
b=values1 ** 4 + values2 5 - values3
c=values1 ** 4 + values2 5 - values3
''')
print('done')
You don’t need to memorize pandas internals, but if there is one thing you need to be aware of when saving memory in pandas it would be the block manager.
from pandas import DataFrame, date_range
from string import ascii_lowercase
from numpy import repeat
# mean(horizontal_sum([value + (value * i for i in range(10))]))
df = DataFrame({
'value': repeat([1, 2, 3, 4, 5], (n := 1_000_000)),
'date': date_range('2000-01-01', periods=n * 5, freq='min')
})
# ① fragmented dataframe, consolidation can bite you later
df = df.assign( # small memory up front, but peak can be very bad upon consolidation
**{col: lambda df, i=i: df['value'].to_numpy() * i for i, col in enumerate(ascii_lowercase[:10])}
)
print(df.sum(numeric_only=True, axis=1))
# ② force consolidation to occur earlier
DataFrame({ # large memory, but we fail early/fast
'value': df['value'],
**{col: lambda df, i=i: df['value'] * i for i, col in enumerate(ascii_lowercase[:10])}
})
# ③ attempt to subvert consolidation by passing a 2d array; or skipping pandas
# still allocating 1,000,000×10 (peak)
from numpy import full, empty
out = empty((len(df), 10), dtype='int64')
for i, _ in enumerate(ascii_lowercase[:10]):
out[:, i] = df['value'] * i
print(df['value'].to_numpy() + out.sum(axis=1))
# ④ Why stress over 10 iterations at the Python level?
print( # very small memory footprint; only allocates 1,000,000×1 (peak)
sum((df['value'] * i for i, col in enumerate(ascii_lowercase[:10])), start=df['value'])
)
When you want to reduce the memory footprint of your pandas DataFrame the simplest things you can do:
Without changing too much code
Changing a small amount of code
Changing a large amount of code