“The doctor said I wouldn’t have to many nosebleeds if I kept my finger out of there.”
— ‘I Love Lisa’ (S04E15)
How do you find bugs deep in your code? How can you use common tools in Python to ease this process? And, once you find a bug, what can you do to surgically investigate and fix it?
Do you…
print when debugging?Then join us for a session on debugging Python code!
Dynamic languages like Python allow you to rapidly prototype code without a compiler getting in your way. However, this means that bugs can sometimes slip into our code if we’re not careful.
In this episode, we’ll discuss the various ways you can debug your Python
programs. We’ll discuss various techniques for instrumenting live code to track
down bugs in large programmes, as well as how to best use tools such as pdb,
and ipdb as well as common mechanisms within the sys module.
Keywords: debugging, pdb, Python debugger, ipdb, breakpoint
“A bug is when you think the code works differently than it actually does.”
Approach:
Consider:
print("Let's take a look!")
from pandas import read_csv
read_csv('does-not-exist.csv')
from pandas import Series, date_range
from numpy import log, zeros, ones
print(
# Series(zeros(3)).apply(log),
(
Series(ones(3)) * Series(ones(3), index=date_range('2000-01-01', periods=3))
).sum(),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
The debugging process is an iterative cycle of:
Guidance: write code to support easy isolation.
from pathlib import Path
from pandas import read_csv, concat
from dateutil.parser import parse
def debug(obj):
print(obj)
return obj
df = concat(
read_csv(p)
.assign(date=parse(p.stem))
.set_index(['date', 'ticker'])
.sort_index()
for p in (Path('data') / 'by-date').iterdir()
if p.suffix == '.csv'
)
print(df.head(3))
from pathlib import Path
from pandas import read_csv, concat
from dateutil.parser import parse
paths = {
p
for p in (Path('data') / 'by-date').iterdir()
if p.suffix == '.csv'
}
raw_data = [
read_csv(p)
.assign(date=parse(p.stem))
.set_index(['date', 'ticker'])
.sort_index()
for p in paths
]
df = concat(raw_data)
print(df.head(3))
Guidance: generators & generator expressions allow easy decomposition.
from csv import reader
raw_data = '''
abcd,123
DEFG,123
defg ,456
#SKIP,789
'''.strip().splitlines()
data = []
for name, value in reader(raw_data):
name = name.strip().casefold()
value = int(value)
if not name.startswith('#'):
data.append((name, value))
print(f'{data = }')
from csv import reader
raw_data = '''
abcd,123
DEFG,123
defg ,456
#SKIP,789
'''.strip().splitlines()
data = reader(raw_data)
data = ((name.strip(), value) for name, value in data)
data = ((name.casefold(), value) for name, value in data)
data = ((name, value) for name, value in data if not name.startswith('#'))
print(f'{[*data] = }')
Guidance: superficialise areas that are likely to involve errors (e.g., input/output.)
from pandas import read_csv
def load_data_from_csv(filename):
return read_csv(filename)
def load_data(filename):
filename = 'does-not-exist.csv'
return load_data_from_csv(filename)
def run_report():
load_data('does-exist.csv')
...
...
...
if __name__ == '__main__':
run_report()
from pandas import read_csv
def run_report(raw_data):
...
...
...
if __name__ == '__main__':
filename = 'does-not-exist.csv'
raw_data = read_csv(filename)
run_report(raw_data)
Guidance: actually read exception tracebacks.
def f():
return g()
def g():
return h()
def h():
try:
return 0 / 0
except Exception as e:
raise ValueError('zero is not divisible') from e
f()
Guidance: only handle errors when you can do something meaningful about them.
en_zh = {
'one': '壹',
'two': '貳',
'three': '參',
'four': '肆',
}
en_word = 'five'
zh_word = en_zh.get(en_word, '')
print(f'To write {en_word!r}, you write {zh_word!r}.')
print(f'To write {en_word!r} emphatically, you write {zh_word.center(7, "=")!r}!')
groceries = {
'eggs': 12,
'tomatoes': 3,
}
for item in {'eggs', 'tomatoes', 'bagels'}:
print(f'You bought {groceries.get(item, 0)} {item}')
from collections import defaultdict
groceries = defaultdict(int, {
'eggs': 12,
'tomatoes': 3,
})
for item in {'eggs', 'tomatoes', 'bagels'}:
print(f'You bought {groceries[item]} {item}')
Try to avoid except Exception!
from random import choice
def f():
return g()
def g():
return h()
def h():
raise choice([ValueError, KeyError, TypeError])()
try:
f()
except Exception:
pass
Do not use “bare” except!
from pandas import read_csv
try:
df = read_csv('does-not-exixt')
except:
pass
try:
raise SystemError()
except:
pass
try:
raise MemoryError()
except:
pass
try:
raise ModuleNotFoundError()
except:
pass
print("Let's take a look!")
Guidance: when investigating errors, remove sources of non-determinism.
from numpy.random import randint
print(f'{1 / randint(-10, +10) = }')
from numpy.random import randint
from numpy.random import seed
seed(0)
print(f'{randint(-10, +10) = }')
from numpy.random import default_rng
rng = default_rng(0)
print(f'{rng.integers(-10, +10) = }')
from random import randint
print(f'{randint(-10, +10) = }')
from random import randint, seed
seed(0)
print(f'{randint(-10, +10) = }')
from random import Random
rnd = Random(0)
print(f'{rnd.randint(-10, +10) = }')
from pandas import Series
from numpy.random import default_rng
rng = default_rng(0)
print(
Series([1, 2, 3, 4]).sample(3, random_state=rng)
)
x = {'a', 'b', 'c'}
print(f'{x = }')
from subprocess import run
run([
'python', '-c',
'print({*"abcde"})',
])
from subprocess import run
run([
'python', '-c',
'print({*"abcde"})',
], env={'PYTHONHASHSEED': '0'})
For Jupyter, set in kernel.json.
{
"argv": [
"python",
"-m",
"ipykernel_launcher",
"-f",
"{connection_file}"
],
"display_name": "Python 3",
"language": "python",
"env": {
"PYTHONHASHSEED": "0"
}
}
Guidance: when investigating errors, remove external dependencies.
from pathlib import Path
from pandas import read_pickle
df = read_pickle(Path('data') / 'data.pkl')
print(df.head(3))
from requests import get
from pandas import read_json, Timestamp
url = 'http://data-source/data.json'
df = read_json(
get(url).text,
).pipe(lambda df: df
.set_axis(df.index.map(lambda x: eval(x, {'Timestamp': Timestamp})))
)
print(df)
from requests import get
from pandas import read_json, Timestamp
from pathlib import Path
url = 'http://data-source/data.json'
cache_dir = Path('/tmp/cache')
cache_dir.mkdir(parents=True, exist_ok=True)
if (cache_file := cache_dir / 'data.pkl').exists():
from pandas import read_pickle
df = read_pickle(cache_file)
else:
df = read_json(
get(url).text,
).pipe(lambda df: df
.set_axis(df.index.map(lambda x: eval(x, {'Timestamp': Timestamp})))
)
df.to_pickle(cache_file)
print(df)
from functools import wraps
from pathlib import Path
from pickle import load, dump
def cached(filename=None):
def dec(f):
nonlocal filename
if filename is None:
filename = f'{f.__name__}.pkl'
@wraps(f)
def inner(*args, **kwargs):
cache_dir = Path('/tmp/cache')
cache_dir.mkdir(parents=True, exist_ok=True)
if (cache_file := cache_dir / filename).exists():
with open(cache_file, 'rb') as cf:
cache = load(cf)
else:
cache = {}
key = args, frozenset(kwargs.items())
if key not in cache:
cache[key] = f(*args, **kwargs)
with open(cache_file, 'wb') as cf:
dump(cache, cf)
return cache[key]
return inner
return dec
@cached()
def f(a, b, c):
...
print(f'{f(1, 2, 3) = }')
Guidance: be careful with float operations.
from pytest import main, skip
from hypothesis import given
from hypothesis.strategies import integers, floats
@given(
a=integers(),
b=integers(),
c=integers(),
)
def test_int_math(a, b, c):
assert a + b == b + a
assert (a + b) + c == a + (b + c)
@given(
a=floats(allow_nan=False, allow_infinity=False),
b=floats(allow_nan=False, allow_infinity=False),
c=floats(allow_nan=False, allow_infinity=False),
)
def test_float_math(a, b, c):
assert a + b == b + a
assert (a + b) + c == a + (b + c)
main(['-q', __file__])
Guidance: use docker, clean venv, &c. for system-level isolation.
print("Let's take a look!")
Guidance: print-style debugging is not the worst starting point.
def f(x):
return g(x - 1)
def g(x):
return h(x - 1)
def h(x):
return 1 / x
for x in range(10):
f(x)
def f(x):
return g(x - 1)
def g(x):
return h(x - 1)
def h(x):
return 1 / x
for x in range(10):
# print(f'{x}')
y = x
print(f'{x = }')
# print(f'{y = }')
f(x)
Guidance: the inspect module can improve simple print style debugging.
from inspect import currentframe
def f(x):
return g(x - 1)
def g(x):
return h(x - 1)
def h(x):
fr = currentframe()
print(f'{({k: v for k, v in fr.f_locals.items() if k not in {"fr"}})}')
return 1 / x
for x in range(10):
f(x)
from inspect import currentframe
def f(x):
return g(x - 1)
def g(x):
print(f'{currentframe().f_lineno = } {currentframe().f_locals = }')
print(f'{currentframe().f_lineno = } {currentframe().f_locals = }')
print(f'{currentframe().f_lineno = } {currentframe().f_locals = }')
print(f'{currentframe().f_lineno = } {currentframe().f_locals = }')
return h(x - 1)
def h(x):
return 1 / x
for x in range(10):
f(x)
from contextlib import contextmanager
from inspect import currentframe, getouterframes
@contextmanager
def debug():
fr = getouterframes(currentframe())[2]
print(f'{fr.lineno = }')
print(f'Before: {fr.frame.f_locals = }')
yield
print(f'After: {fr.frame.f_locals = }')
def f(x):
with debug():
return g(x - 1)
def g(x):
with debug():
x -= 1
return h(x - 1)
def h(x):
return 1 / x
for x in range(10):
f(x)
Guidance: use pdb or ipdb.
from pdb import post_mortem
def f(x):
return g(x - 1)
def g(x):
return h(x - 1)
def h(x):
return 1 / x
if __name__ == '__main__':
try:
for x in range(10):
f(x)
except Exception:
post_mortem()
from pdb import set_trace
def f(x):
return g(x - 1)
def g(x):
return h(x - 1)
def h(x):
set_trace()
return 1 / x
if __name__ == '__main__':
for x in range(10):
f(x)
from pdb import set_trace
def f(x):
return g(x - 1)
def g(x):
return h(x - 1)
def h(x):
try:
return 1 / x
except Exception:
set_trace()
if __name__ == '__main__':
for x in range(10):
f(x)
def f(x):
return g(x - 1)
def g(x):
return h(x - 1)
def h(x):
breakpoint()
return 1 / x
if __name__ == '__main__':
for x in range(10):
f(x)
Guidance: consider customising your sys.breakpointhook.
import sys
@lambda f: setattr(sys, 'breakpointhook', f)
def breakpointhook():
print('breakpoint')
def f(x):
breakpoint()
return g(x - 1)
def g(x):
breakpoint()
return h(x - 1)
def h(x):
breakpoint()
return 1 / x
if __name__ == '__main__':
for x in range(10):
f(x)
from inspect import currentframe, getouterframes
import sys
from pdb import set_trace
@lambda f: setattr(sys, 'breakpointhook', f)
def breakpointhook():
fr = getouterframes(currentframe())[1]
print(f'{fr.frame.f_locals = }')
if fr.frame.f_locals['x'] == 0:
set_trace()
def f(x):
breakpoint()
return g(x - 1)
def g(x):
breakpoint()
return h(x - 1)
def h(x):
breakpoint()
return 1 / x
if __name__ == '__main__':
for x in range(10):
f(x)
Guidance: consider non-interactive tracing.
from inspect import currentframe, getouterframes
from atexit import register
from pathlib import Path
from datetime import datetime
from pickle import dump
import sys
trace = []
@lambda f: setattr(sys, 'breakpointhook', f)
def breakpointhook():
fr = getouterframes(currentframe())[1]
trace.append(fr.frame.f_locals)
@register
def write_trace():
trace_dir = Path('/tmp/trace')
trace_dir.mkdir(parents=True, exist_ok=True)
with open(trace_dir / f'trace.{datetime.now()}.pkl', 'wb') as f:
dump(trace, f)
def f(x):
return g(x - 1)
def g(x):
return h(x - 1)
def h(x):
breakpoint()
return 1 / x
if __name__ == '__main__':
for x in range(10):
f(x)
Guidance: use code.InteractiveConsole for simple interactive investigation
(but try to avoid writing code that requires this…)
from code import InteractiveConsole
def f(x):
state = ...
return g(x - 1, state)
def g(x, state):
return h(x - 1, state)
def h(x, state):
InteractiveConsole(locals=locals()).interact('')
return 1 / x
if __name__ == '__main__':
for x in range(10):
f(x)
Guidance: use other profiling mechanisms from the sys module.
from sys import settrace
@settrace
def tracer(frame, event, arg):
def scope_tracer(frame, event, arg):
if event == 'line':
print(f'{frame.f_lineno = } {frame.f_locals = }')
if not frame.f_code.co_filename.startswith('/usr'):
return scope_tracer
def f(x):
return g(x - 1)
def g(x):
x -= 1
return h(x - 1)
def h(x):
return 1 / x
if __name__ == '__main__':
for x in range(10):
f(x)
print("Let's take a look!")
from sys import stderr
def f(x):
print(f'{x = }', file=stderr)
return g(x - 1)
def g(x):
print(f'{x = }', file=stderr)
x -= 1
return h(x - 1)
def h(x):
print(f'{x = }', file=stderr)
return 1 / x
if __name__ == '__main__':
f(10)
from inspect import currentframe, getouterframes
import sys
def breakpointhook():
fr = getouterframes(currentframe())[1]
print(f'{fr.frame.f_locals = }')
# sys.breakpointhook = breakpointhook
sys.breakpointhook = lambda: None
def f(x):
breakpoint()
return g(x - 1)
def g(x):
breakpoint()
x -= 1
return h(x - 1)
def h(x):
breakpoint()
return 1 / x
if __name__ == '__main__':
for x in range(5, 10):
f(x)
from logging import getLogger, basicConfig, INFO, DEBUG
logger = getLogger(__name__)
# basicConfig(level=DEBUG)
def f(x):
logger.debug('x: %d', x)
return g(x - 1)
def g(x):
logger.debug('x: %d', x)
x -= 1
return h(x - 1)
def h(x):
logger.debug('x: %d', x)
return 1 / x
if __name__ == '__main__':
for x in range(5, 10):
f(x)
def f():
# assert True
# assert False
assert x
from dis import dis
dis(f)
from pandas import read_csv
from pathlib import Path
def run_report(df):
return df[['volume', 'price']].product(axis='columns').groupby('ticker').sum()
if __name__ == '__main__':
df = (
read_csv(Path('data') / 'data.csv', index_col=['date', 'ticker'], parse_dates=['date'])
.sort_index()
)
print(
run_report(df).head(3)
)
from pandas import read_csv
from pathlib import Path
def run_report(df):
if 'price' not in df.columns:
raise ValueError('must have column price')
return df[['volume', 'price']].product(axis='columns').groupby('ticker').sum()
if __name__ == '__main__':
df = (
read_csv(Path('data') / 'data.csv', index_col=['date', 'ticker'], parse_dates=['date'])
.sort_index()
)
print(
run_report(df).head(3)
)
from pandas import read_csv
from pathlib import Path
def run_report(df):
assert 'price' in df.columns
return df[['volume', 'price']].product(axis='columns').groupby('ticker').sum()
if __name__ == '__main__':
df = (
read_csv(Path('data') / 'data.csv', index_col=['date', 'ticker'], parse_dates=['date'])
.sort_index()
)
print(
run_report(df).head(3)
)
def f():
assert x
from dis import dis
dis(f)
def f(x):
return g(x - 1)
def g(x):
x -= 1
assert x > 1
return h(x - 1)
def h(x):
return 1 / x
if __name__ == '__main__':
for x in range(5, 10):
f(x)
print(f'{__debug__ = }')
def f():
if __debug__:
x.y
assert x
from dis import dis
dis(f)
from itertools import pairwise
def nondecreasing(xs):
return all(x <= y for x, y in pairwise(xs))
def f(seq):
assert nondecreasing(seq)
...
...
if __name__ == '__main__':
data = [1, 2, 3, 4, 5]
f(data)
from itertools import pairwise, islice, tee
def nondecreasing(xs):
return all(x <= y for x, y in pairwise(xs))
def f(seq):
if __debug__:
seq, seq_copy = tee(seq, 2)
seq_copy = islice(seq_copy, 100)
assert nondecreasing(seq_copy)
return sum(seq)
if __name__ == '__main__':
data = [1, 2, 3, 4, 5]
f(data)
def f():
breakpoint()
from dis import dis
dis(f)