ts-python

Everything about .groupby

Date: Friday, Sep 6, 2024 at 09:30 US/Eastern

Topics: pandas

Confidently groupby .aggregate, .apply, and .transform your data!

Grouped functions are one of the most common operations performed on tabulardata. Due to their analytical usefulness, the pandas .groupby operationsneed to be both performant and flexible— two coding concepts that are oftenat odds with one another. So how does the most popular Python DataFramelibrary address this problem? Join us to find out the answer.

In this upcoming seminar, we will discuss the concepts behind pandas .groupby operations— so that you will be able to confidently choose the best method for the problems you work on. Not only will this understanding help make your code more declarative & readable, but you will also develop intuition for fast or slow .groupby operation. This session is a great opportunity to further your understanding and use of pandas to write more maintainable and performant code than before!

pip install pandas numpy scipy matplotlib

Notes

Sample Data & Helpers

print("Let's take a look!")
python -m pip install pandas numpy scipy numba

Helper Code: Timer

Here is some helper code for a very simple (low-fidelity) timer. We’ll use it to perform some simple performance analyses.

from contextlib import contextmanager
from pathlib import Path
from time import perf_counter
from sys import stderr

@contextmanager
def timed(msg):
    before = perf_counter()
    try:
        yield
    finally:
        after = perf_counter()
        print(f'{msg:<36} \N{mathematical bold capital delta}t: {after - before:>9.4f}s', file=stderr)

__all__ = 'timed',

if __name__ == '__main__':
    from logging import getLogger, basicConfig, INFO

    logger = getLogger(__name__); basicConfig(level=INFO)

    lib_dir = Path('_lib')
    lib_dir.mkdir(exist_ok=True, parents=True)
    with open(filename := (lib_dir / 'timer.py'), mode='w') as f:
        print(Path(__file__).read_text(), file=f)
    logger.info('Wrote to %s', filename)
from time import sleep
from _lib import timed

if __name__ == '__main__':
    with timed('test'):
        sleep(1)

Helper Code: MultiIndex

Here is some helper code for working with MultiIndex. We’ll use it to improve the “fluency” of our code.

from collections.abc import Callable
from dataclasses import dataclass
from pathlib import Path

from pandas import Index, MultiIndex
from pandas.api.extensions import register_index_accessor

@register_index_accessor('_ext')
@dataclass
class _ext:
    obj : Index

    def addlevel(self, **levels):
        levels = {
            k: v if not isinstance(v, Callable) else v(self.obj)
            for k, v in levels.items()
        }
        new_obj = self.obj.copy(deep=False)
        if not isinstance(new_obj, MultiIndex):
            new_obj = MultiIndex.from_arrays([
                new_obj
            ])
        names = new_obj.names
        new_obj.names = [None] * len(names)
        return MultiIndex.from_arrays([
            *(
                new_obj.get_level_values(idx)
                for idx in range(len(names))
            ),
            *levels.values(),
        ], names=[*names, *levels.keys()])

    def updatelevel(self, **levels):
        levels = {
            k: v if not isinstance(v, Callable) else v(self.obj)
            for k, v in levels.items()
        }
        new_obj = self.obj.copy(deep=False)
        if not isinstance(new_obj, MultiIndex):
            new_obj = MultiIndex.from_arrays([
                new_obj
            ])
        names = new_obj.names
        new_obj.names = [None] * len(names)
        return MultiIndex.from_arrays([
            levels[n]
            if n in levels else
            new_obj.get_level_values(idx)
            for idx, n in enumerate(names)
        ], names=names)

__all__ = ()

if __name__ == '__main__':
    from logging import getLogger, basicConfig, INFO

    logger = getLogger(__name__); basicConfig(level=INFO)

    lib_dir = Path('_lib')
    lib_dir.mkdir(exist_ok=True, parents=True)
    with open(filename := (lib_dir / 'pandas.py'), mode='w') as f:
        print(Path(__file__).read_text(), file=f)
    logger.info('Wrote %s', filename)
from pandas import Series, period_range
from numpy.random import default_rng
import _lib

if __name__ == '__main__':
    rng = default_rng(0)

    s = Series(
        index=(idx := period_range('2020Q1', periods=4)),
        data=rng.integers(-10, +10, size=len(idx)),
    ).rename_axis('date')

    print(
        s.head(2),

        s.pipe(lambda s: s
            .set_axis(s.index._ext.addlevel(num=range(len(s))))
        ).head(2),

        sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40)
    )

Pricing Data

Let’s start with some simple pricing data.

from logging import getLogger, basicConfig, INFO
from pathlib import Path
from pickle import dump
from string import ascii_lowercase

from pandas import CategoricalIndex, date_range, DataFrame, MultiIndex, IndexSlice
from numpy.random import default_rng
from numpy import unique

logger = getLogger(__name__); basicConfig(level=INFO)

rng = default_rng(0)

