ts-python

applied pandas II: Let’s “get in shape” with groupby!

Seminar (Fri Jan 22, 2021; 9:30 AM EST)

Keywords: pandas.DataFrame.groupby, GroupBy.apply, GroupBy.aggregate, GroupBy.transform

Presenter James Powell james@dutc.io
Date Friday, January 22, 2021
Time 9:30 AM EST
print("Let's go!")
print("Let's go!")
print("Let's go!")

Part I: All About .groupby

Let’s start with a pandas.DataFrame with some random data:

from pandas import DataFrame, date_range, to_timedelta
from numpy import repeat, array, tile
from numpy.random import choice, random, randint
from random import randrange
from string import ascii_lowercase
from datetime import timedelta

df = DataFrame({
    'time':   repeat(date_range('2021-07-04 9:00', periods=(size := 5), freq='1H'), (rpt := 4))
            + to_timedelta(randint(0, 59, size=(size*rpt)), unit='T'),
    'ticker': tile(choice([*ascii_lowercase], size=(size, 4)).view('<U4').ravel(), rpt),
    'price':  tile(random(size) * 100, rpt) + ((random(size*rpt) - .5) * 10),
})
print(df.head(3))

We can use the .groupby method on this to perform operations on groups of ticker.

In this case, .groupby method returns a DataFrameGroupBy object that we can call further methods on to operate on the groups.

_print = lambda expr: print(f' {expr} '.center(50, '-'), eval(expr), sep='\n')

from pandas import DataFrame, date_range, to_timedelta
from numpy import repeat, array, tile
from numpy.random import choice, random, randint
from random import randrange
from string import ascii_lowercase
from datetime import timedelta

df = DataFrame({
    'time':   repeat(date_range('2021-07-04 9:00', periods=(size := 5), freq='1H'), (rpt := 4))
            + to_timedelta(randint(0, 59, size=(size*rpt)), unit='T'),
    'ticker': tile(choice([*ascii_lowercase], size=(size, 4)).view('<U4').ravel(), rpt),
    'price':  tile(random(size) * 100, rpt) + ((random(size*rpt) - .5) * 10),
})

print(f'{df.groupby("ticker") = }')
_print('df.groupby("ticker").sum().head(2)')

The pandas.DataFrameGroupBy object defines a set of operations that can be performed across on all of the groups, some of which are implemented as high-performance vectorised operations.

Some simple operations supported are:

These perform the specified reduction/aggregation operation on all relevant columns. If there are non-numeric columns, some operations may not be performed: e.g., .sum() will sum within the groups for all numeric columns and skip non-numeric columns; .size will return a single column with the size of the group; .count will count non-NaN values for all columns; .min and .max will find the minimum and maximum value for all comparable columns.

The return value will be a new pandas.Series or pandas.DataFrame where the data is the result of the desired operation and the index is the group values (potentially a MultiIndex when grouping multiple values.) A pandas.DataFrame if there are multiple columns resulting from the desired operation; a pandas.Series if there is only one column.

_print = lambda expr: print(f' {expr} '.center(50, '-'), eval(expr), sep='\n')

from pandas import DataFrame, date_range, to_timedelta
from numpy import repeat, array, tile
from numpy.random import choice, random, randint
from random import randrange
from string import ascii_lowercase
from datetime import timedelta

from decimal import Decimal
df = DataFrame({
    'time':    repeat(date_range('2021-07-04 9:00', periods=(size := 5), freq='1H'), (rpt := 4))
             + to_timedelta(randint(0, 59, size=(size*rpt)), unit='T'),
    'ticker':  tile(choice([*ascii_lowercase], size=(size, 4)).view('<U4').ravel(), rpt),
    'price':   tile(random(size) * 100, rpt) + ((random(size*rpt) - .5) * 10),
    'signal':  randint(10, size=size*rpt),
    'signal2': [Decimal(0)] * (size*rpt)
})

print(f'{[x for x in dir(df.groupby("ticker")) if not x.startswith("_")] = }')
for meth in 'count min max sum var std size'.split():
    _print(f'df.groupby("ticker").{meth}().head(2)')

With the .as_index=False flag, you can suppress the assignment of the group labels to the index.

_print = lambda expr: print('-' * 50, f' {expr} '.center(50, '-'), '-' * 50, eval(expr), sep='\n')

from pandas import DataFrame
from numpy.random import randint, choice
from string import ascii_lowercase, ascii_uppercase

index = choice([*ascii_lowercase],
               size=(size:=10, 2)).view('<U2').ravel()
df = DataFrame({
    'x': randint(0, 10, size=size),
    'y': randint(0, 10, size=size),
}, index=index).sort_index()

