ts-python

Squeezing More Out of pandas

Topics: pandas, memory

For those moments when you’re slightly over maximum RAM.

When your data sizes just exceed what fits in memory, pandas doesn’t have to grind to a halt. In this seminar, we’ll explore strategies to push pandas a little further without overhauling your entire workflow. We’ll cover tips to help you improve your pandas memory footprint like using memory-efficient datatypes or even incorporating tools like DuckDB and SQLite into your workflow.

We’ll also cover practical tips for profiling memory usage in pandas and understand where the biggest bottlenecks occur. By combining these insights with some additional tooling, you’ll gain the flexibility to process challenging datasets without resorting to big-data frameworks. This seminar is all about working smarter, not harder, while staying in your familiar pandas environment.

Notes

# python version → 3.11
python -m pip install pandas==2.0.3 matplotlib==3.9.* duckdb==1.1.3 numpy<2.* pyarrow==19.0.0 sqlalchemy==2.0.37 memray==1.15 numexpr==2.10.2

Squeezing More Out of pandas

print("Let's Take a Look!")

Scenario: You Can’t Load Your Data

You have some script/notebook that you regularly run, but one day you realize that the dataset you were working with has grown so large you can’t read the entire file into memory any more.

from pathlib import Path
from pandas import DataFrame, date_range, read_sql
from numpy.random import default_rng
from contextlib import closing
from sqlite3 import connect

rng = default_rng(0)

df = DataFrame(
    data={
        'timestamp': date_range('2000-01', freq='s', periods=(n := 500_000)),
        'group':     rng.choice([*'abcdefghijklmnop'], size=n),
        **{
            f'values{i}': rng.normal(1, .001, size=n).cumprod() for i in range(10)
        },
    },
)

data_dir = Path('data')
data_dir.mkdir(exist_ok=True)

df.to_csv(fpath := (data_dir / 'data.csv'), index=False)
print(f'wrote {fpath}!')

with closing(connect(fpath := data_dir / 'data.db')) as conn:
    df.to_sql('mytable', conn, if_exists='replace', index=False)
print(f'wrote {fpath}!')
Minimize What You Read In
from pathlib import Path
from pandas import read_csv, to_datetime
from utils import timed
from memray import Tracker, FileDestination

data_dir = Path('data')

# result = df.set_index('timestamp').groupby('group').rolling('timestamp')[['values1', 'values2']].mean()
# columns → timestamp, group, values1, values2
with Tracker(destination=FileDestination('original.bin', overwrite=True)):
    df = read_csv(data_dir / 'data.csv')

with Tracker(destination=FileDestination('subset.bin', overwrite=True)):
    df = read_csv(data_dir / 'data.csv', usecols=['timestamp', 'group', 'values1', 'values2'])

with Tracker(destination=FileDestination('subset_parse.bin', overwrite=True)):
    df = read_csv(
        data_dir / 'data.csv',
        usecols=['timestamp', 'group', 'values1', 'values2'],
        parse_dates=['timestamp'],
        # infer_datetime_format=True, # deprecated
        date_format={'timestap': '%Y-%m-%d %H:%M:%S'}
    )
print('done')

For reducing your memory footprint, loading in less data for your end result is going to have the largest impact. Most other parsing options exist for speed or convenience.

But what if we cannot hold the end result in memory?

Simple Out-of-Core Approahces

Chunking a File

When dealing with files on disk, chunking requires us to read and process part of the file before reading in the next portion of data.

from pandas import read_csv

print(read_csv('data/data.csv', nrows=5))

Say we want to accomplish the following tasks:

  1. take the mean of values0 by group
  2. create a histogram in values1
from pathlib import Path
from pandas import Series, read_csv, interval_range
from itertools import islice
data_dir = Path('data')

reader = read_csv(data_dir / 'data.csv', chunksize=100_000, usecols=['group', 'values0', 'values1'])
# sums, counts = Series(dtype='float64'), Series(dtype='int64')
values_hist = Series(
    0, index=interval_range(0.2, end=1.8, periods=10)
)

for df in reader:
    res = df['values1'].value_counts(bins=values_hist.index, sort=False)
    values_hist = values_hist.add(res, fill_value=0)
    # res = df.groupby('group')['values0'].agg(['sum', 'count'])
    # sums = sums.add(res['sum'], fill_value=0)
    # counts = counts.add(res['count'], fill_value=0)
# # print(sums / counts)
print(values_hist)

While the above was fairly straightforward, we can start to reason about the computations are hard to perform online (in a streamed/chunked fashion).

Chunking a Database Table

Generic naive approach: offset/limit

from contextlib import closing
from sqlite3 import connect
from itertools import count
from pandas import read_sql

