LYLE LANLEY: “So, then, ‘mono’ means ‘one,’ and ‘rail’ means ‘rail.’ And that concludes our intensive three week course.”
— “Marge vs the Monorail” (S04E12)
Do generators have any part in API design? Are generators just some advanced Python novelty feature? And, importantly, how I actually use generators in my day-to-day work?
Do you…
Then come join us for a session on generators!
Generators are considered an advanced Python feature and are often viewed only as lazily evaluated iterators. While this is mechanically true, they have a much broader application in program structure and can be used to simplify step-wise processing pipelines.
In this episode, we will move beyond the superficial mechanics of generators in Python and explore their implementation and use-cases. We will develop a better sense for how generators can be leveraged in your every day scripting as a structuring mechanism to reduce your code churn and produce less error-prone results.
Keywords
Iterators and Iterables?”print("Let's take a look!")
xs = [1, 2, 3]
for x in xs:
pass
xs = {1, 2, 3}
xs = {'a': 1, 'b': 2, 'c': 3}
xs = open(__file__)
class T:
__iter__ = lambda _: iter(())
xs = T()
for x in xs:
pass
xs = [1, 2, 3]
for x in xs:
pass
# … is the same as…
xi = iter(xs)
while True:
try:
x = next(xi)
except StopIteration:
pass
from collections.abc import Iterable
print(
f'{isinstance([1, 2, 3], Iterable) = }',
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
from collections.abc import Iterable
class T:
__iter__ = lambda _: None
x = T()
print(
f'{isinstance(x, Iterable) = }',
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
xs = [1, 2, 3]
print(f'{next(xs) = }')
xs = [1, 2, 3]
xi = iter(xs)
print(f'{next(xi) = }')
print(f'{next(xi) = }')
print(f'{next(xi) = }')
xs = [1, 2, 3] # the data
xi = iter(xs) # reference to the data + some state (“where you last left off”)
print(f'{next(xi) = }')
xs = [1, 2, 3] # the data
xi1 = iter(xs) # reference to the data + some state
xi2 = iter(xs)
print(
f'{next(xi1) = } {next(xi2) = }',
f'{next(xi1) = } {next(xi2) = }',
f'{next(xi1) = } {next(xi2) = }',
sep='\n',
)
from dataclasses import dataclass, field
from random import shuffle
@dataclass(frozen=True)
class RandomListIterator:
data : list
state : list = field(default_factory=list)
def __post_init__(self):
self.state.extend(range(len(self.data)))
shuffle(self.state)
__iter__ = lambda self: self
def __next__(self):
try:
idx = self.state.pop()
except IndexError as e:
raise StopIteration() from e
return self.data[idx]
xs = [1, 2, 3]
xr = RandomListIterator(xs)
print(
f'{next(xr) = }',
f'{next(xr) = }',
f'{next(xr) = }',
sep='\n',
)
xr = RandomListIterator(xs)
for x in xr:
print(f'{x = }')
from collections.abc import Iterator
class T:
__iter__ = lambda _: None
__next__ = lambda _: None
x = T()
print(
f'{isinstance(x, Iterator) = }',
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
def g():
yield 1
yield 2
yield 3
for x in g():
print(f'{x = }')
def transform(xs):
for x in xs:
yield x ** 2
xs = [1, 2, 3]
for x in transform(xs):
print(f'{x = }')
def map(func, *all_args):
for args in zip(*all_args):
yield func(*args)
xs = [1, 2, 3]
for y in map(lambda x: x**2, xs):
print(f'{y = :.2f}')
xs = [1, 2, 3]
ys = [4, 5, 6]
for z in map(lambda x, y: x**2 * y**3, xs, ys):
print(f'{z = :.2f}')
s = 'a,b,c'
xs = ['a', 'b', 'c']
print(
f'{len(s) = }',
f'{len(xs) = }',
f'{len(s.split(",")) = }',
sep='\n',
)
s = 'a,b,c,d'
xs = ['a', 'b', 'c,d']
print(
f'{len(s) = }',
f'{len(xs) = }',
f'{len(s.split(",")) = }',
sep='\n',
)
Resolving ambiguities in ‘in-band’ encodings:
float('nan'), find -print0)from pandas import Series, date_range
from numpy.random import default_rng
rng = default_rng(0)
s = Series(
index=(idx := date_range('2020-01-01', periods=4)),
data=rng.integers(-10, +10, size=len(idx)),
)
# s.iloc[1] = -99_999
# s.iloc[1] = None
# s.iloc[-1] = 2**53 + 1
s = s.astype('Int64')
s.iloc[-1] = None
print(
s,
f'{s.iloc[-1] = }',
s.array._mask,
s.array._data,
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
Resolving ambiguities in ‘in-band’ encodings:
sha256sum, git objects, network protocols)Resolving ambiguities in ‘in-band’ encodings:
from pandas import read_csv
from io import StringIO
from textwrap import dedent
raw_data = StringIO(dedent('''
name,value
abc,123
def,456
"ghi,xyz",789
''').strip())
print(
read_csv(raw_data)
.set_index('name')
.sort_index()
.squeeze(axis='columns')
,
)
from lxml.etree import fromstring
from textwrap import dedent
raw_data = dedent('''
<root>
<node>
<![CDATA[
<node>
]]>
</node>
</root>
''').strip()
print(
f'{fromstring(raw_data) = }',
f'{fromstring(raw_data).find("node").text.strip() = }',
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
from lxml.etree import fromstring
from textwrap import dedent
raw_data = dedent('''
<root>
<node>
<![CDATA[
<![CDATA[
<node>
]]>
]]>
</node>
</root>
''').strip()
print(
f'{fromstring(raw_data) = }',
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
Also: quoting vs escaping has grammar significant (context-free vs context-sensitive.)
# non-nested, linear, ordered
s = 'a,b,c'
xs = ['a', 'b', 'c']
def run_computation():
step1()
step2()
step3()
step4()
def generate_report():
raw_data = load_data()
data = clean_data(raw_data)
data = remove_outliers(data)
results = compute_results(data)
table = generate_table(results)
return table
def generate_report():
raw_data = load_data()
data = clean_data(raw_data)
data = remove_outliers(data)
results = compute_results(data)
table = generate_table(results)
return table
from dis import dis
dis(generate_report)
def generate_report():
raw_data = load_data()
yield raw_data
data = clean_data(raw_data)
data = remove_outliers(data)
yield data
results = compute_results(data)
yield results
table = generate_table(results)
yield table
# from dis import dis
# dis(generate_report)
from collections.abc import Iterable, Iterator
def g(): yield
gi = g()
print(
f'{isinstance(gi, Iterable) = }',
f'{isinstance(gi, Iterator) = }',
sep='\n',
)
from collections.abc import Iterable, Iterator
def g():
yield 1
yield 2
yield 3
gi = g()
print(
f'{next(gi) = }\t{gi.gi_frame.f_lasti = :>2}',
f'{next(gi) = }\t{gi.gi_frame.f_lasti = :>2}',
f'{next(gi) = }\t{gi.gi_frame.f_lasti = :>2}',
sep='\n',
)
from dis import dis
dis(g)
But… there’s much more to it…!
print("Let's take a look.")
Two general regimes of computation:
‘lazy’
def f(): # eager
return
def g(): # lazy
yield
Where does “eagerness” and “laziness” show up in the tools we use?
from numpy.random import default_rng
rng = default_rng(0)
xs = rng.normal(size=10).round(2)
print(
(xs + 1) * (xs - 2) / (xs ** 3),
xs.sum(),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
from numpy.random import default_rng
from pandas import Series, date_range, Grouper
rng = default_rng(0)
s = Series(
index=(idx := date_range('2020-01-01', periods=10, name='date')),
data=rng.normal(size=len(idx)).round(2),
)
print(
# s,
# s + 1,
# s.sum(),
# s.groupby(Grouper(freq='3D')),
# s.groupby(Grouper(freq='3D')).mean(),
# s.groupby(Grouper(freq='3D')).agg(lambda g: g.sum()),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
from some_dask_like_framework import Computation
c = Computation()
c.transform(...).filter(...).filter(...).transform(...).compute()
Any multi-step or multi-stage computation can be thought of as comprised of:
from time import sleep
def compute(x):
sleep(.2)
return x**2
def process(dataset, *, num_steps=float('inf')):
rv = []
for idx, x in enumerate(dataset):
if idx > num_steps: break
rv.append(compute(x))
return rv
dataset = range(10_000_000)
print(f'{process(dataset, num_steps=3) = }')
Two regimes of computations:
T(compute) « T(decide): much faster to perform one computational step than to decide to keep goingT(compute) » T(decide): much faster to decide to keep going than to perform one computational stepfrom matplotlib.pyplot import subplots, show
from numpy import linspace
def total_time(units, overhead, compute_time, decide_time):
return units * (compute_time + decide_time) + overhead
xs = linspace(0, 100, 1_000)
fig, ax = subplots()
ax.set_title('Two Regimes of Computation')
ax.set_xlabel('units (i.e., dataset size) (# elements)')
ax.set_ylabel('total time (s)')
ax.plot(
xs,
total_time(xs, overhead=10e-3, compute_time=1e-9, decide_time=1e-6),
color='r', label='T(compute) << T(decide)',
)
ax.plot(
xs,
total_time(xs, overhead=10e-3, compute_time=1e-3, decide_time=1e-6),
color='b', label='T(compute) >> T(decide)',
)
ax.legend()
show()
from matplotlib.pyplot import subplots, show
from numpy import linspace
def total_time(units, overhead, compute_time, decide_time):
return units * (compute_time + decide_time) + overhead
xs1 = linspace(0, 10e9, 1_000)
xs2 = linspace(0, 10e3, 1_000)
fig, ax = subplots()
ax.set_title('Two Regimes of Computation')
ax.set_xlabel('units (i.e., dataset size) (# elements)')
ax.set_ylabel('total time (s)')
ax.set_xlim(xs1[len(xs1)//4], xs1[len(xs1)*3//4])
ax.plot(
xs1,
total_time(xs1, overhead=10e-3, compute_time=1e-9, decide_time=1e-6),
color='r', label='T(compute) << T(decide)',
)
ax.twinx().twiny().plot(
xs2,
total_time(xs2, overhead=10e-3, compute_time=1e-3, decide_time=1e-6),
color='b', label='T(compute) >> T(decide)',
)
fig.legend()
show()
Two regimes of computations:
T(compute) » T(decide) → unnecessary computations are expensive ∴ prefer “lazy” computationT(compute) « T(decide) → unnecessary computations are cheap ∴ prefer “eager” computationfrom contextlib import contextmanager
from time import perf_counter
@contextmanager
def timed(msg):
before = perf_counter()
yield
after = perf_counter()
print(f'{msg:<48} elapsed \N{mathematical bold capital delta}t: {(after - before) * 1e3:>8,.2f}ms')
from numpy.random import default_rng
from numpy import where, allclose
from itertools import combinations
results = []
rng = default_rng(0)
xs = rng.normal(size=10_000_000).round(2)
results.append(res := xs.copy())
with timed('xs.copy()[xs > 0] = xs**2'):
mask = res > 0
res[mask] = res[mask]**2
res[~mask] = res[~mask]**3
results.append(res := xs.copy())
with timed('(xs**2) * (xs > 0) + (xs**3) * (xs < 0)'):
mask = res > 0
res[:] = res**2 * mask + res**3 * ~mask
results.append(res := xs.copy())
with timed('where(xs > 0, xs**2, xs**3'):
res[:] = where(res > 0, res**2, res**3)
assert all(allclose(x, y) for x, y in combinations(results, r=2))
from contextlib import contextmanager
from time import perf_counter
@contextmanager
def timed(msg):
before = perf_counter()
yield
after = perf_counter()
print(f'{msg:<48} elapsed \N{mathematical bold capital delta}t: {after - before:>4,.4f}s')
from time import sleep
def computation(time):
def compute(x):
sleep(time)
return x**2
return compute
def eager(dataset, *, compute):
rv = []
for x in dataset:
rv.append(compute(x))
return rv
def lazy(dataset, *, compute):
for x in dataset:
yield compute(x)
dataset = range(100)
with timed('eager(…, compute=…(time=1e-3))—only first 3'):
for idx, _ in enumerate(eager(dataset, compute=computation(time=10e-3)), start=1):
if idx >= 3: break
with timed('lazy(…, compute=…(time=1e-3))—only first 3'):
for idx, _ in enumerate(lazy(dataset, compute=computation(time=10e-3)), start=1):
if idx >= 3: break
with timed('eager(…, compute=…(time=.1e-9))—only first 3'):
for idx, _ in enumerate(eager(dataset, compute=computation(time=.1e-9)), start=1):
if idx >= 3: break
with timed('lazy(…, compute=…(time=.1e-9))—only first 3'):
for idx, _ in enumerate(lazy(dataset, compute=computation(time=.1e-9)), start=1):
if idx >= 3: break
from numpy.random import default_rng
from pandas import DataFrame, CategoricalIndex
from string import ascii_lowercase
rng = default_rng(0)
names = rng.choice([*ascii_lowercase], size=(20_000, 4)).view('<U4').ravel()
df = DataFrame(
index=(idx := CategoricalIndex(
rng.choice(names, size=1_000_000),
name='name',
)),
data={
'value': rng.integers(10, size=len(idx)),
'weight': abs(rng.normal(size=len(idx))).round(2),
},
).sort_index()
print(
df.head(3),
df.tail(3),
f'{len(df) = :,}',
f'{df.groupby("name").ngroups = :,}',
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
df.to_pickle('/tmp/data.pkl')
from contextlib import contextmanager
from time import perf_counter
@contextmanager
def timed(msg):
before = perf_counter()
yield
after = perf_counter()
print(f'{msg:<30} elapsed \N{mathematical bold capital delta}t: {after - before:>4,.2f}s')
from pandas import read_pickle, Series
from itertools import combinations
from numpy import allclose
df = read_pickle('/tmp/data.pkl')
results = []
with timed(f'groupby.apply'):
results.append(
df.groupby('name').apply(lambda g: (g['value'] * g['weight']).sum() / g['weight'].sum())
)
with timed(f'decomposition to groupby.agg'):
grouped_weight = df['weight'].groupby('name').sum()
grouped_weighted_value = (
Series(df['value'].values * df['weight'].values, index=df.index)
).groupby('name').sum()
results.append(
Series(grouped_weighted_value.values / grouped_weight.values, index=grouped_weight.index)
)
assert all(allclose(x, y) for x, y in combinations(results, r=2))
assert all((x.index == y.index).all() for x, y in combinations(results, r=2))
from contextlib import contextmanager
from time import perf_counter
@contextmanager
def timed(msg):
before = perf_counter()
yield
after = perf_counter()
print(f'{msg:<48} elapsed \N{mathematical bold capital delta}t: {after - before:>4,.2f}s')
from numpy.random import default_rng
from pandas import DataFrame, CategoricalIndex, Series, IndexSlice
from string import ascii_lowercase
from itertools import product
rng = default_rng(0)
names = rng.choice([*ascii_lowercase], size=(80_000, 3)).view('<U3').ravel()
df = DataFrame(
index=(idx := CategoricalIndex(
rng.choice(names, size=2_000_000),
name='name',
)),
data={
('value', 'a'): rng.integers(10, size=len(idx)),
('value', 'b'): rng.integers(10, size=len(idx)),
('value', 'c'): rng.integers(10, size=len(idx)),
('weight', 'a'): abs(rng.normal(size=len(idx))).round(2),
('weight', 'b'): abs(rng.normal(size=len(idx))).round(2),
('weight', 'c'): abs(rng.normal(size=len(idx))).round(2),
},
).sort_index()
common_groups = df['value'].columns.intersection(df['weight'].columns)
with timed(f'→ incremental commputation (overall)'):
for val_grp, wgt_grp in product(common_groups, repeat=2):
with timed(f'incremental (`.groupby` on {val_grp, wgt_grp})'):
grouped_weight = df['weight', wgt_grp].groupby('name').sum()
grouped_weighted_value = (
Series(df['value', val_grp].values * df['weight', wgt_grp].values, index=df.index)
).groupby('name').sum()
Series(grouped_weighted_value.values / grouped_weight.values, index=grouped_weight.index)
common_groups = df['value'].columns.intersection(df['weight'].columns)
with timed('→ full computation (overall)'):
with timed('full computation (construct)'):
df2 = DataFrame({
(col, (val_grp, wgt_grp)): df[col, val_grp]
for val_grp, wgt_grp in product(common_groups, repeat=2)
for col in {'weight', 'value'}
})
with timed('full computation (`.groupby`)'):
grouped_weight = df2['weight'].groupby('name').sum()
grouped_weighted_value = (
DataFrame(
df2['value'].values * df2['weight'].values,
index=df2.index, columns=df2['value'].columns,
)
).groupby('name').sum()
DataFrame(
grouped_weighted_value.values / grouped_weight.values,
index=grouped_weighted_value.index,
columns=grouped_weighted_value.columns,
)
But… there’s much more to it…!
print("Let's take a look!")
Generators are an important part of API design.
They externalise control over a computation.
from scipy.optimize import newton
help(newton)
from sympy.abc import x
from sympy import diff, expand, lambdify
f = (x + 3) * (x - 4) * (x + 5)
fprime = diff(f)
print(
f'{f = }',
f'{fprime = }',
f'{expand(f) = }',
f'{expand(fprime) = }',
f'{lambdify(x, f, "numpy")(-3) = }',
f'{lambdify(x, f, "numpy")(+4) = }',
f'{lambdify(x, f, "numpy")(-5) = }',
sep='\n',
)
from scipy.optimize import newton
from sympy.abc import x
from sympy import diff, lambdify
f = (x + 3) * (x - 4) * (x + 5)
fprime = diff(f)
f, fprime = (lambdify(x, func, 'numpy') for func in [f, fprime])
print(
f'{newton(f, -10, fprime) = :>5.2f}',
f'{newton(f, 0, fprime) = :>5.2f}',
f'{newton(f, +10, fprime) = :>5.2f}',
sep='\n',
)
from sympy.abc import x
from sympy import diff, lambdify
f = (x + 3) * (x - 4) * (x + 5)
fprime = diff(f)
f, fprime = (lambdify(x, func, 'numpy') for func in [f, fprime])
def newton(f, x0, fprime):
x = x0
for _ in range(50):
x -= f(x) / fprime(x)
return x
print(
f'{newton(f, -10, fprime) = :>5.2f}',
f'{newton(f, 0, fprime) = :>5.2f}',
f'{newton(f, +10, fprime) = :>5.2f}',
sep='\n',
)
from sympy.abc import x
from sympy import diff, lambdify
f = (x + 3) * (x - 4) * (x + 5)
fprime = diff(f)
f, fprime = (lambdify(x, func, 'numpy') for func in [f, fprime])
def newton(f, x0, fprime, *, num_steps=50):
x = x0
for _ in range(num_steps):
x -= f(x) / fprime(x)
return x
print(
f'{newton(f, -10, fprime, num_steps=10) = :>5.2f}',
f'{newton(f, 0, fprime, num_steps=10) = :>5.2f}',
f'{newton(f, +10, fprime, num_steps=10) = :>5.2f}',
sep='\n',
)
from sympy.abc import x
from sympy import diff, lambdify
from math import isclose
f = (x + 3) * (x - 4) * (x + 5)
fprime = diff(f)
f, fprime = (lambdify(x, func, 'numpy') for func in [f, fprime])
def newton(f, x0, fprime, *, abs_tol=None, rel_tol=None):
x = x0
while True:
if abs_tol is not None and isclose(0, f(x), abs_tol=abs_tol):
break
if rel_tol is not None and isclose(0, f(x), rel_tol=rel_tol):
break
x -= f(x) / fprime(x)
return x
print(
f'{newton(f, -10, fprime, abs_tol=1e-3) = :>5.2f}',
f'{newton(f, 0, fprime, abs_tol=1e-3) = :>5.2f}',
f'{newton(f, +10, fprime, abs_tol=1e-3) = :>5.2f}',
f'{newton(f, -10, fprime, rel_tol=0.01) = :>5.2f}',
f'{newton(f, 0, fprime, rel_tol=0.01) = :>5.2f}',
f'{newton(f, +10, fprime, rel_tol=0.01) = :>5.2f}',
sep='\n',
)
from scipy.optimize import newton
help(newton)
The number of modalities is unbounded…
from sympy.abc import x
from sympy import diff, lambdify
from math import isclose
f = (x + 3) * (x - 4) * (x + 5)
fprime = diff(f)
f, fprime = (lambdify(x, func, 'numpy') for func in [f, fprime])
def newton(f, x0, fprime, *, abs_imp=None, rel_imp=None):
prev_x, x = None, x0
while True:
x -= f(x) / fprime(x)
if prev_x is not None:
if abs_imp is not None and isclose(f(prev_x), f(x), abs_tol=abs_imp):
break
if rel_imp is not None and isclose(f(prev_x), f(x), rel_tol=rel_imp):
break
prev_x = x
return x
print(
f'{newton(f, -10, fprime, abs_imp=1e-9) = :>5.2f}',
f'{newton(f, 0, fprime, abs_imp=1e-9) = :>5.2f}',
f'{newton(f, +10, fprime, abs_imp=1e-9) = :>5.2f}',
f'{newton(f, -10, fprime, rel_imp=0.01) = :>5.2f}',
f'{newton(f, 0, fprime, rel_imp=0.01) = :>5.2f}',
f'{newton(f, +10, fprime, rel_imp=0.01) = :>5.2f}',
sep='\n',
)
We are incapable of encoding all modalities for controlling this computation (even if we artificially limit them!)
from sympy.abc import x
from sympy import diff, lambdify
from math import isclose
f = (x + 3) * (x - 4) * (x + 5)
fprime = diff(f)
f, fprime = (lambdify(x, func, 'numpy') for func in [f, fprime])
def newton(f, x0, fprime, *, num_steps=500, abs_tol=None, rel_tol=None, abs_imp=None, rel_imp=None):
prev_x, x = None, x0
x = x0
for _ in range(num_steps):
if abs_tol is not None and isclose(0, f(x), abs_tol=abs_tol):
break
if rel_tol is not None and isclose(0, f(x), rel_tol=rel_tol):
break
x -= f(x) / fprime(x)
if prev_x is not None:
if abs_imp is not None and isclose(f(prev_x), f(x), abs_tol=abs_imp):
break
if rel_imp is not None and isclose(f(prev_x), f(x), rel_tol=rel_imp):
break
prev_x = x
return x
from sympy.abc import x
from sympy import diff, lambdify
from math import isclose
from itertools import count
f = (x + 3) * (x - 4) * (x + 5)
fprime = diff(f)
f, fprime = (lambdify(x, func, 'numpy') for func in [f, fprime])
def newton(f, x0, fprime, *, num_steps=None, abs_tol=None, rel_tol=None, abs_imp=None, rel_imp=None):
prev_x, x = None, x0
x = x0
for step in count(1):
if num_steps is not None and num_steps < step:
break
if abs_tol is not None and isclose(0, f(x), abs_tol=abs_tol):
break
if rel_tol is not None and isclose(0, f(x), rel_tol=rel_tol):
break
x -= f(x) / fprime(x)
if prev_x is not None:
if abs_imp is not None and isclose(f(prev_x), f(x), abs_tol=abs_imp):
break
if rel_imp is not None and isclose(f(prev_x), f(x), rel_tol=rel_imp):
break
prev_x = x
return x
print(
f'{newton(f, -10, fprime, num_steps=10) = :>5.2f}',
f'{newton(f, 0, fprime, num_steps=10) = :>5.2f}',
f'{newton(f, +10, fprime, num_steps=10) = :>5.2f}',
f'{newton(f, -10, fprime, abs_tol=1e-3) = :>5.2f}',
f'{newton(f, 0, fprime, abs_tol=1e-3) = :>5.2f}',
f'{newton(f, +10, fprime, abs_tol=1e-3) = :>5.2f}',
f'{newton(f, -10, fprime, rel_tol=0.01) = :>5.2f}',
f'{newton(f, 0, fprime, rel_tol=0.01) = :>5.2f}',
f'{newton(f, +10, fprime, rel_tol=0.01) = :>5.2f}',
f'{newton(f, -10, fprime, abs_imp=1e-9) = :>5.2f}',
f'{newton(f, 0, fprime, abs_imp=1e-9) = :>5.2f}',
f'{newton(f, +10, fprime, abs_imp=1e-9) = :>5.2f}',
f'{newton(f, -10, fprime, rel_imp=0.01) = :>5.2f}',
f'{newton(f, 0, fprime, rel_imp=0.01) = :>5.2f}',
f'{newton(f, +10, fprime, rel_imp=0.01) = :>5.2f}',
sep='\n',
)
from sympy.abc import x
from sympy import diff, lambdify
from hypothesis import given
from hypothesis.strategies import integers
@given(
a=integers(),
b=integers(),
c=integers(),
)
def test_newton():
f = (x + a) * (x - b) * (x + c)
fprime = diff(f)
f, fprime = (lambdify(x, func, 'numpy') for func in [f, fprime])
for result in {
newton(f, 0, fprime, num_steps=10),
newton(f, 0, fprime, abs_tol=1e-3),
newton(f, 0, fprime, rel_tol=0.01),
newton(f, 0, fprime, abs_imp=1e-9),
newton(f, 0, fprime, rel_imp=0.01),
}:
assert any(isclose(f(result), coeff) for coeff in {a, b, c})
from sympy.abc import x
from sympy import diff, lambdify
from hypothesis import given
from hypothesis.strategies import integers
@given(
a=integers(),
b=integers(),
c=integers(),
)
def test_newton():
f = (x + a) * (x - b) * (x + c)
fprime = diff(f)
f, fprime = (lambdify(x, func, 'numpy') for func in [f, fprime])
for result in {
newton(f, 0, fprime),
newton(f, 0, fprime, num_steps=10),
newton(f, 0, fprime, abs_tol=1e-3),
newton(f, 0, fprime, rel_tol=0.01),
newton(f, 0, fprime, abs_imp=1e-9),
newton(f, 0, fprime, rel_imp=0.01),
newton(f, 0, fprime, num_steps=10, abs_tol=1e-3),
newton(f, 0, fprime, num_steps=10, rel_tol=0.01),
newton(f, 0, fprime, num_steps=10, abs_imp=1e-9),
newton(f, 0, fprime, num_steps=10, rel_imp=0.01),
newton(f, 0, fprime, abs_tol=1e-3, rel_tol=0.01),
newton(f, 0, fprime, abs_tol=1e-3, abs_imp=1e-9),
newton(f, 0, fprime, abs_tol=1e-3, rel_imp=0.01),
newton(f, 0, fprime, rel_tol=0.01, abs_imp=1e-9),
newton(f, 0, fprime, rel_tol=0.01, rel_imp=0.01),
newton(f, 0, fprime, abs_imp=1e-9, rel_imp=0.01),
newton(f, 0, fprime, num_steps=10, abs_tol=1e-3, rel_tol=0.01),
newton(f, 0, fprime, num_steps=10, abs_tol=1e-3, abs_imp=1e-9),
newton(f, 0, fprime, num_steps=10, abs_tol=1e-3, rel_imp=0.01),
newton(f, 0, fprime, num_steps=10, rel_tol=0.01, abs_imp=1e-9),
newton(f, 0, fprime, num_steps=10, rel_tol=0.01, rel_imp=0.01),
newton(f, 0, fprime, num_steps=10, abs_imp=1e-9, rel_imp=0.01),
newton(f, 0, fprime, abs_tol=1e-3, rel_tol=0.01, abs_imp=1e-9),
newton(f, 0, fprime, abs_tol=1e-3, rel_tol=0.01, rel_imp=0.01),
newton(f, 0, fprime, abs_tol=1e-3, abs_imp=1e-9, rel_imp=0.01),
newton(f, 0, fprime, rel_tol=0.01, abs_imp=1e-9, rel_imp=0.01),
newton(f, 0, fprime, num_steps=10, abs_tol=1e-3, rel_tol=0.01, abs_imp=1e-9),
newton(f, 0, fprime, num_steps=10, abs_tol=1e-3, rel_tol=0.01, rel_imp=0.01),
newton(f, 0, fprime, num_steps=10, abs_tol=1e-3, abs_imp=1e-9, rel_imp=0.01),
newton(f, 0, fprime, num_steps=10, rel_tol=0.01, abs_imp=1e-9, rel_imp=0.01),
newton(f, 0, fprime, abs_tol=1e-3, rel_tol=0.01, abs_imp=1e-9, rel_imp=0.01),
newton(f, 0, fprime, num_steps=10, abs_tol=1e-3, rel_tol=0.01, abs_imp=1e-9, rel_imp=0.01),
}:
assert any(isclose(f(result), coeff) for coeff in {a, b, c})
from sympy.abc import x
from sympy import diff, lambdify
from hypothesis import given
from hypothesis.strategies import integers
from itertools import combinations, chain
@given(
a=integers(),
b=integers(),
c=integers(),
)
def test_newton():
f = (x + a) * (x - b) * (x + c)
fprime = diff(f)
f, fprime = (lambdify(x, func, 'numpy') for func in [f, fprime])
kwargs = dict(
num_steps=10,
abs_tol=1e-3,
rel_tol=0.01,
abs_imp=1e-9,
rel_imp=0.01,
)
for kw in chain.from_iterable(
combinations(kwargs.items(), r=r)
for r in range(0, len(args)+1)
):
result = newton(f, 0, fprime, **dict(kw))
assert any(isclose(f(result), coeff) for coeff in {a, b, c})
A generator, by virtue of structuring the computation, can externalise the modalities of how the computation is performed.
from sympy.abc import x
from sympy import diff, lambdify
from math import isclose
from itertools import islice, takewhile, pairwise
from collections import deque
from textwrap import dedent
f = (x + 3) * (x - 4) * (x + 5)
fprime = diff(f)
f, fprime = (lambdify(x, func, 'numpy') for func in [f, fprime])
def newton(f, x0, fprime):
x = x0
while True:
x -= f(x) / fprime(x)
yield x
from time import perf_counter
def timed(g, t):
start = perf_counter()
for x in g:
if perf_counter - start > t:
break
yield x
print(
f'{deque(islice(newton(f, 0, fprime), 10), maxlen=1)[0] = :>5.2f}',
dedent(f'''{
deque(takewhile(
lambda x: not isclose(f(x), 0, abs_tol=1e-6),
newton(f, 0, fprime),
), maxlen=1)[0] = :>5.2f}
''').strip(),
dedent(f'''{
deque(takewhile(
lambda x: not isclose(f(x), 0, rel_tol=0.01),
newton(f, 0, fprime),
), maxlen=1)[0] = :>5.2f}
''').strip(),
dedent(f'''{
deque(takewhile(
lambda xs: not isclose(f(xs[0]), f(xs[-1]), abs_tol=1e-6),
pairwise(newton(f, 0, fprime)),
), maxlen=1)[0][-1] = :>5.2f}
''').strip(),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
from scipy.optimize import newton
from inspect import getsource
print(
getsource(newton)
)
And… there’s much more to it…!
from itertools import tee, pairwise, islice
def g(xs):
if __debug__: xs, xs_copy = tee(xs, 2)
return 1 / sum((x - y) for x, y in pairwise(xs))
def target():
pass
gi = g(10)
gi, gi_copy = tee(gi, 2)
# print(f'Debug: {next(gi_copy) = }')
for x in gi:
print(f'Inside loop: {x = }')
print(f'{__debug__ = }')
def f():
if __debug__:
print()
from dis import dis
dis(f)