ts-python

Python @ Two Sigma (seminar)

Save 3 hours with 30 minutes of tests!

Theme: reproducibility

Topic: how to be effective with testing

Presenter James Powell james@dutc.io
Date Friday, December 4, 2020
Time 9:30 AM PST

Notes: https://ts-python.dutc.io

tmate: https://tmate.io/t/dutc/ts-python

print('Good morning!')
print("Let's get started!")

Poll? Poll?

Presume we have some cell in our notebook that is computing some mathematical result. Perhaps we are computing an exponential weighted moving average.

We could do this directly with pandas:

from numpy.random import normal
from pandas import Series
data = normal(size=3) # dummy data
s = Series(data)
result = s.ewm(alpha=.01).mean()
print(result)

But for the sake of argument, let’s assume that we are manually calculating this with numpy:

from numpy.random import normal
from numpy import power, arange
data = normal(size=(SIZE:=3)) # dummy data
factors = power((1 - 0.01), arange(SIZE, 0, -1))
result = (factors * data).cumsum() / factors.cumsum()
print(result)

We may have high confidence that this is correct, but, for sake of argument, let’s check that this is correct against pandas:

from numpy.random import normal
from numpy import power, arange
from pandas import Series

data = normal(size=(SIZE:=3)) # dummy data
alpha = 0.01

factors = power((1 - alpha), arange(SIZE, 0, -1))
actual = (factors * data).cumsum() / factors.cumsum()

expected = Series(data).ewm(alpha=alpha).mean()

# this will fail, because we can't compare floats exactly
#   given imprecision issues with IEEE-754
#  assert (actual == expected).all(), 'Test failed!'

# instead, we should use `numpy.isclose`
from numpy import isclose
assert isclose(actual, expected).all(), 'Test failed!'

print('Test passed!')

This is a typical “expected vs actual” test, which tends to be a poor way to “prove” the correctness of our ccode.

Poll?

Our goal in testing:

from pandas import DataFrame, Series, to_timedelta, to_datetime
from numpy.random import randint, choice, normal
from string import ascii_lowercase
from datetime import datetime

today   = to_datetime(datetime.today())
deltas  = to_timedelta(randint(1, 1000, size=(SIZE:=100_000)), unit='us').values.cumsum() 
times   = today + deltas
tickers = choice([*ascii_lowercase], size=(SIZE, 3)).view('<U3').ravel()
index   = [tickers, times]
#  index   = tickers

df = DataFrame({
    'w': normal(size=SIZE),
    'x': normal(size=SIZE),
    'y': normal(size=SIZE),
    'z': normal(size=SIZE),
}, index=index).sort_index()
print(df.head())

def process(df : DataFrame):
    return df.groupby(level=1).agg({'x': 'mean', 'y': 'sum', 'z': 'var'})

rv = process(df)
print(rv.head())
def process(df):
    ''' df is a dataframe that has columns x, y, z
        and is indexed on tickers, times '''
    return df.groupby(level=1).agg({'x': 'mean', 'y': 'sum', 'z': 'var'})

Poll?

# NOTE: comment is out of date!
def process(df):
    ''' df is a dataframe that has columns x, y, z
        and is indexed on tickers, times '''
    return df.groupby(level=1).agg({'w': 'var', 'x': 'mean', 'y': 'sum'})
def process(df):
    return df.groupby(level=1).agg({'x': 'mean', 'y': 'sum', 'zee': 'var'})

from pandas import DataFrame, to_datetime
from numpy import arange, nan, isclose
def test_process():
    tickers = ['aa', 'aa', 'aa']
    times   = to_datetime(['2020-07-04', '2020-07-04', '2020-07-04'])
    index   = [tickers, times]

    df = DataFrame({
        'x': arange(3),
        'y': arange(3),
        'z': arange(3),
    }, index=index)

    actual = process(df)

    expected = DataFrame({
        'x': [1],
        'y': [3],
        'z': [1],
    }, index=times)

    assert isclose(actual, expected).all()

if __name__ == '__main__':
    test_process()
from numpy import arange, power
def ewma1(data):
    alpha = 0.01
    factors = (1 - alpha) ** arange(data.size)
    return (factors * data).cumsum() / factors.cumsum()

def ewma2(data):
    alpha = 0.01
    factors = power(1 - alpha, arange(data.size))
    return (factors * data).cumsum() / factors.cumsum()

from numpy.random import normal
data = normal(size=3) # dummy data

from numpy import isclose
rv1 = ewma1(data)
rv2 = ewma2(data)
assert isclose(rv1, rv2).all()

Let’s assume we have some simulation-like or iterative code we want to test.

As a stand-in, let’s use fib which computes elements from the Fibonacci integer sequence.

def fib(n):
    if n == 0 or n == 1:
        return 1
    return fib(n-1) + fib(n-2)

