ts-python

Seminar VIII: “All About Generators”

LYLE LANLEY: “So, then, ‘mono’ means ‘one,’ and ‘rail’ means ‘rail.’ And that concludes our intensive three week course.”

— “Marge vs the Monorail” (S04E12)

Abstract

Do generators have any part in API design? Are generators just some advanced Python novelty feature? And, importantly, how I actually use generators in my day-to-day work?

Do you…

Then come join us for a session on generators!

Generators are considered an advanced Python feature and are often viewed only as lazily evaluated iterators. While this is mechanically true, they have a much broader application in program structure and can be used to simplify step-wise processing pipelines.

In this episode, we will move beyond the superficial mechanics of generators in Python and explore their implementation and use-cases. We will develop a better sense for how generators can be leveraged in your every day scripting as a structuring mechanism to reduce your code churn and produce less error-prone results.

Keywords

Notes

Mechanics: “Aren’t they just about Iterators and Iterables?”

print("Let's take a look!")
xs = [1, 2, 3]

for x in xs:
    pass
xs = {1, 2, 3}
xs = {'a': 1, 'b': 2, 'c': 3}
xs = open(__file__)

class T:
    __iter__ = lambda _: iter(())
xs = T()

for x in xs:
    pass
xs = [1, 2, 3]
for x in xs:
    pass

# … is the same as…

xi = iter(xs)
while True:
    try:
        x = next(xi)
    except StopIteration:
        pass
from collections.abc import Iterable

