ts-python

Seminar XII: Seeing pandas in the window: window operations in pandas .rolling, .expanding, .ewm

OTTO MANN: “Oh, wow! Windows! I don’t think I can afford this place…”

— “You Only Move Twice” (S08E02)

Abstract

What’s the difference between a windowed operation and a grouped operation? How do I effectively leverage pandas the pandas window API? Under what circumstances will a windowed operation be fast or slow?

Do you…

Then come join us for a session on pandas window operations!

pandas has a plethora of ways to slice, dice, and analyze your data. The most common data transformations are aggregations. The ability to describe a large dataset with a just a few numbers is extremely powerful for understanding your data. But what happens is we over aggregate, mistakenly throwing out real signal for the convenience of working with aggregated values? Thankfully window operations are here to help.

In this episode, we’ll look at .rolling, .expanding, and .ewm as well as their various options. These DataFrame methods are just the beginning though, as we’ll also explore the Window, Rolling, Expanding, and ExponentialMovingWindow objects they return. We’ll discuss the uses for these objects, how you can customize them, and how you can ensure your operations are applied efficiently and remain within the Restricted Computation Domain.

Keywords

Notes

Premise

print("Let's take a look!")

The Data

from pandas import DataFrame, period_range, MultiIndex, IndexSlice, to_timedelta, Series, Categorical, to_datetime
from numpy import tile, where, sign
from numpy.random import default_rng
from string import ascii_lowercase
from pathlib import Path
from textwrap import dedent

rng = default_rng(0)

tickers = rng.choice([*ascii_lowercase], size=(10, 4)).view('<U4').ravel()
industries = ['retail', 'finance', 'healthcare', 'tech', 'energy', 'entertainment']
days = period_range('2020-01-01', periods=90, freq='D')
portfolios = ['Alice', 'Bob', 'Charlie', 'Dana']

infos = Series(rng.choice(industries, size=len(tickers)), index=tickers, name='industry').rename_axis('ticker').sort_index()

prices = DataFrame(
    index=(idx := MultiIndex.from_product([
        days,
        tickers,
    ], names=['date', 'ticker'])),
    data={
        'bid': (bid := (
            rng.normal(loc=250, scale=50, size=len(tickers)).clip(0, 500)
          * rng.normal(loc=1, scale=.02, size=(len(days), len(tickers))).cumprod(axis=-1).clip(0, 5)
        ).ravel().round(2)),
        'ask': (ask := (
            bid * rng.normal(loc=1.25, scale=.05, size=len(idx)).clip(.99, 2)
        ).round(2)),
    },
).sort_index()

trades = DataFrame(
    index=(idx := MultiIndex.from_product([
        portfolios,
        tile(days, 50),
        tickers,
    ], names=['portfolio', 'date', 'ticker'])),
    data={
        'volume': (volumes :=
           rng.normal(loc=0, scale=250_000, size=len(idx)).round(-2).astype('int')
        ),
        'price': (
            where(
                volumes > 0,
                (px := prices.loc[
                    MultiIndex.from_arrays([
                        idx.get_level_values('date'),
                        idx.get_level_values('ticker'),
                    ])
                ]).loc[:, 'ask'],
                px.loc[:, 'bid'],
            )
            * rng.normal(loc=1, scale=.15, size=len(idx)).clip(.5, 1.5)
        ).round(2),
    },
).loc[lambda df: df['volume'] != 0].pipe(
    lambda df: df.set_axis(
        MultiIndex.from_arrays([
            df.index.get_level_values('portfolio'),
            df.index.get_level_values('date').to_timestamp()
            + to_timedelta(rng.normal(loc=12*60*60, scale=3*60*60, size=len(df.index)).round().clip(0, 24*60*60), unit='s'),
            df.index.get_level_values('ticker'),
        ], names=df.index.names)
    )
).sample(frac=.25, random_state=rng).sort_index()

