.rolling, .expanding, .ewmOTTO MANN: “Oh, wow! Windows! I don’t think I can afford this place…”
— “You Only Move Twice” (S08E02)
What’s the difference between a windowed operation and a grouped operation? How do I effectively leverage pandas the pandas window API? Under what circumstances will a windowed operation be fast or slow?
Do you…
Then come join us for a session on pandas window operations!
pandas has a plethora of ways to slice, dice, and analyze your data. The most common data transformations are aggregations. The ability to describe a large dataset with a just a few numbers is extremely powerful for understanding your data. But what happens is we over aggregate, mistakenly throwing out real signal for the convenience of working with aggregated values? Thankfully window operations are here to help.
In this episode, we’ll look at .rolling, .expanding, and .ewm as well as their various options. These DataFrame methods are just the beginning though, as we’ll also explore the Window, Rolling, Expanding, and ExponentialMovingWindow objects they return. We’ll discuss the uses for these objects, how you can customize them, and how you can ensure your operations are applied efficiently and remain within the Restricted Computation Domain.
Keywords
print("Let's take a look!")
from pandas import DataFrame, period_range, MultiIndex, IndexSlice, to_timedelta, Series, Categorical, to_datetime
from numpy import tile, where, sign
from numpy.random import default_rng
from string import ascii_lowercase
from pathlib import Path
from textwrap import dedent
rng = default_rng(0)
tickers = rng.choice([*ascii_lowercase], size=(10, 4)).view('<U4').ravel()
industries = ['retail', 'finance', 'healthcare', 'tech', 'energy', 'entertainment']
days = period_range('2020-01-01', periods=90, freq='D')
portfolios = ['Alice', 'Bob', 'Charlie', 'Dana']
infos = Series(rng.choice(industries, size=len(tickers)), index=tickers, name='industry').rename_axis('ticker').sort_index()
prices = DataFrame(
index=(idx := MultiIndex.from_product([
days,
tickers,
], names=['date', 'ticker'])),
data={
'bid': (bid := (
rng.normal(loc=250, scale=50, size=len(tickers)).clip(0, 500)
* rng.normal(loc=1, scale=.02, size=(len(days), len(tickers))).cumprod(axis=-1).clip(0, 5)
).ravel().round(2)),
'ask': (ask := (
bid * rng.normal(loc=1.25, scale=.05, size=len(idx)).clip(.99, 2)
).round(2)),
},
).sort_index()
trades = DataFrame(
index=(idx := MultiIndex.from_product([
portfolios,
tile(days, 50),
tickers,
], names=['portfolio', 'date', 'ticker'])),
data={
'volume': (volumes :=
rng.normal(loc=0, scale=250_000, size=len(idx)).round(-2).astype('int')
),
'price': (
where(
volumes > 0,
(px := prices.loc[
MultiIndex.from_arrays([
idx.get_level_values('date'),
idx.get_level_values('ticker'),
])
]).loc[:, 'ask'],
px.loc[:, 'bid'],
)
* rng.normal(loc=1, scale=.15, size=len(idx)).clip(.5, 1.5)
).round(2),
},
).loc[lambda df: df['volume'] != 0].pipe(
lambda df: df.set_axis(
MultiIndex.from_arrays([
df.index.get_level_values('portfolio'),
df.index.get_level_values('date').to_timestamp()
+ to_timedelta(rng.normal(loc=12*60*60, scale=3*60*60, size=len(df.index)).round().clip(0, 24*60*60), unit='s'),
df.index.get_level_values('ticker'),
], names=df.index.names)
)
).sample(frac=.25, random_state=rng).sort_index()
positions = trades.groupby(['portfolio', 'ticker'])['volume'].cumsum().rename('position')
direction = sign(positions).map({-1: 'bid', 1: 'ask', 0: 'ask'}).rename('direction')
market_value = (
prices.loc[
prices.index.get_level_values('date').max()
].stack().rename_axis(['ticker', 'direction']).loc[
MultiIndex.from_arrays([
direction.index.get_level_values('ticker'),
direction.values,
])
].droplevel('direction')
.pipe(lambda s:
positions.to_frame().assign(price=s.values)
)
.product(axis='columns')
)
print(
# trades,
# prices,
# trades.head(),
# prices.head(),
# positions.head(),
# direction.head(),
# market_value.head(),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
data_dir = Path('data')
data_dir.mkdir(exist_ok=True)
trades.to_pickle(data_dir / 'trades.pkl')
prices.to_pickle(data_dir / 'prices.pkl')
positions.to_pickle(data_dir / 'positions.pkl')
direction.to_pickle(data_dir / 'direction.pkl')
market_value.to_pickle(data_dir / 'market_value.pkl')
from pandas import read_pickle
from pathlib import Path
data_dir = Path('data')
data_dir.mkdir(exist_ok=True)
trades = read_pickle(data_dir / 'trades.pkl')
prices = read_pickle(data_dir / 'prices.pkl')
positions = read_pickle(data_dir / 'positions.pkl')
direction = read_pickle(data_dir / 'direction.pkl')
market_value = read_pickle(data_dir / 'market_value.pkl')
print(
prices * 10,
# trades.head(),
# prices.head(),
# positions.head(),
# direction.head(),
# market_value.head(),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
print("Let's take a look!")
.groupbyfrom pandas import read_pickle
from pathlib import Path
data_dir = Path('data')
data_dir.mkdir(exist_ok=True)
trades = read_pickle(data_dir / 'trades.pkl')
prices = read_pickle(data_dir / 'prices.pkl')
positions = read_pickle(data_dir / 'positions.pkl')
direction = read_pickle(data_dir / 'direction.pkl')
market_value = read_pickle(data_dir / 'market_value.pkl')
print(
positions.groupby(['portfolio', 'ticker']).max(),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
.groupby with built-ins.groupby.sum().groupby.mean().groupby.cumsum()from pandas import read_pickle, IndexSlice, Grouper
from pathlib import Path
data_dir = Path('data')
data_dir.mkdir(exist_ok=True)
trades = read_pickle(data_dir / 'trades.pkl')
prices = read_pickle(data_dir / 'prices.pkl')
positions = read_pickle(data_dir / 'positions.pkl')
direction = read_pickle(data_dir / 'direction.pkl')
market_value = read_pickle(data_dir / 'market_value.pkl')
from numpy.random import default_rng
rng = default_rng(0)
print(
positions,
prices.loc[IndexSlice[:, ['ibba', 'evqx'], :], 'bid'].pipe(
lambda s: s.groupby(
['date']
# s.index.get_level_values('date').asfreq('M')
# rng.choice([True, False], size=len(s))
# Grouper(level='date', freq='M')
# lambda x: x[0].asfreq('M')
# ).cumsum()
# ).agg(max)
# ).agg('max')
# ).agg(np.max)
).agg(lambda g: max(g)/mean(g))
),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
.groupby with user-defined functions.groupby.filter.groupby.agg.groupby.apply.groupby.transformfrom pandas import read_pickle
from pathlib import Path
data_dir = Path('data')
data_dir.mkdir(exist_ok=True)
trades = read_pickle(data_dir / 'trades.pkl')
prices = read_pickle(data_dir / 'prices.pkl')
positions = read_pickle(data_dir / 'positions.pkl')
direction = read_pickle(data_dir / 'direction.pkl')
market_value = read_pickle(data_dir / 'market_value.pkl')
print(
prices
.groupby(['ticker'])
# .agg operates on only one column a time
# .agg changes your indexing to match the grouping
# .agg will reduce (“aggregate”) your data: for every partition (grouping) → one scalar result value
# .agg(
# lambda g: g.max() - g.mean()
# )
# .transform operates on only one column at a time
# .transform will not chagne your indexing
# .transform will map (“transform”) your data: for every original row → one result row
# .transform(
# lambda g: g.cummax()
# )
# .apply operates on all columns at the same time
# .apply will change the indexing: append the grouping as the outermost level of your indexing to
# whatever the UDF returns
# .apply will perform an arbitrary operation on your data: there is no a prior relationship between
# number of input & output rows
# .apply(
# lambda g: g[g['bid'] < g['ask'] * .80]
# )
# .filter(
# lambda g: g['bid'] < g['ask'] * .80
# )
,
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
print("Let's take a look!")
.rollingfrom pathlib import Path
from pandas import read_pickle, Grouper, to_datetime, IndexSlice
from pandas.tseries.offsets import Day
data_dir = Path('data')
data_dir.mkdir(exist_ok=True)
trades = read_pickle(data_dir / 'trades.pkl')
prices = read_pickle(data_dir / 'prices.pkl')
positions = read_pickle(data_dir / 'positions.pkl')
direction = read_pickle(data_dir / 'direction.pkl')
market_value = read_pickle(data_dir / 'market_value.pkl')
from numpy.random import default_rng
prices = prices.sample(frac=.40, random_state=default_rng(0)).sort_index()
print(
# positions,
# prices.unstack('ticker').rolling(7, min_periods=1).mean(),
# prices.unstack('ticker').rolling(Day(7), min_periods=1).count(),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
from pathlib import Path
from scipy.signal.windows import triang, gaussian
from numpy import ones, convolve
from pandas import read_pickle, Grouper, to_datetime, IndexSlice
from pandas.tseries.offsets import Day
data_dir = Path('data')
data_dir.mkdir(exist_ok=True)
trades = read_pickle(data_dir / 'trades.pkl')
prices = read_pickle(data_dir / 'prices.pkl')
positions = read_pickle(data_dir / 'positions.pkl')
direction = read_pickle(data_dir / 'direction.pkl')
market_value = read_pickle(data_dir / 'market_value.pkl')
print(
# prices.loc[IndexSlice[:, 'chmk'], 'bid'].rolling(3).mean(),
# convolve(
# prices.loc[IndexSlice[:, 'chmk'], 'bid'].values,
# # ones(3) / 3,
# triang(3) / sum(triang(3))
# ),
# win_type
# triang(3),
prices.loc[IndexSlice[:, 'chmk'], 'bid'].rolling(3, win_type='triang').mean(),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
from pathlib import Path
from pandas import read_pickle, IndexSlice
from pandas.api.indexers import FixedForwardWindowIndexer
from pandas.tseries.offsets import Day
data_dir = Path('data')
data_dir.mkdir(exist_ok=True)
trades = read_pickle(data_dir / 'trades.pkl')
prices = read_pickle(data_dir / 'prices.pkl')
positions = read_pickle(data_dir / 'positions.pkl')
direction = read_pickle(data_dir / 'direction.pkl')
market_value = read_pickle(data_dir / 'market_value.pkl')
print(
prices,
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
from numpy import arange
from numpy.lib.stride_tricks import as_strided
def rolling(arr, size):
return as_strided(arr, shape=(arr.shape[0] - size + 1, size, *arr.shape[1:]),
strides=(arr.strides[0], *arr.strides))
xs = arange(10)
ys = rolling(xs, size=3)
print(
# xs,
ys,
)
.expandingfrom pathlib import Path
from pandas import read_pickle, IndexSlice
data_dir = Path('data')
data_dir.mkdir(exist_ok=True)
trades = read_pickle(data_dir / 'trades.pkl')
prices = read_pickle(data_dir / 'prices.pkl')
positions = read_pickle(data_dir / 'positions.pkl')
direction = read_pickle(data_dir / 'direction.pkl')
market_value = read_pickle(data_dir / 'market_value.pkl')
print(
positions,
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
.ewmfrom pathlib import Path
from numpy import arange
from pandas import read_pickle, IndexSlice
data_dir = Path('data')
data_dir.mkdir(exist_ok=True)
trades = read_pickle(data_dir / 'trades.pkl')
prices = read_pickle(data_dir / 'prices.pkl')
positions = read_pickle(data_dir / 'positions.pkl')
direction = read_pickle(data_dir / 'direction.pkl')
market_value = read_pickle(data_dir / 'market_value.pkl')
alpha = .1
factors = lambda s: (1 - alpha)**arange(s.size, 0, -1)
# (s * (f := factor(s))).cumsum() / f.cumsum()
print(
prices.ewm(alpha=alpha).mean(),
(prices['bid'] * (f := factors(prices['bid']))).cumsum() / f.cumsum(),
f,
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
print("Let's take a look!")
VariableOffsetWindowIndexerfrom dataclasses import dataclass
from functools import wraps
from numpy import array
from numpy.random import default_rng
from pandas import Series, to_datetime, to_timedelta
from pandas.api.indexers import VariableOffsetWindowIndexer
@dataclass
class Window(VariableOffsetWindowIndexer):
@wraps(VariableOffsetWindowIndexer.get_window_bounds)
def get_window_bounds(self, num_values, min_periods=None, *_, **__):
start_indices = array([0, 1, 4, 3, 0, 0, 0])
stop_indices = array([1, 2, 5, 5, 0, 0, 0])
return start_indices, stop_indices
rng = default_rng(0)
s = Series(
data=(data := rng.normal(size=7)),
index=
to_datetime('2020-01-01')
+ to_timedelta(rng.choice([0, 1], size=len(data)).cumsum(), unit='d'),
).round(2)
print(
s,
win := Window(),
win.get_window_bounds(s),
s.rolling(win, min_periods=1).mean(),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
from dataclasses import dataclass
from functools import wraps
from pathlib import Path
from numpy import arange, array
from pandas import to_datetime, date_range, Series, Grouper, read_pickle, IndexSlice
from pandas.tseries.offsets import CustomBusinessDay
from pandas.api.indexers import VariableOffsetWindowIndexer
@dataclass
class Window(VariableOffsetWindowIndexer):
index_array : None
window_size : None
cbd = CustomBusinessDay(holidays=to_datetime(['2020-05-31']))
@wraps(VariableOffsetWindowIndexer.get_window_bounds)
def get_window_bounds(self, num_values, min_periods=None, *_, **__):
indices = Series(arange(num_values), index=self.index_array)
start = (
indices.groupby(Grouper(freq=self.cbd)).transform(lambda s: s[0])
.rolling(self.window_size, min_periods=min_periods).agg(lambda s: s[0]).fillna(0).astype(int)
)
stop = (
indices.groupby(Grouper(freq=self.cbd)).transform(lambda s: s[-1])
.rolling(self.window_size, min_periods=min_periods).agg(lambda s: s[-1]).fillna(0).astype(int)
)
return start.values, stop.values
data_dir = Path('data')
data_dir.mkdir(exist_ok=True)
trades = read_pickle(data_dir / 'trades.pkl')
prices = read_pickle(data_dir / 'prices.pkl')
positions = read_pickle(data_dir / 'positions.pkl')
direction = read_pickle(data_dir / 'direction.pkl')
market_value = read_pickle(data_dir / 'market_value.pkl')
print(
pxs := prices.loc[IndexSlice[:, 'ibba', :], 'bid'].droplevel('ticker'),
win := Window(
index_array=pxs.index.get_level_values('date'),
window_size=1,
),
win.get_window_bounds(len(pxs)),
pxs.rolling(win, min_periods=1).count(),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)