print(
    f'{isinstance([1, 2, 3], Iterable) = }',
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
from collections.abc import Iterable

class T:
    __iter__ = lambda _: None

x = T()

print(
    f'{isinstance(x, Iterable) = }',
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
xs = [1, 2, 3]

print(f'{next(xs) = }')
xs = [1, 2, 3]
xi = iter(xs)

print(f'{next(xi) = }')
print(f'{next(xi) = }')
print(f'{next(xi) = }')
xs = [1, 2, 3] # the data
xi = iter(xs)  # reference to the data + some state (“where you last left off”)

print(f'{next(xi) = }')
xs = [1, 2, 3] # the data

xi1 = iter(xs) # reference to the data + some state
xi2 = iter(xs)

print(
    f'{next(xi1) = } {next(xi2) = }',
    f'{next(xi1) = } {next(xi2) = }',
    f'{next(xi1) = } {next(xi2) = }',
    sep='\n',
)
from dataclasses import dataclass, field
from random import shuffle

@dataclass(frozen=True)
class RandomListIterator:
    data : list
    state : list = field(default_factory=list)

    def __post_init__(self):
        self.state.extend(range(len(self.data)))
        shuffle(self.state)

    __iter__ = lambda self: self
    def __next__(self):
        try:
            idx = self.state.pop()
        except IndexError as e:
            raise StopIteration() from e
        return self.data[idx]

xs = [1, 2, 3]
xr = RandomListIterator(xs)

print(
    f'{next(xr) = }',
    f'{next(xr) = }',
    f'{next(xr) = }',
    sep='\n',
)

xr = RandomListIterator(xs)
for x in xr:
    print(f'{x = }')
from collections.abc import Iterator

class T:
    __iter__ = lambda _: None
    __next__ = lambda _: None

x = T()

print(
    f'{isinstance(x, Iterator) = }',
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
def g():
    yield 1
    yield 2
    yield 3

for x in g():
    print(f'{x = }')
def transform(xs):
    for x in xs:
        yield x ** 2

xs = [1, 2, 3]
for x in transform(xs):
    print(f'{x = }')
def map(func, *all_args):
    for args in zip(*all_args):
        yield func(*args)

xs = [1, 2, 3]
for y in map(lambda x: x**2, xs):
    print(f'{y = :.2f}')

xs = [1, 2, 3]
ys = [4, 5, 6]
for z in map(lambda x, y: x**2 * y**3, xs, ys):
    print(f'{z = :.2f}')
s = 'a,b,c'
xs = ['a', 'b', 'c']

print(
    f'{len(s)            = }',
    f'{len(xs)           = }',
    f'{len(s.split(",")) = }',
    sep='\n',
)
s = 'a,b,c,d'
xs = ['a', 'b', 'c,d']

print(
    f'{len(s)            = }',
    f'{len(xs)           = }',
    f'{len(s.split(",")) = }',
    sep='\n',
)

Resolving ambiguities in ‘in-band’ encodings:

from pandas import Series, date_range
from numpy.random import default_rng

rng = default_rng(0)

s = Series(
    index=(idx := date_range('2020-01-01', periods=4)),
    data=rng.integers(-10, +10, size=len(idx)),
)

# s.iloc[1] = -99_999

# s.iloc[1] = None
# s.iloc[-1] = 2**53 + 1

s = s.astype('Int64')
s.iloc[-1] = None

print(
    s,
    f'{s.iloc[-1] = }',
    s.array._mask,
    s.array._data,
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

Resolving ambiguities in ‘in-band’ encodings:

Resolving ambiguities in ‘in-band’ encodings:

from pandas import read_csv
from io import StringIO
from textwrap import dedent

raw_data = StringIO(dedent('''
    name,value
    abc,123
    def,456
    "ghi,xyz",789
''').strip())

print(
    read_csv(raw_data)
        .set_index('name')
        .sort_index()
        .squeeze(axis='columns')
    ,
)
from lxml.etree import fromstring
from textwrap import dedent

raw_data = dedent('''
    <root>
        <node>
            <![CDATA[
                <node>
            ]]>
        </node>
    </root>
''').strip()

print(
    f'{fromstring(raw_data) = }',
    f'{fromstring(raw_data).find("node").text.strip() = }',
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
from lxml.etree import fromstring
from textwrap import dedent

raw_data = dedent('''
    <root>
        <node>
            <![CDATA[
                <![CDATA[
                    <node>
                ]]>
            ]]>
        </node>
    </root>
''').strip()

print(
    f'{fromstring(raw_data) = }',
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

Also: quoting vs escaping has grammar significant (context-free vs context-sensitive.)

# non-nested, linear, ordered
s = 'a,b,c'
xs = ['a', 'b', 'c']
def run_computation():
    step1()
    step2()
    step3()
    step4()
def generate_report():
    raw_data = load_data()

    data = clean_data(raw_data)
    data = remove_outliers(data)

    results = compute_results(data)

    table = generate_table(results)

    return table
def generate_report():
    raw_data = load_data()

    data = clean_data(raw_data)
    data = remove_outliers(data)

    results = compute_results(data)

    table = generate_table(results)

    return table

from dis import dis
dis(generate_report)
def generate_report():
    raw_data = load_data()
    yield raw_data

    data = clean_data(raw_data)
    data = remove_outliers(data)
    yield data

    results = compute_results(data)
    yield results

    table = generate_table(results)
    yield table


# from dis import dis
# dis(generate_report)
from collections.abc import Iterable, Iterator

def g(): yield

gi = g()

print(
    f'{isinstance(gi, Iterable)  = }',
    f'{isinstance(gi, Iterator) = }',
    sep='\n',
)
from collections.abc import Iterable, Iterator

def g():
    yield 1
    yield 2
    yield 3

gi = g()

print(
    f'{next(gi) = }\t{gi.gi_frame.f_lasti = :>2}',
    f'{next(gi) = }\t{gi.gi_frame.f_lasti = :>2}',
    f'{next(gi) = }\t{gi.gi_frame.f_lasti = :>2}',
    sep='\n',
)

from dis import dis
dis(g)

But… there’s much more to it…!

Mechanics: “Aren’t they just about laziness?”

print("Let's take a look.")

Two general regimes of computation:

def f(): # eager
    return

def g(): # lazy
    yield

Where does “eagerness” and “laziness” show up in the tools we use?

from numpy.random import default_rng

rng = default_rng(0)

xs = rng.normal(size=10).round(2)

print(
    (xs + 1) * (xs - 2) / (xs ** 3),
    xs.sum(),
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
from numpy.random import default_rng
from pandas import Series, date_range, Grouper

rng = default_rng(0)

s = Series(
    index=(idx := date_range('2020-01-01', periods=10, name='date')),
    data=rng.normal(size=len(idx)).round(2),
)

print(
    # s,
    # s + 1,
    # s.sum(),
    # s.groupby(Grouper(freq='3D')),
    # s.groupby(Grouper(freq='3D')).mean(),
    # s.groupby(Grouper(freq='3D')).agg(lambda g: g.sum()),
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
from some_dask_like_framework import Computation

c = Computation()
c.transform(...).filter(...).filter(...).transform(...).compute()

Any multi-step or multi-stage computation can be thought of as comprised of:

from time import sleep

def compute(x):
    sleep(.2)
    return x**2

def process(dataset, *, num_steps=float('inf')):
    rv = []
    for idx, x in enumerate(dataset):
        if idx > num_steps: break
        rv.append(compute(x))
    return rv

dataset = range(10_000_000)
print(f'{process(dataset, num_steps=3) = }')

Two regimes of computations:

from matplotlib.pyplot import subplots, show
from numpy import linspace

def total_time(units, overhead, compute_time, decide_time):
    return units * (compute_time + decide_time) + overhead

xs = linspace(0, 100, 1_000)

fig, ax = subplots()
ax.set_title('Two Regimes of Computation')
ax.set_xlabel('units (i.e., dataset size) (# elements)')
ax.set_ylabel('total time (s)')
ax.plot(
    xs,
    total_time(xs, overhead=10e-3, compute_time=1e-9, decide_time=1e-6),
    color='r', label='T(compute) << T(decide)',
)
ax.plot(
    xs,
    total_time(xs, overhead=10e-3, compute_time=1e-3, decide_time=1e-6),
    color='b', label='T(compute) >> T(decide)',
)
ax.legend()
show()
from matplotlib.pyplot import subplots, show
from numpy import linspace

def total_time(units, overhead, compute_time, decide_time):
    return units * (compute_time + decide_time) + overhead

xs1 = linspace(0, 10e9, 1_000)
xs2 = linspace(0, 10e3, 1_000)

fig, ax = subplots()
ax.set_title('Two Regimes of Computation')
ax.set_xlabel('units (i.e., dataset size) (# elements)')
ax.set_ylabel('total time (s)')
ax.set_xlim(xs1[len(xs1)//4], xs1[len(xs1)*3//4])
ax.plot(
    xs1,
    total_time(xs1, overhead=10e-3, compute_time=1e-9, decide_time=1e-6),
    color='r', label='T(compute) << T(decide)',
)
ax.twinx().twiny().plot(
    xs2,
    total_time(xs2, overhead=10e-3, compute_time=1e-3, decide_time=1e-6),
    color='b', label='T(compute) >> T(decide)',
)
fig.legend()
show()

Two regimes of computations:

from contextlib import contextmanager
from time import perf_counter

@contextmanager
def timed(msg):
    before = perf_counter()
    yield
    after = perf_counter()
    print(f'{msg:<48} elapsed \N{mathematical bold capital delta}t: {(after - before) * 1e3:>8,.2f}ms')

from numpy.random import default_rng
from numpy import where, allclose
from itertools import combinations

results = []

rng = default_rng(0)

xs = rng.normal(size=10_000_000).round(2)

results.append(res := xs.copy())
with timed('xs.copy()[xs > 0] = xs**2'):
    mask = res > 0
    res[mask] = res[mask]**2
    res[~mask] = res[~mask]**3

results.append(res := xs.copy())
with timed('(xs**2) * (xs > 0) + (xs**3) * (xs < 0)'):
    mask = res > 0
    res[:] = res**2 * mask + res**3 * ~mask

results.append(res := xs.copy())
with timed('where(xs > 0, xs**2, xs**3'):
    res[:] = where(res > 0, res**2, res**3)

assert all(allclose(x, y) for x, y in combinations(results, r=2))
from contextlib import contextmanager
from time import perf_counter

@contextmanager
def timed(msg):
    before = perf_counter()
    yield
    after = perf_counter()
    print(f'{msg:<48} elapsed \N{mathematical bold capital delta}t: {after - before:>4,.4f}s')

from time import sleep

def computation(time):
    def compute(x):
        sleep(time)
        return x**2
    return compute

def eager(dataset, *, compute):
    rv = []
    for x in dataset:
        rv.append(compute(x))
    return rv

def lazy(dataset, *, compute):
    for x in dataset:
        yield compute(x)

dataset = range(100)
with timed('eager(…, compute=…(time=1e-3))—only first 3'):
    for idx, _ in enumerate(eager(dataset, compute=computation(time=10e-3)), start=1):
        if idx >= 3: break
with timed('lazy(…, compute=…(time=1e-3))—only first 3'):
    for idx, _ in enumerate(lazy(dataset, compute=computation(time=10e-3)), start=1):
        if idx >= 3: break
with timed('eager(…, compute=…(time=.1e-9))—only first 3'):
    for idx, _ in enumerate(eager(dataset, compute=computation(time=.1e-9)), start=1):
        if idx >= 3: break
with timed('lazy(…, compute=…(time=.1e-9))—only first 3'):
    for idx, _ in enumerate(lazy(dataset, compute=computation(time=.1e-9)), start=1):
        if idx >= 3: break
from numpy.random import default_rng
from pandas import DataFrame, CategoricalIndex
from string import ascii_lowercase

rng = default_rng(0)

names = rng.choice([*ascii_lowercase], size=(20_000, 4)).view('<U4').ravel()

df = DataFrame(
    index=(idx := CategoricalIndex(
        rng.choice(names, size=1_000_000),
        name='name',
    )),
    data={
        'value': rng.integers(10, size=len(idx)),
        'weight': abs(rng.normal(size=len(idx))).round(2),
    },
).sort_index()

print(
    df.head(3),
    df.tail(3),
    f'{len(df) = :,}',
    f'{df.groupby("name").ngroups = :,}',
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)

df.to_pickle('/tmp/data.pkl')
from contextlib import contextmanager
from time import perf_counter

@contextmanager
def timed(msg):
    before = perf_counter()
    yield
    after = perf_counter()
    print(f'{msg:<30} elapsed \N{mathematical bold capital delta}t: {after - before:>4,.2f}s')

from pandas import read_pickle, Series
from itertools import combinations
from numpy import allclose

df = read_pickle('/tmp/data.pkl')

results = []

with timed(f'groupby.apply'):
    results.append(
        df.groupby('name').apply(lambda g: (g['value'] * g['weight']).sum() / g['weight'].sum())
    )

with timed(f'decomposition to groupby.agg'):
    grouped_weight = df['weight'].groupby('name').sum()
    grouped_weighted_value = (
        Series(df['value'].values * df['weight'].values, index=df.index)
    ).groupby('name').sum()
    results.append(
        Series(grouped_weighted_value.values / grouped_weight.values, index=grouped_weight.index)
    )

assert all(allclose(x, y) for x, y in combinations(results, r=2))
assert all((x.index == y.index).all() for x, y in combinations(results, r=2))
from contextlib import contextmanager
from time import perf_counter

@contextmanager
def timed(msg):
    before = perf_counter()
    yield
    after = perf_counter()
    print(f'{msg:<48} elapsed \N{mathematical bold capital delta}t: {after - before:>4,.2f}s')

from numpy.random import default_rng
from pandas import DataFrame, CategoricalIndex, Series, IndexSlice
from string import ascii_lowercase
from itertools import product

rng = default_rng(0)

names = rng.choice([*ascii_lowercase], size=(80_000, 3)).view('<U3').ravel()

df = DataFrame(
    index=(idx := CategoricalIndex(
        rng.choice(names, size=2_000_000),
        name='name',
    )),
    data={
        ('value',  'a'): rng.integers(10, size=len(idx)),
        ('value',  'b'): rng.integers(10, size=len(idx)),
        ('value',  'c'): rng.integers(10, size=len(idx)),
        ('weight', 'a'): abs(rng.normal(size=len(idx))).round(2),
        ('weight', 'b'): abs(rng.normal(size=len(idx))).round(2),
        ('weight', 'c'): abs(rng.normal(size=len(idx))).round(2),
    },
).sort_index()

common_groups = df['value'].columns.intersection(df['weight'].columns)
with timed(f'→ incremental commputation (overall)'):
    for val_grp, wgt_grp in product(common_groups, repeat=2):
        with timed(f'incremental (`.groupby` on {val_grp, wgt_grp})'):
            grouped_weight = df['weight', wgt_grp].groupby('name').sum()
            grouped_weighted_value = (
                Series(df['value', val_grp].values * df['weight', wgt_grp].values, index=df.index)
            ).groupby('name').sum()
            Series(grouped_weighted_value.values / grouped_weight.values, index=grouped_weight.index)

common_groups = df['value'].columns.intersection(df['weight'].columns)
with timed('→ full computation (overall)'):
    with timed('full computation (construct)'):
        df2 = DataFrame({
            (col, (val_grp, wgt_grp)): df[col, val_grp]
            for val_grp, wgt_grp in product(common_groups, repeat=2)
            for col in {'weight', 'value'}
        })
    with timed('full computation (`.groupby`)'):
        grouped_weight = df2['weight'].groupby('name').sum()
        grouped_weighted_value = (
            DataFrame(
                df2['value'].values * df2['weight'].values,
                index=df2.index, columns=df2['value'].columns,
            )
        ).groupby('name').sum()
        DataFrame(
            grouped_weighted_value.values / grouped_weight.values,
            index=grouped_weighted_value.index,
            columns=grouped_weighted_value.columns,
        )

But… there’s much more to it…!

Motivation: “So what are they good for?”

print("Let's take a look!")

Generators are an important part of API design.

They externalise control over a computation.

from scipy.optimize import newton

help(newton)
from sympy.abc import x
from sympy import diff, expand, lambdify

f = (x + 3) * (x - 4) * (x + 5)
fprime = diff(f)

print(
    f'{f              = }',
    f'{fprime         = }',
    f'{expand(f)      = }',
    f'{expand(fprime) = }',
    f'{lambdify(x, f, "numpy")(-3) = }',
    f'{lambdify(x, f, "numpy")(+4) = }',
    f'{lambdify(x, f, "numpy")(-5) = }',
    sep='\n',
)
from scipy.optimize import newton
from sympy.abc import x
from sympy import diff, lambdify

f = (x + 3) * (x - 4) * (x + 5)
fprime = diff(f)

f, fprime = (lambdify(x, func, 'numpy') for func in [f, fprime])

print(
    f'{newton(f, -10, fprime) = :>5.2f}',
    f'{newton(f,   0, fprime) = :>5.2f}',
    f'{newton(f, +10, fprime) = :>5.2f}',
    sep='\n',
)
from sympy.abc import x
from sympy import diff, lambdify

f = (x + 3) * (x - 4) * (x + 5)
fprime = diff(f)

f, fprime = (lambdify(x, func, 'numpy') for func in [f, fprime])

def newton(f, x0, fprime):
    x = x0
    for _ in range(50):
        x -= f(x) / fprime(x)
    return x

print(
    f'{newton(f, -10, fprime) = :>5.2f}',
    f'{newton(f,   0, fprime) = :>5.2f}',
    f'{newton(f, +10, fprime) = :>5.2f}',
    sep='\n',
)
from sympy.abc import x
from sympy import diff, lambdify

f = (x + 3) * (x - 4) * (x + 5)
fprime = diff(f)

f, fprime = (lambdify(x, func, 'numpy') for func in [f, fprime])

def newton(f, x0, fprime, *, num_steps=50):
    x = x0
    for _ in range(num_steps):
        x -= f(x) / fprime(x)
    return x

print(
    f'{newton(f, -10, fprime, num_steps=10) = :>5.2f}',
    f'{newton(f,   0, fprime, num_steps=10) = :>5.2f}',
    f'{newton(f, +10, fprime, num_steps=10) = :>5.2f}',
    sep='\n',
)
from sympy.abc import x
from sympy import diff, lambdify
from math import isclose

f = (x + 3) * (x - 4) * (x + 5)
fprime = diff(f)

f, fprime = (lambdify(x, func, 'numpy') for func in [f, fprime])

def newton(f, x0, fprime, *, abs_tol=None, rel_tol=None):
    x = x0
    while True:
        if abs_tol is not None and isclose(0, f(x), abs_tol=abs_tol):
            break
        if rel_tol is not None and isclose(0, f(x), rel_tol=rel_tol):
            break
        x -= f(x) / fprime(x)
    return x

print(
    f'{newton(f, -10, fprime, abs_tol=1e-3) = :>5.2f}',
    f'{newton(f,   0, fprime, abs_tol=1e-3) = :>5.2f}',
    f'{newton(f, +10, fprime, abs_tol=1e-3) = :>5.2f}',
    f'{newton(f, -10, fprime, rel_tol=0.01) = :>5.2f}',
    f'{newton(f,   0, fprime, rel_tol=0.01) = :>5.2f}',
    f'{newton(f, +10, fprime, rel_tol=0.01) = :>5.2f}',
    sep='\n',
)
from scipy.optimize import newton

help(newton)

The number of modalities is unbounded…

from sympy.abc import x
from sympy import diff, lambdify
from math import isclose

f = (x + 3) * (x - 4) * (x + 5)
fprime = diff(f)

f, fprime = (lambdify(x, func, 'numpy') for func in [f, fprime])

def newton(f, x0, fprime, *, abs_imp=None, rel_imp=None):
    prev_x, x = None, x0
    while True:
        x -= f(x) / fprime(x)
        if prev_x is not None:
            if abs_imp is not None and isclose(f(prev_x), f(x), abs_tol=abs_imp):
                break
            if rel_imp is not None and isclose(f(prev_x), f(x), rel_tol=rel_imp):
                break
        prev_x = x
    return x

print(
    f'{newton(f, -10, fprime, abs_imp=1e-9) = :>5.2f}',
    f'{newton(f,   0, fprime, abs_imp=1e-9) = :>5.2f}',
    f'{newton(f, +10, fprime, abs_imp=1e-9) = :>5.2f}',
    f'{newton(f, -10, fprime, rel_imp=0.01) = :>5.2f}',
    f'{newton(f,   0, fprime, rel_imp=0.01) = :>5.2f}',
    f'{newton(f, +10, fprime, rel_imp=0.01) = :>5.2f}',
    sep='\n',
)

We are incapable of encoding all modalities for controlling this computation (even if we artificially limit them!)

from sympy.abc import x
from sympy import diff, lambdify
from math import isclose

f = (x + 3) * (x - 4) * (x + 5)
fprime = diff(f)

f, fprime = (lambdify(x, func, 'numpy') for func in [f, fprime])

def newton(f, x0, fprime, *, num_steps=500, abs_tol=None, rel_tol=None, abs_imp=None, rel_imp=None):
    prev_x, x = None, x0
    x = x0
    for _ in range(num_steps):
        if abs_tol is not None and isclose(0, f(x), abs_tol=abs_tol):
            break
        if rel_tol is not None and isclose(0, f(x), rel_tol=rel_tol):
            break
        x -= f(x) / fprime(x)
        if prev_x is not None:
            if abs_imp is not None and isclose(f(prev_x), f(x), abs_tol=abs_imp):
                break
            if rel_imp is not None and isclose(f(prev_x), f(x), rel_tol=rel_imp):
                break
        prev_x = x
    return x
from sympy.abc import x
from sympy import diff, lambdify
from math import isclose
from itertools import count

f = (x + 3) * (x - 4) * (x + 5)
fprime = diff(f)

f, fprime = (lambdify(x, func, 'numpy') for func in [f, fprime])

def newton(f, x0, fprime, *, num_steps=None, abs_tol=None, rel_tol=None, abs_imp=None, rel_imp=None):
    prev_x, x = None, x0
    x = x0
    for step in count(1):
        if num_steps is not None and num_steps < step:
            break
        if abs_tol is not None and isclose(0, f(x), abs_tol=abs_tol):
            break
        if rel_tol is not None and isclose(0, f(x), rel_tol=rel_tol):
            break
        x -= f(x) / fprime(x)
        if prev_x is not None:
            if abs_imp is not None and isclose(f(prev_x), f(x), abs_tol=abs_imp):
                break
            if rel_imp is not None and isclose(f(prev_x), f(x), rel_tol=rel_imp):
                break
        prev_x = x
    return x

print(
    f'{newton(f, -10, fprime, num_steps=10) = :>5.2f}',
    f'{newton(f,   0, fprime, num_steps=10) = :>5.2f}',
    f'{newton(f, +10, fprime, num_steps=10) = :>5.2f}',
    f'{newton(f, -10, fprime, abs_tol=1e-3) = :>5.2f}',
    f'{newton(f,   0, fprime, abs_tol=1e-3) = :>5.2f}',
    f'{newton(f, +10, fprime, abs_tol=1e-3) = :>5.2f}',
    f'{newton(f, -10, fprime, rel_tol=0.01) = :>5.2f}',
    f'{newton(f,   0, fprime, rel_tol=0.01) = :>5.2f}',
    f'{newton(f, +10, fprime, rel_tol=0.01) = :>5.2f}',
    f'{newton(f, -10, fprime, abs_imp=1e-9) = :>5.2f}',
    f'{newton(f,   0, fprime, abs_imp=1e-9) = :>5.2f}',
    f'{newton(f, +10, fprime, abs_imp=1e-9) = :>5.2f}',
    f'{newton(f, -10, fprime, rel_imp=0.01) = :>5.2f}',
    f'{newton(f,   0, fprime, rel_imp=0.01) = :>5.2f}',
    f'{newton(f, +10, fprime, rel_imp=0.01) = :>5.2f}',
    sep='\n',
)
from sympy.abc import x
from sympy import diff, lambdify
from hypothesis import given
from hypothesis.strategies import integers

@given(
    a=integers(),
    b=integers(),
    c=integers(),
)
def test_newton():
    f = (x + a) * (x - b) * (x + c)
    fprime = diff(f)
    f, fprime = (lambdify(x, func, 'numpy') for func in [f, fprime])

    for result in {
        newton(f, 0, fprime, num_steps=10),
        newton(f, 0, fprime, abs_tol=1e-3),
        newton(f, 0, fprime, rel_tol=0.01),
        newton(f, 0, fprime, abs_imp=1e-9),
        newton(f, 0, fprime, rel_imp=0.01),
    }:
        assert any(isclose(f(result), coeff) for coeff in {a, b, c})
from sympy.abc import x
from sympy import diff, lambdify
from hypothesis import given
from hypothesis.strategies import integers

@given(
    a=integers(),
    b=integers(),
    c=integers(),
)
def test_newton():
    f = (x + a) * (x - b) * (x + c)
    fprime = diff(f)
    f, fprime = (lambdify(x, func, 'numpy') for func in [f, fprime])

    for result in {
        newton(f, 0, fprime),
        newton(f, 0, fprime, num_steps=10),
        newton(f, 0, fprime, abs_tol=1e-3),
        newton(f, 0, fprime, rel_tol=0.01),
        newton(f, 0, fprime, abs_imp=1e-9),
        newton(f, 0, fprime, rel_imp=0.01),
        newton(f, 0, fprime, num_steps=10, abs_tol=1e-3),
        newton(f, 0, fprime, num_steps=10, rel_tol=0.01),
        newton(f, 0, fprime, num_steps=10, abs_imp=1e-9),
        newton(f, 0, fprime, num_steps=10, rel_imp=0.01),
        newton(f, 0, fprime, abs_tol=1e-3, rel_tol=0.01),
        newton(f, 0, fprime, abs_tol=1e-3, abs_imp=1e-9),
        newton(f, 0, fprime, abs_tol=1e-3, rel_imp=0.01),
        newton(f, 0, fprime, rel_tol=0.01, abs_imp=1e-9),
        newton(f, 0, fprime, rel_tol=0.01, rel_imp=0.01),
        newton(f, 0, fprime, abs_imp=1e-9, rel_imp=0.01),
        newton(f, 0, fprime, num_steps=10, abs_tol=1e-3, rel_tol=0.01),
        newton(f, 0, fprime, num_steps=10, abs_tol=1e-3, abs_imp=1e-9),
        newton(f, 0, fprime, num_steps=10, abs_tol=1e-3, rel_imp=0.01),
        newton(f, 0, fprime, num_steps=10, rel_tol=0.01, abs_imp=1e-9),
        newton(f, 0, fprime, num_steps=10, rel_tol=0.01, rel_imp=0.01),
        newton(f, 0, fprime, num_steps=10, abs_imp=1e-9, rel_imp=0.01),
        newton(f, 0, fprime, abs_tol=1e-3, rel_tol=0.01, abs_imp=1e-9),
        newton(f, 0, fprime, abs_tol=1e-3, rel_tol=0.01, rel_imp=0.01),
        newton(f, 0, fprime, abs_tol=1e-3, abs_imp=1e-9, rel_imp=0.01),
        newton(f, 0, fprime, rel_tol=0.01, abs_imp=1e-9, rel_imp=0.01),
        newton(f, 0, fprime, num_steps=10, abs_tol=1e-3, rel_tol=0.01, abs_imp=1e-9),
        newton(f, 0, fprime, num_steps=10, abs_tol=1e-3, rel_tol=0.01, rel_imp=0.01),
        newton(f, 0, fprime, num_steps=10, abs_tol=1e-3, abs_imp=1e-9, rel_imp=0.01),
        newton(f, 0, fprime, num_steps=10, rel_tol=0.01, abs_imp=1e-9, rel_imp=0.01),
        newton(f, 0, fprime, abs_tol=1e-3, rel_tol=0.01, abs_imp=1e-9, rel_imp=0.01),
        newton(f, 0, fprime, num_steps=10, abs_tol=1e-3, rel_tol=0.01, abs_imp=1e-9, rel_imp=0.01),
    }:
        assert any(isclose(f(result), coeff) for coeff in {a, b, c})
from sympy.abc import x
from sympy import diff, lambdify
from hypothesis import given
from hypothesis.strategies import integers
from itertools import combinations, chain

@given(
    a=integers(),
    b=integers(),
    c=integers(),
)
def test_newton():
    f = (x + a) * (x - b) * (x + c)
    fprime = diff(f)
    f, fprime = (lambdify(x, func, 'numpy') for func in [f, fprime])

    kwargs = dict(
        num_steps=10,
        abs_tol=1e-3,
        rel_tol=0.01,
        abs_imp=1e-9,
        rel_imp=0.01,
    )

    for kw in chain.from_iterable(
        combinations(kwargs.items(), r=r)
        for r in range(0, len(args)+1)
    ):
        result = newton(f, 0, fprime, **dict(kw))
        assert any(isclose(f(result), coeff) for coeff in {a, b, c})

A generator, by virtue of structuring the computation, can externalise the modalities of how the computation is performed.

from sympy.abc import x
from sympy import diff, lambdify
from math import isclose
from itertools import islice, takewhile, pairwise
from collections import deque
from textwrap import dedent

f = (x + 3) * (x - 4) * (x + 5)
fprime = diff(f)

f, fprime = (lambdify(x, func, 'numpy') for func in [f, fprime])

def newton(f, x0, fprime):
    x = x0
    while True:
        x -= f(x) / fprime(x)
        yield x

from time import perf_counter
def timed(g, t):
    start = perf_counter()
    for x in g:
        if perf_counter - start > t:
            break
        yield x

print(
    f'{deque(islice(newton(f, 0, fprime), 10), maxlen=1)[0] = :>5.2f}',

    dedent(f'''{
        deque(takewhile(
            lambda x: not isclose(f(x), 0, abs_tol=1e-6),
            newton(f, 0, fprime),
        ), maxlen=1)[0] = :>5.2f}
    ''').strip(),

    dedent(f'''{
        deque(takewhile(
            lambda x: not isclose(f(x), 0, rel_tol=0.01),
            newton(f, 0, fprime),
        ), maxlen=1)[0] = :>5.2f}
    ''').strip(),

    dedent(f'''{
        deque(takewhile(
            lambda xs: not isclose(f(xs[0]), f(xs[-1]), abs_tol=1e-6),
            pairwise(newton(f, 0, fprime)),
        ), maxlen=1)[0][-1] = :>5.2f}
    ''').strip(),

    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
from scipy.optimize import newton
from inspect import getsource

print(
    getsource(newton)
)

And… there’s much more to it…!

from itertools import tee, pairwise, islice

def g(xs):
    if __debug__: xs, xs_copy = tee(xs, 2)
    return 1 / sum((x - y) for x, y in pairwise(xs))

def target():
    pass

gi = g(10)
gi, gi_copy = tee(gi, 2)
# print(f'Debug: {next(gi_copy) = }')

for x in gi:
    print(f'Inside loop: {x = }')
print(f'{__debug__ = }')

def f():
    if __debug__:
        print()

from dis import dis
dis(f)