.groupbyDate: Friday, Sep 6, 2024 at 09:30 US/Eastern
Topics: pandas
Confidently groupby .aggregate, .apply, and .transform your data!
Grouped functions are one of the most common operations performed on
tabulardata. Due to their analytical usefulness, the pandas .groupby
operationsneed to be both performant and flexible— two coding concepts that are
oftenat odds with one another. So how does the most popular Python
DataFramelibrary address this problem? Join us to find out the answer.
In this upcoming seminar, we will discuss the concepts behind pandas .groupby
operations— so that you will be able to confidently choose the best method for
the problems you work on. Not only will this understanding help make your code
more declarative & readable, but you will also develop intuition for fast or
slow .groupby operation. This session is a great opportunity to further your
understanding and use of pandas to write more maintainable and performant code
than before!
pip install pandas numpy scipy matplotlib
print("Let's take a look!")
python -m pip install pandas numpy scipy numba
Here is some helper code for a very simple (low-fidelity) timer. We’ll use it to perform some simple performance analyses.
from contextlib import contextmanager
from pathlib import Path
from time import perf_counter
from sys import stderr
@contextmanager
def timed(msg):
before = perf_counter()
try:
yield
finally:
after = perf_counter()
print(f'{msg:<36} \N{mathematical bold capital delta}t: {after - before:>9.4f}s', file=stderr)
__all__ = 'timed',
if __name__ == '__main__':
from logging import getLogger, basicConfig, INFO
logger = getLogger(__name__); basicConfig(level=INFO)
lib_dir = Path('_lib')
lib_dir.mkdir(exist_ok=True, parents=True)
with open(filename := (lib_dir / 'timer.py'), mode='w') as f:
print(Path(__file__).read_text(), file=f)
logger.info('Wrote to %s', filename)
from time import sleep
from _lib import timed
if __name__ == '__main__':
with timed('test'):
sleep(1)
Here is some helper code for working with MultiIndex. We’ll use it to
improve the “fluency” of our code.
from collections.abc import Callable
from dataclasses import dataclass
from pathlib import Path
from pandas import Index, MultiIndex
from pandas.api.extensions import register_index_accessor
@register_index_accessor('_ext')
@dataclass
class _ext:
obj : Index
def addlevel(self, **levels):
levels = {
k: v if not isinstance(v, Callable) else v(self.obj)
for k, v in levels.items()
}
new_obj = self.obj.copy(deep=False)
if not isinstance(new_obj, MultiIndex):
new_obj = MultiIndex.from_arrays([
new_obj
])
names = new_obj.names
new_obj.names = [None] * len(names)
return MultiIndex.from_arrays([
*(
new_obj.get_level_values(idx)
for idx in range(len(names))
),
*levels.values(),
], names=[*names, *levels.keys()])
def updatelevel(self, **levels):
levels = {
k: v if not isinstance(v, Callable) else v(self.obj)
for k, v in levels.items()
}
new_obj = self.obj.copy(deep=False)
if not isinstance(new_obj, MultiIndex):
new_obj = MultiIndex.from_arrays([
new_obj
])
names = new_obj.names
new_obj.names = [None] * len(names)
return MultiIndex.from_arrays([
levels[n]
if n in levels else
new_obj.get_level_values(idx)
for idx, n in enumerate(names)
], names=names)
__all__ = ()
if __name__ == '__main__':
from logging import getLogger, basicConfig, INFO
logger = getLogger(__name__); basicConfig(level=INFO)
lib_dir = Path('_lib')
lib_dir.mkdir(exist_ok=True, parents=True)
with open(filename := (lib_dir / 'pandas.py'), mode='w') as f:
print(Path(__file__).read_text(), file=f)
logger.info('Wrote %s', filename)
from pandas import Series, period_range
from numpy.random import default_rng
import _lib
if __name__ == '__main__':
rng = default_rng(0)
s = Series(
index=(idx := period_range('2020Q1', periods=4)),
data=rng.integers(-10, +10, size=len(idx)),
).rename_axis('date')
print(
s.head(2),
s.pipe(lambda s: s
.set_axis(s.index._ext.addlevel(num=range(len(s))))
).head(2),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40)
)
Let’s start with some simple pricing data.
from logging import getLogger, basicConfig, INFO
from pathlib import Path
from pickle import dump
from string import ascii_lowercase
from pandas import CategoricalIndex, date_range, DataFrame, MultiIndex, IndexSlice
from numpy.random import default_rng
from numpy import unique
logger = getLogger(__name__); basicConfig(level=INFO)
rng = default_rng(0)
dates = date_range('2010', '2020', freq='min', name='date')
dates = dates[dates.indexer_between_time('09:30', '16:00')]
currencies = {'USD', 'EUR'}
tickers = unique(
rng.choice([*ascii_lowercase], size=(20, 4)).view('<U4').ravel()
)
assets = CategoricalIndex([*tickers, *sorted(currencies)], name='asset')
currencies = CategoricalIndex([*sorted(currencies)], dtype=assets.dtype, name='currency')
tickers = CategoricalIndex([*sorted(tickers)], dtype=assets.dtype, name='ticker')
prices = (
DataFrame(
index=(idx := MultiIndex.from_product([
dates,
tickers,
])),
data={
'buy': (
rng.normal(loc=100, scale=20, size=len(tickers)).clip(10, 200)
* rng.normal(loc=1, scale=.0001, size=(len(dates), len(tickers))).cumprod(axis=0)
).ravel()
},
)
.assign(
sell=lambda df: df['buy'] * (1 - abs(rng.normal(loc=0, scale=.05, size=len(df))))
)
.round(4)
.rename_axis(columns='direction')
)
print(
prices.head(3),
# prices.loc[IndexSlice[:, 'arnq', :]],
sep='\n',
)
data_dir = Path('data')
data_dir.mkdir(exist_ok=True, parents=True)
prices.to_pickle(filename := (data_dir / 'prices.pkl'))
logger.info('Wrote %s', filename)
with open(filename := (data_dir / 'dtypes.pkl'), mode='wb') as f:
dump({'currencies': currencies, 'tickers': tickers, 'assets': assets, 'dates': dates}, f)
logger.info('Wrote %s', filename)
We’ll store the pricing data to a Pickle file (which is fine, because we are not distributing this data—we’re moving it merely from our “left-hand” to our “right-hand.”)
Let’s construct some simple trading data.
The above dramatically simplifies working with MultiIndex. Let’s complete our
trade data by adding noise to the time measurements and adding a trading identifier.
from logging import getLogger, basicConfig, INFO
from pathlib import Path
from pickle import load
from numpy import arange
from numpy.random import default_rng
from pandas import read_pickle, Series, MultiIndex, to_timedelta, IndexSlice
import _lib
logger = getLogger(__name__); basicConfig(level=INFO)
rng = default_rng(0)
data_dir = Path('data')
prices = read_pickle(data_dir / 'prices.pkl')
with open(data_dir / 'dtypes.pkl', mode='rb') as f:
dtypes = load(f)
currencies, tickers, assets, dates = dtypes['currencies'], dtypes['tickers'], dtypes['assets'], dtypes['dates']
idx = MultiIndex.from_product([
dates,
tickers,
range(10),
], names=[dates.name, 'asset', 'trade'])
idx = idx[rng.choice(arange(len(idx)), size=int(len(idx) * .005), replace=True)]
idx = idx.droplevel('trade')
trades = (
Series(
index=idx,
data=(
rng.choice([-1, +1], size=len(idx))
* abs(rng.normal(loc=10_000, scale=20_000, size=len(idx)).round(-2).astype(int))
),
name='volume'
)
.round(-2)
.pipe(lambda s: s.set_axis(s.index
._ext.updatelevel(date=lambda idx:
idx.get_level_values('date')
+ to_timedelta(abs(rng.normal(loc=0, scale=10, size=len(idx)).round()), unit='s')
)
))
.sort_index()
.loc[
prices.index.get_level_values('date').min()
:prices.index.get_level_values('date').max()
]
.pipe(lambda s: s.set_axis(s.index
._ext.addlevel(trade=range(len(s)))
))
)
print(
trades,
trades.groupby('asset', observed=True).count(),
sep='\n',
)
data_dir = Path('data')
data_dir.mkdir(exist_ok=True, parents=True)
trades.to_pickle(filename := (data_dir / 'trades.pkl'))
logger.info('Wrote %s', filename)
Here is a helper module for loading all the data.
from pathlib import Path
from pickle import load
from pandas import read_pickle
def load_data(*, data_dir=Path('data')):
prices = read_pickle(data_dir / 'prices.pkl')
trades = read_pickle(data_dir / 'trades.pkl')
with open(data_dir / 'dtypes.pkl', mode='rb') as f:
dtypes = load(f)
return {
'prices': prices,
'trades': trades,
**dtypes,
}
__all__ = 'load_data',
if __name__ == '__main__':
from logging import getLogger, basicConfig, INFO
logger = getLogger(__name__); basicConfig(level=INFO)
lib_dir = Path('_lib')
lib_dir.mkdir(exist_ok=True, parents=True)
with open(filename := (lib_dir / 'data.py'), mode='w') as f:
print(Path(__file__).read_text(), file=f)
logger.info('Wrote to %s', filename)
.groupby Overviewprint("Let's take a look!")
Let’s start with some basic .groupby operations.
from _lib import load_data
from numpy import sum as np_sum
globals().update(load_data())
print(
# trades,
# trades.groupby('asset', observed=True).count(),
# trades.groupby('asset', observed=True).sum(),
# trades.groupby('asset', observed=True).agg('count'),
# trades.groupby('asset', observed=True).agg('sum'),
# trades.groupby('asset', observed=True).agg(sum),
# trades.groupby('asset', observed=True).agg(np_sum),
# trades.groupby('asset', observed=True).agg(lambda g: sum(g)),
trades.groupby('asset', observed=True).sum() / trades.groupby('asset', observed=True).count(),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
Let’s look into the underlying performance of these operations.
from statistics import mean
from string import ascii_lowercase
from textwrap import indent
from pandas import Series, MultiIndex, date_range
from numpy import unique, sum as np_sum, mean as np_mean
from numpy.random import default_rng
from _lib import timed
rng = default_rng(0)
entities = unique(
rng.choice([*ascii_lowercase], size=(20_000, 4)).view('<U4').ravel()
)
s = Series(
index=(idx := MultiIndex.from_product([
date_range('2020-01-01', periods=365),
entities,
], names='date entity'.split())),
data=rng.normal(size=len(idx)),
)
print(
s.head(),
f'{len(s) = :,}',
f'{len(entities) = :,}',
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
if False:
with timed('.sum()'): s.groupby('entity').sum()
with timed(".agg('sum')"): s.groupby('entity').agg('sum')
with timed('.agg(sum)'): s.groupby('entity').agg(sum)
with timed('.agg(np_sum)'): s.groupby('entity').agg(np_sum)
with timed('.agg(lambda g: sum(g))'): s.groupby('entity').agg(lambda g: sum(g))
if True:
with timed('.mean()'): s.groupby('entity').mean()
with timed(".agg('mean')"): s.groupby('entity').agg('mean')
with timed('.agg(mean)'): s.groupby('entity').agg(mean)
with timed('.agg(np_mean)'): s.groupby('entity').agg(np_mean)
.groupby.filterThe .groupby.filter modality allows us to select elements on a groub-by-group basis.
Let’s find all the tickers we bought.
from _lib import load_data
globals().update(load_data())
print(
trades.loc[trades > 0],
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
Let’s find all the tickers where our net position life-to-date was long.
from _lib import load_data
globals().update(load_data())
print(
# trades.groupby('asset', observed=True).filter(lambda g: g.sum() > 0),
trades
.groupby('asset', observed=True).filter(lambda g: g.sum() > 0)
.index.get_level_values('asset').unique()
.values
,
trades
.groupby('asset', observed=True).filter(lambda g: g.sum() > 0)
.pipe(lambda s: trades
.index.get_level_values('asset').unique().difference(
s.index.get_level_values('asset').unique()
)
.values
)
,
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
Let’s find all the tickers where we ever held a long position.
...
We’ll visit this question momentarily.
.groupby.filter modality is about filtering rows based on group-level
calculations..groupby.filter modality takes a UDF (“user defined function”) that operates on
the entire DataFrame..groupby.filter modality returns a result that is the same size or smaller than
the original object..groupby.filter modality preserves the indexing of the original object..groupby.aggThe .groupby.agg modality is about summarizing rows based on non-overlapping,
group-level calculations. This is equivalent to the SQL group by operator.
from _lib import load_data
globals().update(load_data())
print(
# trades.head(),
# trades.groupby('asset', observed=True).sum(),
# prices.head(),
# prices.groupby('ticker', observed=True).mean(),
# trades.groupby('asset', observed=True).agg(lambda g: g.max() - g.min()),
# trades.groupby('asset', observed=True).agg(lambda g: g.idxmax() - g.idxmin()),
# trades.groupby('asset', observed=True).agg(
# lambda g: g.droplevel(['asset', 'trade']).idxmax() - g.droplevel(['asset', 'trade']).idxmin()
# ),
# trades.groupby('asset', observed=True).pipe(lambda gb: gb.max() - gb.min()),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
Pro-tip: iterate over .groupby to see the groups!
from pandas import Series, date_range
s = Series(
index=date_range('2020-01-01', periods=365),
data=0,
)
for k, g in s.groupby(s.index.to_period('Q')):
print(f'{k = }')
print(f'{g = }')
break
Let’s consider performance of various .agg-related syntaxes on high-cardinality data.
from itertools import pairwise
from statistics import mean
from string import ascii_lowercase
from textwrap import indent
from pandas import Series, MultiIndex, date_range
from numpy import unique, sum as np_sum, mean as np_mean
from numpy.random import default_rng
from _lib import timed
rng = default_rng(0)
entities = unique(
rng.choice([*ascii_lowercase], size=(20_000, 4)).view('<U4').ravel()
)
s = Series(
index=(idx := MultiIndex.from_product([
date_range('2020-01-01', periods=365),
entities,
], names='date entity'.split())),
data=rng.normal(size=len(idx)),
)
print(
s.head(),
f'{len(s) = :,}',
f'{len(entities) = :,}',
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
if False:
results = []
with timed('.agg(lambda g: …)'):
results.append(
s.groupby('entity').agg(lambda g: g.max() - g.min())
)
with timed('.pipe(lambda gb: …)'):
results.append(
s.groupby('entity').pipe(lambda gb: gb.max() - gb.min())
)
with timed(".agg([…]).pipe(…)"):
results.append(
s.groupby('entity').agg(['min', 'max']).pipe(lambda df: df['max'] - df['min'])
)
assert all((x == y).all() for x, y in pairwise(results))
if True:
results = []
with timed('.agg(lambda g: …)'):
results.append(
s.groupby('entity').agg(lambda g:
g.droplevel('entity').idxmax() - g.droplevel('entity').idxmin()
)
)
with timed(".pipe(lambda gb: …)"):
results.append(
s.groupby('entity').pipe(lambda gb:
gb.agg(lambda g: g.droplevel('entity').idxmax())
- gb.agg(lambda g: g.droplevel('entity').idxmin())
)
)
with timed(".pipe(lambda gb: …)"):
results.append(
s.pipe(lambda s: s
.reset_index('entity', drop=True)
.groupby(s.index.get_level_values('entity'))
.pipe(lambda gb: gb.idxmax() - gb.idxmin())
)
)
assert all((x == y).all() for x, y in pairwise(results))
Let’s find the average traded price per asset (volume-weighted.)
from pandas import merge_asof
from numpy import where
from _lib import load_data
globals().update(load_data())
# merged = (
# merge_asof(
# trades,
# prices,
# # left_on='date',
# # right_on='date',
# # left_on=('date', 'asset'),
# # right_on=('date', 'ticker'),
# )
# .drop(['date'], axis='columns')
# .set_axis(trades.index)
# .assign(
# price=lambda df: where(df['volume'] > 0, df['buy'], df['sell'])
# )
# )
merged = (
trades
.pipe(lambda s: s
.set_axis(s.index._ext.updatelevel(date=s.index.get_level_values('date').floor('30min')))
)
.to_frame()
.join(prices)
.assign(
price=lambda df: where(df['volume'] > 0, df['buy'], df['sell'])
)
)
print(
prices.head(3),
trades.head(3),
# merged.head(3),
# merged
# .groupby('asset', observed=True).agg(lambda df:
# (df['volume'] * df['price']).sum() / df['volume'].sum()
# )
# ,
merged
.pipe(lambda df:
(df['volume'] * df['price']).groupby('asset', observed=True).sum()
/ (df['volume']).groupby('asset', observed=True).sum()
)
,
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
.groupby.agg modality is about producing group-level summaries (“aggregations.”).groupby.agg modality can take a UDF (“user defined function”) that operates on
one column at a time (a Series.).groupby.agg modality returns a result that has strictly one row per group..groupby.agg produces a result that is indexed on the group keys.pandas.GrouperThe pandas.Grouper is an occassionally useful helper for performing
grouping operations on time series indexed data.
Let’s look at some of the things we can .groupby(…) on!
from pandas import Series, date_range, Grouper
from numpy.random import default_rng
rng = default_rng(0)
s = Series(
index=(idx := date_range('2020-01-01', periods=90, name='date')),
data=rng.normal(size=len(idx)),
)
print(
s.groupby('date').mean().head(),
# s.groupby(lambda idx: idx - idx.to_period('M').start_time).mean().head(),
# s.groupby(s.index.get_level_values('date') - s.index.get_level_values('date').to_period('M')).mean().head(),
# s.groupby(rng.choice([True, False], size=len(s))).mean(),
# s.groupby(Grouper(level='date', freq='M')).mean(),
# s.pipe(lambda s: s.groupby(s.index.to_period('M'))).mean(),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
pandas.NamedAggThe pandas.NamedAgg is an occassionally useful helper for performing
grouping operations on DataFrame where we want to control the names
of the result columns.
from pandas import DataFrame, date_range, NamedAgg, MultiIndex
from numpy.random import default_rng
rng = default_rng(0)
df = DataFrame(
index=(idx := date_range('2020-01-01', periods=90, name='date')),
data={
'a': rng.normal(size=len(idx)),
'b': rng.normal(size=len(idx)),
},
)
print(
# df.groupby('date').agg('min').head(),
# df.groupby('date').agg(['min', 'max']).head(),
# df.groupby('date').agg({'a': 'min', 'b': 'max'}).head(),
# df.groupby('date').agg({'a': 'min', 'b': ['min', 'max']}).head(),
# df.groupby('date').agg(
# a_min=NamedAgg('a', 'min'),
# b_max=NamedAgg('b', 'max'),
# )
# .head(),
# df.groupby('date').agg({'a': 'min', 'b': 'max'})
# .set_axis(MultiIndex.from_tuples([('a', 'min'), ('b', 'max')]), axis='columns')
# .head(),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
.groupby.transformThe .groupby.transform modality is about transforming the data based on
non-overlapping, group-level calculations. This is equivalent to the SQL
“window functions” feature.
from pandas import NamedAgg
from numpy import sign, where
from _lib import load_data
globals().update(load_data())
print(
# trades.groupby('asset', observed=True).sum().head(),
# trades.groupby('asset', observed=True).cumsum().head(),
# trades.groupby('asset', observed=True).cummax(),
# trades.groupby('asset', observed=True).expanding().max()
# .droplevel(0).sort_index()
# ,
# prices.groupby('ticker', observed=True).idxmax(),
# prices.groupby('ticker', observed=True).cummax(),
# prices.groupby('ticker', observed=True).cumidxmax(),
trades
.pipe(lambda s:
s.set_axis(
s.index._ext.addlevel(
direction=where(
d := s.groupby('asset', observed=True).cummax().pipe(sign) > 0,
'long',
'short',
),
position=d.groupby('asset', observed=True).transform(lambda g:
(g != g.shift()).cumsum()
),
)
)
)
# .groupby(['asset', 'position', 'direction'], observed=True).agg([
# 'count',
# NamedAgg(
# 'trades',
# lambda g: {*g.index.get_level_values('trade').unique()},
# ),
# ])
# .groupby('asset', observed=True).agg(lambda g:
# g.index.get_level_values('position').nunique()
# )
# .head()
,
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
There is an interesting relationship between .groupby and .unstack.
from string import ascii_lowercase
from pandas import MultiIndex, date_range, Series
from numpy.random import default_rng
rng = default_rng(0)
s = (
Series(
index=(idx := MultiIndex.from_product([
date_range('2020-01-01', periods=14, name='date'),
rng.choice([*ascii_lowercase], size=(10, 4)).view('<U4').ravel(),
], names=['date', 'entity'])),
data=rng.normal(size=len(idx)),
)
.round(2)
.sort_index()
)
print(
# s,
# s.groupby('entity').cumsum().head(),
# s.groupby('entity').transform('cumsum').head(),
# s.unstack('entity').cumsum().stack('entity').head(),
s.groupby('entity').sum().head(),
s.groupby('entity').agg('sum').head(),
s.unstack('entity').sum().head(),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
.groupby.transform modality is about transforming the entire data using per-group
analyses..groupby.transform modality can take a UDF (“user defined function”) that operates on
one column at a time (a Series.).groupby.transform modality returns a result that has strictly the same shape
as the original data..groupby.transform produces a result that is indexed identically to the original
data..groupby.applyThe .groupby.apply modality is about arbitrary manipulations of the data based on
non-overlapping, group-level calculations. There is no direct equivalent in SQL.
It is the only modality that operates on the DataFrame level.
Let’s find the average traded price per asset (volume-weighted.)
from numpy import where
from pandas import Series, DataFrame
from _lib import load_data
globals().update(load_data())
traded_prices = (
Series(
index=trades.index,
data=prices.loc[
trades.index._ext.updatelevel(date=lambda idx: idx.get_level_values('date').floor('30min'))
].pipe(lambda df: where(trades > 0, df['buy'], df['sell']))
)
)
print(
# trades.head(),
# prices.head(),
# traded_prices.head(),
# DataFrame({'volume': trades, 'price': traded_prices})
# .pipe(lambda df:
# (df['volume'] * df['price']).groupby('asset', observed=True).sum()
# / (df['volume']).groupby('asset', observed=True).sum()
# )
# .head()
# ,
(
(trades * traded_prices).groupby('asset', observed=True).sum()
/ trades.groupby('asset', observed=True).sum()
)
.head()
,
DataFrame({'volume': trades, 'price': traded_prices})
.groupby('asset', observed=True).apply(lambda df:
(df['volume'] * df['price']).sum() / df['volume'].sum()
)
.head()
,
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
How slow is .groupby.apply really?
from string import ascii_lowercase
from pandas import DataFrame, MultiIndex, date_range, Series
from numpy import unique
from numpy.random import default_rng
from _lib import timed
rng = default_rng(0)
entities = unique(
rng.choice([*ascii_lowercase], size=(20_000, 4)).view('<U4').ravel()
)
df = (
DataFrame(
index=(idx := MultiIndex.from_product([
date_range('2020-01-01', periods=365),
entities,
], names='date entity'.split())),
data={
'value': rng.integers(-10, +10, size=len(idx)),
'weight': abs(rng.normal(size=len(idx))),
}
)
.swaplevel()
.sort_index()
)
if True:
with timed('.apply'):
df.groupby('entity').apply(lambda g:
(g['value'] * g['weight']).sum() / g['weight'].sum()
)
with timed('.agg'):
df.pipe(lambda df:
(df['value'] * df['weight']).groupby('entity').sum()
/ df['weight'].groupby('entity').sum()
)
if True:
from _lib.cgroupcalcs import grouped_weighted_mean
with timed('cython'):
Series(*grouped_weighted_mean(
grouping=df.index.get_level_values('entity'),
values=df['value'],
weights=df['weight'],
))
# cython: language_level=3, boundscheck=False, initializedcheck=False, cdivision=True
from numpy import unique, empty, nonzero, hstack
def grouped_weighted_mean(grouping, values, weights):
grouping = grouping.to_numpy()
values = values.to_numpy()
weights = weights.to_numpy()
boundaries = nonzero(grouping[1:] != grouping[:-1])[0] + 1
unique = hstack([grouping[0], grouping[boundaries]])
result = empty(len(boundaries) + 1)
gwmean_calc(hstack([boundaries, [len(values)]]), values, weights, result)
return result, unique
cdef gwmean_calc(long[:] boundaries, long[:] values, double[:] weights, double[:] result):
cdef int bnd_idx
cdef int prev_idx
cdef int curr_idx
cdef int idx
cdef double numer
cdef double denom
cdef int num_boundaries = len(boundaries)
cdef int num_values = len(values)
prev_idx = 0
for bnd_idx in range(num_boundaries):
curr_idx = boundaries[bnd_idx]
numer, denom = 0, 0
for idx in range(prev_idx, curr_idx):
numer += values[idx] * weights[idx]
denom += weights[idx]
result[bnd_idx] = numer / denom
prev_idx = curr_idx
def grouped_weighted_var(grouping, values, weights):
grouping = grouping.to_numpy()
values = values.to_numpy()
weights = weights.to_numpy()
boundaries = nonzero(grouping[1:] != grouping[:-1])[0] + 1
unique = hstack([grouping[0], grouping[boundaries]])
result = empty(len(boundaries) + 1)
gwvar_calc(hstack([boundaries, [len(values)]]), values, weights, result)
return result, unique
cdef gwvar_calc(long[:] boundaries, long[:] values, double[:] weights, double[:] result):
cdef int bnd_idx
cdef int prev_idx
cdef int curr_idx
cdef int idx
cdef double numer
cdef double denom
cdef double mean
cdef int num_boundaries = len(boundaries)
cdef int num_values = len(values)
prev_idx = 0
for bnd_idx in range(num_boundaries):
curr_idx = boundaries[bnd_idx]
numer, denom = 0, 0
for idx in range(prev_idx, curr_idx):
numer += values[idx] * weights[idx]
denom += weights[idx]
mean = numer / denom
numer, denom = 0, 0
for idx in range(prev_idx, curr_idx):
numer += weights[idx] * (values[idx] - mean)**2
denom += weights[idx]
result[bnd_idx] = numer / denom
prev_idx = curr_idx
Let’s convert “deltas” to “states.”
from itertools import pairwise
from string import ascii_lowercase
from pandas import MultiIndex, date_range, Series, Categorical
from numpy import unique
from numpy.random import default_rng
from _lib import timed
rng = default_rng(0)
states = Categorical('A B C D'.split())
s = (
Series(
index=(idx := MultiIndex.from_product([
date_range('2020-01-01', periods=90, name='date'),
unique(rng.choice([*ascii_lowercase], size=(20_000, 4)).view('<U4').ravel()),
], names='date entity'.split())),
data=rng.choice(states, size=len(idx)),
)
.sample(frac=.05, random_state=rng)
.sort_index()
)
all_dates = date_range(
s.index.get_level_values('date').min(),
s.index.get_level_values('date').max(),
name='date'
)
print(
# s,
# s.groupby('entity').apply(lambda g:
# g.droplevel('entity').reindex(all_dates).ffill().bfill()
# ),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
if True:
results = []
with timed('.groupby.apply'):
results.append(
s.groupby('entity').apply(lambda g:
g.droplevel('entity').reindex(all_dates).ffill().bfill()
)
)
with timed('.loc[:] = '):
_ = Series(index=MultiIndex.from_product([
all_dates,
s.index.get_level_values('entity').unique(),
], names=s.index.names), dtype=states.dtype)
_.loc[:] = s
_ = (
_
.groupby('entity').ffill().groupby('entity').bfill()
.swaplevel()
.sort_index()
)
results.append(_)
assert all((x == y).all() for x, y in pairwise(results))
.groupby.apply modality is about manipulating the data in arbitrary ways using
per-group analyses. “Arbitrary” means no prior-known relationship between
the same of the input and the shape of the output output..groupby.apply modality can take a UDF (“user defined function”) that operates on
the entire data (a DataFrame.).groupby.apply modality returns a result that can have any possible shape.groupby.apply produces a result that is indexed on each grouped calculation results
with (optionally) the group-keys prepended..groupby.apply modality is best avoided—it is very slow!.rolling.rolling operations are about performing overlapping grouped operations. This
is similar to SQL’s window functions.
What is the three day moving average price per asset?
from _lib import load_data
globals().update(load_data())
print(
prices,
# prices.rolling(3, min_periods=1).mean(),
# prices.groupby('ticker', observed=True).rolling('3d', min_periods=1).mean(),
prices.groupby('ticker', observed=True).transform(
lambda g: g.droplevel('ticker').rolling('3d', min_periods=1).mean()
),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
How slow is .groupby.rolling(lambda g: …) really?
from itertools import pairwise
from string import ascii_lowercase
from pandas import MultiIndex, Series, date_range
from numpy import unique
from numpy.random import default_rng
from _lib import timed
rng = default_rng(0)
s = Series(
index=(idx := MultiIndex.from_product([
date_range('2020-01-01', periods=30, name='date'),
unique(rng.choice([*ascii_lowercase], size=(20_000, 4)).view('<U4').ravel()),
], names='date entity'.split())),
data=rng.normal(size=len(idx)).round(2),
)
print(
s.head(),
f'{len(s) = :,}',
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
if True:
results = []
with timed('.groupby(…).transform(.rolling(…))'):
results.append(
s.groupby('entity', observed=True).transform(
lambda g: g.droplevel('entity').rolling('3d', min_periods=1).mean().values
)
)
with timed('.unstack(…).rolling(…).stack(…)'):
results.append(
s.unstack('entity').rolling('3d', min_periods=1).mean().stack('entity')
)
assert all((x == y).all() for x, y in pairwise(results))
.rolling in NumPy with numpy.lib.stride_tricks.sliding_window_viewLet’s consider against NumPy…
from itertools import pairwise
from string import ascii_lowercase
from pandas import Series, MultiIndex, date_range, DataFrame
from numpy import unique, hstack, allclose
from numpy.random import default_rng
from numpy.lib.stride_tricks import sliding_window_view
from _lib import timed
rng = default_rng(0)
s = Series(
index=(idx := MultiIndex.from_product([
date_range('2020-01-01', periods=30, name='date'),
unique(rng.choice([*ascii_lowercase], size=(20_000, 4)).view('<U4').ravel()),
], names='date entity'.split())),
data=rng.normal(size=len(idx)).round(2),
)
if True:
results = []
with timed('.groupby(…).rolling(…)'):
results.append((
s.groupby('entity', observed=True).rolling(3).mean(),
lambda x: x.droplevel(-1).dropna(),
))
with timed('.unstack(…).rolling(…).stack(…)'):
results.append((
s.unstack('entity').rolling(3).mean().stack('entity'),
lambda x: x.dropna().swaplevel().sort_index(),
))
with timed('.groupby(…).xform(sliding_win_view)'):
results.append((
s.groupby('entity', observed=True).transform(lambda g:
hstack([
[float('nan')]*(3-1), sliding_window_view(g, 3).mean(axis=-1)
])
),
lambda x: x.dropna().swaplevel().sort_index()
))
with timed('.unstack(…).xform(sliding_win_view)…'):
results.append((
s.unstack('entity').pipe(lambda df: DataFrame(
index=df.index[3-1:],
columns=df.columns,
data=sliding_window_view(df, 3, axis=0).mean(axis=-1),
)
).stack('entity'),
lambda x: x.swaplevel().sort_index(),
))
assert all(allclose(fx(x), fy(y)) for (x, fx), (y, fy) in pairwise(results))
.rolling(closed=…, center=…).rolling allows us to control the centering of the window as well as the
inclusion of the endpoints of the window.
from pandas import Series, date_range
from numpy.random import default_rng
rng = default_rng(0)
s = Series(
index=(idx := date_range('2020', periods=90)),
data=rng.integers(-10, +10, size=len(idx)),
)
print(
s.head(),
s.rolling('3d').sum().head(),
# s.rolling('3d', center=True).sum().head(),
s.rolling('3d', closed='both').sum().head(),
s.rolling('3d', closed='right').sum().head(), # XXX: default
s.rolling('3d', closed='left').sum().head(),
s.rolling('3d', closed='neither').sum().head(),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
.rolling(…).apply(…)We can provide a UDF to .rolling, but we’re relatively limited in what we can
do.
from pandas import Series, date_range
from numpy import fabs
from numpy.random import default_rng
rng = default_rng(0)
s = Series(
index=(idx := date_range('2020', periods=90)),
data=rng.integers(-10, +10, size=len(idx)),
)
print(
s.rolling('3d').apply(lambda g: abs(g - g.mean()).mean()),
# s.rolling('3d').apply(lambda g: abs(g - g.mean()).mean(), raw=True),
# s.rolling('3d').apply(lambda g: abs(g - g.mean()).mean(), engine='cython'),
# s.rolling('3d').apply(lambda g: fabs(g - g.mean()).mean(), raw=True, engine='numba'),
s.rolling('3d').apply(
lambda g: fabs(g - g.mean()).mean(),
raw=True,
engine='numba',
engine_kwargs={'nopython': True, 'nogil': True, 'parallel': True},
),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
.rolling(…).cov(…, pairwise=True)We can do rolling pairwise operations like covariances and correlations.
from pandas import DataFrame, date_range
from numpy.random import default_rng
rng = default_rng(0)
df0 = DataFrame(
index=(idx := date_range('2020', periods=90)),
data={
'A': rng.integers(-10, +10, size=len(idx)),
'B': rng.integers(-10, +10, size=len(idx)),
'C': rng.integers(-10, +10, size=len(idx)),
},
)
df1 = DataFrame(
index=(idx := date_range('2020', periods=90)),
data={
'A': rng.integers(-10, +10, size=len(idx)),
'B': rng.integers(-10, +10, size=len(idx)),
'C': rng.integers(-10, +10, size=len(idx)),
},
)
print(
df0.rolling('2d').cov(df1, pairwise=True),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
.rolling(win_type=…)We can customize the window weighting with win_type=.
from pandas import Series, date_range
from numpy.random import default_rng
rng = default_rng(0)
s = Series(
index=(idx := date_range('2020', periods=90)),
data=rng.integers(-10, +10, size=len(idx)),
)
print(
s.rolling(3).mean(),
s.rolling(3, win_type='triang').mean(),
s.rolling(3, win_type='gaussian').mean(std=0.1),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
pandas.api.indexers.BaseIndexerOccassionally, we may need our own custom windows. We can accomplish this
with a sublcass of pandas.api.indexers.BaseIndexer.
from itertools import pairwise, chain
from string import ascii_lowercase
from pandas import Series, date_range, MultiIndex, to_timedelta
from pandas.api.indexers import BaseIndexer
from numpy import unique, nonzero, array, empty
from numpy.random import default_rng
from _lib import timed
class CustomIndexer(BaseIndexer):
def __init__(self, grouping, timeseries, *args, **kwargs):
super().__init__(*args, **kwargs)
self.grouping = grouping
self.timeseries = timeseries
def get_window_bounds(self, num_values, min_periods, center, closed, step):
starts, stops = empty(num_values, dtype=int), empty(num_values, dtype=int)
for grplft, grprgt in pairwise(chain([0], nonzero(self.grouping[1:] != self.grouping[:-1])[0] + 1, [len(self.grouping)])):
lft, rgt = (
Series(index=self.timeseries[grplft:grprgt], data=range(grprgt-grplft))
.pipe(lambda s: (
s.rolling('3d').min().values,
s.rolling('3d').max().values,
))
)
for idx, (lftidx, rgtidx) in enumerate(zip(lft, rgt)):
starts[grplft + idx] = grplft + lftidx
stops[grplft + idx] = grplft + rgtidx + 1
return starts, stops
rng = default_rng(0)
s = (
Series(
index=(idx := MultiIndex.from_product([
date_range('2020-01-01', periods=30, name='date'),
unique(rng.choice([*ascii_lowercase], size=(20_000, 4)).view('<U4').ravel()),
], names='date entity'.split())),
data=rng.integers(-10, +10, size=len(idx)),
)
.swaplevel()
.sort_index()
)
if True:
results = []
with timed('CustomIndexer'):
results.append(
s.rolling(
CustomIndexer(
grouping=s.index.get_level_values('entity'),
timeseries=s.index.get_level_values('date'),
)
).sum()
)
with timed('.groupby.transform(.rolling)'):
results.append(
s.groupby('entity').transform(lambda g: g
.droplevel('entity').rolling('3d').sum().values
)
)
assert all((x == y).all() for x, y in pairwise(results))
.expanding and .ewm.expanding allows us to perform expanding window calculations.
.ewm allows us to perform expanding window calculations with exponential weighting.
from scipy.stats import zscore
from pandas import Series, date_range
from numpy.random import default_rng
rng = default_rng(0)
s = Series(
index=(idx := date_range('2020', periods=90)),
data=rng.integers(-10, +10, size=len(idx)),
)
print(
# s.expanding().sum(),
# s.cumsum(),
s.ewm(halflife='2 days', times=s.index).sum(),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
When was the last time we saw the highest price?
from _lib import load_data
globals().update(load_data())
print(
# prices.head(),
prices
['buy']
.groupby('ticker', observed=True)
.transform(lambda g: g
.droplevel('ticker').expanding().apply(lambda g: g.idxmax())
)
,
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)