positions = trades.groupby(['portfolio', 'ticker'])['volume'].cumsum().rename('position')
direction = sign(positions).map({-1: 'bid', 1: 'ask', 0: 'ask'}).rename('direction')
market_value = (
    prices.loc[
        prices.index.get_level_values('date').max()
    ].stack().rename_axis(['ticker', 'direction']).loc[
        MultiIndex.from_arrays([
            direction.index.get_level_values('ticker'),
            direction.values,
        ])
    ].droplevel('direction')
    .pipe(lambda s:
          positions.to_frame().assign(price=s.values)
    )
    .product(axis='columns')
)

print(
    # trades,
    # prices,
    # trades.head(),
    # prices.head(),
    # positions.head(),
    # direction.head(),
    # market_value.head(),
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

data_dir = Path('data')
data_dir.mkdir(exist_ok=True)
trades.to_pickle(data_dir / 'trades.pkl')
prices.to_pickle(data_dir / 'prices.pkl')
positions.to_pickle(data_dir / 'positions.pkl')
direction.to_pickle(data_dir / 'direction.pkl')
market_value.to_pickle(data_dir / 'market_value.pkl')
from pandas import read_pickle
from pathlib import Path

data_dir = Path('data')
data_dir.mkdir(exist_ok=True)

trades = read_pickle(data_dir / 'trades.pkl')
prices = read_pickle(data_dir / 'prices.pkl')
positions = read_pickle(data_dir / 'positions.pkl')
direction = read_pickle(data_dir / 'direction.pkl')
market_value = read_pickle(data_dir / 'market_value.pkl')

print(
    prices * 10,
    # trades.head(),
    # prices.head(),
    # positions.head(),
    # direction.head(),
    # market_value.head(),
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

Non-Overlapping Entries

print("Let's take a look!")

The Essence of .groupby

from pandas import read_pickle
from pathlib import Path

data_dir = Path('data')
data_dir.mkdir(exist_ok=True)

trades = read_pickle(data_dir / 'trades.pkl')
prices = read_pickle(data_dir / 'prices.pkl')
positions = read_pickle(data_dir / 'positions.pkl')
direction = read_pickle(data_dir / 'direction.pkl')
market_value = read_pickle(data_dir / 'market_value.pkl')

print(
    positions.groupby(['portfolio', 'ticker']).max(),
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
.groupby with built-ins
from pandas import read_pickle, IndexSlice, Grouper
from pathlib import Path

data_dir = Path('data')
data_dir.mkdir(exist_ok=True)

trades = read_pickle(data_dir / 'trades.pkl')
prices = read_pickle(data_dir / 'prices.pkl')
positions = read_pickle(data_dir / 'positions.pkl')
direction = read_pickle(data_dir / 'direction.pkl')
market_value = read_pickle(data_dir / 'market_value.pkl')

from numpy.random import default_rng

rng = default_rng(0)

print(
    positions,
    prices.loc[IndexSlice[:, ['ibba', 'evqx'], :], 'bid'].pipe(
        lambda s: s.groupby(
            ['date']
            # s.index.get_level_values('date').asfreq('M')
            # rng.choice([True, False], size=len(s))
            # Grouper(level='date', freq='M')
            # lambda x: x[0].asfreq('M')
        # ).cumsum()
        # ).agg(max)
        # ).agg('max')
        # ).agg(np.max)
        ).agg(lambda g: max(g)/mean(g))
    ),
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
.groupby with user-defined functions
from pandas import read_pickle
from pathlib import Path

data_dir = Path('data')
data_dir.mkdir(exist_ok=True)

trades = read_pickle(data_dir / 'trades.pkl')
prices = read_pickle(data_dir / 'prices.pkl')
positions = read_pickle(data_dir / 'positions.pkl')
direction = read_pickle(data_dir / 'direction.pkl')
market_value = read_pickle(data_dir / 'market_value.pkl')

print(
    prices
        .groupby(['ticker'])

        # .agg operates on only one column a time
        # .agg changes your indexing to match the grouping
        # .agg will reduce (“aggregate”) your data: for every partition (grouping) → one scalar result value
        # .agg(
        #     lambda g: g.max() - g.mean()
        # )

        # .transform operates on only one column at a time
        # .transform will not chagne your indexing
        # .transform will map (“transform”) your data: for every original row → one result row
        # .transform(
        #     lambda g: g.cummax()
        # )

        # .apply operates on all columns at the same time
        # .apply will change the indexing: append the grouping as the outermost level of your indexing to
        #                                  whatever the UDF returns
        # .apply will perform an arbitrary operation on your data: there is no a prior relationship between
        #                                                          number of input & output rows
        # .apply(
        #     lambda g: g[g['bid'] < g['ask'] * .80]
        # )

        # .filter(
        #     lambda g: g['bid'] < g['ask'] * .80
        # )
    ,
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

Overlapping Entries

print("Let's take a look!")

The Essence of .rolling

from pathlib import Path

from pandas import read_pickle, Grouper, to_datetime, IndexSlice
from pandas.tseries.offsets import Day

data_dir = Path('data')
data_dir.mkdir(exist_ok=True)

trades = read_pickle(data_dir / 'trades.pkl')
prices = read_pickle(data_dir / 'prices.pkl')
positions = read_pickle(data_dir / 'positions.pkl')
direction = read_pickle(data_dir / 'direction.pkl')
market_value = read_pickle(data_dir / 'market_value.pkl')

from numpy.random import default_rng
prices = prices.sample(frac=.40, random_state=default_rng(0)).sort_index()

print(
    # positions,
    # prices.unstack('ticker').rolling(7, min_periods=1).mean(),
    # prices.unstack('ticker').rolling(Day(7), min_periods=1).count(),
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
from pathlib import Path

from scipy.signal.windows import triang, gaussian
from numpy import ones, convolve

from pandas import read_pickle, Grouper, to_datetime, IndexSlice
from pandas.tseries.offsets import Day

data_dir = Path('data')
data_dir.mkdir(exist_ok=True)

trades = read_pickle(data_dir / 'trades.pkl')
prices = read_pickle(data_dir / 'prices.pkl')
positions = read_pickle(data_dir / 'positions.pkl')
direction = read_pickle(data_dir / 'direction.pkl')
market_value = read_pickle(data_dir / 'market_value.pkl')

print(
    # prices.loc[IndexSlice[:, 'chmk'], 'bid'].rolling(3).mean(),
    # convolve(
    #     prices.loc[IndexSlice[:, 'chmk'], 'bid'].values,
    #     # ones(3) / 3,
    #     triang(3) / sum(triang(3))
    # ),
    # win_type
    # triang(3),
    prices.loc[IndexSlice[:, 'chmk'], 'bid'].rolling(3, win_type='triang').mean(),
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
from pathlib import Path

from pandas import read_pickle, IndexSlice
from pandas.api.indexers import FixedForwardWindowIndexer
from pandas.tseries.offsets import Day

data_dir = Path('data')
data_dir.mkdir(exist_ok=True)

trades = read_pickle(data_dir / 'trades.pkl')
prices = read_pickle(data_dir / 'prices.pkl')
positions = read_pickle(data_dir / 'positions.pkl')
direction = read_pickle(data_dir / 'direction.pkl')
market_value = read_pickle(data_dir / 'market_value.pkl')

print(
    prices,
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
from numpy import arange
from numpy.lib.stride_tricks import as_strided

def rolling(arr, size):
    return as_strided(arr, shape=(arr.shape[0] - size + 1, size, *arr.shape[1:]),
                          strides=(arr.strides[0], *arr.strides))

xs = arange(10)
ys = rolling(xs, size=3)
print(
    #  xs,
    ys,
)

The Essence of .expanding

from pathlib import Path

from pandas import read_pickle, IndexSlice

data_dir = Path('data')
data_dir.mkdir(exist_ok=True)

trades = read_pickle(data_dir / 'trades.pkl')
prices = read_pickle(data_dir / 'prices.pkl')
positions = read_pickle(data_dir / 'positions.pkl')
direction = read_pickle(data_dir / 'direction.pkl')
market_value = read_pickle(data_dir / 'market_value.pkl')

print(
    positions,
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

The Essence of .ewm

from pathlib import Path

from numpy import arange
from pandas import read_pickle, IndexSlice

data_dir = Path('data')
data_dir.mkdir(exist_ok=True)

trades = read_pickle(data_dir / 'trades.pkl')
prices = read_pickle(data_dir / 'prices.pkl')
positions = read_pickle(data_dir / 'positions.pkl')
direction = read_pickle(data_dir / 'direction.pkl')
market_value = read_pickle(data_dir / 'market_value.pkl')

alpha = .1
factors = lambda s: (1 - alpha)**arange(s.size, 0, -1)
# (s * (f := factor(s))).cumsum() / f.cumsum()

print(
    prices.ewm(alpha=alpha).mean(),
    (prices['bid'] * (f := factors(prices['bid']))).cumsum() / f.cumsum(),
    f,
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

Arbitrary (Contiguous) Entries

print("Let's take a look!")

VariableOffsetWindowIndexer

from dataclasses import dataclass
from functools import wraps

from numpy import array
from numpy.random import default_rng
from pandas import Series, to_datetime, to_timedelta
from pandas.api.indexers import VariableOffsetWindowIndexer

@dataclass
class Window(VariableOffsetWindowIndexer):
    @wraps(VariableOffsetWindowIndexer.get_window_bounds)
    def get_window_bounds(self, num_values, min_periods=None, *_, **__):
        start_indices = array([0, 1, 4, 3, 0, 0, 0])
        stop_indices  = array([1, 2, 5, 5, 0, 0, 0])
        return start_indices, stop_indices

rng = default_rng(0)
s = Series(
    data=(data := rng.normal(size=7)),
    index=
        to_datetime('2020-01-01')
        + to_timedelta(rng.choice([0, 1], size=len(data)).cumsum(), unit='d'),
).round(2)

print(
    s,
    win := Window(),
    win.get_window_bounds(s),
    s.rolling(win, min_periods=1).mean(),
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
from dataclasses import dataclass
from functools import wraps
from pathlib import Path

from numpy import arange, array
from pandas import to_datetime, date_range, Series, Grouper, read_pickle, IndexSlice
from pandas.tseries.offsets import CustomBusinessDay
from pandas.api.indexers import VariableOffsetWindowIndexer

@dataclass
class Window(VariableOffsetWindowIndexer):
    index_array : None
    window_size : None
    cbd = CustomBusinessDay(holidays=to_datetime(['2020-05-31']))

    @wraps(VariableOffsetWindowIndexer.get_window_bounds)
    def get_window_bounds(self, num_values, min_periods=None, *_, **__):
        indices = Series(arange(num_values), index=self.index_array)
        start = (
            indices.groupby(Grouper(freq=self.cbd)).transform(lambda s: s[0])
                .rolling(self.window_size, min_periods=min_periods).agg(lambda s: s[0]).fillna(0).astype(int)
        )
        stop = (
            indices.groupby(Grouper(freq=self.cbd)).transform(lambda s: s[-1])
                .rolling(self.window_size, min_periods=min_periods).agg(lambda s: s[-1]).fillna(0).astype(int)
        )
        return start.values, stop.values


data_dir = Path('data')
data_dir.mkdir(exist_ok=True)

trades = read_pickle(data_dir / 'trades.pkl')
prices = read_pickle(data_dir / 'prices.pkl')
positions = read_pickle(data_dir / 'positions.pkl')
direction = read_pickle(data_dir / 'direction.pkl')
market_value = read_pickle(data_dir / 'market_value.pkl')

print(
    pxs := prices.loc[IndexSlice[:, 'ibba', :], 'bid'].droplevel('ticker'),
    win := Window(
        index_array=pxs.index.get_level_values('date'),
        window_size=1,
    ),
    win.get_window_bounds(len(pxs)),
    pxs.rolling(win, min_periods=1).count(),
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)