print(f'{fib(10) = }')
print(f'{fib(40) = }')
from decimal import Decimal
from math import sqrt
sqrt_5 = sqrt(5)
phi = (1 + sqrt_5) / 2
psi = -1/phi

def fib(n):
    return (phi ** n - psi ** n) / sqrt_5

rv = [fib(n) for n in range(10)]
print(f'{rv = }')
from decimal import Decimal, localcontext
from math import sqrt
sqrt_5 = Decimal(5).sqrt()
phi = (1 + sqrt_5) / 2
psi = -1/phi

def fib(n):
    return (phi ** n - psi ** n) / sqrt_5

with localcontext() as ctx:
    ctx.prec = 100
    rv = [fib(n) for n in range(10)]
    print(f'{rv = }')
def fib(n):
    rv = [1, 1]
    while True:
        if len(rv) == n:
            break
        rv.append(rv[-1] + rv[-2])
    return rv

print(f'{fib(10) = }')
print(f'{fib(40) = }')
print(f'{fib(-10) = }')
def fib(n):
    rv = [0, 1]
    while True:
        if len(rv) == n:
            break
        rv.append(rv[-1] + rv[-2])
    return rv

def test_fib():
    actual = fib(10)
    expected = [0, 1, 1, 2, 3, 5, 8, 13, 21, 34] 
    assert actual == expected
    try:
        fib(0)
    except MemoryError:
        pass
    else:
        raise AssertionError('test failed')

if __name__ == '__main__':
    from pytest import main
    main(['-q', __file__])
def fib(n):
    rv = [0, 1]
    while True:
        if len(rv) == n:
            break
        rv.append(rv[-1] + rv[-2])
    return rv

from pytest import fixture
from random import randint
@fixture
def n():
    return randint(-10, 10)

def test_fib(n):
    actual = fib(n)
    expected = [0, 1, 1, 2, 3, 5, 8, 13, 21, 34][:n]
    assert actual == expected

if __name__ == '__main__':
    from pytest import main
    main(['-q', __file__])
def fib(n):
    rv = [0, 1]
    while True:
        if len(rv) == n:
            break
        rv.append(rv[-1] + rv[-2])
    return rv

from hypothesis import given
from hypothesis.strategies import integers
@given(
    n=integers(min_value=-10, max_value=10)
)
def test_fib(n):
    try:
        actual = fib(n)
    except MemoryError:
        actual = n
    expected = [0, 1, 1, 2, 3, 5, 8, 13, 21, 34][:n]
    assert actual == expected

if __name__ == '__main__':
    from pytest import main
    main(['-q', __file__])
def fib(m):
    rv = [0, 1]
    while True:
        if rv[-1] + rv[-2] > m:
            break
        rv.append(rv[-1] + rv[-2])
    return rv

print(f'{fib(90) = }')
def fib(n=None, m=None):
    rv = [0, 1]
    while True:
        if n is not None:
            if len(rv) >= n:
                break
        if m is not None:
            if rv[-1] + rv[-2] >= m:
                break
        rv.append(rv[-1] + rv[-2])
    return rv
def fib(a=1, b=1):
    while True:
        yield a
        a, b = b, a + b

from itertools import islice, takewhile
print(f'{[*islice(fib(), 10)]                  = }')
print(f'{[*takewhile(lambda x: x < 90, fib())] = }')
def fib(a=1, b=1):
    while True:
        yield a
        a, b = b, a + b

from hypothesis import given
from hypothesis.strategies import integers
from itertools import islice
@given(
    a=integers(),
    b=integers(),
)
def test_fib(a, b):
    size = 10_000
    actual = [*islice(fib(a, b), size)]
    expected = [a, b]
    for _ in range(size - 2):
        expected.append(expected[-1] + expected[-2])
    assert actual == expected

if __name__ == '__main__':
    from pytest import main
    main(['-q', __file__])

Poll?

def fib(a=1, b=1):
    while True:
        yield a
        a, b = b, a + b

from hypothesis import given
from hypothesis.strategies import integers
from itertools import tee, islice
nwise = lambda g, n=2: zip(*(islice(g, i, None) for i, g in enumerate(tee(g, n))))

@given(
    a=integers(),
    b=integers(),
)
def test_fib(a, b):
    for x, y, z in nwise(islice(fib(a, b), 10_000), 3):
        assert x + y == z

if __name__ == '__main__':
    from pytest import main
    main(['-q', __file__])
def add(x, y):
    return x + y

def test_add():
    assert add(1, 1) == 2 

if __name__ == '__main__':
    from pytest import main
    main(['-q', __file__])
def add(x, y):
    return x + y

from hypothesis import given
from hypothesis.strategies import integers
@given(
    x = integers(),
    y = integers(),
)
def test_add(x, y):
    assert add(x, y) == x + y

if __name__ == '__main__':
    from pytest import main
    main(['-q', __file__])