with closing(connect('data/data.db')) as conn:
    for offset in count(0, step=(limit := 100_000)):
        df = read_sql('''
            select * from mytable
            limit :limit
            offset :offset
        ''',
            con=conn,
            params={'offset': offset, 'limit': limit},
        )
        if df.empty:
            break
        print(df)

Chunking via Connection Cursor

from contextlib import closing
from sqlite3 import connect
from pandas import DataFrame

with closing(connect('data/data.db')) as conn:
    cursor = conn.execute('select * from mytable')
    columns = [desc[0] for desc in cursor.description]
    while (data := cursor.fetchmany(100_000)):
        df = DataFrame(data, columns=columns)
        print(df)

Chunking via SQLAlchemy & read_sql options

from contextlib import closing
from pandas import read_sql
from sqlalchemy import create_engine

engine = create_engine('sqlite:///data/data.db')

# Will work for any SQL flavor: https://github.com/pandas-dev/pandas/issues/35689
with closing(engine.connect().execution_options(stream_results=True)) as conn:
    for df in  read_sql('select * from mytable', con=conn, chunksize=100_000):
        print(df.head())

Predicate Pushdown & Streaming Engines

from pandas import read_csv

df = read_csv('data/data.csv').loc[lambda d: d['group'] == 'a']

If you have an analytical pipeline that heavily relies on pandas, try using DuckDB for the edges of the program (loading/filtering) or replace memory-intensive computations with it.

from duckdb import sql

relation = sql('''
    select timestamp, "group", values1, values2
    from 'data/data.csv'
    where "group" = 'a'
''')

print(relation.df())

Scenario: My Computation Raises out-of-memory

Notebook Users: Use %xdel

Interactive IPython holds onto all variables displayed, even if they are deleted.

Datatype Downcasting
from pandas import DataFrame, date_range, concat
from numpy import arange
from numpy.random import default_rng
from string import ascii_lowercase
rng = default_rng(0)

def metainfo(df):
    return DataFrame({
        'memory': df.memory_usage(deep=True, index=False).map('{:,}'.format),
        'dtypes': df.dtypes,
    })

df = DataFrame({
    'order':     arange(n := 10_000, dtype='float64'),
    'timestamp': date_range('2000-01-01', periods=n, freq='H').astype(str),
    'values':    rng.normal(1, .02, size=n).cumprod(),
    'group':     rng.choice([*'abcdefghijklmnop'], size=n),
    'entity':    rng.choice([*ascii_lowercase], size=(n, 10)).view('<U10').ravel(),
})

new_df = df.astype({
    'order': 'int32',
    'timestamp': 'datetime64[ns]',
    'group':  'category',
    'entity': 'string',
})

auto_df = new_df.convert_dtypes(dtype_backend='pyarrow')

print(
    concat({'orig': metainfo(df), 'astype': metainfo(new_df), 'pyarrow': metainfo(auto_df)}, axis=1)
    .sort_index(axis=1, level=1, sort_remaining=False)
)


from pandas import read_csv, concat, DataFrame
from utils import timed

def metainfo(df):
    return DataFrame({
        'memory': df.memory_usage(deep=True, index=False).map('{:,}'.format),
        'dtypes': df.dtypes,
    })


with timed('standard'):
    standard = read_csv('data/data.csv')

with timed('pyarrow engine'):
    pyarr_engine = read_csv('data/data.csv', engine='pyarrow')

with timed('pyarrow engine & pyarrow dtypes'):
    pyarr_engine_backend = read_csv('data/data.csv', engine='pyarrow', dtype_backend='pyarrow')

print(
    concat({
        'orig': metainfo(standard),
        'pyarrow engine': metainfo(pyarr_engine),
        'pyarrow eng & backend': metainfo(pyarr_engine_backend)
        },
       axis=1
    )
    .sort_index(axis=1, level=1, sort_remaining=False)
)

Smaller Approahces for Complex Problems

Pairwise Comparisons

from pathlib import Path
from numpy.random import default_rng
from string import ascii_lowercase

from pandas import DataFrame

rng = default_rng(0)
df = DataFrame(
    data={
        'entity': rng.choice([*ascii_lowercase], size=(n := 2_000, 5)).view('<U5').ravel(),
        'values': rng.normal(100, 5, size=n),
    },
)
data_dir = Path('data')
df.to_parquet(data_dir / 'pairwise.parquet', index=False)

print(df)

Cross Join & Calculate

from pandas import read_parquet
df = read_parquet('data/pairwise.parquet')

print(
    df.merge(df, how='cross')
    .assign(
        delta=lambda d: (d['values_y'] - d['values_x']).abs()
    )
    .pivot(
        index='entity_x', columns='entity_y', values='delta'
    )
)

Be familiar with your NumPy ufuncs!

NumPy sits a lower abstraction level than pandas does, so it keeps you a touch “closer to the metal”. If you need a bit more management over memory, reaching into NumPy could be an approach you want to take.

