ts-python

Seminar IV: “Debugging in Python”

“The doctor said I wouldn’t have to many nosebleeds if I kept my finger out of there.”

— ‘I Love Lisa’ (S04E15)

Abstract

How do you find bugs deep in your code? How can you use common tools in Python to ease this process? And, once you find a bug, what can you do to surgically investigate and fix it?

Do you…

Then join us for a session on debugging Python code!

Dynamic languages like Python allow you to rapidly prototype code without a compiler getting in your way. However, this means that bugs can sometimes slip into our code if we’re not careful.

In this episode, we’ll discuss the various ways you can debug your Python programs. We’ll discuss various techniques for instrumenting live code to track down bugs in large programmes, as well as how to best use tools such as pdb, and ipdb as well as common mechanisms within the sys module.

Keywords: debugging, pdb, Python debugger, ipdb, breakpoint

Notes

“What are we even doing here?”

“A bug is when you think the code works differently than it actually does.”

Approach:

Consider:

Question: “Did something go wrong?”

print("Let's take a look!")
from pandas import read_csv

read_csv('does-not-exist.csv')
from pandas import Series, date_range
from numpy import log, zeros, ones

print(
    # Series(zeros(3)).apply(log),
    (
        Series(ones(3)) * Series(ones(3), index=date_range('2000-01-01', periods=3))
    ).sum(),
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

The debugging process is an iterative cycle of:

Guidance: write code to support easy isolation.

from pathlib import Path
from pandas import read_csv, concat
from dateutil.parser import parse

def debug(obj):
    print(obj)
    return obj

df = concat(
    read_csv(p)
        .assign(date=parse(p.stem))
        .set_index(['date', 'ticker'])
        .sort_index()
    for p in (Path('data') / 'by-date').iterdir()
    if p.suffix == '.csv'
)

print(df.head(3))
from pathlib import Path
from pandas import read_csv, concat
from dateutil.parser import parse

paths = {
    p
    for p in (Path('data') / 'by-date').iterdir()
    if p.suffix == '.csv'
}

raw_data = [
    read_csv(p)
        .assign(date=parse(p.stem))
        .set_index(['date', 'ticker'])
        .sort_index()
    for p in paths
]

df = concat(raw_data)

print(df.head(3))

Guidance: generators & generator expressions allow easy decomposition.

from csv import reader

raw_data = '''
abcd,123
DEFG,123
 defg ,456
#SKIP,789
'''.strip().splitlines()

data = []
for name, value in reader(raw_data):
    name = name.strip().casefold()
    value = int(value)
    if not name.startswith('#'):
        data.append((name, value))

print(f'{data = }')
from csv import reader

raw_data = '''
abcd,123
DEFG,123
 defg ,456
#SKIP,789
'''.strip().splitlines()

data = reader(raw_data)
data = ((name.strip(),    value) for name, value in data)
data = ((name.casefold(), value) for name, value in data)
data = ((name,            value) for name, value in data if not name.startswith('#'))

print(f'{[*data] = }')

Guidance: superficialise areas that are likely to involve errors (e.g., input/output.)

from pandas import read_csv

def load_data_from_csv(filename):
    return read_csv(filename)

def load_data(filename):
    filename = 'does-not-exist.csv'
    return load_data_from_csv(filename)

def run_report():
    load_data('does-exist.csv')
    ...
    ...
    ...

if __name__ == '__main__':
    run_report()
from pandas import read_csv

def run_report(raw_data):
    ...
    ...
    ...

if __name__ == '__main__':
    filename = 'does-not-exist.csv'
    raw_data = read_csv(filename)
    run_report(raw_data)

Guidance: actually read exception tracebacks.

def f():
    return g()

def g():
    return h()

def h():
    try:
        return 0 / 0
    except Exception as e:
        raise ValueError('zero is not divisible') from e

f()

Guidance: only handle errors when you can do something meaningful about them.

en_zh = {
    'one':   '壹',
    'two':   '貳',
    'three': '參',
    'four':  '肆',
}

en_word = 'five'
zh_word = en_zh.get(en_word, '')

print(f'To write {en_word!r}, you write {zh_word!r}.')
print(f'To write {en_word!r} emphatically, you write {zh_word.center(7, "=")!r}!')
groceries = {
    'eggs':     12,
    'tomatoes':  3,
}

for item in {'eggs', 'tomatoes', 'bagels'}:
    print(f'You bought {groceries.get(item, 0)} {item}')
from collections import defaultdict

groceries = defaultdict(int, {
    'eggs':     12,
    'tomatoes':  3,
})

for item in {'eggs', 'tomatoes', 'bagels'}:
    print(f'You bought {groceries[item]} {item}')

Try to avoid except Exception!

from random import choice

def f():
    return g()

def g():
    return h()

def h():
    raise choice([ValueError, KeyError, TypeError])()

try:
    f()
except Exception:
    pass

Do not use “bare” except!

from pandas import read_csv

try:
    df = read_csv('does-not-exixt')
except:
    pass
try:
    raise SystemError()
except:
    pass
try:
    raise MemoryError()
except:
    pass
try:
    raise ModuleNotFoundError()
except:
    pass

Question: “Can I make this reproducibly break?”

print("Let's take a look!")

Guidance: when investigating errors, remove sources of non-determinism.

from numpy.random import randint

print(f'{1 / randint(-10, +10) = }')
from numpy.random import randint
from numpy.random import seed

seed(0)

print(f'{randint(-10, +10) = }')
from numpy.random import default_rng

rng = default_rng(0)

print(f'{rng.integers(-10, +10) = }')
from random import randint

print(f'{randint(-10, +10) = }')
from random import randint, seed

seed(0)

print(f'{randint(-10, +10) = }')
from random import Random

rnd = Random(0)

print(f'{rnd.randint(-10, +10) = }')
from pandas import Series
from numpy.random import default_rng

rng = default_rng(0)

print(
    Series([1, 2, 3, 4]).sample(3, random_state=rng)
)
x = {'a', 'b', 'c'}

print(f'{x = }')
from subprocess import run

run([
    'python', '-c',
    'print({*"abcde"})',
])
from subprocess import run

run([
    'python', '-c',
    'print({*"abcde"})',
], env={'PYTHONHASHSEED': '0'})

For Jupyter, set in kernel.json.

{
  "argv": [
    "python",
    "-m",
    "ipykernel_launcher",
    "-f",
    "{connection_file}"
  ],
  "display_name": "Python 3",
  "language": "python",
  "env": {
    "PYTHONHASHSEED": "0"
  }
}

Guidance: when investigating errors, remove external dependencies.

from pathlib import Path
from pandas import read_pickle

df = read_pickle(Path('data') / 'data.pkl')

print(df.head(3))
from requests import get
from pandas import read_json, Timestamp

url = 'http://data-source/data.json'

df = read_json(
    get(url).text,
).pipe(lambda df: df
    .set_axis(df.index.map(lambda x: eval(x, {'Timestamp': Timestamp})))
)

print(df)
from requests import get
from pandas import read_json, Timestamp
from pathlib import Path

url = 'http://data-source/data.json'

cache_dir = Path('/tmp/cache')
cache_dir.mkdir(parents=True, exist_ok=True)
if (cache_file := cache_dir / 'data.pkl').exists():
    from pandas import read_pickle
    df = read_pickle(cache_file)
else:
    df = read_json(
        get(url).text,
    ).pipe(lambda df: df
        .set_axis(df.index.map(lambda x: eval(x, {'Timestamp': Timestamp})))
    )
    df.to_pickle(cache_file)

print(df)
from functools import wraps
from pathlib import Path
from pickle import load, dump

def cached(filename=None):
    def dec(f):
        nonlocal filename
        if filename is None:
            filename = f'{f.__name__}.pkl'
        @wraps(f)
        def inner(*args, **kwargs):
            cache_dir = Path('/tmp/cache')
            cache_dir.mkdir(parents=True, exist_ok=True)
            if (cache_file := cache_dir / filename).exists():
                with open(cache_file, 'rb') as cf:
                    cache = load(cf)
            else:
                cache = {}
            key = args, frozenset(kwargs.items())
            if key not in cache:
                cache[key] = f(*args, **kwargs)
            with open(cache_file, 'wb') as cf:
                dump(cache, cf)
            return cache[key]
        return inner
    return dec

@cached()
def f(a, b, c):
    ...

print(f'{f(1, 2, 3) = }')

Guidance: be careful with float operations.

from pytest import main, skip
from hypothesis import given
from hypothesis.strategies import integers, floats

@given(
    a=integers(),
    b=integers(),
    c=integers(),
)
def test_int_math(a, b, c):
    assert a + b == b + a
    assert (a + b) + c == a + (b + c)

@given(
    a=floats(allow_nan=False, allow_infinity=False),
    b=floats(allow_nan=False, allow_infinity=False),
    c=floats(allow_nan=False, allow_infinity=False),
)
def test_float_math(a, b, c):
    assert a + b == b + a
    assert (a + b) + c == a + (b + c)

main(['-q', __file__])

Guidance: use docker, clean venv, &c. for system-level isolation.

Question: “How do I find the problem?”

print("Let's take a look!")

Guidance: print-style debugging is not the worst starting point.

def f(x):
    return g(x - 1)

def g(x):
    return h(x - 1)

def h(x):
    return 1 / x

for x in range(10):
    f(x)
def f(x):
    return g(x - 1)

def g(x):
    return h(x - 1)

def h(x):
    return 1 / x

for x in range(10):
    # print(f'{x}')
    y = x
    print(f'{x = }')
    # print(f'{y = }')
    f(x)

Guidance: the inspect module can improve simple print style debugging.

from inspect import currentframe

def f(x):
    return g(x - 1)

def g(x):
    return h(x - 1)

def h(x):
    fr = currentframe()
    print(f'{({k: v for k, v in fr.f_locals.items() if k not in {"fr"}})}')
    return 1 / x

for x in range(10):
    f(x)
from inspect import currentframe

def f(x):
    return g(x - 1)

def g(x):
    print(f'{currentframe().f_lineno = } {currentframe().f_locals = }')
    print(f'{currentframe().f_lineno = } {currentframe().f_locals = }')
    print(f'{currentframe().f_lineno = } {currentframe().f_locals = }')
    print(f'{currentframe().f_lineno = } {currentframe().f_locals = }')
    return h(x - 1)

def h(x):
    return 1 / x

for x in range(10):
    f(x)
from contextlib import contextmanager
from inspect import currentframe, getouterframes

@contextmanager
def debug():
    fr = getouterframes(currentframe())[2]
    print(f'{fr.lineno = }')
    print(f'Before: {fr.frame.f_locals = }')
    yield
    print(f'After:  {fr.frame.f_locals = }')

def f(x):
    with debug():
        return g(x - 1)

def g(x):
    with debug():
        x -= 1
        return h(x - 1)

def h(x):
    return 1 / x

for x in range(10):
    f(x)

Guidance: use pdb or ipdb.

from pdb import post_mortem

def f(x):
    return g(x - 1)

def g(x):
    return h(x - 1)

def h(x):
    return 1 / x

if __name__ == '__main__':
    try:
        for x in range(10):
            f(x)
    except Exception:
        post_mortem()
from pdb import set_trace

def f(x):
    return g(x - 1)

def g(x):
    return h(x - 1)

def h(x):
    set_trace()
    return 1 / x

if __name__ == '__main__':
    for x in range(10):
        f(x)
from pdb import set_trace

def f(x):
    return g(x - 1)

def g(x):
    return h(x - 1)

def h(x):
    try:
        return 1 / x
    except Exception:
        set_trace()

if __name__ == '__main__':
    for x in range(10):
        f(x)
def f(x):
    return g(x - 1)

def g(x):
    return h(x - 1)

def h(x):
    breakpoint()
    return 1 / x

if __name__ == '__main__':
    for x in range(10):
        f(x)

Guidance: consider customising your sys.breakpointhook.

import sys

@lambda f: setattr(sys, 'breakpointhook', f)
def breakpointhook():
    print('breakpoint')

def f(x):
    breakpoint()
    return g(x - 1)

def g(x):
    breakpoint()
    return h(x - 1)

def h(x):
    breakpoint()
    return 1 / x

if __name__ == '__main__':
    for x in range(10):
        f(x)
from inspect import currentframe, getouterframes
import sys
from pdb import set_trace

@lambda f: setattr(sys, 'breakpointhook', f)
def breakpointhook():
    fr = getouterframes(currentframe())[1]
    print(f'{fr.frame.f_locals = }')
    if fr.frame.f_locals['x'] == 0:
        set_trace()

def f(x):
    breakpoint()
    return g(x - 1)

def g(x):
    breakpoint()
    return h(x - 1)

def h(x):
    breakpoint()
    return 1 / x

if __name__ == '__main__':
    for x in range(10):
        f(x)

Guidance: consider non-interactive tracing.

from inspect import currentframe, getouterframes
from atexit import register
from pathlib import Path
from datetime import datetime
from pickle import dump
import sys

trace = []
@lambda f: setattr(sys, 'breakpointhook', f)
def breakpointhook():
    fr = getouterframes(currentframe())[1]
    trace.append(fr.frame.f_locals)

@register
def write_trace():
    trace_dir = Path('/tmp/trace')
    trace_dir.mkdir(parents=True, exist_ok=True)
    with open(trace_dir / f'trace.{datetime.now()}.pkl', 'wb') as f:
        dump(trace, f)

def f(x):
    return g(x - 1)

def g(x):
    return h(x - 1)

def h(x):
    breakpoint()
    return 1 / x

if __name__ == '__main__':
    for x in range(10):
        f(x)

Guidance: use code.InteractiveConsole for simple interactive investigation (but try to avoid writing code that requires this…)

from code import InteractiveConsole

def f(x):
    state = ...
    return g(x - 1, state)

def g(x, state):
    return h(x - 1, state)

def h(x, state):
    InteractiveConsole(locals=locals()).interact('')
    return 1 / x

if __name__ == '__main__':
    for x in range(10):
        f(x)

Guidance: use other profiling mechanisms from the sys module.

from sys import settrace

@settrace
def tracer(frame, event, arg):
    def scope_tracer(frame, event, arg):
        if event == 'line':
            print(f'{frame.f_lineno = } {frame.f_locals = }')
    if not frame.f_code.co_filename.startswith('/usr'):
        return scope_tracer

def f(x):
    return g(x - 1)

def g(x):
    x -= 1
    return h(x - 1)

def h(x):
    return 1 / x

if __name__ == '__main__':
    for x in range(10):
        f(x)

Question: “How do I keep an eye on this?”

print("Let's take a look!")
from sys import stderr

def f(x):
    print(f'{x = }', file=stderr)
    return g(x - 1)

def g(x):
    print(f'{x = }', file=stderr)
    x -= 1
    return h(x - 1)

def h(x):
    print(f'{x = }', file=stderr)
    return 1 / x

if __name__ == '__main__':
    f(10)
from inspect import currentframe, getouterframes
import sys

def breakpointhook():
    fr = getouterframes(currentframe())[1]
    print(f'{fr.frame.f_locals = }')
# sys.breakpointhook = breakpointhook
sys.breakpointhook = lambda: None

def f(x):
    breakpoint()
    return g(x - 1)

def g(x):
    breakpoint()
    x -= 1
    return h(x - 1)

def h(x):
    breakpoint()
    return 1 / x

if __name__ == '__main__':
    for x in range(5, 10):
        f(x)
from logging import getLogger, basicConfig, INFO, DEBUG

logger = getLogger(__name__)
# basicConfig(level=DEBUG)

def f(x):
    logger.debug('x: %d', x)
    return g(x - 1)

def g(x):
    logger.debug('x: %d', x)
    x -= 1
    return h(x - 1)

def h(x):
    logger.debug('x: %d', x)
    return 1 / x

if __name__ == '__main__':
    for x in range(5, 10):
        f(x)
def f():
    # assert True
    # assert False
    assert x

from dis import dis
dis(f)
from pandas import read_csv
from pathlib import Path

def run_report(df):
    return df[['volume', 'price']].product(axis='columns').groupby('ticker').sum()

if __name__ == '__main__':
    df = (
        read_csv(Path('data') / 'data.csv', index_col=['date', 'ticker'], parse_dates=['date'])
        .sort_index()
    )
    print(
        run_report(df).head(3)
    )
from pandas import read_csv
from pathlib import Path

def run_report(df):
    if 'price' not in df.columns:
        raise ValueError('must have column price')
    return df[['volume', 'price']].product(axis='columns').groupby('ticker').sum()

if __name__ == '__main__':
    df = (
        read_csv(Path('data') / 'data.csv', index_col=['date', 'ticker'], parse_dates=['date'])
        .sort_index()
    )
    print(
        run_report(df).head(3)
    )
from pandas import read_csv
from pathlib import Path

def run_report(df):
    assert 'price' in df.columns
    return df[['volume', 'price']].product(axis='columns').groupby('ticker').sum()

if __name__ == '__main__':
    df = (
        read_csv(Path('data') / 'data.csv', index_col=['date', 'ticker'], parse_dates=['date'])
        .sort_index()
    )
    print(
        run_report(df).head(3)
    )
def f():
    assert x

from dis import dis
dis(f)
def f(x):
    return g(x - 1)

def g(x):
    x -= 1
    assert x > 1
    return h(x - 1)

def h(x):
    return 1 / x

if __name__ == '__main__':
    for x in range(5, 10):
        f(x)
print(f'{__debug__ = }')
def f():
    if __debug__:
        x.y
    assert x

from dis import dis
dis(f)
from itertools import pairwise

def nondecreasing(xs):
    return all(x <= y for x, y in pairwise(xs))

def f(seq):
    assert nondecreasing(seq)
    ...

    ...

if __name__ == '__main__':
    data = [1, 2, 3, 4, 5]
    f(data)
from itertools import pairwise, islice, tee

def nondecreasing(xs):
    return all(x <= y for x, y in pairwise(xs))

def f(seq):
    if __debug__:
        seq, seq_copy = tee(seq, 2)
        seq_copy = islice(seq_copy, 100)
        assert nondecreasing(seq_copy)
    return sum(seq)

if __name__ == '__main__':
    data = [1, 2, 3, 4, 5]
    f(data)
def f():
    breakpoint()

from dis import dis
dis(f)