def add(x, y):
    return x + y

from hypothesis import given
from hypothesis.strategies import integers
@given(
    x = integers(),
    y = integers(),
    z = integers(),
)
def test_add(x, y, z):
    assert add(x, y) == add(y, x)
    assert add(x, add(y, z)) == add(add(x, y), z)
    assert add(x, 0) == x
    assert add(x, -x) == 0

if __name__ == '__main__':
    from pytest import main
    main(['-q', __file__])
def add(x, y):
    return x + y

from hypothesis import given
from hypothesis.strategies import integers, decimals
@given(
    x = decimals(allow_nan=False, allow_infinity=False),
    y = decimals(allow_nan=False, allow_infinity=False),
    z = decimals(allow_nan=False, allow_infinity=False),
)
def test_add(x, y, z):
    assert add(x, y) == add(y, x)
    assert add(x, add(y, z)) == add(add(x, y), z)
    assert add(x, 0) == x
    assert add(x, -x) == 0

if __name__ == '__main__':
    from pytest import main
    main(['-q', __file__])
def add(x, y):
    return x + y

from hypothesis import given
from hypothesis.strategies import integers, decimals, floats
from math import isclose
@given(
    x = decimals(allow_nan=False, allow_infinity=False),
    y = decimals(allow_nan=False, allow_infinity=False),
    z = decimals(allow_nan=False, allow_infinity=False),
)
def test_add(x, y, z):
    assert isclose(add(x, y), add(y, x))
    assert isclose(add(x, add(y, z)), add(add(x, y), z))
    assert isclose(add(x, 0), x)
    assert isclose(add(x, -x), 0)

if __name__ == '__main__':
    from pytest import main
    main(['-q', __file__])

Poll?

from enum import Enum
from datetime import date
from datetime import timedelta

US_HOLIDAYS = {
    'independence day': date(2020,  7,  3),
    'thanksgiving':     date(2020, 11, 26),
    'christmas':        date(2020, 12, 25),
    "new year's eve":   date(2020, 12, 31),
}

class Dir(Enum):
    nbd = timedelta(days=1)
    pbd = timedelta(days=-1)

def modify_business_date(dt, n=1, *, holidays=US_HOLIDAYS, direction=Dir.nbd):
    if n < 0:
        raise ValueError('n must be positive')
    while True:
        while True:
            dt = dt + direction.value
            if dt.weekday() not in {5, 6} and dt not in set(holidays.values()):
                break
        if n == 0:
            break
        n -= 1
    return dt

print(f'{modify_business_date(date(2020, 7, 3)) = }')

from hypothesis import given
from hypothesis.strategies import dictionaries, dates, text, integers
@given(
    holidays=dictionaries(keys=text(), values=dates(), min_size=1),
    day=dates(),
    n=integers(min_value=0, max_value=10),
)
def test_modify_business_date(holidays, day, n):
    hol = [*holidays.values()][0]
    assert modify_business_date(hol,  0) > hol
    assert modify_business_date(day,  n) >= day
    if day not in set(holidays.values()) and day.weekday() not in {5, 6}:
        assert modify_business_date(modify_business_date(day, 1), 1, direction=Dir.pbd) == day

if __name__ == '__main__':
    from pytest import main
    main(['-q', __file__])
from datetime import date, timedelta
from hypothesis import given
from hypothesis.strategies import dictionaries, dates, text, integers
from enum import Enum
from itertools import islice, tee, takewhile

nwise = lambda g, n=2: zip(*(islice(g, i, None) for i, g in enumerate(tee(g, n))))

def datecount(refdate, *, direction=Dir.nbd):
    while True:
        yield refdate
        refdate += direction

@given(
    refdate=dates(),
)
def test_datecount(refdate):
    dc = datecount(refdate)
    dates = [next(dc), next(dc)]
    assert dates[0] < dates[1]
    assert dates[0] == refdate
    assert (dates[1] - dates[0]).days == 1

def businessdays(refdate, *, direction=Dir.nbd, holidays=US_HOLIDAYS):
    return (d for d in datecount(refdate, direction=direction)
            if d.weekday() not in {5, 6} and d not in set(holidays.values()))

@given(
    refdate=dates(),
    holidays=dictionaries(keys=text(), values=dates(), min_size=1),
)
def test_businessdays(refdate, holidays):
    for pd, cd in nwise(islice(businessdays(refdate, holidays=holidays), 1_000)):
        # no weekdays
        assert cd.weekday() not in {5, 6}
        # no holidays
        assert cd not in set(holidays.values())
        # should move monotonically forward
        assert pd < cd
        # dates in-between must be either holidays or weekdays
        for d in takewhile(lambda d: d < cd, islice(datecount(pd), 1, None)):
            assert d.weekday() in {5, 6} or d in set(holidays.values())


if __name__ == '__main__':
    from pytest import main
    main(['-q', __file__])