Theme: reproducibility
Topic: how to be effective with testing
| Presenter | James Powell james@dutc.io |
| Date | Friday, December 4, 2020 |
| Time | 9:30 AM PST |
Notes: https://ts-python.dutc.io
tmate: https://tmate.io/t/dutc/ts-python
print('Good morning!')
print("Let's get started!")
Poll? Poll?
Presume we have some cell in our notebook that is computing some mathematical result. Perhaps we are computing an exponential weighted moving average.
We could do this directly with pandas:
from numpy.random import normal
from pandas import Series
data = normal(size=3) # dummy data
s = Series(data)
result = s.ewm(alpha=.01).mean()
print(result)
But for the sake of argument, let’s assume that we are manually calculating
this with numpy:
from numpy.random import normal
from numpy import power, arange
data = normal(size=(SIZE:=3)) # dummy data
factors = power((1 - 0.01), arange(SIZE, 0, -1))
result = (factors * data).cumsum() / factors.cumsum()
print(result)
We may have high confidence that this is correct, but, for sake of argument,
let’s check that this is correct against pandas:
from numpy.random import normal
from numpy import power, arange
from pandas import Series
data = normal(size=(SIZE:=3)) # dummy data
alpha = 0.01
factors = power((1 - alpha), arange(SIZE, 0, -1))
actual = (factors * data).cumsum() / factors.cumsum()
expected = Series(data).ewm(alpha=alpha).mean()
# this will fail, because we can't compare floats exactly
# given imprecision issues with IEEE-754
# assert (actual == expected).all(), 'Test failed!'
# instead, we should use `numpy.isclose`
from numpy import isclose
assert isclose(actual, expected).all(), 'Test failed!'
print('Test passed!')
This is a typical “expected vs actual” test, which tends to be a poor way to “prove” the correctness of our ccode.
Poll?
Our goal in testing:
from pandas import DataFrame, Series, to_timedelta, to_datetime
from numpy.random import randint, choice, normal
from string import ascii_lowercase
from datetime import datetime
today = to_datetime(datetime.today())
deltas = to_timedelta(randint(1, 1000, size=(SIZE:=100_000)), unit='us').values.cumsum()
times = today + deltas
tickers = choice([*ascii_lowercase], size=(SIZE, 3)).view('<U3').ravel()
index = [tickers, times]
# index = tickers
df = DataFrame({
'w': normal(size=SIZE),
'x': normal(size=SIZE),
'y': normal(size=SIZE),
'z': normal(size=SIZE),
}, index=index).sort_index()
print(df.head())
def process(df : DataFrame):
return df.groupby(level=1).agg({'x': 'mean', 'y': 'sum', 'z': 'var'})
rv = process(df)
print(rv.head())
def process(df):
''' df is a dataframe that has columns x, y, z
and is indexed on tickers, times '''
return df.groupby(level=1).agg({'x': 'mean', 'y': 'sum', 'z': 'var'})
Poll?
# NOTE: comment is out of date!
def process(df):
''' df is a dataframe that has columns x, y, z
and is indexed on tickers, times '''
return df.groupby(level=1).agg({'w': 'var', 'x': 'mean', 'y': 'sum'})
def process(df):
return df.groupby(level=1).agg({'x': 'mean', 'y': 'sum', 'zee': 'var'})
from pandas import DataFrame, to_datetime
from numpy import arange, nan, isclose
def test_process():
tickers = ['aa', 'aa', 'aa']
times = to_datetime(['2020-07-04', '2020-07-04', '2020-07-04'])
index = [tickers, times]
df = DataFrame({
'x': arange(3),
'y': arange(3),
'z': arange(3),
}, index=index)
actual = process(df)
expected = DataFrame({
'x': [1],
'y': [3],
'z': [1],
}, index=times)
assert isclose(actual, expected).all()
if __name__ == '__main__':
test_process()
from numpy import arange, power
def ewma1(data):
alpha = 0.01
factors = (1 - alpha) ** arange(data.size)
return (factors * data).cumsum() / factors.cumsum()
def ewma2(data):
alpha = 0.01
factors = power(1 - alpha, arange(data.size))
return (factors * data).cumsum() / factors.cumsum()
from numpy.random import normal
data = normal(size=3) # dummy data
from numpy import isclose
rv1 = ewma1(data)
rv2 = ewma2(data)
assert isclose(rv1, rv2).all()
Let’s assume we have some simulation-like or iterative code we want to test.
As a stand-in, let’s use fib which computes elements from the Fibonacci
integer sequence.
def fib(n):
if n == 0 or n == 1:
return 1
return fib(n-1) + fib(n-2)
print(f'{fib(10) = }')
print(f'{fib(40) = }')
from decimal import Decimal
from math import sqrt
sqrt_5 = sqrt(5)
phi = (1 + sqrt_5) / 2
psi = -1/phi
def fib(n):
return (phi ** n - psi ** n) / sqrt_5
rv = [fib(n) for n in range(10)]
print(f'{rv = }')
from decimal import Decimal, localcontext
from math import sqrt
sqrt_5 = Decimal(5).sqrt()
phi = (1 + sqrt_5) / 2
psi = -1/phi
def fib(n):
return (phi ** n - psi ** n) / sqrt_5
with localcontext() as ctx:
ctx.prec = 100
rv = [fib(n) for n in range(10)]
print(f'{rv = }')
def fib(n):
rv = [1, 1]
while True:
if len(rv) == n:
break
rv.append(rv[-1] + rv[-2])
return rv
print(f'{fib(10) = }')
print(f'{fib(40) = }')
print(f'{fib(-10) = }')
def fib(n):
rv = [0, 1]
while True:
if len(rv) == n:
break
rv.append(rv[-1] + rv[-2])
return rv
def test_fib():
actual = fib(10)
expected = [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
assert actual == expected
try:
fib(0)
except MemoryError:
pass
else:
raise AssertionError('test failed')
if __name__ == '__main__':
from pytest import main
main(['-q', __file__])
def fib(n):
rv = [0, 1]
while True:
if len(rv) == n:
break
rv.append(rv[-1] + rv[-2])
return rv
from pytest import fixture
from random import randint
@fixture
def n():
return randint(-10, 10)
def test_fib(n):
actual = fib(n)
expected = [0, 1, 1, 2, 3, 5, 8, 13, 21, 34][:n]
assert actual == expected
if __name__ == '__main__':
from pytest import main
main(['-q', __file__])
def fib(n):
rv = [0, 1]
while True:
if len(rv) == n:
break
rv.append(rv[-1] + rv[-2])
return rv
from hypothesis import given
from hypothesis.strategies import integers
@given(
n=integers(min_value=-10, max_value=10)
)
def test_fib(n):
try:
actual = fib(n)
except MemoryError:
actual = n
expected = [0, 1, 1, 2, 3, 5, 8, 13, 21, 34][:n]
assert actual == expected
if __name__ == '__main__':
from pytest import main
main(['-q', __file__])
def fib(m):
rv = [0, 1]
while True:
if rv[-1] + rv[-2] > m:
break
rv.append(rv[-1] + rv[-2])
return rv
print(f'{fib(90) = }')
def fib(n=None, m=None):
rv = [0, 1]
while True:
if n is not None:
if len(rv) >= n:
break
if m is not None:
if rv[-1] + rv[-2] >= m:
break
rv.append(rv[-1] + rv[-2])
return rv
def fib(a=1, b=1):
while True:
yield a
a, b = b, a + b
from itertools import islice, takewhile
print(f'{[*islice(fib(), 10)] = }')
print(f'{[*takewhile(lambda x: x < 90, fib())] = }')
def fib(a=1, b=1):
while True:
yield a
a, b = b, a + b
from hypothesis import given
from hypothesis.strategies import integers
from itertools import islice
@given(
a=integers(),
b=integers(),
)
def test_fib(a, b):
size = 10_000
actual = [*islice(fib(a, b), size)]
expected = [a, b]
for _ in range(size - 2):
expected.append(expected[-1] + expected[-2])
assert actual == expected
if __name__ == '__main__':
from pytest import main
main(['-q', __file__])
Poll?
def fib(a=1, b=1):
while True:
yield a
a, b = b, a + b
from hypothesis import given
from hypothesis.strategies import integers
from itertools import tee, islice
nwise = lambda g, n=2: zip(*(islice(g, i, None) for i, g in enumerate(tee(g, n))))
@given(
a=integers(),
b=integers(),
)
def test_fib(a, b):
for x, y, z in nwise(islice(fib(a, b), 10_000), 3):
assert x + y == z
if __name__ == '__main__':
from pytest import main
main(['-q', __file__])
def add(x, y):
return x + y
def test_add():
assert add(1, 1) == 2
if __name__ == '__main__':
from pytest import main
main(['-q', __file__])
def add(x, y):
return x + y
from hypothesis import given
from hypothesis.strategies import integers
@given(
x = integers(),
y = integers(),
)
def test_add(x, y):
assert add(x, y) == x + y
if __name__ == '__main__':
from pytest import main
main(['-q', __file__])
def add(x, y):
return x + y
from hypothesis import given
from hypothesis.strategies import integers
@given(
x = integers(),
y = integers(),
z = integers(),
)
def test_add(x, y, z):
assert add(x, y) == add(y, x)
assert add(x, add(y, z)) == add(add(x, y), z)
assert add(x, 0) == x
assert add(x, -x) == 0
if __name__ == '__main__':
from pytest import main
main(['-q', __file__])
def add(x, y):
return x + y
from hypothesis import given
from hypothesis.strategies import integers, decimals
@given(
x = decimals(allow_nan=False, allow_infinity=False),
y = decimals(allow_nan=False, allow_infinity=False),
z = decimals(allow_nan=False, allow_infinity=False),
)
def test_add(x, y, z):
assert add(x, y) == add(y, x)
assert add(x, add(y, z)) == add(add(x, y), z)
assert add(x, 0) == x
assert add(x, -x) == 0
if __name__ == '__main__':
from pytest import main
main(['-q', __file__])
def add(x, y):
return x + y
from hypothesis import given
from hypothesis.strategies import integers, decimals, floats
from math import isclose
@given(
x = decimals(allow_nan=False, allow_infinity=False),
y = decimals(allow_nan=False, allow_infinity=False),
z = decimals(allow_nan=False, allow_infinity=False),
)
def test_add(x, y, z):
assert isclose(add(x, y), add(y, x))
assert isclose(add(x, add(y, z)), add(add(x, y), z))
assert isclose(add(x, 0), x)
assert isclose(add(x, -x), 0)
if __name__ == '__main__':
from pytest import main
main(['-q', __file__])
Poll?
from enum import Enum
from datetime import date
from datetime import timedelta
US_HOLIDAYS = {
'independence day': date(2020, 7, 3),
'thanksgiving': date(2020, 11, 26),
'christmas': date(2020, 12, 25),
"new year's eve": date(2020, 12, 31),
}
class Dir(Enum):
nbd = timedelta(days=1)
pbd = timedelta(days=-1)
def modify_business_date(dt, n=1, *, holidays=US_HOLIDAYS, direction=Dir.nbd):
if n < 0:
raise ValueError('n must be positive')
while True:
while True:
dt = dt + direction.value
if dt.weekday() not in {5, 6} and dt not in set(holidays.values()):
break
if n == 0:
break
n -= 1
return dt
print(f'{modify_business_date(date(2020, 7, 3)) = }')
from hypothesis import given
from hypothesis.strategies import dictionaries, dates, text, integers
@given(
holidays=dictionaries(keys=text(), values=dates(), min_size=1),
day=dates(),
n=integers(min_value=0, max_value=10),
)
def test_modify_business_date(holidays, day, n):
hol = [*holidays.values()][0]
assert modify_business_date(hol, 0) > hol
assert modify_business_date(day, n) >= day
if day not in set(holidays.values()) and day.weekday() not in {5, 6}:
assert modify_business_date(modify_business_date(day, 1), 1, direction=Dir.pbd) == day
if __name__ == '__main__':
from pytest import main
main(['-q', __file__])
from datetime import date, timedelta
from hypothesis import given
from hypothesis.strategies import dictionaries, dates, text, integers
from enum import Enum
from itertools import islice, tee, takewhile
nwise = lambda g, n=2: zip(*(islice(g, i, None) for i, g in enumerate(tee(g, n))))
def datecount(refdate, *, direction=Dir.nbd):
while True:
yield refdate
refdate += direction
@given(
refdate=dates(),
)
def test_datecount(refdate):
dc = datecount(refdate)
dates = [next(dc), next(dc)]
assert dates[0] < dates[1]
assert dates[0] == refdate
assert (dates[1] - dates[0]).days == 1
def businessdays(refdate, *, direction=Dir.nbd, holidays=US_HOLIDAYS):
return (d for d in datecount(refdate, direction=direction)
if d.weekday() not in {5, 6} and d not in set(holidays.values()))
@given(
refdate=dates(),
holidays=dictionaries(keys=text(), values=dates(), min_size=1),
)
def test_businessdays(refdate, holidays):
for pd, cd in nwise(islice(businessdays(refdate, holidays=holidays), 1_000)):
# no weekdays
assert cd.weekday() not in {5, 6}
# no holidays
assert cd not in set(holidays.values())
# should move monotonically forward
assert pd < cd
# dates in-between must be either holidays or weekdays
for d in takewhile(lambda d: d < cd, islice(datecount(pd), 1, None)):
assert d.weekday() in {5, 6} or d in set(holidays.values())
if __name__ == '__main__':
from pytest import main
main(['-q', __file__])