_print('df.sample(2)')
_print("df.groupby(level=0, as_index=True).sum().head(2)")
_print("df.groupby(level=0, as_index=False).sum().head(2)")

Note that the .groupby has an axis= parameter. While we typically group down rows, it is also possible to group across columns! (This might be useful for joining similarly named columns.)

For the rest of the notes, we’ll ignore this possibility and assume we only want grouping of rows (i.e., axis=0.)

_print = lambda expr: print(f' {expr} '.center(50, '-'), eval(expr), sep='\n')

from pandas import DataFrame, date_range, to_timedelta
from numpy import repeat, array, tile
from numpy.random import choice, random, randint
from random import randrange
from string import ascii_lowercase
from datetime import timedelta

df = DataFrame({
    'time':    repeat(date_range('2021-07-04 9:00', periods=(size := 5), freq='1H'), (rpt := 4))
             + to_timedelta(randint(0, 59, size=(size*rpt)), unit='T'),
    'ticker':  tile(choice([*ascii_lowercase], size=(size, 4)).view('<U4').ravel(), rpt),
    'price':   tile(random(size) * 100, rpt) + ((random(size*rpt) - .5) * 10),
    'sig1':    randint(10, size=size*rpt),
    'sig2':    randint(10, size=size*rpt),
    'sig3':    randint(10, size=size*rpt),
})
df.columns = *df.columns[:-3], 'sig', 'sig', 'sig'

_print('df.head(3)')
_print('df.groupby(level=0, axis=1).sum().head(3)')
_print('df["sig"].T.iloc[0].T')

We can peform a .groupby grouping by a single column, by multiple columns, or on the index!

A considerable amount of time for a .groupby operation is spent sorting the data. If the data is pre-sorted, then we can disable this sorting with a sort=False flag at significant performance gain.

Note where the time is spent in each of the below:

from itertools import islice, tee
from time import sleep, perf_counter
from contextlib import contextmanager
from dataclasses import dataclass

nwise = lambda g, n=2: zip(*(islice(g, i, None) for i, g in enumerate(tee(g, n))))

@dataclass
class Timing:
    message  : str
    start    : int  = None
    stop     : int  = None
    sections : list = None
    def __post_init__(self):
        if self.sections is None:
            self.sections = []
    def traverse(self):
        yield self
        for x in self.sections:
            yield from x.traverse()
    @property
    def elapsed(self):
        if self.start is not None and self.stop is not None:
            return self.stop - self.start
            
@contextmanager
def timed(msg, timing):
    timing.sections.append(Timing(msg))
    try:
        timing.sections[-1].start = perf_counter()
        yield timing.sections[-1]
    finally:
        timing.sections[-1].stop = perf_counter()
            
overall_timing = t = Timing('<overall>')
all_results = []

from pandas import DataFrame
from string import ascii_lowercase
from numpy.random import normal, choice

raw = DataFrame({
    'a': choice([*ascii_lowercase], size=((SIZE := 250_000), 4))
            .view('<U4')
            .ravel(),
    'b': choice([*ascii_lowercase], size=(SIZE, 4))
            .view('<U4')
            .ravel(),
    'x': normal(size=SIZE),
    'y': normal(size=SIZE),
})

df = raw.copy(deep=True)
with timed('groupby on data', t) as t:
    rv = df.groupby(['a', 'b']).sum()
all_results.append(rv)

df = raw.copy(deep=True)
with timed('groupby on data (w/ sort=False)', t) as t:
    with timed('  .sort_values', t) as t:
        df  = df.sort_values(['a', 'b'])
    with timed('  .groupby', t) as t:
        rv = df.groupby(['a', 'b'], sort=False).sum()
all_results.append(rv)

df = raw.copy(deep=True)
with timed('groupby on index', t) as t:
    with timed('  .set_index', t) as t:
        df = df.set_index(['a', 'b'])
    with timed('  .sort_index', t) as t:
        df = df.sort_index()
    with timed('  .groupby', t) as t:
        rv = df.groupby(level=[0, 1]).sum()
all_results.append(rv)

for t in overall_timing.traverse():
    if t.elapsed:
        print(f'{t.message:<32} \N{mathematical bold capital delta}t: {t.elapsed:.4f}s')
assert all((x == y).all().all() for x, y in nwise(all_results))
print('All results identical.')

There is also a pandas.Series.GroupBy object in addition to the pandas.DataFrameGroupBy. This represent a .groupby operation being performed on a single pandas.Series of data:

from pandas import DataFrame
from numpy.random import normal, choice
from string import ascii_lowercase

