pandas Window Functions, .rolling, .expanding, .ewm)Lisa (Grace Kelly): I wish I were creative…
Jeff (Jimmy Stewart): You are! You’re great at creating difficult situations.
— Rear Window (1954)
| Date | Time | Track | Meeting Link |
|---|---|---|---|
| May 7, 2021 | 9:30 AM EST | In Depth with pandas |
Seminar I: “Rear Window” Meeting ID: 900845368 Passcode: 2953 |
These sessions are designed for a broad audience of modelers and software programmers of all background and skill-levels.
Our expected audience should comprise attendees with a…
pandaspandas.{Series,DataFrame}.groupby and grouping operations… or greater!
During this session, we will endeavour to guide our audience to developing…
.expanding, .rolling, and .ewm, including use with time series data and use of the corresponding Rolling, Window, Expanding, and ExponentialMovingWindow objects.groupby(…).transform(lambda df: df.rolling(…))… and we will share additional tips, tricks, and in-depth guidance on all of these topics!
In a previous episode, we looked at .groupby in depth, as well as the various reduction operations it supports (.apply, .transform, .aggregate.)
Let’s turn our attention to other window functions in pandas—functions which operate on “windows” of multiple rows or multiple columns to perform aggregations or other transformations.
In this episode, we’ll look at .rolling, .expanding, and .ewm, their various options and modalities, as well as the the operations available on the Window, Rolling, Expanding, and ExponentialMovingWindow objects they return. We’ll discuss these operations in the context of time series analysis and discuss performance considerations related to the use of each.
Did you enjoy this episode? Did you learn something new that will help you as you continue or begin to use window methods in your work?
If so, stay tuned for future episodes, which may…
.rolling with .tseries.offsets and other time series considerations in greater depth.numba, cython or other tools to address performance issues with non-native window operations.If there are other related topics you’d like to see covered, please reach out to Diego Torres Quintanilla.
print("Let's go!")
print("Let's go!")
print("Let's go!")
Let’s start with a pandas.DataFrame with some random data:
time is a pandas.date_range from July 4, 2021 9:00 AM ~ 1:00 PM, with some random number of minutes add to put noise on each measurement.ticker is a randomly selected four-character stringprice is a random starting value on [0, 100) plus some random noise drawn uniformly from [-5, +5)# data.py
from pandas import DataFrame, date_range, to_timedelta, Categorical
from numpy import repeat, array, tile
from random import randrange
from string import ascii_lowercase
from matplotlib.pyplot import plot, show
from pandas import Timestamp
from numpy.random import default_rng
rng = default_rng(Timestamp('2021-05-07').asm8.astype('uint32'))
markets = ['ABC', 'WXYZ']
tickers = rng.choice([*ascii_lowercase], size=(num_tickers := 5, 4)).view('<U4').ravel()
time = date_range('2021-05-07 9:00', periods=(num_periods := 5), freq='1H')
df = DataFrame({
'ticker': repeat(tickers, len(markets) * len(time)),
'market': tile(repeat(markets, len(time)), len(tickers)),
'time': tile(tile(time, len(markets)), len(tickers)) + to_timedelta(rng.integers(0, 59, size=(len(tickers)*len(time)*len(markets))), unit='T'),
'price':
repeat(rng.random(len(tickers)) * 100, len(markets) * len(time)) * repeat(rng.normal(loc=1, scale=0.01, size=len(markets) * len(tickers)), len(time))
+ rng.normal(size=(len(tickers), len(markets), len(time))).cumsum(axis=-1).ravel(),
'volume': rng.integers(100, 100_000, size=len(tickers)*len(markets)*len(time)),
})
print(
# df,
df.head(6),
)
.groupby and the .indexLet’s quickly refresh what we know about the pandas .index and the .groupby operation.
A pandas.array is a 1-D dataset containing homogeneous data structured to
allow for efficient storage and processing (using the “restricted computation
domain” idea.) The elements of this dataset can be accessed by their position
(their “integer location.”)
A pandas.Series is a pandas.array combined with a pandas.Index. The
elements of this object can be accessed by their position using the .iloc[]
API or by their “label” using the .loc[] API.
A pandas.Index is an object which provides a mechanism for converting “label”
values into “position” values. It implements the .get_loc interface. It is
used by the .loc[] API in a pandas.Series and pandas.DataFrame to allow
the contents of these datasets to be accessed by “label.”
A pandas.DataFrame is a 2-D “flat” (typically taller-than-wide) dataset, with
multiple 1- or 2-D homogeneous datasets (accessible via ._data) along with two indices:
.index which allow selection of rows.columns which allows selection of columnsNote that all operations performed between pandas.Series or
pandas.DataFrame objects are “index-aligned”. e.g., when performing s1 + s2
on two pandas.Series objects, we first align the two datasets using their
.indexes. Note that a pandas.DataFrame can thought of as a collection of
multiple 1-D datasets with a common .index.
from pandas import Series
s1 = Series({'a': 1, 'b': 2, 'c': 3})
s2 = Series({ 'b': 2, 'c': 3, 'd': 4})
print(
# s1,
# s2,
# s1.iloc[0], s2.iloc[0],
# s1.loc['a'], s2.loc['b'],
# s1 + s2,
s1.add(s2, fill_value=0),
)
from pandas import DataFrame, date_range
df1 = DataFrame({
'a': [0, 1, 2],
'b': [3, 4, 5],
}, index=date_range('2021-05-06', periods=3))
df2 = DataFrame({
'b': [6, 7, 8],
'c': [9, 10, 11],
}, index=date_range('2021-05-07', periods=3))
print(
# df1,
# df2,
# df1['a'],
# df1.loc[0],
# df1.loc[[0, 1]],
# df1.index,
# df1.columns,
# df1 + df2,
df1.add(df2, fill_value=0),
)
Here’s a minimalistic (and incomplete) example of a custom index:
from pandas import DataFrame
from dataclasses import dataclass
from string import ascii_lowercase
from itertools import product, chain, count, islice
@dataclass
class CustomIndex:
size : int
_labels = {
letter: number for letter, number in
zip((''.join(x) for x in chain.from_iterable(product(ascii_lowercase, repeat=n) for n in count(1))), range(16_384))
}
__iter__ = lambda s: islice(s._labels, s.size)
__len__ = lambda s: s.size
get_loc = _labels.get
df = DataFrame({
'value': range(size := 30),
}, index=CustomIndex(size=size))
print(
# df,
# df.tail(),
df.loc['x':'ab'],
)
The .groupby operation allows us to perform computations on either horizontal
or vertical groupings of data.
from data import df
print(
# df.head(5),
# df.groupby('ticker').mean(),
# df.groupby(['ticker', 'market']).min(),
# df.groupby('ticker').max(),
# df.groupby(['ticker', 'market']).mean(),
df.groupby(['ticker', 'market']).mean().unstack(),
)
With .groupby, we can also use the .apply, .aggregate, .transform, and
.filter methods. These largely fit into the following categories:
Let’s first look at .filter:
from data import df
print(
# df.groupby('ticker'),
# df.groupby('ticker').mean(),
# df.groupby('ticker').filter(
# lambda g: (g['price'].max() - g['price'].min()) / g['price'].max() > 0.1
# # lambda g: print(g)
# ),
# df.set_index('ticker').sort_index(),
# df.set_index(['ticker', 'market', 'time']).sort_index(),
# df.groupby(['ticker', 'time']).filter(
# lambda g: (g['price'].max() - g['price'].min()) / g['price'].max() > 0.1
# ),
# df.assign(time=df['time'].dt.floor('1H')).groupby(['ticker', 'time']).filter(
# lambda g: (g['price'].max() - g['price'].min()) / g['price'].max() > 0.1
# ),
)
Let’s next look at .transform:
In a transformation, the result must be “like-indexed.” The function using for
.transform is also restricted in the following ways:
from data import df
print(
df.groupby(['ticker', 'market'])['price'].transform(
lambda s: s / s.iloc[0]
),
# df.set_index(['ticker', 'market'])
# .groupby(['ticker', 'market'])['price']
# .transform(
# lambda s: s / s.iloc[0]
# ),
)
from pandas import Series, Categorical
from data import df
s = Series({'ABC': 1, 'WXYZ': 100})
s.index = Categorical(s.index)
s.index.name = 'market'
df['market'] = Categorical(df['market'], categories=s.index.categories)
print(
# df.head(),
# df.set_index(['ticker', 'market']).pipe(
# lambda df: df.assign(price=df['price'] * s)
# ),
)
If we want to perform an aggregation, we can use .aggregate or .agg.
.aggregate works on a column-by-column basis.
from data import df
from scipy.stats import zscore
print(
# df.groupby('ticker').mean(),
# df.groupby('ticker').agg('mean'),
# df.groupby('ticker').agg({'price': 'mean', 'volume': 'max'}),
# df.groupby(['ticker', 'market']).agg(
# lambda s: s.mean()
# ),
# df.groupby(['ticker', 'market']).agg({'price':
# lambda s: s.mean()
# }),
df.groupby(['ticker', 'market']).agg({'price':
lambda s: zscore(s)[-1]
}),
)
Finally, we have another means by which we can do aggregation operations: .apply.
.apply is extremely flexible, but it has the downside of being much slower in
practice than .aggregate or .transform. .apply takes a function which
accepts a DataFrame as its argument, returning a new DataFrame; the
.apply machinery determines how to combine the result DataFrames into a new
structure.
.apply operates on a window-by-window basis.
from data import df
print(
df.groupby(['ticker', 'market'])[['volume', 'price']].apply(
lambda df: (df['volume'] * df['price']).sum() / df['volume'].sum()
),
)
.rolling and .expanding and .ewmLet’s take a look at a computation involving an expanding window:
from data import df
from itertools import islice
from scipy.stats import zscore
print(
# df.expanding(),
# df.expanding(min_periods=1),
# df.groupby(['ticker', 'market'])['price'].transform(
# lambda s: s.expanding(min_periods=1).agg(
# lambda s: zscore(s)[-1]
# )
# ),
# df.set_index(['ticker', 'market']).groupby(['ticker', 'market'])['price'].transform(
# lambda s: s.expanding(min_periods=1).agg(
# lambda s: zscore(s)[-1]
# )
#.dropna(),
)
for g in islice(
df.expanding(min_periods=1),
3):
print(g)
# for g in islice(
# df.groupby(['ticker', 'market']).expanding(min_periods=1),
# None):
# print(g)
As an aside, let’s consider the sampling rate of our data:
from data import df
from pandas import date_range, concat
print(
# df,
# df.resample('30T'),
# df.set_index('time').resample('30T'),
# df.set_index('time').resample('30T').first(),
# df.set_index(['ticker', 'market', 'time']).resample('30T').first(),
# df.set_index(['ticker', 'market', 'time']).groupby(['ticker', 'market']).apply(
# lambda df: df.reset_index(['ticker', 'market'], drop=True).resample('30T').first()
# ).interpolate('linear'),
df.set_index(['ticker', 'market', 'time']).groupby(['ticker', 'market']).apply(
# lambda df: df.reset_index(['ticker', 'market'], drop=True).resample('30T').apply(
# lambda s: concat([s, s])
# )
lambda df: df.reset_index(['ticker', 'market'], drop=True).resample('30T').aggregate(
lambda s: s.mean()
)
),
)
Let’s take a look at a computation involving a rolling window:
from data import big_df as df
from pandas import Grouper, to_datetime
from pandas.tseries.offsets import Day, CustomBusinessDay
cbd = CustomBusinessDay(holidays=to_datetime(['2021-05-31']))
print(
# df['time'].min(), df['time'].max(),
# df.sample(5),
# df.set_index(['ticker', 'market', 'time'])
# .groupby(['ticker', 'market', Grouper(level='time', freq='D')])['price'].last()
# .groupby(['ticker', 'market']).transform(
# lambda df: df.rolling(7, min_periods=1).mean()
# ),
# df[
# (df['time'].dt.date != to_datetime('2021-05-31')) Memorial Day
# & (df['time'].dt.weekday < 6) Mon ~ Fri
# ],
# df[
# (df['time'].dt.date != to_datetime('2021-05-31')) # Memorial Day
# & (df['time'].dt.weekday < 6) # Mon ~ Fri
# ].set_index(['ticker', 'market', 'time'])
# .groupby(['ticker', 'market', Grouper(level='time', freq='D')])['price'].last()
# .groupby(['ticker', 'market']).transform(
# lambda df: df.reset_index(['ticker', 'market'], drop=True).rolling(Day(3), min_periods=1).mean()
# ),
df.set_index(['ticker', 'market', 'time'])
.groupby(['ticker', 'market', Grouper(level='time', freq='D')])['price'].last()
.groupby(['ticker', 'market']).transform(
lambda df: df.reset_index(['ticker', 'market'], drop=True).rolling(cbd, min_periods=1).mean()
),
)
Consider that any rolling-window operation can be conceived of as a convolution integral with a square “kernel.”
from data import df
from numpy import ones, convolve
for _, grp in df.groupby(['ticker', 'market']):
break
print(
# grp['price'],
# grp['price'].rolling(3).mean().dropna(),
# convolve(ones(3) / 3, grp['price'])[2:-2],
)
What about non-square “kernels”?
from scipy.signal.windows import triang, gaussian
from numpy import convolve
from data import df
for _, grp in df.groupby(['ticker', 'market']):
break
print(
# triang(3),
# gaussian(3, std=1),
convolve(grp['price'], triang(3)),
)
from data import df
for _, grp in df.groupby(['ticker', 'market']):
break
print(
grp['price'].rolling(3, win_type='triang', min_periods=1).mean(),
)
from pandas import Series
from pandas.api.indexers import FixedForwardWindowIndexer
s = Series(range(10))
window = FixedForwardWindowIndexer(window_size=3)
print(
# window.get_window_bounds(s.size),
# s.rolling(3).mean(),
s.rolling(window).mean(),
)
from pandas.api.indexers import BaseIndexer
from functools import wraps
from numpy import arange
from pandas import Series
class Window(BaseIndexer):
@wraps(BaseIndexer.get_window_bounds)
def get_window_bounds(self, num_values, *_, **__):
start = arange(num_values)
return start, (2 ** start.astype('O')).clip(0, num_values).astype(int)
s = Series(range(100))
window = Window()
print(
# window.get_window_bounds(s.size),
s.rolling(window).mean(),
)
from pandas import to_datetime, date_range, Series, Grouper
from numpy import arange, array
from pandas.tseries.offsets import CustomBusinessDay
from pandas.api.indexers import VariableOffsetWindowIndexer
from dataclasses import dataclass
from functools import wraps
from data import big_df as df
@dataclass
class Window(VariableOffsetWindowIndexer):
index_array : None
window_size : None
cbd = CustomBusinessDay(holidays=to_datetime(['2021-05-31']))
@wraps(VariableOffsetWindowIndexer.get_window_bounds)
def get_window_bounds(self, num_values, min_periods=None, *_, **__):
indices = Series(arange(num_values), index=self.index_array)
start = (
indices.groupby(Grouper(freq=self.cbd)).transform(lambda s: s[0])
.rolling(self.window_size, min_periods=min_periods).agg(lambda s: s[0]).fillna(0).astype(int)
)
stop = (
indices.groupby(Grouper(freq=self.cbd)).transform(lambda s: s[-1])
.rolling(self.window_size, min_periods=min_periods).agg(lambda s: s[-1]).fillna(0).astype(int)
)
return start.values, stop.values
for _, grp in df.groupby(['ticker', 'market']):
break
window = Window(index_array=grp['time'], window_size=1)
print(
# window.get_window_bounds(grp['price'].size),
# grp['price'].rolling(window, min_periods=1).mean(),
)
from pandas import Series
from numpy import arange
s = Series(range(100))
alpha = .1
print(
# (s * (factors := (1 - alpha)**arange(s.size, 0, -1))).cumsum()
# / factors.cumsum(),
# s.ewm(alpha=alpha).mean(),
# s.expanding(min_periods=1).agg(
# lambda s: (s * (factors := (1 - alpha)**arange(s.size, 0, -1))).sum()
# / factors.sum()
# ),
)
from data import big_df as df
for _, grp in df.groupby(['ticker', 'market']):
break
# alpha = 1 - exp(-ln(2) / halflife)
print(
# grp['price'].ewm(halflife='2 days', times=grp['time']).mean(),
# grp.set_index('time')['price'].pipe(
# lambda df: df.ewm(halflife='2 days', times=df.index).mean()
# ),
# df.set_index(['ticker', 'market', 'time'])['price']
# .pipe(lambda df: df.ewm(halflife='3 days',
# times=df.index.get_level_values('time')).mean())
# .reset_index()#.iloc[2 * 24 * 31 - 3: 2 * 24 * 31 + 3],
# df.iloc[2 * 24 * 31 - 3: 2 * 24 * 31 + 3],
# df.set_index(['ticker', 'market', 'time'])['price']
# .groupby(['ticker', 'market', 'time']).transform(
# lambda s: s.ewm(halflife='3 days', times=s.index.get_level_values('time')).mean()
# ).reset_index()#.iloc[2 * 24 * 31 - 3: 2 * 24 * 31 + 3],
)
Let’s talk a bit about performance.
from data import big_df as df, timed
from data import huge_df as df
with timed('df[column].apply(…)'):
print(
df['price'].apply(lambda x: x * 100),
# df['price'] * 100,
)
from data import big_df as df, timed
from data import huge_df as df
from pandas import Grouper
df = df.set_index(['ticker', 'market', 'time']).sort_index()['price']
with timed('df.groupby(…).apply(…)'):
print(
# df.groupby(['ticker', 'market']).mean(),
# df.groupby(['ticker', 'market']).agg('mean'),
# df.groupby(['ticker', 'market']).apply(lambda df: df.mean()),
df.groupby(['ticker', 'market', Grouper(level='time', freq='1D')]).apply(lambda df: df.mean()),
# df.groupby(['ticker', 'market', Grouper(level='time', freq='1D')]).mean()
)
from data import big_df as df, timed
# from data import huge_df as df
from pandas import Series
from xarray import DataArray
from numpy import arange
df = df.set_index(['ticker', 'market'])[['price']]
with timed('rolling & expanding'):
print(
# df.groupby(level=['ticker', 'market']).transform(
# lambda s: s.rolling(3, min_periods=1).mean()
# ),
# df.groupby(level=['ticker', 'market']).transform(
# lambda s: s.rolling(3, min_periods=1).apply(lambda g: g.mean())
# ),
# df.groupby(level=['ticker', 'market']).transform(
# lambda s: s.expanding().apply(lambda g: g.mean())
# ),
# df.groupby(level=['ticker', 'market']).transform(
# lambda s: s.expanding().mean()
# ),
# df.groupby(level=['ticker', 'market']).transform(
# lambda s: s.expanding().apply(lambda g: g.mean(), raw=True)
# ),
# ((da := DataArray(df['price'].values.reshape(len(df.index.levels[0]), len(df.index.levels[1]), -1),
# dims=[*df.index.names, 'time']))
# .cumsum(dim='time') / arange(1, len(da.coords['time']) + 1)),
Series(((da := DataArray(df['price'].values.reshape(len(df.index.levels[0]), len(df.index.levels[1]), -1),
dims=[*df.index.names, 'time']))
.cumsum(dim='time') / arange(1, len(da.coords['time']) + 1)).data.ravel(),
index=df.index).sort_index(),
)
from numpy import arange
from numpy.lib.stride_tricks import as_strided
def rolling(arr, size):
return as_strided(arr, shape=(arr.shape[0] - size + 1, size, *arr.shape[1:]),
strides=(arr.strides[0], *arr.strides))
xs = arange(10)
ys = rolling(xs, size=3)
print(
# xs,
ys,
)
from data import big_df as df, timed
# from data import huge_df as df
from xarray import DataArray
from numpy.lib.stride_tricks import as_strided
def rolling(arr, size):
return as_strided(arr, shape=(*arr.shape[:-1], arr.shape[-1] - size + 1, size),
strides=(*arr.strides, arr.strides[-1]))
df = df.set_index(['ticker', 'market'])[['price']]
with timed('rolling'):
print(
# df.groupby(['ticker', 'market']).transform(
# lambda s: s.rolling(3).mean()
# ).dropna(),
# (da := DataArray(
# rolling(
# df['price'].values.reshape(len(df.index.levels[0]), len(df.index.levels[1]), -1),
# size=3,
# ),
# dims=[*df.index.names, 'time', 'win'],
# coords={k: v for k, v in zip(df.index.names, df.index.levels)},
# ))#.sel(market='ABC', ticker='oywj')
# (da := DataArray(
# rolling(
# df['price'].values.reshape(len(df.index.levels[0]), len(df.index.levels[1]), -1),
# size=3,
# ),
# dims=[*df.index.names, 'time', 'win'],
# coords={k: v for k, v in zip(df.index.names, df.index.levels)},
# )).mean(dim='win')
# df.groupby(['ticker', 'market']).transform(
# lambda s: s.rolling(3).apply(lambda s: s.mean())
# ).dropna(),
)
from numpy.ma import masked_array
from numpy import arange, eye
from numpy.lib.stride_tricks import as_strided
def expanding(arr):
mask = eye(arr.shape[0]+1).cumsum(axis=1).astype(bool)[1:, :arr.shape[0]]
return masked_array(as_strided(arr, shape=(*arr.shape, arr.shape[0]),
strides=(0, *arr.strides)),
mask=mask)
xs = arange(10)
ys = expanding(xs)
print(
# xs,
ys,
# ys.sum(axis=1),
)
from data import df, timed
from data import big_df as df
from xarray import DataArray
from numpy.lib.stride_tricks import as_strided
from numpy.ma import masked_array
from numpy import arange, eye, tile
def expanding(arr):
mask = tile(
eye(arr.shape[-1]+1).cumsum(axis=1).astype(bool)[1:, :-1],
arr.shape[:-1],
).reshape(*arr.shape, arr.shape[-1])
return masked_array(as_strided(arr, shape=(*arr.shape, arr.shape[-1]),
strides=(*arr.strides[:-1], 0, arr.strides[-1])),
mask=mask)
df = df.set_index(['ticker', 'market'])[['price']]
with timed('rolling'):
print(
# df.groupby(['ticker', 'market']).transform(
# lambda s: s.expanding().agg(lambda s: s.mean())
# ).tail(),
# expanding(
# df['price'].values.reshape(len(df.index.levels[0]), len(df.index.levels[1]), -1),
# ).mean(axis=-1)
)