Topics: decorators, generators, context managers, object orientation, asyncio
Modelers, you probably know pandas and NumPy like the back of your hand, and your code is solid. But even with that expertise, there are likely some inefficiencies that creep into your code from not making use of some important features available in Python.
In this session, we’ll dive into a few of those features that you might have seen, heard of, or read about but haven’t yet integrated into your analytical code. We’ll cover foundational elements like context managers, decorators, and generators and show how they can appear in your analytical code.
We’ll also explore newer additions to Python, including type annotations, the match statement, breakpoint, dataclasses, keyword-only arguments, and some useful third-party libraries like PyArrow (when paired with pandas) and Hypothesis for unit testing. Join us to boost the efficiency and readability of your code!
python -m pip install fastapi pyarrow pytest hypothesis pandas scipy mypy
print("Let's take a look!")
Context managers express the idea of “wrapping” some block of code with a “before” and “after” action, such that we are guaranteed that if the “before” action runs, then the “after” action will necessarily run (even if errors occur in the contained block of code.)
Context managers are commonly used for resource management—the “before” action does whatever “setup” is necessary to make the resource available and the “after” after does whatever “teardown” is necessary to clean-up or release the resource.
We should use context managers whenever we need to perform multiple operations on an open file.
For example, let’s say we have a single .csv file that contains TWO tables.
from pathlib import Path
from textwrap import dedent
data_dir = Path('data')
data_dir.mkdir(exist_ok=True, parents=True)
(data_dir / 'tables.csv').write_text(dedent('''
# table 1
name,value
abc,123
xyz,456
# table 2
date,name,value
2020-01-01,abc,123
2020-01-01,xyz,456
''').strip())
We cannot use pandas.read_csv to read it in in one pass.
from pathlib import Path
from pandas import read_csv
data_dir = Path('data')
df = read_csv(data_dir / 'tables.csv')
We should use a context manager to open the file, then iterable over each table.
Instead of…
from itertools import takewhile, islice
from io import StringIO
from pathlib import Path
from pandas import read_csv
data_dir = Path('data')
f = open(data_dir / 'tables.csv')
data = {
'table 1':
read_csv(
StringIO(''.join(islice(takewhile(lambda ln: ln.strip(), f), 1, None))),
index_col=['name'],
)
.squeeze(axis='columns')
.sort_index(),
'table 2':
read_csv(
StringIO(''.join(islice(takewhile(lambda ln: ln.strip(), f), 1, None))),
parse_dates=['date'],
index_col=['date', 'name'],
)
.squeeze(axis='columns')
.sort_index(),
}
f.close()
print(
*data.values(),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
… we want to write…
from itertools import takewhile, islice
from io import StringIO
from pathlib import Path
from pandas import read_csv
data_dir = Path('data')
with open(data_dir / 'tables.csv') as f:
data = {
'table 1':
read_csv(
StringIO(''.join(islice(takewhile(lambda ln: ln.strip(), f), 1, None))),
index_col=['name'],
)
.squeeze(axis='columns')
.sort_index(),
'table 2':
read_csv(
StringIO(''.join(islice(takewhile(lambda ln: ln.strip(), f), 1, None))),
parse_dates=['date'],
index_col=['date', 'name'],
)
.squeeze(axis='columns')
.sort_index(),
}
print(
*data.values(),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
There are many context managers in the Python standard library and in common libraries we might use that we can use.
Some are used for managing connections.
from contextlib import closing
from sqlite3 import connect
query = 'select 1 + 1'
with closing(connect(':memory:')) as conn:
print(f'{[*conn.execute(query)]}')
Some are used for managing resources.
from tempfile import TemporaryDirectory
from pathlib import Path
with TemporaryDirectory() as d:
d = Path(d)
print(f'{d = } {d.exists() = }')
print(f'{d = } {d.exists() = }')
Some are used for controlling global state.
from decimal import Decimal, localcontext
print(f'{Decimal("1") / Decimal("3") = }')
with localcontext() as ctx:
ctx.prec = 10
print(f'{Decimal("1") / Decimal("3") = }')
print(f'{Decimal("1") / Decimal("3") = }')
Some are used for managing lifespans.
from contextlib import asynccontextmanager
from logging import getLogger
from fastapi import FastAPI
from uvicorn import run
logger = getLogger(f'uvicorn.{__name__}')
@asynccontextmanager
async def lifespan(app : FastAPI):
logger.info('lifespan startup, app = %r', app)
yield
logger.info('lifespan teardown, app = %r', app)
app = FastAPI(lifespan=lifespan)
if __name__ == '__main__':
run(app)
It is very easy to write your own context managers. Any time you see a pairing of a “before”→middle→“after” action, you should write a context manager!
You may use contextlib.contextmanager or contextlib.asynccontextmanager to
make this very easy!
from collections import namedtuple
from contextlib import contextmanager, closing
from datetime import datetime, timedelta
from random import Random
from sqlite3 import connect, register_adapter
from string import ascii_lowercase
from textwrap import dedent
class Datum(namedtuple('Datum', 'date name value')):
@classmethod
def from_random(cls, *, random_state=None):
rnd = Random() if random_state is None else random_state
return cls(
date=datetime(2020, 1, 1) + timedelta(days=rnd.randrange(90)),
name=''.join(rnd.choices(ascii_lowercase, k=4)),
value=rnd.randint(-1_000, +1_000),
)
@contextmanager
def temporary_table(conn, table_name):
if not table_name.isalpha():
raise ValueError('bad table name {table_name = }')
queries = {
'create':
dedent(f'''
create table {table_name} (
"date" date
, "name" text
, "value" number
)
''').strip(),
'drop': f'drop table {table_name}',
}
try:
conn.execute(queries['create'])
conn.commit()
yield
finally:
conn.execute(queries['drop'])
conn.commit()
@contextmanager
def sample_data(conn, table_name, *, random_state):
if not table_name.isalpha():
raise ValueError('bad table name {table_name = }')
queries = {
'insert': f'insert into {table_name} (date, name, value) values (:date, :name, :value)',
'delete': f'delete from {table_name} where date=:date and name=:name and value=:value',
}
rnd = Random() if random_state is None else random_state
data = [Datum.from_random(random_state=rnd) for _ in range(100)]
try:
conn.executemany(queries['insert'], (x._asdict() for x in data))
yield
finally:
conn.executemany(queries['delete'], (x._asdict() for x in data))
if __name__ == '__main__':
rnd = Random(0)
register_adapter(datetime, lambda dt: dt.isoformat())
with (
closing(connect(':memory:')) as conn,
temporary_table(conn, 'data'),
sample_data(conn, 'data', random_state=rnd)
):
query = 'select count(*), min(value), max(value), min(date), max(date) from data'
for row in conn.execute(query):
print(f'{row = }')
print("Let's take a look!")
Python supports functional programming approaches by virtue of having first-class functions and allowing dynamic definition of functions (with support for closures.)
@dec
def f():
pass
f = dec(f)
@higher_dec(...)
def f():
pass
f = higher_dec(...)(f)
def higher_dec(*args, **kwargs):
def dec(f):
def inner(*args, **kwargs):
return f(*args, **kwargs)
return inner
return dec
We can treat functions like any other data type:
from random import Random
def add(x, y):
return x + y
def mul(x, y):
return x * y
rnd = Random(0)
result = initial = 10
operations = [rnd.choice([add, mul]) for _ in range(10)]
for val, op in zip(range(1, 10), operations):
result = op(result, val)
print(
f'{operations = }',
f'{initial = :>10,}',
f'{result = :>10,}',
sep='\n',
)
We can dynamically define functions which “close” over their defining environment.
def create_function(state):
def increment():
nonlocal state
state += 1
return state
return increment
inc0 = create_function( 0)
inc1 = create_function( 10)
inc2 = create_function(100)
print(
f'{inc0.__closure__[0].cell_contents = }',
f'{inc1.__closure__[0].cell_contents = }',
f'{inc2.__closure__[0].cell_contents = }',
f'{inc0() = :>5,} · {inc1() = :>5,} · {inc2() = :>5,}',
f'{inc0() = :>5,} · {inc1() = :>5,} · {inc2() = :>5,}',
f'{inc0() = :>5,} · {inc1() = :>5,} · {inc2() = :>5,}',
f'{inc0() = :>5,} · {inc1() = :>5,} · {inc2() = :>5,}',
sep='\n',
)
This can be an alternative to standard object orientation approaches:
class T:
def __init__(self, state):
self.state = state
def __call__(self):
self.state += 1
return self.state
def create_function(state):
def increment():
nonlocal state
state += 1
return state
return increment
inc0 = T(123)
inc1 = create_function(456)
print(
f'{inc0() = :>5,} · {inc1() = :>5,}',
f'{inc0() = :>5,} · {inc1() = :>5,}',
f'{inc0() = :>5,} · {inc1() = :>5,}',
f'{inc0() = :>5,} · {inc1() = :>5,}',
sep='\n',
)
Decorators in Python allow us to perform an action associated with the definition of functions or classes—e.g., wrapping, registering, or modifying the function or registering, modifying, or checking the class.
We may write decorators to add cross-cutting functionality to existing functions.
from collections import namedtuple
from datetime import datetime, timedelta
from functools import wraps
from itertools import islice
from pathlib import Path
from pickle import load, dump
from random import Random
from string import ascii_lowercase
from time import sleep
class Datum(namedtuple('Datum', 'date name value')):
@classmethod
def from_random(cls, *, random_state=None):
rnd = Random() if random_state is None else random_state
return cls(
date=datetime(2020, 1, 1) + timedelta(days=rnd.randrange(90)),
name=''.join(rnd.choices(ascii_lowercase, k=4)),
value=rnd.randint(-1_000, +1_000),
)
def cache_to_pickle(path):
if __debug__:
def dec(func):
@wraps(func)
def inner(*args, **kwargs):
if path.exists() and path.stat().st_size > 0:
with open(path, 'rb') as f:
return load(f)
rv = func(*args, **kwargs)
with open(path, 'wb') as f:
dump(rv, f)
return rv
return inner
else:
def dec(func):
return func
return dec
@cache_to_pickle(Path('/tmp/load_data_slowly.pickle'))
def load_data_slowly(*, random_state=None):
rnd = Random() if random_state is None else random_state
rv = []
for _ in range(10):
sleep(.1)
rv.append(Datum.from_random(random_state=rnd))
return rv
@cache_to_pickle(Path('/tmp/process_data_slowly.pickle'))
def process_data_slowly(data):
rv = []
for x in data:
sleep(.1)
rv.append(x._replace(value=abs(x.value)))
return rv
if __name__ == '__main__':
rnd = Random(0)
raw_data = load_data_slowly(random_state=rnd)
results = process_data_slowly(raw_data)
print(f'{[*islice(results, 10)] = }')
Tools like fastapi allow you to “register routes” using decorator syntax.
from fastapi import FastAPI
from uvicorn import run
app = FastAPI()
@app.get('/status')
async def test():
return {'success': true}
if __name__ == '__main__':
run(app)
pandas allows you to use decorator syntax to extend the behavior available on
Series, DataFrame, and Index objects.
from collections.abc import Callable
from dataclasses import dataclass
from pandas import Index, Series, MultiIndex, date_range
from pandas.api.extensions import register_index_accessor
@register_index_accessor('_ext')
@dataclass
class _ext:
obj : Index
def addlevel(self, **levels):
levels = {k: v if not isinstance(v, Callable) else v(self.obj) for k, v in levels.items()}
new_obj = self.obj.copy(deep=False)
if not isinstance(new_obj, MultiIndex):
new_obj = MultiIndex.from_arrays([new_obj])
names = new_obj.names
new_obj.names = [None] * len(names)
return MultiIndex.from_arrays([
*(new_obj.get_level_values(idx) for idx in range(len(names))),
*levels.values(),
], names=[*names, *levels.keys()])
def updatelevel(self, **levels):
levels = {k: v if not isinstance(v, Callable) else v(self.obj) for k, v in levels.items()}
new_obj = self.obj.copy(deep=False)
if not isinstance(new_obj, MultiIndex):
new_obj = MultiIndex.from_arrays([new_obj])
names = new_obj.names
new_obj.names = [None] * len(names)
return MultiIndex.from_arrays([
levels[n] if n in levels else new_obj.get_level_values(idx) for idx, n in enumerate(names)
], names=names)
s = Series(
data=0,
index=MultiIndex.from_product([
[*'abc'],
date_range('2020-01-01', periods=3),
], names='entity date'.split()),
)
print(
s.head(3),
s.pipe(lambda s: s.set_axis(s.index.swaplevel())).head(3),
s.pipe(lambda s: s.set_axis(s.index.droplevel('date'))).head(3),
s.pipe(lambda s: s.set_axis(s.index._ext.addlevel(x=[0] * len(s)))).head(3),
s.pipe(lambda s: s.set_axis(s.index._ext.updatelevel(entity=['...'] * len(s)))).head(3),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
print("Let's take a look!")
Python allows you to write generators and generators coroutines which represent single computations that have been decomposed into parts.
This decomposition allows implement lazy computations and dramatically simplify the APIs that we write. Anytime we write an API that includes modalities specifying where a (potentially infinite) computation may end, we should write these as a generator.
Here is an implementation mimicking scipy.optimize.newton.
Where we would use scipy.optimize.newton as follows…
from scipy.optimize import newton
f = lambda x: (x + 4) * (x - 5) + 1 # x² - x - 20
fprime = lambda x: 2 * x - 1
print(f'{newton(f, 0, fprime, rtol=1e-6) = :.2f}')
… we might naïvely implement it as follows…
from math import isclose
def newton(f, x0, fprime, tol=1.48e-08, maxiter=50, rtol=0.0, imp=None, rimp=None):
prev_x = None
x = x0
for _ in range(maxiter):
if isclose(f(x), 0, abs_tol=tol): break
if isclose(f(x), 0, rel_tol=rtol): break
if prev_x is not None and imp is not None:
if isclose(f(prev_x), f(x), abs_tol=imp):
break
if prev_x is not None and rimp is not None:
if isclose(f(prev_x), f(x), rel_tol=rimp):
break
prev_x = x
x -= f(x) / fprime(x)
return x
f = lambda x: (x + 4) * (x - 5) # x² - x - 20
fprime = lambda x: 2 * x - 1
print(f'{newton(f, 0, fprime, rtol=1e-6) = :.2f}')
… but the generator formulation provides us with a superior option:
def newton(f, x, fprime):
while True:
x -= f(x) / fprime(x)
yield x
f = lambda x: (x + 4) * (x - 5) + 1 # x² - x - 20
fprime = lambda x: 2 * x - 1
from collections import deque
from itertools import islice, takewhile
print(f'{deque(islice(newton(f, 0, fprime), 50), maxlen=1)[0] = :.2f}')
In fact, generator coroutines provide us with an extremely powerful way to represent arbitrarily complex state machines.
Where the canonical modelling might look like…
from collections import defaultdict, deque
from collections.abc import Callable
from dataclasses import dataclass
from random import Random
@dataclass(frozen=True, unsafe_hash=True)
class State:
name : str
@dataclass(frozen=True, unsafe_hash=True)
class Transition:
src : State
dst : State
pred : Callable
def __call__(self, *args, **kwargs):
return self.pred(*args, **kwargs)
# a
# / \
# b c
# | | \
# d e f - g → a
# \ | /
# → a
states = {st.name: st for st in {
State(name='a'),
State(name='b'),
State(name='c'),
State(name='d'),
State(name='e'),
State(name='f'),
}}
transitions = defaultdict(set)
for tr in {
Transition(src=states['a'], dst=states['b'], pred=lambda x: x >= 0),
Transition(src=states['a'], dst=states['c'], pred=lambda x: x < 0),
Transition(src=states['b'], dst=states['d'], pred=lambda _: True),
Transition(src=states['c'], dst=states['e'], pred=lambda x: x >= 0),
Transition(src=states['c'], dst=states['f'], pred=lambda x: x < 0),
Transition(src=states['d'], dst=states['a'], pred=lambda _: True, num_visits=..., num_visits_nodes=...),
Transition(src=states['e'], dst=states['a'], pred=lambda _: True),
Transition(src=states['f'], dst=states['a'], pred=lambda _: True),
}:
transitions[tr.src].add(tr)
def execute(init_state, transitions, inputs):
state = init_state
while inputs:
inp = inputs.popleft()
for tr in transitions[state]:
if tr(inp):
print(f'{state} → {inp:>3} → {tr.dst}')
state = tr.dst
break
if __name__ == '__main__':
rnd = Random(0)
inputs = deque([rnd.randint(-10, +10) for _ in range(10)])
execute(states['a'], transitions, inputs)
… a generator coroutine modelling might look like…
from collections import deque
from dataclasses import dataclass
from random import Random
# a
# / \
# b c
# | | \
# d e f
# \ | /
# → a
@dataclass(frozen=True, unsafe_hash=True)
class State:
name : str
def machine():
num_visits_e = 0
while True:
if (inp := (yield State('a'))) >= 0:
inp = yield State('b')
inp = yield State('d')
else:
if (inp := (yield State('c'))) >= 0:
num_visits_e += 1
inp = yield State('e')
else:
inp = yield State('f')
if num_visits_e > ...:
inp = yield State('g')
if __name__ == '__main__':
rnd = Random(0)
inputs = deque([rnd.randint(-10, +10) for _ in range(10)])
m = machine()
src = next(m)
for inp in inputs:
dst = m.send(inp)
print(f'{src} → {inp:>3} → {dst}')
src = dst
print("Let's take a look!")
PEP-484 introduced type hinting to Python, which is a way that we can document the types that our functions take and return.
The below code will fail at runtime:
from time import sleep
def add(x, y):
return x + y
if __name__ == '__main__':
sleep(10_000)
add('abc', 123)
With type hints, we can ensure that it fails at checking time:
def add(x : int, y : int) -> int:
return x + y
if __name__ == '__main__':
add(123, 456)
Type-hint can help us find problems that would otherwise be fairy difficult to spot:
from random import Random
def f(*, random_state=None):
rnd = Random() if random_state is None else random_state
return rnd.choice([True, False])
if __name__ == '__main__':
rnd = Random(0)
if f():
...
Of course, for functions that work on pandas.DataFrame and pandas.Series objects,
these may not be that useful. However, they may still be useful for documentation
purposes.
from pandas import Series, MultiIndex, Timestamp
def execute(volumes : Series | list[Series], prices : Series) -> list[Series]:
volumes = [volumes] if isinstance(volumes, Series) else volumes
traded_prices = [
prices.loc[
MultiIndex.from_arrays([
vol.index.get_level_values('date').floor('min'),
vol.index.get_level_values('asset'),
])
].pipe(lambda df: where(volumes > 0, df['buy'], df['sell']))
for vol in volumes
]
return [
(px * -vol)
.pipe(lambda s: s.set_axis(s.index._ext.updatelevel(asset=['USD'] * len(volumes))))
for px, vol in zip(volumes, traded_prices)
]
def liquidate(volumes : Series | list[Series], prices : Series, *, date : Timestamp = None) -> list[Series]:
dates = [
[prices.index.get_level_values('date').max() if date is None else date] * len(vol)
for vol in volumes
]
trades = [
(-vol).pipe(lambda s: s.set_axis(s.index._ext.updatelevel(date=dt)))
for dt, vol in zip(dates, volumes)
]
return Liquidation(trades=trades, cashflows=[execute(tr, prices) for t in trades])
from typing import Literal
def f(mode : Literal['dryrun', 'prod']):
pass
print("Let's take a look!")
There is quite a bit of other syntax that you should know.
PEP-3102 added keyword-only arguments to Python 3.0, and PEP-570 added positional-only arguments to Python 3.8.
For example, f can be called with any mix of positional and keyword arguments.
def f(a, b, c):
...
print(
f'{f( 123, 456, 789) = }',
f'{f(a=123, b=456, c=789) = }',
f'{f( 123, b=456, c=789) = }',
sep='\n',
)
However, g requires that b and c be passed by keyword…
def g(a, *, b, c):
...
print(
# f'{g( 123, 456, 789) = }', # INVALID
f'{g(a=123, b=456, c=789) = }',
# f'{g( 123, b=456, c=789) = }', # INVALID
sep='\n',
)
… and h requires that a be passed positionally.
def h(a, /, b, c):
...
print(
f'{h( 123, 456, 789) = }',
# f'{h(a=123, b=456, c=789) = }', # INVALID
f'{h( 123, b=456, c=789) = }', # INVALID
sep='\n',
)
Keyword-only arguments can be used to make APIs easier to read.
from pandas import read_csv
help(read_csv)
Positional-only arguments can be used to eliminate potential ambiguities in APIs.
def do_n_times(f, n, /, *args, **kwargs):
for _ in range(n):
yield f(*args, **kwargs)
def f(x, y, n):
return (x + y) / n
if __name__ == '__main__':
for rv in do_n_times(f, 3, x=123, y=456, n=789):
print(f'{rv = :.2f}')
PEP-448 added additional unpacking generalizations which makes for nicer literal syntax.
For example, if we want to combine two pieces of Python data…
xs = [1, 2, 3]
ys = {4, 5, 6}
print(
# f'{ xs + ys = }',
f'{(*xs, *ys) = }',
sep='\n',
)
Unfortunately, this means we have far too many way to merge dictionaries…
from itertools import chain
from collections import ChainMap
d0 = {'a': 1, 'b': 20, }
d1 = { 'b': 22, 'c': 333}
print(
f'{dict(chain(d0.items(), d1.items())) = }',
f'{({**d0, **d1}) = }',
f'{d0 | d1 = }',
f'{({**ChainMap(d1, d0)}) = }',
sep='\n',
)
PEP-634 added a match statement to Python 3.10 with some advanced destructuring features.
from collections import namedtuple
T0 = namedtuple('T0', 'a b')
T1 = namedtuple('T1', 'a b c')
if __name__ == '__main__':
values = [
T0(123, 456),
T1(123, 456, 789),
]
for val in values:
match val:
case T0(a=x, b=y) if y > 0:
print(f'{x = } {y = }')
case T1(a=x, b=y, c=z) if y < z:
print(f'{x = } {y = } {z = }')
This works nicely with PEP-484 type hinting.
from enum import Enum, auto
from numpy import ndarray
from typing import TypeVar
InterpolationMode = Enum('InterpolationMode', 'Linear Cubic Spline')
T = TypeVar('T')
def interpolate(xs : T, ys : T, target_xs : T, mode : InterpolationMode) -> T:
match mode:
case InterpolationMode.Linear:
...
return xs
case InterpolationMode.Cubic:
...
return xs
if __name__ == '__main__':
interpolate(..., ..., ..., mode=InterpolationMode.Spline)
print("Let's take a look!")
How do you debug your buggy code?
from collections import namedtuple
from datetime import datetime, timedelta
from math import log
from random import Random
from string import ascii_lowercase
class Datum(namedtuple('Datum', 'date name value')):
@classmethod
def from_random(cls, *, random_state=None):
rnd = Random() if random_state is None else random_state
return cls(
date=datetime(2020, 1, 1) + timedelta(days=rnd.randrange(90)),
name=''.join(rnd.choices(ascii_lowercase, k=4)),
value=rnd.randint(-100, +100),
)
def __call__(self):
return self._replace(value=log(abs(self.value)))
def process(data):
total = 0
for x in data:
total += x().value
return total
if __name__ == '__main__':
rnd = Random(0)
data = [Datum.from_random(random_state=rnd) for _ in range(1_000)]
print(f'{process(data) = }')
We can use pdb.post_mortem, which is very useful for catching an untrapped
Exception.
from collections import namedtuple
from datetime import datetime, timedelta
from math import log
from random import Random
from string import ascii_lowercase
class Datum(namedtuple('Datum', 'date name value')):
@classmethod
def from_random(cls, *, random_state=None):
rnd = Random() if random_state is None else random_state
return cls(
date=datetime(2020, 1, 1) + timedelta(days=rnd.randrange(90)),
name=''.join(rnd.choices(ascii_lowercase, k=4)),
value=rnd.randint(-100, +100),
)
def __call__(self):
return self._replace(value=log(abs(self.value)))
def process(data):
total = 0
for x in data:
total += x().value
return total
if __name__ == '__main__':
from pdb import post_mortem
import sys; sys.excepthook = lambda typ, val, tb: post_mortem(tb)
rnd = Random(0)
data = [Datum.from_random(random_state=rnd) for _ in range(1_000)]
print(f'{process(data) = }')
We can try adding print statements, but it’s easy for us to get overwhelmed
by the output.
from collections import namedtuple
from datetime import datetime, timedelta
from math import log
from random import Random
from string import ascii_lowercase
class Datum(namedtuple('Datum', 'date name value')):
@classmethod
def from_random(cls, *, random_state=None):
rnd = Random() if random_state is None else random_state
return cls(
date=datetime(2020, 1, 1) + timedelta(days=rnd.randrange(90)),
name=''.join(rnd.choices(ascii_lowercase, k=4)),
value=rnd.randint(-100, +100),
)
def __call__(self):
return self._replace(value=log(abs(self.value)))
def process(data):
total = 0
for x in data:
print(f'{x = }')
print(f'{x().value = }')
total += x().value
return total
if __name__ == '__main__':
rnd = Random(0)
data = [Datum.from_random(random_state=rnd) for _ in range(1_000)]
print(f'{process(data) = }')
We can use the breakpoint builtin added in Python 3.7, and we can easily
add conditional guards to it.
from collections import namedtuple
from datetime import datetime, timedelta
from math import log
from random import Random
from string import ascii_lowercase
class Datum(namedtuple('Datum', 'date name value')):
@classmethod
def from_random(cls, *, random_state=None):
rnd = Random() if random_state is None else random_state
return cls(
date=datetime(2020, 1, 1) + timedelta(days=rnd.randrange(90)),
name=''.join(rnd.choices(ascii_lowercase, k=4)),
value=rnd.randint(-100, +100),
)
def __call__(self):
return self._replace(value=log(abs(self.value)))
def process(data):
total = 0
for x in data:
try:
total += x().value
except Exception:
breakpoint()
return total
if __name__ == '__main__':
rnd = Random(0)
data = [Datum.from_random(random_state=rnd) for _ in range(1_000)]
print(f'{process(data) = }')
By default, breakpoint calls pdb.set_trace but we can customize this.
from atexit import register
from collections import namedtuple
from datetime import datetime, timedelta
from inspect import currentframe, getouterframes
from math import log
from random import Random
from string import ascii_lowercase
from pandas import Series
class Datum(namedtuple('Datum', 'date name value')):
@classmethod
def from_random(cls, *, random_state=None):
rnd = Random() if random_state is None else random_state
return cls(
date=datetime(2020, 1, 1) + timedelta(days=rnd.randrange(90)),
name=''.join(rnd.choices(ascii_lowercase, k=4)),
value=rnd.randint(-100, +100),
)
def __call__(self):
return self._replace(value=log(abs(self.value)))
def process(data):
total = 0
for x in data:
breakpoint()
total += x().value
return total
if __name__ == '__main__':
import sys
trace = []
@lambda f: setattr(sys, 'breakpointhook', f)
def breakpointhook():
trace.append(getouterframes(currentframe())[1].frame.f_locals["x"])
@register
def atexit():
print(Series([tr.value for tr in trace]))
rnd = Random(0)
data = [Datum.from_random(random_state=rnd) for _ in range(1_000)]
print(f'{process(data) = }')
print("Let's take a look!")
We know that we ought to write tests for our code.
def split_bill(total, num_guests):
total *= 100
charges = [total// num_guests] * num_guests
charges[0] += total - sum(charges)
return [x / 100 for x in charges]
if __name__ == '__main__':
print(f'{split_bill(123.45, 4) = }')
We can write some trivial “oracular tests” as follows:
def split_bill(total, num_guests):
total *= 100
charges = [total// num_guests] * num_guests
charges[0] += total - sum(charges)
return [x / 100 for x in charges]
def test_split_bill():
assert split_bill(123.45, 3) == [41.15, 41.15, 41.15]
assert split_bill(123.45, 4) == [30.87, 30.86, 30.86, 30.86]
We can use PyTest fixtures to randomize some of the inputs, which will improve coverage:
from pytest import fixture
from random import randrange, uniform
def split_bill(total, num_guests):
total *= 100
charges = [total// num_guests] * num_guests
charges[0] += total - sum(charges)
return [x / 100 for x in charges]
@fixture
def total():
return round(uniform(10, 1_000), 2)
@fixture
def num_guests():
return randrange(1, 10)
def test_split_bill(total, num_guests):
assert split_bill(123.45, 3) == [41.15, 41.15, 41.15]
assert split_bill(123.45, 4) == [30.87, 30.86, 30.86, 30.86]
assert sum(split_bill(total, num_guests)) == total
But we should instead consider hypothesis, which does a directed search through the space of inputs and encourages us to discover and test properties of our code.
from hypothesis import given
from hypothesis.strategies import floats, integers
def split_bill(total, num_guests):
total *= 100
charges = [total// num_guests] * num_guests
charges[0] += total - sum(charges)
return [x / 100 for x in charges]
@given(
total=floats(min_value=0.01, allow_infinity=False, allow_nan=False),
num_guests=integers(min_value=1, max_value=1_000),
)
def test_split_bill(total, num_guests):
assert split_bill(123.45, 3) == [41.15, 41.15, 41.15]
assert split_bill(123.45, 4) == [30.87, 30.86, 30.86, 30.86]
assert sum(split_bill(total, num_guests)) == total
Good testing leads us to meaningful, material improvements in our code.
from hypothesis import given, note
from hypothesis.strategies import floats, integers
def split_bill(total, num_guests):
charges = [total // num_guests] * num_guests
charges[0] += total - sum(charges)
return [x for x in charges]
@given(
total=integers(min_value=1_00, max_value=100_000),
num_guests=integers(min_value=2, max_value=1_000),
)
def test_split_bill(total, num_guests):
assert split_bill(123_45, 3) == [41_15, 41_15, 41_15]
assert split_bill(123_45, 4) == [30_87, 30_86, 30_86, 30_86]
assert split_bill(total, 1) == [total]
assert split_bill(total, total) == [1] * total
assert sum(split_bill(total, num_guests), 0) == total
lhs = split_bill(total, num_guests - 1)
rhs = split_bill(total, num_guests + 1)
note(f'{lhs = }')
note(f'{rhs = }')
assert max(lhs) > min(rhs)