index = choice([*ascii_lowercase], size=(size:=10, 2)).view('<U2').ravel()
df = DataFrame({
    'x': normal(size=size),
    'y': normal(size=size),
}, index=index).sort_index()

print(f'{df["x"].groupby(level=0) = }')

.groupby can also group by a piece of data that is not contained in the DataFrame or Series:

_print = lambda expr: print(f' {expr} '.center(50, '-'), eval(expr), sep='\n')

from pandas import DataFrame
from numpy.random import randint, choice
from string import ascii_lowercase

index = choice([*ascii_lowercase], size=(size:=10, 2)).view('<U2').ravel()
df = DataFrame({
    'x': randint(0, 10, size=size),
    'y': randint(0, 10, size=size),
}, index=index).sort_index()

_print('df.head(3)')
_print("df.groupby(df['x'] < df['y']).sum()")
_print = lambda expr: print('-' * 50, f' {expr} '.center(50, '-'), '-' * 50, eval(expr), sep='\n')

from pandas import DataFrame
from numpy.random import randint, choice
from string import ascii_lowercase, ascii_uppercase

index = choice([*ascii_lowercase, *ascii_uppercase],
               size=(size:=10, 4)).view('<U4').ravel()
df = DataFrame({
    'x': randint(0, 10, size=size),
    'y': randint(0, 10, size=size),
}, index=index).sort_index()

_print('df.sample(2)')
_print("df.groupby(lambda idx_val: idx_val.lower()[:2]).sum().head(2)")
_print("df.groupby(df.index.str.lower().str[:2]).sum().head(2)")

If we were to .groupby two fields, and we want the result to be non-“flat”; in other words, we want one of the fields to be the row index, the other field to be the column index, and the grouped function values to be at the intersection of these, then we can do a .groupby(…).unstack().

There is also a short-hand for this operation: .pivot_table() which has the advantage of not converting the data values to a floating point if there are missing intersections of the two groups:

_print = lambda expr: print('-' * 50, f' {expr} '.center(50, '-'), '-' * 50, eval(expr), sep='\n')

from pandas import DataFrame, MultiIndex
from numpy.random import randint
from sys import maxsize

index = MultiIndex.from_tuples([
    ('a', 'aa', 'aaa'),
    ('a', 'bb', 'aaa'),
    ('a', 'bb', 'abb'),
    ('b', 'aa', 'aaa'),
    ('b', 'bb', 'aaa'),
    ('b', 'cc', 'aaa'),
], names='A B C'.split())
df = DataFrame({
    'x': randint(0, 10, size=index.size),
    'y': 2**53 + 1,
}, index=index)

_print('df.head(3)')
#  _print('df.groupby(level=(1, 2)).count()')
#  _print('df.groupby(level=(1, 2)).count().unstack()')
_print('df.groupby(level=(1, 2)).max().unstack().fillna(0).convert_dtypes()')
_print('''(
    df.pivot_table(
        index      = 'B',
        columns    = 'C',
        values     = ['x', 'y'],
        aggfunc    = {'x': ['max', 'min'], 'y': 'max'},
        fill_value = 0
    )
)''')

Part II: All About .groupby(…).{aggregate,apply,filter,transform}

Going back to our data from before, we may want to perform a more complex operation on the groups than a simple .count or .sum.

We can iterate over the groups directly, and perform operations in Python (though this should be reserved for cases where the number of groups is on the order of the programme structure, not on the order of the programme computation.)

_print = lambda expr: print(f' {expr} '.center(50, '-'), eval(expr), sep='\n')

from pandas import DataFrame, date_range, to_timedelta
from numpy import repeat, array, tile
from numpy.random import choice, random, randint
from random import randrange
from string import ascii_lowercase
from datetime import timedelta

df = DataFrame({
    'time':    repeat(date_range('2021-07-04 9:00', periods=(size := 5), freq='1H'), (rpt := 4))
             + to_timedelta(randint(0, 59, size=(size*rpt)), unit='T'),
    'ticker':  tile(choice([*ascii_lowercase], size=(size, 4)).view('<U4').ravel(), rpt),
    'price':   tile(random(size) * 100, rpt) + ((random(size*rpt) - .5) * 10),
    'signal':  randint(10, size=size*rpt),
}).set_index(['ticker', 'time']).sort_index()

_print('df.head(3)')
for lbl, grp in df.groupby('ticker'):
    print(f'{lbl = } {type(grp) = }')
    print(grp.head(3))
    break

We can also use the .apply, .aggregate, .transform, and .filter methods. These largely fit into the following categories:

Let’s start with filtration, using .filter:

_print = lambda expr: print('-' * 50, f' {expr} '.center(50, '-'), '-' * 50, eval(expr), sep='\n')

from pandas import DataFrame, date_range, to_timedelta
from numpy import repeat, array, tile
from numpy.random import choice, random, randint
from random import randrange
from string import ascii_lowercase
from datetime import timedelta

df = DataFrame({
    'time':    repeat(date_range('2021-07-04 9:00', periods=(size := 5), freq='1H'), (rpt := 4))
             + to_timedelta(randint(0, 59, size=(size*rpt)), unit='T'),
    'ticker':  tile(choice([*ascii_lowercase], size=(size, 4)).view('<U4').ravel(), rpt),
    'price':   tile(random(size) * 100, rpt) + ((random(size*rpt) - .5) * 10),
    'signal':  randint(10, size=size*rpt),
}).set_index(['ticker', 'time']).sort_index()

_print('df.head(10)')
_print('''(
    df.groupby("ticker").filter(
        lambda g: (g['signal'].max() - g['signal'].min()) < 6
    ).head(2)
)''')

In a transformation, the result must be “like-indexed.” Let’s take a look at using .transform.

The function using for .transform is restricted in the following ways:

_print = lambda expr: print('-' * 50, f' {expr} '.center(50, '-'), '-' * 50, eval(expr), sep='\n')

from pandas import DataFrame, date_range, to_timedelta
from numpy import repeat, array, tile
from numpy.random import choice, random, randint
from random import randrange
from string import ascii_lowercase
from datetime import timedelta

df = DataFrame({
    'time':    repeat(date_range('2021-07-04 9:00', periods=(size := 5), freq='1H'), (rpt := 4))
             + to_timedelta(randint(0, 59, size=(size*rpt)), unit='T'),
    'ticker':  tile(choice([*ascii_lowercase], size=(size, 4)).view('<U4').ravel(), rpt),
    'price':   tile(random(size) * 100, rpt) + ((random(size*rpt) - .5) * 10),
    'signal':  randint(10, size=size*rpt),
}).set_index(['ticker', 'time']).sort_index()

_print('df.head(2)')
_print('''(
    df.groupby("ticker").transform(
        lambda g: g.round(-1)
    ).head(2)
)''')
_print('''(
    df.groupby("ticker").transform(
        lambda g: g.round(-1).iloc[0]
    ).head(10)
)''')
#  _print('''(
#      df.groupby("ticker").transform(
#          lambda g: g.round(-1).iloc[:-1]
#      ).head(2)
#  )''')
_print('''(
    df.groupby("ticker").transform(
        lambda g: round(g, -1 if g.name == 'signal' else -2)
    ).head(2)
)''')

If we want to perform an aggregation, we can use .aggregate or .agg:

_print = lambda expr: print('-' * 50, f' {expr} '.center(50, '-'), '-' * 50, eval(expr), sep='\n')

from pandas import DataFrame, date_range, to_timedelta, NamedAgg
from numpy import repeat, array, tile
from numpy.random import choice, random, randint
from random import randrange
from string import ascii_lowercase
from datetime import timedelta

df = DataFrame({
'time':    repeat(date_range('2021-07-04 9:00', periods=(size := 5), freq='1H'), (rpt := 4))
         + to_timedelta(randint(0, 59, size=(size*rpt)), unit='T'),
'ticker':  tile(choice([*ascii_lowercase], size=(size, 4)).view('<U4').ravel(), rpt),
'price':   tile(random(size) * 100, rpt) + ((random(size*rpt) - .5) * 10),
'signal':  randint(10, size=size*rpt),
}).set_index(['ticker', 'time']).sort_index()

_print('df.head(2)')
_print('''(
    df.groupby("ticker").aggregate(
        'sum'
    ).head(2)
)''')
_print('''(
    df.groupby("ticker").aggregate({
        'price':  'max',
        'signal': ['min', 'max'],
    }).head(2)
)''')
_print('''(
    df.groupby("ticker").aggregate(
        max_price  = NamedAgg(column='price', aggfunc='max'),
        min_signal = ('signal', 'min'),
        max_signal = ('signal', 'max'),
    ).head(2)
)''')
_print('''(
    df.groupby("ticker").aggregate(
        lambda g: g.sum()
    ).head(2)
)''')

from pandas import Series, DataFrame
_print('''(
    df.groupby("ticker").aggregate(
        lambda g: print(type(g))
    ).head(2)
)''')

Finally, we have another means by which we can do aggregation operations: .apply.