dates = date_range('2010', '2020', freq='min', name='date')
dates = dates[dates.indexer_between_time('09:30', '16:00')]

currencies = {'USD', 'EUR'}
tickers = unique(
    rng.choice([*ascii_lowercase], size=(20, 4)).view('<U4').ravel()
)
assets = CategoricalIndex([*tickers, *sorted(currencies)], name='asset')
currencies = CategoricalIndex([*sorted(currencies)], dtype=assets.dtype, name='currency')
tickers = CategoricalIndex([*sorted(tickers)], dtype=assets.dtype, name='ticker')

prices = (
    DataFrame(
        index=(idx := MultiIndex.from_product([
            dates,
            tickers,
        ])),
        data={
            'buy': (
                rng.normal(loc=100, scale=20, size=len(tickers)).clip(10, 200)
              * rng.normal(loc=1, scale=.0001, size=(len(dates), len(tickers))).cumprod(axis=0)
            ).ravel()
        },
    )
    .assign(
        sell=lambda df: df['buy'] * (1 - abs(rng.normal(loc=0, scale=.05, size=len(df))))
    )
    .round(4)
    .rename_axis(columns='direction')
)

print(
    prices.head(3),
    # prices.loc[IndexSlice[:, 'arnq', :]],
    sep='\n',
)

data_dir = Path('data')
data_dir.mkdir(exist_ok=True, parents=True)

prices.to_pickle(filename := (data_dir / 'prices.pkl'))
logger.info('Wrote %s', filename)

with open(filename := (data_dir / 'dtypes.pkl'), mode='wb') as f:
    dump({'currencies': currencies, 'tickers': tickers, 'assets': assets, 'dates': dates}, f)
logger.info('Wrote %s', filename)

We’ll store the pricing data to a Pickle file (which is fine, because we are not distributing this data—we’re moving it merely from our “left-hand” to our “right-hand.”)

Trading Data

Let’s construct some simple trading data.

The above dramatically simplifies working with MultiIndex. Let’s complete our trade data by adding noise to the time measurements and adding a trading identifier.

from logging import getLogger, basicConfig, INFO
from pathlib import Path
from pickle import load

from numpy import arange
from numpy.random import default_rng
from pandas import read_pickle, Series, MultiIndex, to_timedelta, IndexSlice

import _lib

logger = getLogger(__name__); basicConfig(level=INFO)

rng = default_rng(0)

data_dir = Path('data')
prices = read_pickle(data_dir / 'prices.pkl')
with open(data_dir / 'dtypes.pkl', mode='rb') as f:
    dtypes = load(f)
currencies, tickers, assets, dates = dtypes['currencies'], dtypes['tickers'], dtypes['assets'], dtypes['dates']

idx = MultiIndex.from_product([
    dates,
    tickers,
    range(10),
], names=[dates.name, 'asset', 'trade'])
idx = idx[rng.choice(arange(len(idx)), size=int(len(idx) * .005), replace=True)]
idx = idx.droplevel('trade')

trades = (
    Series(
        index=idx,
        data=(
            rng.choice([-1, +1], size=len(idx))
          * abs(rng.normal(loc=10_000, scale=20_000, size=len(idx)).round(-2).astype(int))
        ),
        name='volume'
    )
    .round(-2)
    .pipe(lambda s: s.set_axis(s.index
        ._ext.updatelevel(date=lambda idx:
            idx.get_level_values('date')
          + to_timedelta(abs(rng.normal(loc=0, scale=10, size=len(idx)).round()), unit='s')
        )
    ))
    .sort_index()
    .loc[
        prices.index.get_level_values('date').min()
       :prices.index.get_level_values('date').max()
    ]
    .pipe(lambda s: s.set_axis(s.index
        ._ext.addlevel(trade=range(len(s)))
    ))
)

print(
    trades,
    trades.groupby('asset', observed=True).count(),
    sep='\n',
)

data_dir = Path('data')
data_dir.mkdir(exist_ok=True, parents=True)

trades.to_pickle(filename := (data_dir / 'trades.pkl'))
logger.info('Wrote %s', filename)

Here is a helper module for loading all the data.

from pathlib import Path
from pickle import load

from pandas import read_pickle

def load_data(*, data_dir=Path('data')):
    prices = read_pickle(data_dir / 'prices.pkl')
    trades = read_pickle(data_dir / 'trades.pkl')
    with open(data_dir / 'dtypes.pkl', mode='rb') as f:
        dtypes = load(f)
    return {
        'prices': prices,
        'trades': trades,
        **dtypes,
    }

__all__ = 'load_data',

if __name__ == '__main__':
    from logging import getLogger, basicConfig, INFO

    logger = getLogger(__name__); basicConfig(level=INFO)

    lib_dir = Path('_lib')
    lib_dir.mkdir(exist_ok=True, parents=True)
    with open(filename := (lib_dir / 'data.py'), mode='w') as f:
        print(Path(__file__).read_text(), file=f)
    logger.info('Wrote to %s', filename)