from pandas import DataFrame, read_parquet
from numpy import subtract

s = (
    read_parquet('data/pairwise.parquet', engine='pyarrow')
    .set_index('entity')['values']
)


print(
    DataFrame(
        subtract.outer(s.to_numpy(), s.to_numpy()),
        columns=s.index,
        index=s.index,
    ).abs()
)

Only operate on the lower triangle

from pandas import MultiIndex, read_parquet

s = (
    read_parquet('data/pairwise.parquet', dtype_backend='pyarrow')
    .set_index('entity')['values']
)

idx = MultiIndex.from_product([s.index.sort_values()] * 2, names=['entity_x', 'entity_y'])
idx = idx[idx.codes[0] < idx.codes[1]]
res = (s.reindex(idx, level=0) - s.reindex(idx, level=1))

print(res.xs('zzhps', level=0))
print(res.xs('zzhps', level=1))
Select Pairwise Analysis

Occassionally, the problems we seek to solve are simply too large. Instead, we can answer the most interesting narrow vertical slices of that problem.

from pandas import DataFrame, MultiIndex, read_parquet, Series

s = (
    read_parquet('data/pairwise.parquet', engine='pyarrow')
    .set_index('entity')['values']
)

top_s = s.nlargest(5).index
idx = MultiIndex.from_product([top_s, s.index.sort_values()], names=['entity_x', 'entity_y'])

res = (s.reindex(idx, level=0) - s.reindex(idx, level=1))
print(res)
Complex Maths with NumExpr

Does NumExpr help with peak memory?

from pandas import read_csv
from pathlib import Path
import numexpr
from memray import Tracker, FileDestination
from utils import timed

df = read_csv('data/data.csv', usecols=['values1', 'values2', 'values3'])

# with timed('assign'):
with Tracker(destination=FileDestination('assign', overwrite=True)):
    res = df.assign(
        x=lambda d: d['values1'] ** 4 + d['values2'] * 5 - d['values3'],
        y=lambda d: d['values1'] ** 4 + d['values2'] * 5 - d['values3'],
        z=lambda d: d['values1'] ** 4 + d['values2'] * 5 - d['values3'],
        a=lambda d: d['values1'] ** 4 + d['values2'] * 5 - d['values3'],
        b=lambda d: d['values1'] ** 4 + d['values2'] * 5 - d['values3'],
        c=lambda d: d['values1'] ** 4 + d['values2'] * 5 - d['values3'],
    )

# with timed('eval'):
with Tracker(destination=FileDestination('eval', overwrite=True)):
    res = df.eval('''
        x=values1 ** 4 + values2 5 - values3
        y=values1 ** 4 + values2 5 - values3
        z=values1 ** 4 + values2 5 - values3
        a=values1 ** 4 + values2 5 - values3
        b=values1 ** 4 + values2 5 - values3
        c=values1 ** 4 + values2 5 - values3
    ''')

print('done')
DataFrame Fragmentation & Copies

You don’t need to memorize pandas internals, but if there is one thing you need to be aware of when saving memory in pandas it would be the block manager.

from pandas import DataFrame, date_range
from string import ascii_lowercase
from numpy import repeat

# mean(horizontal_sum([value + (value * i for i in range(10))]))
df = DataFrame({
    'value': repeat([1, 2, 3, 4, 5], (n := 1_000_000)),
    'date': date_range('2000-01-01', periods=n * 5, freq='min')
})

# ① fragmented dataframe, consolidation can bite you later
df = df.assign( # small memory up front, but peak can be very bad upon consolidation
    **{col: lambda df, i=i: df['value'].to_numpy() * i for i, col in enumerate(ascii_lowercase[:10])}
)
print(df.sum(numeric_only=True, axis=1))

# ② force consolidation to occur earlier
DataFrame({ # large memory, but we fail early/fast
    'value': df['value'],
    **{col: lambda df, i=i: df['value'] * i for i, col in enumerate(ascii_lowercase[:10])}
})

# ③ attempt to subvert consolidation by passing a 2d array; or skipping pandas
# still allocating 1,000,000×10 (peak)
from numpy import full, empty
out = empty((len(df), 10), dtype='int64')
for i, _ in enumerate(ascii_lowercase[:10]):
    out[:, i] = df['value'] * i
print(df['value'].to_numpy() + out.sum(axis=1))

# ④ Why stress over 10 iterations at the Python level?
print( # very small memory footprint; only allocates 1,000,000×1 (peak)
    sum((df['value'] * i for i, col in enumerate(ascii_lowercase[:10])), start=df['value'])
)

Summary

When you want to reduce the memory footprint of your pandas DataFrame the simplest things you can do:

Without changing too much code

Changing a small amount of code

Changing a large amount of code