.apply is extremely flexible, but it has the donside of much slower in practice than .aggregate or .transform. .apply takes a function which accepts a DataFrame as its argument, returning a new DataFrame; the .apply machinery determines how to combine the result DataFrames into a new structure.

_print = lambda expr: print('-' * 50, f' {expr} '.center(50, '-'), '-' * 50, eval(expr), sep='\n')
from time import sleep, perf_counter
from contextlib import contextmanager
@contextmanager
def timed(msg):
    try:
        start = perf_counter()
        yield
    finally:
        stop = perf_counter()
        print(f'{msg:<24} \N{mathematical bold capital delta}t: {stop - start:.4f}s')

from pandas import DataFrame, date_range, to_timedelta, NamedAgg, concat
from numpy import repeat, array, tile
from numpy.random import choice, random, randint
from random import randrange
from string import ascii_lowercase
from datetime import timedelta

df = DataFrame({
    'time':    repeat(date_range('2021-07-04 9:00', periods=(size := 5_000), freq='1H'), (rpt := 4))
             + to_timedelta(randint(0, 59, size=(size*rpt)), unit='T'),
    'ticker':  tile(choice([*ascii_lowercase], size=(size, 4)).view('<U4').ravel(), rpt),
    'price':   tile(random(size) * 100, rpt) + ((random(size*rpt) - .5) * 10),
    'signal':  randint(10, size=size*rpt),
}).set_index(['ticker', 'time']).sort_index()

with timed('.apply(sum)'):
    _print('''(
        df.groupby("ticker").apply(
            sum
        ).head(2)
    )''')

with timed('.aggregate(sum)'):
    _print('''(
        df.groupby("ticker").aggregate(
            sum
        ).head(2)
    )''')

#  with timed('.sum'):
#      _print('''df.groupby("ticker").sum().head(2)''')
_print = lambda expr: print('-' * 50, f' {expr} '.center(50, '-'), '-' * 50, eval(expr), sep='\n')

from pandas import DataFrame, date_range, to_timedelta, NamedAgg, concat
from numpy import repeat, array, tile
from numpy.random import choice, random, randint
from random import randrange
from string import ascii_lowercase
from datetime import timedelta

df = DataFrame({
    'time':    repeat(date_range('2021-07-04 9:00', periods=(size := 5_000), freq='1H'), (rpt := 4))
             + to_timedelta(randint(0, 59, size=(size*rpt)), unit='T'),
    'ticker':  tile(choice([*ascii_lowercase], size=(size, 4)).view('<U4').ravel(), rpt),
    'price':   tile(random(size) * 100, rpt) + ((random(size*rpt) - .5) * 10),
    'signal':  randint(10, size=size*rpt),
}).set_index(['ticker', 'time']).sort_index()

from time import sleep, perf_counter
from contextlib import contextmanager
@contextmanager
def timed(msg):
    try:
        start = perf_counter()
        yield
    finally:
        stop = perf_counter()
        print(f'{msg:<24} \N{mathematical bold capital delta}t: {stop - start:.4f}s')

_print('df.head(2)')
_print('''(
    df.groupby("ticker").apply(
        lambda df: df
    ).head(2)
)''')
_print('''(
    df.groupby("ticker").apply(
        lambda df: concat([df, df]).sort_index()
    ).head(10)
)''')
with timed('.apply'):
    _print('''(
        df.groupby("ticker").apply(
            lambda df: (df['signal'] * df['price']).sum()
        ).head(2)
    )''')
with timed('.aggregate'):
    _print('''(
        df.assign(result=df['signal'] * df['price']).groupby("ticker").aggregate(
            sum
        )[['result']].head(2)
    )''')

Part III: .rolling

The .groupby method is not the only method which can operate on the pandas.DataFrame._data region. The .rolling and .expanding operations provide similar functionality, but on rolling windows and for expanding transformations:

_print = lambda expr: print('-' * 50, f' {expr} '.center(50, '-'), '-' * 50, eval(expr), sep='\n')

from pandas import DataFrame, date_range, to_timedelta, NamedAgg, concat
from pandas.tseries.offsets import Hour
from numpy import repeat, array, tile
from numpy.random import choice, random, randint
from random import randrange
from string import ascii_lowercase
from datetime import timedelta

df = DataFrame({
    'time':    repeat(date_range('2021-07-04 9:00', periods=(size := 5), freq='1H'), (rpt := 4))
             + to_timedelta(randint(0, 59, size=(size*rpt)), unit='T'),
    'ticker':  tile(choice([*ascii_lowercase], size=(size, 4)).view('<U4').ravel(), rpt),
    'price':   tile(random(size) * 100, rpt) + ((random(size*rpt) - .5) * 10),
    'signal':  randint(10, size=size*rpt),
}).set_index(['ticker', 'time']).sort_index()