.groupby Overview

print("Let's take a look!")

Let’s start with some basic .groupby operations.

from _lib import load_data
from numpy import sum as np_sum

globals().update(load_data())

print(
    # trades,

    # trades.groupby('asset', observed=True).count(),
    # trades.groupby('asset', observed=True).sum(),

    # trades.groupby('asset', observed=True).agg('count'),
    # trades.groupby('asset', observed=True).agg('sum'),
    # trades.groupby('asset', observed=True).agg(sum),
    # trades.groupby('asset', observed=True).agg(np_sum),
    # trades.groupby('asset', observed=True).agg(lambda g: sum(g)),

    trades.groupby('asset', observed=True).sum() / trades.groupby('asset', observed=True).count(),

    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

Let’s look into the underlying performance of these operations.

from statistics import mean
from string import ascii_lowercase
from textwrap import indent

from pandas import Series, MultiIndex, date_range
from numpy import unique, sum as np_sum, mean as np_mean
from numpy.random import default_rng

from _lib import timed

rng = default_rng(0)

entities = unique(
    rng.choice([*ascii_lowercase], size=(20_000, 4)).view('<U4').ravel()
)
s = Series(
    index=(idx := MultiIndex.from_product([
        date_range('2020-01-01', periods=365),
        entities,
    ], names='date entity'.split())),
    data=rng.normal(size=len(idx)),
)

print(
    s.head(),
    f'{len(s) = :,}',
    f'{len(entities) = :,}',
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

if False:
    with timed('.sum()'):                 s.groupby('entity').sum()
    with timed(".agg('sum')"):            s.groupby('entity').agg('sum')
    with timed('.agg(sum)'):              s.groupby('entity').agg(sum)
    with timed('.agg(np_sum)'):           s.groupby('entity').agg(np_sum)
    with timed('.agg(lambda g: sum(g))'): s.groupby('entity').agg(lambda g: sum(g))

if True:
    with timed('.mean()'):                 s.groupby('entity').mean()
    with timed(".agg('mean')"):            s.groupby('entity').agg('mean')
    with timed('.agg(mean)'):              s.groupby('entity').agg(mean)
    with timed('.agg(np_mean)'):           s.groupby('entity').agg(np_mean)

.groupby.filter

The .groupby.filter modality allows us to select elements on a groub-by-group basis.

Let’s find all the tickers we bought.

from _lib import load_data

globals().update(load_data())

print(
    trades.loc[trades > 0],
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

Let’s find all the tickers where our net position life-to-date was long.

from _lib import load_data

globals().update(load_data())

print(
    # trades.groupby('asset', observed=True).filter(lambda g: g.sum() > 0),

    trades
        .groupby('asset', observed=True).filter(lambda g: g.sum() > 0)
        .index.get_level_values('asset').unique()
        .values
    ,

    trades
        .groupby('asset', observed=True).filter(lambda g: g.sum() > 0)
        .pipe(lambda s: trades
            .index.get_level_values('asset').unique().difference(
                s.index.get_level_values('asset').unique()
            )
            .values
        )
    ,

    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

Let’s find all the tickers where we ever held a long position.

...

We’ll visit this question momentarily.

.groupby.agg

The .groupby.agg modality is about summarizing rows based on non-overlapping, group-level calculations. This is equivalent to the SQL group by operator.

from _lib import load_data

globals().update(load_data())

print(
    # trades.head(),

    # trades.groupby('asset', observed=True).sum(),

    # prices.head(),

    # prices.groupby('ticker', observed=True).mean(),

    # trades.groupby('asset', observed=True).agg(lambda g: g.max() - g.min()),
    # trades.groupby('asset', observed=True).agg(lambda g: g.idxmax() - g.idxmin()),
    # trades.groupby('asset', observed=True).agg(
    #     lambda g: g.droplevel(['asset', 'trade']).idxmax() - g.droplevel(['asset', 'trade']).idxmin()
    # ),

    # trades.groupby('asset', observed=True).pipe(lambda gb: gb.max() - gb.min()),

    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

Pro-tip: iterate over .groupby to see the groups!

from pandas import Series, date_range

s = Series(
    index=date_range('2020-01-01', periods=365),
    data=0,
)

for k, g in s.groupby(s.index.to_period('Q')):
    print(f'{k = }')
    print(f'{g = }')
    break

Let’s consider performance of various .agg-related syntaxes on high-cardinality data.

from itertools import pairwise
from statistics import mean
from string import ascii_lowercase
from textwrap import indent

from pandas import Series, MultiIndex, date_range
from numpy import unique, sum as np_sum, mean as np_mean
from numpy.random import default_rng

from _lib import timed

rng = default_rng(0)

entities = unique(
    rng.choice([*ascii_lowercase], size=(20_000, 4)).view('<U4').ravel()
)
s = Series(
    index=(idx := MultiIndex.from_product([
        date_range('2020-01-01', periods=365),
        entities,
    ], names='date entity'.split())),
    data=rng.normal(size=len(idx)),
)

print(
    s.head(),
    f'{len(s) = :,}',
    f'{len(entities) = :,}',
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

if False:
    results = []
    with timed('.agg(lambda g: …)'):
        results.append(
            s.groupby('entity').agg(lambda g: g.max() - g.min())
        )
    with timed('.pipe(lambda gb: …)'):
        results.append(
            s.groupby('entity').pipe(lambda gb: gb.max() - gb.min())
        )
    with timed(".agg([…]).pipe(…)"):
        results.append(
            s.groupby('entity').agg(['min', 'max']).pipe(lambda df: df['max'] - df['min'])
        )
    assert all((x == y).all() for x, y in pairwise(results))

if True:
    results = []
    with timed('.agg(lambda g: …)'):
        results.append(
            s.groupby('entity').agg(lambda g:
                g.droplevel('entity').idxmax() - g.droplevel('entity').idxmin()
            )
        )
    with timed(".pipe(lambda gb: …)"):
        results.append(
            s.groupby('entity').pipe(lambda gb:
                gb.agg(lambda g: g.droplevel('entity').idxmax())
              - gb.agg(lambda g: g.droplevel('entity').idxmin())
            )
        )
    with timed(".pipe(lambda gb: …)"):
        results.append(
            s.pipe(lambda s: s
                .reset_index('entity', drop=True)
                .groupby(s.index.get_level_values('entity'))
                .pipe(lambda gb: gb.idxmax() - gb.idxmin())
            )
        )
    assert all((x == y).all() for x, y in pairwise(results))

Let’s find the average traded price per asset (volume-weighted.)

from pandas import merge_asof
from numpy import where

from _lib import load_data

globals().update(load_data())

# merged = (
#     merge_asof(
#         trades,
#         prices,
#         # left_on='date',
#         # right_on='date',
#         # left_on=('date', 'asset'),
#         # right_on=('date', 'ticker'),
#     )
#     .drop(['date'], axis='columns')
#     .set_axis(trades.index)
#     .assign(
#         price=lambda df: where(df['volume'] > 0, df['buy'], df['sell'])
#     )
# )

merged = (
    trades
    .pipe(lambda s: s
        .set_axis(s.index._ext.updatelevel(date=s.index.get_level_values('date').floor('30min')))
    )
    .to_frame()
    .join(prices)
    .assign(
        price=lambda df: where(df['volume'] > 0, df['buy'], df['sell'])
    )
)

print(
    prices.head(3),
    trades.head(3),
    # merged.head(3),

    # merged
    # .groupby('asset', observed=True).agg(lambda df:
    #     (df['volume'] * df['price']).sum() / df['volume'].sum()
    # )
    # ,

    merged
    .pipe(lambda df:
        (df['volume'] * df['price']).groupby('asset', observed=True).sum()
      / (df['volume']).groupby('asset', observed=True).sum()
    )
    ,

    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

pandas.Grouper

The pandas.Grouper is an occassionally useful helper for performing grouping operations on time series indexed data.

Let’s look at some of the things we can .groupby(…) on!

from pandas import Series, date_range, Grouper
from numpy.random import default_rng

rng = default_rng(0)

s = Series(
    index=(idx := date_range('2020-01-01', periods=90, name='date')),
    data=rng.normal(size=len(idx)),
)

print(
    s.groupby('date').mean().head(),

    # s.groupby(lambda idx: idx - idx.to_period('M').start_time).mean().head(),
    # s.groupby(s.index.get_level_values('date') - s.index.get_level_values('date').to_period('M')).mean().head(),

    # s.groupby(rng.choice([True, False], size=len(s))).mean(),

    # s.groupby(Grouper(level='date', freq='M')).mean(),

    # s.pipe(lambda s: s.groupby(s.index.to_period('M'))).mean(),

    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

pandas.NamedAgg

The pandas.NamedAgg is an occassionally useful helper for performing grouping operations on DataFrame where we want to control the names of the result columns.

from pandas import DataFrame, date_range, NamedAgg, MultiIndex
from numpy.random import default_rng

rng = default_rng(0)

df = DataFrame(
    index=(idx := date_range('2020-01-01', periods=90, name='date')),
    data={
        'a': rng.normal(size=len(idx)),
        'b': rng.normal(size=len(idx)),
    },
)

print(
    # df.groupby('date').agg('min').head(),

    # df.groupby('date').agg(['min', 'max']).head(),

    # df.groupby('date').agg({'a': 'min', 'b': 'max'}).head(),
    # df.groupby('date').agg({'a': 'min', 'b': ['min', 'max']}).head(),

    # df.groupby('date').agg(
    #     a_min=NamedAgg('a', 'min'),
    #     b_max=NamedAgg('b', 'max'),
    # )
    # .head(),

    # df.groupby('date').agg({'a': 'min', 'b': 'max'})
    # .set_axis(MultiIndex.from_tuples([('a', 'min'), ('b', 'max')]), axis='columns')
    # .head(),

    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

.groupby.transform

The .groupby.transform modality is about transforming the data based on non-overlapping, group-level calculations. This is equivalent to the SQL “window functions” feature.

from pandas import NamedAgg
from numpy import sign, where

from _lib import load_data

globals().update(load_data())

print(
    # trades.groupby('asset', observed=True).sum().head(),
    # trades.groupby('asset', observed=True).cumsum().head(),

    # trades.groupby('asset', observed=True).cummax(),

    # trades.groupby('asset', observed=True).expanding().max()
    # .droplevel(0).sort_index()
    # ,

    # prices.groupby('ticker', observed=True).idxmax(),
    # prices.groupby('ticker', observed=True).cummax(),
    # prices.groupby('ticker', observed=True).cumidxmax(),

    trades
    .pipe(lambda s:
        s.set_axis(
            s.index._ext.addlevel(
                direction=where(
                    d := s.groupby('asset', observed=True).cummax().pipe(sign) > 0,
                    'long',
                    'short',
                ),
                position=d.groupby('asset', observed=True).transform(lambda g:
                    (g != g.shift()).cumsum()
                ),
            )
        )
    )
    # .groupby(['asset', 'position', 'direction'], observed=True).agg([
    #     'count',
    #     NamedAgg(
    #         'trades',
    #         lambda g: {*g.index.get_level_values('trade').unique()},
    #     ),
    # ])
    # .groupby('asset', observed=True).agg(lambda g:
    #     g.index.get_level_values('position').nunique()
    # )
    # .head()
    ,

    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

There is an interesting relationship between .groupby and .unstack.

from string import ascii_lowercase

from pandas import MultiIndex, date_range, Series
from numpy.random import default_rng

rng = default_rng(0)

s = (
    Series(
        index=(idx := MultiIndex.from_product([
            date_range('2020-01-01', periods=14, name='date'),
            rng.choice([*ascii_lowercase], size=(10, 4)).view('<U4').ravel(),
        ], names=['date', 'entity'])),
        data=rng.normal(size=len(idx)),
    )
    .round(2)
    .sort_index()
)

print(
    # s,
    # s.groupby('entity').cumsum().head(),
    # s.groupby('entity').transform('cumsum').head(),
    # s.unstack('entity').cumsum().stack('entity').head(),

    s.groupby('entity').sum().head(),
    s.groupby('entity').agg('sum').head(),
    s.unstack('entity').sum().head(),

    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

.groupby.apply

The .groupby.apply modality is about arbitrary manipulations of the data based on non-overlapping, group-level calculations. There is no direct equivalent in SQL.

It is the only modality that operates on the DataFrame level.

Let’s find the average traded price per asset (volume-weighted.)

from numpy import where
from pandas import Series, DataFrame
from _lib import load_data

globals().update(load_data())

traded_prices = (
    Series(
        index=trades.index,
        data=prices.loc[
            trades.index._ext.updatelevel(date=lambda idx: idx.get_level_values('date').floor('30min'))
        ].pipe(lambda df: where(trades > 0, df['buy'], df['sell']))
    )
)

print(
    # trades.head(),
    # prices.head(),
    # traded_prices.head(),

    # DataFrame({'volume': trades, 'price':  traded_prices})
    # .pipe(lambda df:
    #     (df['volume'] * df['price']).groupby('asset', observed=True).sum()
    #   / (df['volume']).groupby('asset', observed=True).sum()
    # )
    # .head()
    # ,

    (
        (trades * traded_prices).groupby('asset', observed=True).sum()
      / trades.groupby('asset', observed=True).sum()
    )
    .head()
    ,

    DataFrame({'volume': trades, 'price':  traded_prices})
    .groupby('asset', observed=True).apply(lambda df:
        (df['volume'] * df['price']).sum() / df['volume'].sum()
    )
    .head()
    ,

    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

How slow is .groupby.apply really?

from string import ascii_lowercase

from pandas import DataFrame, MultiIndex, date_range, Series
from numpy import unique
from numpy.random import default_rng

from _lib import timed

rng = default_rng(0)

entities = unique(
    rng.choice([*ascii_lowercase], size=(20_000, 4)).view('<U4').ravel()
)

df = (
    DataFrame(
        index=(idx := MultiIndex.from_product([
            date_range('2020-01-01', periods=365),
            entities,
        ], names='date entity'.split())),
        data={
            'value':  rng.integers(-10, +10, size=len(idx)),
            'weight': abs(rng.normal(size=len(idx))),
        }
    )
    .swaplevel()
    .sort_index()
)

if True:
    with timed('.apply'):
        df.groupby('entity').apply(lambda g:
            (g['value'] * g['weight']).sum() / g['weight'].sum()
        )
    with timed('.agg'):
        df.pipe(lambda df:
            (df['value'] * df['weight']).groupby('entity').sum()
          / df['weight'].groupby('entity').sum()
        )

if True:
    from _lib.cgroupcalcs import grouped_weighted_mean
    with timed('cython'):
        Series(*grouped_weighted_mean(
            grouping=df.index.get_level_values('entity'),
            values=df['value'],
            weights=df['weight'],
        ))
# cython: language_level=3, boundscheck=False, initializedcheck=False, cdivision=True

from numpy import unique, empty, nonzero, hstack

def grouped_weighted_mean(grouping, values, weights):
    grouping = grouping.to_numpy()
    values = values.to_numpy()
    weights = weights.to_numpy()

    boundaries = nonzero(grouping[1:] != grouping[:-1])[0] + 1
    unique = hstack([grouping[0], grouping[boundaries]])

    result = empty(len(boundaries) + 1)

    gwmean_calc(hstack([boundaries, [len(values)]]), values, weights, result)

    return result, unique

cdef gwmean_calc(long[:] boundaries, long[:] values, double[:] weights, double[:] result):
    cdef int bnd_idx
    cdef int prev_idx
    cdef int curr_idx
    cdef int idx
    cdef double numer
    cdef double denom
    cdef int num_boundaries = len(boundaries)
    cdef int num_values = len(values)

    prev_idx = 0
    for bnd_idx in range(num_boundaries):
        curr_idx = boundaries[bnd_idx]
        numer, denom = 0, 0
        for idx in range(prev_idx, curr_idx):
            numer += values[idx] * weights[idx]
            denom += weights[idx]
        result[bnd_idx] = numer / denom
        prev_idx = curr_idx

def grouped_weighted_var(grouping, values, weights):
    grouping = grouping.to_numpy()
    values = values.to_numpy()
    weights = weights.to_numpy()

    boundaries = nonzero(grouping[1:] != grouping[:-1])[0] + 1
    unique = hstack([grouping[0], grouping[boundaries]])

    result = empty(len(boundaries) + 1)

    gwvar_calc(hstack([boundaries, [len(values)]]), values, weights, result)

    return result, unique

cdef gwvar_calc(long[:] boundaries, long[:] values, double[:] weights, double[:] result):
    cdef int bnd_idx
    cdef int prev_idx
    cdef int curr_idx
    cdef int idx
    cdef double numer
    cdef double denom
    cdef double mean
    cdef int num_boundaries = len(boundaries)
    cdef int num_values = len(values)

    prev_idx = 0
    for bnd_idx in range(num_boundaries):
        curr_idx = boundaries[bnd_idx]
        numer, denom = 0, 0
        for idx in range(prev_idx, curr_idx):
            numer += values[idx] * weights[idx]
            denom += weights[idx]
        mean = numer / denom
        numer, denom = 0, 0
        for idx in range(prev_idx, curr_idx):
            numer += weights[idx] * (values[idx] - mean)**2
            denom += weights[idx]
        result[bnd_idx] = numer / denom
        prev_idx = curr_idx

Let’s convert “deltas” to “states.”

from itertools import pairwise
from string import ascii_lowercase

from pandas import MultiIndex, date_range, Series, Categorical
from numpy import unique
from numpy.random import default_rng

from _lib import timed

rng = default_rng(0)

states = Categorical('A B C D'.split())

s = (
    Series(
        index=(idx := MultiIndex.from_product([
            date_range('2020-01-01', periods=90, name='date'),
            unique(rng.choice([*ascii_lowercase], size=(20_000, 4)).view('<U4').ravel()),
        ], names='date entity'.split())),
        data=rng.choice(states, size=len(idx)),
    )
    .sample(frac=.05, random_state=rng)
    .sort_index()
)

all_dates = date_range(
    s.index.get_level_values('date').min(),
    s.index.get_level_values('date').max(),
    name='date'
)

print(
    # s,
    # s.groupby('entity').apply(lambda g:
    #     g.droplevel('entity').reindex(all_dates).ffill().bfill()
    # ),
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

if True:
    results = []
    with timed('.groupby.apply'):
        results.append(
            s.groupby('entity').apply(lambda g:
                g.droplevel('entity').reindex(all_dates).ffill().bfill()
            )
        )

    with timed('.loc[:] = '):
        _ = Series(index=MultiIndex.from_product([
            all_dates,
            s.index.get_level_values('entity').unique(),
        ], names=s.index.names), dtype=states.dtype)
        _.loc[:] = s
        _ = (
            _
            .groupby('entity').ffill().groupby('entity').bfill()
            .swaplevel()
            .sort_index()
        )
        results.append(_)

    assert all((x == y).all() for x, y in pairwise(results))

.rolling

.rolling operations are about performing overlapping grouped operations. This is similar to SQL’s window functions.

What is the three day moving average price per asset?

from _lib import load_data

globals().update(load_data())

print(
    prices,

    # prices.rolling(3, min_periods=1).mean(),
    # prices.groupby('ticker', observed=True).rolling('3d', min_periods=1).mean(),

    prices.groupby('ticker', observed=True).transform(
        lambda g: g.droplevel('ticker').rolling('3d', min_periods=1).mean()
    ),

    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

How slow is .groupby.rolling(lambda g: …) really?

from itertools import pairwise
from string import ascii_lowercase

from pandas import MultiIndex, Series, date_range
from numpy import unique
from numpy.random import default_rng

from _lib import timed

rng = default_rng(0)

s = Series(
    index=(idx := MultiIndex.from_product([
        date_range('2020-01-01', periods=30, name='date'),
        unique(rng.choice([*ascii_lowercase], size=(20_000, 4)).view('<U4').ravel()),
    ], names='date entity'.split())),
    data=rng.normal(size=len(idx)).round(2),
)

print(
    s.head(),
    f'{len(s) = :,}',
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)


if True:
    results = []

    with timed('.groupby(…).transform(.rolling(…))'):
        results.append(
            s.groupby('entity', observed=True).transform(
                lambda g: g.droplevel('entity').rolling('3d', min_periods=1).mean().values
            )
        )

    with timed('.unstack(…).rolling(…).stack(…)'):
        results.append(
            s.unstack('entity').rolling('3d', min_periods=1).mean().stack('entity')
        )

    assert all((x == y).all() for x, y in pairwise(results))

.rolling in NumPy with numpy.lib.stride_tricks.sliding_window_view

Let’s consider against NumPy…

from itertools import pairwise
from string import ascii_lowercase

from pandas import Series, MultiIndex, date_range, DataFrame
from numpy import unique, hstack, allclose
from numpy.random import default_rng
from numpy.lib.stride_tricks import sliding_window_view

from _lib import timed

rng = default_rng(0)

s = Series(
    index=(idx := MultiIndex.from_product([
        date_range('2020-01-01', periods=30, name='date'),
        unique(rng.choice([*ascii_lowercase], size=(20_000, 4)).view('<U4').ravel()),
    ], names='date entity'.split())),
    data=rng.normal(size=len(idx)).round(2),
)

if True:
    results = []

    with timed('.groupby(…).rolling(…)'):
        results.append((
            s.groupby('entity', observed=True).rolling(3).mean(),
            lambda x: x.droplevel(-1).dropna(),
        ))

    with timed('.unstack(…).rolling(…).stack(…)'):
        results.append((
            s.unstack('entity').rolling(3).mean().stack('entity'),
            lambda x: x.dropna().swaplevel().sort_index(),
        ))

    with timed('.groupby(…).xform(sliding_win_view)'):
        results.append((
            s.groupby('entity', observed=True).transform(lambda g:
                hstack([
                    [float('nan')]*(3-1), sliding_window_view(g, 3).mean(axis=-1)
                ])
            ),
            lambda x: x.dropna().swaplevel().sort_index()
        ))

    with timed('.unstack(…).xform(sliding_win_view)…'):
        results.append((
            s.unstack('entity').pipe(lambda df: DataFrame(
                    index=df.index[3-1:],
                    columns=df.columns,
                    data=sliding_window_view(df, 3, axis=0).mean(axis=-1),
                )
            ).stack('entity'),
            lambda x: x.swaplevel().sort_index(),
        ))

    assert all(allclose(fx(x), fy(y)) for (x, fx), (y, fy) in pairwise(results))

.rolling(closed=…, center=…)

.rolling allows us to control the centering of the window as well as the inclusion of the endpoints of the window.

from pandas import Series, date_range
from numpy.random import default_rng

rng = default_rng(0)

s = Series(
    index=(idx := date_range('2020', periods=90)),
    data=rng.integers(-10, +10, size=len(idx)),
)

print(
    s.head(),
    s.rolling('3d').sum().head(),
    # s.rolling('3d', center=True).sum().head(),
    s.rolling('3d', closed='both').sum().head(),
    s.rolling('3d', closed='right').sum().head(), # XXX: default
    s.rolling('3d', closed='left').sum().head(),
    s.rolling('3d', closed='neither').sum().head(),
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

.rolling(…).apply(…)

We can provide a UDF to .rolling, but we’re relatively limited in what we can do.

from pandas import Series, date_range
from numpy import fabs
from numpy.random import default_rng

rng = default_rng(0)

s = Series(
    index=(idx := date_range('2020', periods=90)),
    data=rng.integers(-10, +10, size=len(idx)),
)

print(
    s.rolling('3d').apply(lambda g: abs(g - g.mean()).mean()),

    # s.rolling('3d').apply(lambda g: abs(g - g.mean()).mean(), raw=True),

    # s.rolling('3d').apply(lambda g: abs(g - g.mean()).mean(), engine='cython'),

    # s.rolling('3d').apply(lambda g: fabs(g - g.mean()).mean(), raw=True, engine='numba'),

    s.rolling('3d').apply(
        lambda g: fabs(g - g.mean()).mean(),
        raw=True,
        engine='numba',
        engine_kwargs={'nopython': True, 'nogil': True, 'parallel': True},
    ),

    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

.rolling(…).cov(…, pairwise=True)

We can do rolling pairwise operations like covariances and correlations.

from pandas import DataFrame, date_range
from numpy.random import default_rng

rng = default_rng(0)

df0 = DataFrame(
    index=(idx := date_range('2020', periods=90)),
    data={
        'A': rng.integers(-10, +10, size=len(idx)),
        'B': rng.integers(-10, +10, size=len(idx)),
        'C': rng.integers(-10, +10, size=len(idx)),
    },
)

df1 = DataFrame(
    index=(idx := date_range('2020', periods=90)),
    data={
        'A': rng.integers(-10, +10, size=len(idx)),
        'B': rng.integers(-10, +10, size=len(idx)),
        'C': rng.integers(-10, +10, size=len(idx)),
    },
)

print(
    df0.rolling('2d').cov(df1, pairwise=True),

    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

.rolling(win_type=…)

We can customize the window weighting with win_type=.

from pandas import Series, date_range
from numpy.random import default_rng

rng = default_rng(0)

s = Series(
    index=(idx := date_range('2020', periods=90)),
    data=rng.integers(-10, +10, size=len(idx)),
)

print(
    s.rolling(3).mean(),

    s.rolling(3, win_type='triang').mean(),

    s.rolling(3, win_type='gaussian').mean(std=0.1),

    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

pandas.api.indexers.BaseIndexer

Occassionally, we may need our own custom windows. We can accomplish this with a sublcass of pandas.api.indexers.BaseIndexer.

from itertools import pairwise, chain
from string import ascii_lowercase

from pandas import Series, date_range, MultiIndex, to_timedelta
from pandas.api.indexers import BaseIndexer
from numpy import unique, nonzero, array, empty
from numpy.random import default_rng

from _lib import timed

class CustomIndexer(BaseIndexer):
    def __init__(self, grouping, timeseries, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.grouping = grouping
        self.timeseries = timeseries
    def get_window_bounds(self, num_values, min_periods, center, closed, step):
        starts, stops = empty(num_values, dtype=int), empty(num_values, dtype=int)
        for grplft, grprgt in pairwise(chain([0], nonzero(self.grouping[1:] != self.grouping[:-1])[0] + 1, [len(self.grouping)])):
            lft, rgt = (
                Series(index=self.timeseries[grplft:grprgt], data=range(grprgt-grplft))
                .pipe(lambda s: (
                    s.rolling('3d').min().values,
                    s.rolling('3d').max().values,
                ))
            )
            for idx, (lftidx, rgtidx) in enumerate(zip(lft, rgt)):
                starts[grplft + idx] = grplft + lftidx
                stops[grplft + idx] = grplft + rgtidx + 1
        return starts, stops


rng = default_rng(0)

s = (
    Series(
        index=(idx := MultiIndex.from_product([
            date_range('2020-01-01', periods=30, name='date'),
            unique(rng.choice([*ascii_lowercase], size=(20_000, 4)).view('<U4').ravel()),
        ], names='date entity'.split())),
        data=rng.integers(-10, +10, size=len(idx)),
    )
    .swaplevel()
    .sort_index()
)

if True:
    results = []

    with timed('CustomIndexer'):
        results.append(
            s.rolling(
                CustomIndexer(
                    grouping=s.index.get_level_values('entity'),
                    timeseries=s.index.get_level_values('date'),
                )
            ).sum()
        )

    with timed('.groupby.transform(.rolling)'):
        results.append(
            s.groupby('entity').transform(lambda g: g
                .droplevel('entity').rolling('3d').sum().values
            )
        )

    assert all((x == y).all() for x, y in pairwise(results))

.expanding and .ewm

.expanding allows us to perform expanding window calculations.

.ewm allows us to perform expanding window calculations with exponential weighting.

from scipy.stats import zscore
from pandas import Series, date_range
from numpy.random import default_rng

rng = default_rng(0)

s = Series(
    index=(idx := date_range('2020', periods=90)),
    data=rng.integers(-10, +10, size=len(idx)),
)

print(
    # s.expanding().sum(),
    # s.cumsum(),

    s.ewm(halflife='2 days', times=s.index).sum(),

    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

When was the last time we saw the highest price?

from _lib import load_data

globals().update(load_data())

print(
    # prices.head(),

    prices
        ['buy']
        .groupby('ticker', observed=True)
        .transform(lambda g: g
            .droplevel('ticker').expanding().apply(lambda g: g.idxmax())
        )
    ,

    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)