_print('df.head(2)')
_print('''(
    df.rolling(10, min_periods=1).sum().head(2)
)''')
_print('''(
    df.groupby('ticker').transform(
        lambda df: df.rolling(10, min_periods=1).sum()
    ).head(2)
)''')
_print('''(
    df.groupby('ticker')['price'].transform(
        lambda s: s.reset_index(level=0, drop=True).rolling(Hour(1)).sum()
    ).head(2)
)''')
_print = lambda expr: print('-' * 50, f' {expr} '.center(50, '-'), '-' * 50, eval(expr), sep='\n')

from pandas import DataFrame, date_range, to_timedelta, NamedAgg, concat
from pandas.tseries.offsets import Hour
from numpy import repeat, array, tile
from numpy.random import choice, random, randint
from random import randrange
from string import ascii_lowercase
from datetime import timedelta

df = DataFrame({
    'time':    repeat(date_range('2021-07-04 9:00', periods=(size := 5), freq='1H'), (rpt := 4))
             + to_timedelta(randint(0, 59, size=(size*rpt)), unit='T'),
    'ticker':  tile(choice([*ascii_lowercase], size=(size, 4)).view('<U4').ravel(), rpt),
    'price':   tile(random(size) * 100, rpt) + ((random(size*rpt) - .5) * 10),
    'signal':  randint(10, size=size*rpt),
}).set_index(['ticker', 'time']).sort_index()

_print('df.head(2)')
_print('''(
    df.expanding(min_periods=1).sum()
)''')
_print('''(
    df.expanding(min_periods=1).count()
)''')
#  _print('''(
#      df.expanding(min_periods=1).aggregate(
#          lambda s: s[-1] - s[0]
#      )
#  )''')
#  _print('''(
#      df.expanding(min_periods=1).aggregate(
#          lambda s: s.max() - s.min()
#      )
#  )''')
_print = lambda expr: print(f' {expr} '.center(50, '-'), eval(expr), sep='\n')

from pandas import DataFrame, date_range, to_timedelta, NamedAgg, concat
from pandas.tseries.offsets import Hour
from numpy import repeat, array, tile
from numpy.random import choice, random, randint
from random import randrange
from string import ascii_lowercase
from datetime import timedelta

df = DataFrame({
    'time':    repeat(date_range('2021-07-04 9:00', periods=(size := 5), freq='1H'), (rpt := 4))
             + to_timedelta(randint(0, 59, size=(size*rpt)), unit='T'),
    'ticker':  tile(choice([*ascii_lowercase], size=(size, 4)).view('<U4').ravel(), rpt),
    'price':   tile(random(size) * 100, rpt) + ((random(size*rpt) - .5) * 10),
    'signal':  randint(10, size=size*rpt),
}).set_index(['ticker', 'time']).sort_index()

_print('df.head(2)')
_print('df.ewm(alpha=0.1).mean().head(2)')
#  _print('df.ewm(alpha=0.1).std().head(2)')
#  _print('df.ewm(alpha=0.1).var().head(2)')

Part IV: xarray.DataArray.groupby and xarray.DataArray.groupby_bins

The xarray.DataArray also features a .groupby operation:

_print = lambda expr: print(f' {expr} '.center(50, '-'), eval(expr), sep='\n')

from xarray import DataArray
from numpy.random import normal, choice
from string import ascii_lowercase
from pandas import date_range

da = DataArray(
    normal(size=(30_000, 30, 10, 10)),
    dims = 'ticker t x y'.split(),
    coords = {
        't':      date_range('2021-07-04', periods=30),
        'ticker': choice([*ascii_lowercase], size=(30_000, 2)).view('<U2').ravel(),
    },
)

_print('da.groupby("ticker")')
#  _print('da.groupby("ticker").sum()')
#  _print('''(
#      da.groupby("ticker").sum()
#        .sum(dim=["x", "y"])
#        .sel(ticker=da.coords["ticker"].values[0])
#  )''')

The xarray.DataArray also provides a way to perform groupby on discretised values (using pandas.cut,) .groupby_bins:

_print = lambda expr: print(f' {expr} '.center(50, '-'), eval(expr), sep='\n')

from xarray import DataArray
from numpy.random import normal, choice
from string import ascii_lowercase
from pandas import date_range, to_datetime

da = DataArray(
    normal(size=(300, 30, 10, 10)),
    dims = 'ticker t x y'.split(),
    coords = {
        't':      date_range('2021-07-04', periods=30),
        'ticker': choice([*ascii_lowercase], size=(300, 2)).view('<U2').ravel(),
    },
)

bins = to_datetime(['2021-07-01', '2021-07-04'])
#  _print('da.groupby_bins("t", bins=bins)')
#  _print('da.groupby_bins("t", bins=bins).sum().sum(dim=["x", "y"])')

Part V: Queen’s Gambit

Data

We have two datafiles in “Portable Game Notation” representing a series of chess games played in recent years.

We will use a third-party toolkit python-chess to load these and generate simpler files for our consumption.

NOTE: you do not need to install python-chess for this exercise!

from chess import WHITE, BLACK, PAWN, ROOK, KNIGHT, BISHOP, QUEEN, KING
from chess.pgn import read_game
from itertools import product
from collections import namedtuple
from enum import Enum
from pathlib import Path

class Colors(Enum):
    Black = BLACK
    White = WHITE

class Pieces(Enum):
    Pawn   = PAWN
    Rook   = ROOK
    Knight = KNIGHT
    Bishop = BISHOP
    Queen  = QUEEN
    King   = KING

if __name__ == '__main__':
    for input_filename in (Path(x) for x in {'carlsen.pgn', 'croatian.pgn'}):

        games = []
        with open(input_filename) as f:
            while (g := read_game(f)):
                games.append(g)

        class Row(namedtuple('RowBase', 'game move color piece positions')):
            def to_csv(self):
                return f'{self.game}, {self.move}, {self.color.name}, {self.piece.name}, {", ".join(f"{x}" for x in self.positions)}\n'
            def to_fwf(self):
                return f'{self.game:>4}{self.move:>4}{self.color.name:>8}{self.piece.name:>8}{", ".join(f"{x:2}" for x in self.positions):<39}\n'

        rows = []
        for game_no, g in enumerate(games):
            b = g.board()
            for move_no, m in enumerate(g.mainline_moves()):
                b.push(m)
                for col, pc in product(Colors, Pieces):
                    if b.pieces(pc.value, col.value):
                        r = Row(game_no, move_no, col, pc, [*b.pieces(pc.value, col.value)])
                        rows.append(r)

        output_filename = input_filename.with_suffix('.csv')
        with open(output_filename, 'w') as f:
            for r in rows:
                if r.positions:
                    f.write(r.to_csv())
        output_filename = input_filename.with_suffix('.fw')
        with open(output_filename, 'w') as f:
            for r in rows:
                if r.positions:
                    f.write(r.to_fwf())
        print(f'Wrote: {output_filename}.')

Actual Data

These files contain information about these games in the following CSV format (with no header):

Problem

Read in the above data using pandas, numpy, or xarray (or some combination thereof) and determine the following things:

from pandas import read_csv
df = read_csv('carlsen.csv',
              header=None,
              names='game move side piece pos0 pos1 pos2 pos3 pos4 pos5 pos6 pos7'.split())

print(df.sample(3))

Sample Problem & Solution

Determine which window of moves in the game had the maximum number of pieces lost.

from pandas import read_csv, Categorical, to_datetime, IndexSlice
from collections import namedtuple

df = read_csv('carlsen.csv',
              header=None,
              names='game move side piece pos0 pos1 pos2 pos3 pos4 pos5 pos6 pos7'.split())
index_vars = ['game', 'move', 'side', 'piece']
df = df.melt(id_vars=index_vars, value_vars=set(df.columns) - set(index_vars),
             var_name='num', value_name='pos').dropna()

df['pos'] = df['pos'].astype(int)
df['num'] = Categorical(df['num'])._ndarray
for col, cats in {'piece': 'Pawn Rook Knight Bishop Queen King'.split(),
                  'side':  'Black White'.split()}.items():
    df[col] = Categorical(df[col].str.strip(), categories=cats)
df = df.set_index([*index_vars, 'num']).sort_index()

game_0 = df.loc[IndexSlice[0, :]]

class Diffs(namedtuple('DiffsBase', 'size white black raw')):
    @classmethod
    def from_game(cls, game, *, size=5):
        windows = game_0.groupby(['move', 'side']).count().dropna()
        wh = windows.loc[IndexSlice[:, 'White', :]].diff().fillna(0).rolling(size, min_periods=1).sum()
        bl = windows.loc[IndexSlice[:, 'Black', :]].diff().fillna(0).rolling(size, min_periods=1).sum()
        return cls(size, wh, bl, game)

diffs = Diffs.from_game(game_0)
print(diffs.white.values)
from pandas import read_csv, Categorical, to_datetime, IndexSlice
from collections import namedtuple

df = read_csv('carlsen.csv',
              header=None,
              names='game move side piece pos0 pos1 pos2 pos3 pos4 pos5 pos6 pos7'.split())
index_vars = ['game', 'move', 'side', 'piece']
df = df.melt(id_vars=index_vars, value_vars=set(df.columns) - set(index_vars),
             var_name='num', value_name='pos').dropna()

df['pos'] = df['pos'].astype(int)
df['num'] = Categorical(df['num'])._ndarray
for col, cats in {'piece': 'Pawn Rook Knight Bishop Queen King'.split(),
                  'side':  'Black White'.split()}.items():
    df[col] = Categorical(df[col].str.strip(), categories=cats)
df = df.set_index([*index_vars, 'num']).sort_index()

game_0 = df.loc[IndexSlice[0, :]]
end = game_0.loc[IndexSlice[83, :]]
print(end.groupby(['side', 'piece']).count().unstack().fillna(0).convert_dtypes())
print(end.pivot_table(index=['side'],
                      columns=['piece'],
                      values=['pos'],
                      aggfunc='count',
                      fill_value=0))
from pandas import read_csv, Categorical, to_datetime, IndexSlice
from xarray import DataArray
from numpy import array, zeros

df = read_csv('carlsen.csv',
              header=None,
              names='game move side piece pos0 pos1 pos2 pos3 pos4 pos5 pos6 pos7'.split())
index_vars = ['game', 'move', 'side', 'piece']
#  df = df.melt(id_vars=index_vars, value_vars=set(df.columns) - set(index_vars),
#               var_name='num', value_name='pos').dropna()
for col, cats in {'piece': 'Pawn Rook Knight Bishop Queen King'.split(),
                  'side':  'Black White'.split()}.items():
    df[col] = Categorical(df[col].str.strip(), categories=cats)#._ndarray
df = df.set_index('game')
df['alive'] = sum(~df[f'pos{idx}'].isnull() for idx in range(8))

game_0 = df.loc[0]
state = game_0.groupby(['move', 'side'])['alive'].sum()
by_side = state.groupby(['side']).transform(
    lambda df: -df.diff().rolling(25, min_periods=1).sum().fillna(0)
)
worst_white = by_side[IndexSlice[:, 'White', :]].idxmax()
print(state.loc[(worst_white-25, 'White'):(worst_white, 'White')][IndexSlice[:, 'Black', :]])
```python
from pandas import read_csv, Categorical, to_datetime, IndexSlice, Series, DataFrame
from xarray import DataArray
from numpy import array, zeros, log

df = read_csv('carlsen.csv',
              header=None,
              names='game move side piece pos0 pos1 pos2 pos3 pos4 pos5 pos6 pos7'.split())
index_vars = ['game', 'move', 'side', 'piece']
df = df.melt(id_vars=index_vars, value_vars=set(df.columns) - set(index_vars),
             var_name='num', value_name='pos').dropna()
for col, cats in {'piece': 'Pawn Rook Knight Bishop Queen King'.split(),
                  'side':  'Black White'.split()}.items():
    df[col] = Categorical(df[col].str.strip(), categories=cats)#._ndarray

df['pos'] = df['pos'].astype(int)
df['num'] = Categorical(df['num'])._ndarray
for col, cats in {'piece': 'Pawn Rook Knight Bishop Queen King'.split(),
                  'side':  'Black White'.split()}.items():
    df[col] = Categorical(df[col].str.strip(), categories=cats)
df['progress'] = df.groupby(['game'])['move'].transform(
    lambda s: s / s.max()
)
df = df.drop('move', axis=1)
df = df.set_index(['game', 'progress', 'side', 'piece', 'num']).sort_index()

game_0 = df.loc[IndexSlice[0, :]]
att = game_0.groupby(['progress', 'side', 'piece']).count().unstack().unstack()['pos'].fillna(0).swaplevel(axis=1).sort_index(axis=1).fillna(0).convert_dtypes()

piece_count = Series({
    'Pawn':   8,
    'Rook':   2,
    'Knight': 2,
    'Bishop': 2,
    'Queen':  1,
    'King':   1,
})
piece_count.index = df.index.levels[-2]
res = (att['Black'] / piece_count) #.plot(kind='area')
res = res / DataFrame({
    'Pawn':   res.index,
    'Rook':   res.index,
    'Knight': res.index,
    'Bishop': res.index,
    'Queen':  res.index,
    'King':   res.index,
}, index=res.index)

if (show := False):
    from matplotlib.pyplot import show
    res.plot(kind='area')
    show()