ts-python

Seminar VI: “Memory profiling in Python (and pandas)”

“But I predict that within 100 years, computers will be twice as powerful, 10000× larger, and so expensive that only the five richest kings of Europe will own them.”

— ‘Much Apu About Nothing’ (S07E23)

Abstract

How do you profile memory usage in your Python code? How do you find the spots in your code where you are allocating too much memory? And, critically, what do you *do about it?*

Do you…

Then come join us for a session on memory profiling in Python (and pandas)!

Python does not provide fine-grained control over memory allocation, deallocation, or layout. Tools like pandas often encourage making copies, going so far as to remove keyword arguments like inplace=… from their APIs. Though it may be the case for many of our analyses that we can simply buy a bigger computer, there are still circumstances under which we may need to control, predict, and reduce our memory usage.

In this episode, we will cover memory profilers in Python (memray) and how to effectively use them to locate, investigate, and address memory-intensive code. We’ll develop intuition around memory use, discuss how Python allocates and deallocates memory, and discuss approaches we can take in code using pandas to address memory issues and to improve the memory efficiency of our analyses.

Keywords: profiling, memory, pandas, Python

Notes

sys module, gc module, and “restricted computation domains”

print("Let's take a look!")

“What can the sys module tell us?”

from sys import getsizeof

class T:
    def __init__(self, data):
        self.data = data

obj = T(0)
obj = T([1, 2, 3])

print(
    # f'{getsizeof(0)         = }',
    # f'{getsizeof([1, 2, 3])   = }',
    f'{getsizeof(obj)       = }',
    sep='\n',
)
struct _object {
    Py_ssize_t ob_refcnt;
    PyTypeObject *ob_type;
};

typedef struct {
    PyObject ob_base;
    Py_ssize_t ob_size; /* Number of items in variable part */
} PyVarObject;

struct _longobject {
    PyObject_VAR_HEAD
    digit ob_digit[1];
};
from collections.abc import Iterable
from sys import getsizeof

def traverse(obj, *, seen={*()}):
    yield obj
    if isinstance(obj, Iterable) and not isinstance(obj, str):
        for x in obj:
            if id(x) in seen:
                continue
            yield from traverse(x, seen={*seen, id(obj)})

xs = [1, 2, 3]
ys = [4, 5, 6, xs]
zs = [7, 8, 9, ys]

print(
    f'{getsizeof(xs)                           = }',
    f'{getsizeof(ys)                           = }',
    f'{getsizeof(zs)                           = }',
    # f'{getsizeof(xs[0])                        = }',
    f'{sum(getsizeof(x) for x in traverse(xs)) = }',
    f'{sum(getsizeof(y) for y in traverse(ys)) = }',
    f'{sum(getsizeof(z) for z in traverse(zs)) = }',
    sep='\n',
)

“What can the gc module tell us?”

from gc import get_objects
from sys import getsizeof
import pandas

print(
    f'{get_objects()                            = }',
    f'{sum(getsizeof(x) for x in get_objects()) = :,}',
    sep='\n',
)
from gc import get_referents, get_referrers
from dataclasses import dataclass

@dataclass
class T:
    def __init__(self, x):
        self.x = x

@lambda f: f()
def _():
    obj0 = T(...)
    obj1 = T(obj0)

    print(
        # f'{get_referents(obj0) = }',
        f'{get_referrers(obj0) = }',
        sep='\n',
    )

“What can numpy.ndarray and pandas.Series, pandas.DataFrame tell us?”

from numpy import array, int32, int64
from numpy import shares_memory

# xs = array([1, 2, 3, 4], dtype=int32)
# xs = array([1, 2, 3, 4], dtype=int64)

print(
    f'{xs        = }',
    f'{xs.dtype  = }',
    f'{xs.nbytes = }',
    sep='\n',
)
from pandas import Series

s = Series([1, 2, 3, 4], index=[1, 2, 3, 4])
# s.attrs = {'a': ..., 'b': ...}
# s = Series('abcddddddddd abc ab a'.split(), index=[1, 2, 3, 4])

print(
    s,
    # s._data,
    # f'{s.memory_usage()          = }',
    # f'{s.memory_usage(deep=True) = }',
    sep='\n',
)

“Do we have fine-grained control over CPU use?”

class T:
    def __add__(self, other):
        return self
    __radd__ = __add__

xs = [1, 2, 3, 4]
# xs = [1, 2, 3, 4, T()]

print(f'{sum(xs) = }')
def f():
    pass
from collections.abc import Callable
from itertools import chain
from json import dumps

print(
    f"{isinstance(chain, Callable) and hasattr(chain, '__get__') = }",
    f"{isinstance(dumps, Callable) and hasattr(dumps, '__get__') = }",
    sep='\n'
)
def g(xs):
    for x in xs:
        yield x ** 2

gi = g([1, 2, 3, 4, 5])
print(
    f'{next(gi) = }',
    sep='\n',
)

“Do we have fine-grained control over memory use?”

from numpy import array, int32, int64

# xs = [1, 2, 3, 4]
xs = array([1, 2, 3, 4], dtype=int32)
from collections import deque

def f(xs):
    rv = []
    for x in xs:
        rv.append(x ** 2)
    return rv

f([1, 2, 3, 4, 5])

def g(xs):
    for x in xs:
        yield x ** 2

# for x in g([1, 2, 3, 4, 5]): pass

deque(g([1, 2, 3, 4, 5]), maxlen=1)

“Iteration helpers.”

from random import Random
from itertools import islice, tee, repeat, chain, repeat, zip_longest

nwise = lambda g, n=2: zip(*(islice(g, i, None) for i, g in enumerate(tee(g, n))))
nwise_longest = lambda g, n=2, fv=object(): zip_longest(*(islice(g, i, None) for i, g in enumerate(tee(g, n))), fillvalue=fv)
first = lambda g, n=1: zip(chain(repeat(True, n), repeat(False)), g)
last = lambda g, m=1, s=object(): ((y[-1] is s, x) for x, *y in nwise_longest(g, m+1, s))

rnd = Random(0)

events = {
    'a': rnd.randint(-10, +10),
    'b': rnd.randint(-10, +10),
    'c': rnd.randint(-10, +10),
    'd': rnd.randint(-10, +10),
}

if __name__ == '__main__':
    for is_first, (is_last, (prev, curr)) in first(last(nwise(events.items()))):
        if is_first:
            ...
        ...
        if is_last:
            ...

“Value disintermediation.”

def f(): pass
def g(): pass
def h(): pass

if __name__ == '__main__':
    mode = 'f'
    if mode == 'f':
        f()
    elif mode == 'g':
        g()
    elif mode == 'h':
        h()
def f(): pass
def g(): pass
def h(): pass

if __name__ == '__main__':
    modes = {'f': f, 'g': g, 'h': h}
    modes['f']()
from dataclasses import dataclass

@dataclass
class proposer:
    nodes : dict

@dataclass
class acceptor:
    nodes : dict

@dataclass
class learner:
    nodes : dict

if __name__ == '__main__':
    nodes = {}
    nodes.update({
        'proposers': [proposer(nodes)],
        'acceptors': [acceptor(nodes) for _ in range(3)],
        'learners':  [learner(nodes) for _ in range(2)],
    })
from dataclasses import dataclass

@dataclass
class proposer:
    nodes : dict

@dataclass
class acceptor:
    nodes : dict

@dataclass
class learner:
    nodes : dict

if __name__ == '__main__':
    nodes = {}
    nodes.update({
        proposer: [proposer(nodes)],
        acceptor: [acceptor(nodes) for _ in range(3)],
        learner:  [learner(nodes) for _ in range(2)],
    })

“Restricted computation domains.”

from dataclasses import dataclass
from random import Random

rnd = Random(0)

@dataclass
class Datum:
    value : int

def var(xs):
    avg = sum(x.value for x in xs) / len(xs)
    return sum((x.value - avg) ** 2 for x in xs) / len(xs)

xs = [Datum(rnd.randint(-10, +10)) for _ in range(10)]
print(f'{var(xs) = :.2f}')
from dataclasses import dataclass
from random import Random

rnd = Random(0)

@dataclass
class Data:
    def __init__(self, values):
        self.values = values
    __iter__ = lambda self: iter(self.values)
    __len__ = lambda self: len(self.values)

    def var(self):
        avg = sum(x for x in self) / len(self)
        return sum((x - avg) ** 2 for x in self) / len(self)

xs = Data([rnd.randint(-10, +10) for _ in range(10)])

print(f'{xs.var() = :.2f}')
from numpy import array
from random import Random

rnd = Random(0)

xs = array([rnd.randint(-10, +10) for _ in range(10)])

print(f'{xs.var() = :.2f}')
from numpy import array
from numpy.random import default_rng

rng = default_rng(0)

xs = rng.integers(-10, +10, size=10)

print(f'{xs.var() = :.2f}')

“How do Python and NumPy/pandas fit together?”

from itertools import chain, combinations

def powerset(xs):
    return chain.from_iterable(
        combinations(xs, r=r) for r in range(len(xs) + 1)
    )

events = [
    ('abc', +1),
    ('def', -2),
    ('xyz', +3),
]

scenarios = {frozenset(x) for x in powerset(events)}

print(
    *sorted(scenarios, key=len),
    sep='\n'
)
from itertools import chain, combinations
from pandas import MultiIndex, Series, date_range, Grouper
from numpy.random import default_rng

rng = default_rng(0)

def powerset(xs):
    return chain.from_iterable(
        combinations(xs, r=r) for r in range(len(xs) + 1)
    )

events = [
    ('abc + 1', 'abc', lambda s: s + 1),
    ('def × 2', 'def', lambda s: s * 2),
    ('xyz ^ 3', 'xyz', lambda s: s ** 3),
]

scenarios = {frozenset(x) for x in powerset(events)}

s = Series(
    index=(idx := MultiIndex.from_product([
        'abc def xyz'.split(),
        date_range('2020-01-01', periods=90),
    ], names=['entity', 'date'])),
    data=rng.integers(-10, +10, size=len(idx)),
)

for sc in sorted(scenarios, key=len):
    s_ = s.copy()
    for _, ent, dlt in sc:
        mask = s.index.get_level_values('entity') == ent
        s_.loc[mask] = dlt(s_.loc[mask])
    print(
        ' {} '.format(', '.join(sorted(nm for nm, *_ in sc))).center(40, '\N{box drawings light horizontal}'),
        s_.groupby(Grouper(level='date', freq='M')).mean().round(2),
        sep='\n',
    )

tracemalloc

print("Let's take a look!")
from tracemalloc import start, take_snapshot

start()

before = take_snapshot()

# xs = [None for _ in range(1024 ** 2)]
xs = [None for _ in range(10 * 1024 ** 2)]

after = take_snapshot()

print(
    *(
        diff
        for diff in after.compare_to(before, 'lineno')
        if diff.traceback[0].filename == __file__
    ),
    sep='\n'
)
from tracemalloc import start, take_snapshot; start()
from contextlib import contextmanager
from random import random

@contextmanager
def profile_memory(*files):
    try:
        before = take_snapshot()
        yield
    finally:
        after = take_snapshot()
        diffs = (
            diff for diff in after.compare_to(before, 'lineno')
            if diff.traceback[0].filename in files
        )
        print(*diffs, sep='\n')

with profile_memory(__file__):
    xs = [random() for _ in range(100_000)]

“How do we make this useful?”

from numpy import int64, sqrt, array
from numpy.random import default_rng

rng = default_rng(0)

coeffs = rng.integers(
    -1_000, +1000,
    size=(100_000, 3),
).view(
    [('a', int64), ('b', int64), ('c', int64)],
).ravel()

# print(coeffs)

mask = (coeffs['b'] ** 2 > 4 * coeffs['a'] * coeffs['c']) & (2 * coeffs['a'] != 0)
real_coeffs = coeffs[mask]

a, b, c = real_coeffs['a'], real_coeffs['b'], real_coeffs['c']
roots = array([
    (-b + sqrt(b**2 - 4*a*c)) / (2*a),
    #                 ###        ###
    (-b - sqrt(b**2 - 4*a*c)) / (2*a),
]).T

print(roots)
from numpy import int64, sqrt, array
from numpy.random import default_rng
from tracemalloc import start, take_snapshot; start()
from contextlib import contextmanager

@contextmanager
def profile_memory(*files):
    try:
        before = take_snapshot()
        yield
    finally:
        after = take_snapshot()
        diffs = (
            diff for diff in after.compare_to(before, 'lineno')
            if diff.traceback[0].filename in files
        )
        print(*sorted(diffs, key=lambda d: d.size, reverse=True), sep='\n')

rng = default_rng(0)

coeffs = rng.integers(
    -1_000, +1000,
    size=(1_000_000, 3),
).view(
    [('a', int64), ('b', int64), ('c', int64)],
).ravel()

# print(coeffs)

mask = (coeffs['b'] ** 2 > 4 * coeffs['a'] * coeffs['c']) & (2 * coeffs['a'] != 0)
real_coeffs = coeffs[mask]

a, b, c = real_coeffs['a'], real_coeffs['b'], real_coeffs['c']

with profile_memory(__file__):
    roots = array([
        (-b + sqrt(b**2 - 4*a*c)) / (2*a),
        (-b - sqrt(b**2 - 4*a*c)) / (2*a),
    ]).T

print(f'{roots.nbytes = :,}')
from numpy import int64, sqrt, array
from numpy.random import default_rng
from tracemalloc import start, take_snapshot; start()
from contextlib import contextmanager
from numexpr import evaluate

@contextmanager
def profile_memory(*files):
    try:
        before = take_snapshot()
        yield
    finally:
        after = take_snapshot()
        diffs = (
            diff for diff in after.compare_to(before, 'lineno')
            if diff.traceback[0].filename in files
        )
        print(*sorted(diffs, key=lambda d: d.size, reverse=True), sep='\n')

rng = default_rng(0)

coeffs = rng.integers(
    -1_000, +1000,
    size=(1_000_000, 3),
).view(
    [('a', int64), ('b', int64), ('c', int64)],
).ravel()

# print(coeffs)

mask = (coeffs['b'] ** 2 > 4 * coeffs['a'] * coeffs['c']) & (2 * coeffs['a'] != 0)
real_coeffs = coeffs[mask]

a, b, c = real_coeffs['a'], real_coeffs['b'], real_coeffs['c']

with profile_memory(__file__):
    roots = array([
        (-b + sqrt(b**2 - 4*a*c)) / (2*a),
        (-b - sqrt(b**2 - 4*a*c)) / (2*a),
    ]).T
print(f'\N{box drawings light horizontal}' * 40)
with profile_memory(__file__):
    roots = array([
        evaluate('(-b + sqrt(b**2 - 4*a*c)) / (2*a)'),
        evaluate('(-b - sqrt(b**2 - 4*a*c)) / (2*a)'),
    ])

print(f'{roots.nbytes = :,}')
from datetime import timedelta
from random import Random
from enum import Enum
from string import ascii_lowercase, digits

from pandas import DataFrame, to_datetime, concat, MultiIndex
from numpy.random import default_rng

rng = default_rng(0)
rnd = Random(0)

State = Enum('State', 'A B C D E')

def simulate():
    td = lambda days: timedelta(seconds=rnd.randrange(24 * 60 * 60 * days))

    yield State.A, td(days=90)

    if rnd.choices([True, False], weights=[.90, .10])[0]:
        yield State.B, td(days=30)

        if rnd.choices([True, False], weights=[.95, .05])[0]:
            for _ in range(max(0, int(rnd.gauss(mu=2, sigma=1)))):
                yield State.C, td(days=14)
                yield State.D, td(days=14)
            if rnd.choices([True, False], weights=[.95, .05])[0]:
                yield State.C, td(days=14)

        if rnd.choices([True, False], weights=[.50, .50])[0]:
            yield State.E, td(days=30)

if __name__ == '__main__':
    entities = rng.choice([*ascii_lowercase, *digits], size=(10, 4))
    entities = entities.view('<U4').ravel()

    lifecycles = (
        DataFrame(simulate(), columns='state time'.split())
            .assign(entity=ent)
            .set_index(['entity', 'state'])
            .cumsum()
            .pipe(lambda df: df + to_datetime('2020-01-01'))
            .squeeze(axis='columns')
        for ent in entities
    )
    lifecycles = concat(lifecycles, axis='index')

    print(lifecycles)

    (
        lifecycles
            .pipe(lambda s: s
                .set_axis(
                    MultiIndex.from_arrays([
                        s.index.get_level_values('entity'),
                        s.index.get_level_values('state').map(lambda x: x.name),
                    ])
                )
            )
            .to_pickle('lifecycles.pkl')
    )
from pandas import read_pickle, concat, DataFrame, date_range, MultiIndex, Series, IndexSlice

s = read_pickle('lifecycles.pkl')

idx = s.dt.to_period('D').groupby(['entity']).agg(['min', 'max']).pipe(
     lambda df: concat(DataFrame({
         'time': date_range(row['min'].iloc[0].start_time, row['max'].iloc[0].end_time),
     }).assign(entity=idx).squeeze(axis='columns') for idx, row in df.groupby('entity')),
)

idx = MultiIndex.from_arrays([idx['entity'], idx['time']])
print(f'{len(idx) = }')

states = Series(
    data=None,
    index=idx,
    dtype=str,
)
data = s.dt.floor('D').reset_index('state', drop=False).set_index('time', append=True).squeeze(axis='columns')
states.loc[data.index] = data.values
states = states.groupby('entity').ffill()

print(
    states.groupby(states.index.get_level_values('time').to_period('M')).value_counts().loc[IndexSlice[:, 'B', :]],
    sep='\n',
)
from tracemalloc import start, take_snapshot; start()
from contextlib import contextmanager
from pandas import read_pickle, concat, DataFrame, date_range, MultiIndex, Series, IndexSlice


@contextmanager
def profile_memory(*files):
    try:
        before = take_snapshot()
        yield
    finally:
        after = take_snapshot()
        diffs = (
            diff for diff in after.compare_to(before, 'lineno')
            if diff.traceback[0].filename in files
        )
        print(*sorted(diffs, key=lambda d: d.size, reverse=True), sep='\n')

s = read_pickle('lifecycles.pkl')

idx = s.dt.to_period('D').groupby(['entity']).agg(['min', 'max']).pipe(
     lambda df: concat(DataFrame({
         'time': date_range(row['min'].iloc[0].start_time, row['max'].iloc[0].end_time),
     }).assign(entity=idx).squeeze(axis='columns') for idx, row in df.groupby('entity')),
)
with profile_memory(__file__):
    idx = MultiIndex.from_arrays([idx['entity'], idx['time']])

states = Series(
    data=None,
    index=idx,
    dtype=str,
)
data = s.dt.floor('D').reset_index('state', drop=False).set_index('time', append=True).squeeze(axis='columns')
states.loc[data.index] = data.values
states = states.groupby('entity').ffill()

print(
    states.groupby(states.index.get_level_values('time').to_period('M')).value_counts().loc[IndexSlice[:, 'B', :]],
    sep='\n',
)

memray

print("Let's take a look!")
from pandas import period_range, MultiIndex, CategoricalIndex, DataFrame, date_range
from numpy import tile, where
from numpy.random import default_rng
from string import ascii_lowercase

rng = default_rng(0)

entities = rng.choice([*ascii_lowercase], size=(10, 4)).view('<U4').ravel()
periods = period_range('2020-01-01', periods=90, freq='D')
idx = MultiIndex.from_product([
    periods,
    CategoricalIndex(entities),
], names=['date', 'entity'])

prices = DataFrame(
    data={
        'buy': (buy := (
            rng.normal(loc=100, scale=20, size=len(entities)).clip(0)
             *
            rng.normal(loc=1, scale=0.05, size=(len(periods), len(entities))).clip(.80, 1.2).cumprod(axis=0)
        ).ravel()),
        'sell': buy * rng.normal(loc=1.2, scale=.05, size=len(idx)).clip(1, 1.5),
    },
    index=idx,
).round(2)

dates = date_range('2020-01-01', periods=90)
idx = MultiIndex.from_product([
    tile(dates, 10),
    CategoricalIndex(entities),
], names=['date', 'entity'])

trades = DataFrame({
        'volume': (volume := rng.integers(-100_000, +100_000, size=len(idx)).round(-2)),
        'price': (prices.stack().loc[
            MultiIndex.from_arrays([
                idx.get_level_values('date').to_period('D'),
                idx.get_level_values('entity'),
                where(volume > 0, 'buy', 'sell'),
            ])
        ].values * rng.normal(loc=1, scale=.1, size=len(idx))).round(2),
    },
    index=idx,
).sample(frac=.50, random_state=rng).sort_index()

print(
    (trades['volume'] * trades['price']).groupby('entity').sum()
    +
    (
        trades['volume'].groupby('entity').sum().pipe(
            lambda s: s.set_axis(
                MultiIndex.from_arrays([s.index, where(s > 0, 'sell', 'buy')])
            )
        ) * prices.loc[prices.index.get_level_values('date')[-1]].stack()
    ).dropna().droplevel(level=-1),
)
from pandas import period_range, MultiIndex, CategoricalIndex, DataFrame, date_range, to_timedelta, read_pickle
from numpy import tile, where
from numpy.random import default_rng
from string import ascii_lowercase
from tempfile import TemporaryDirectory
from collections import defaultdict
from pathlib import Path
from logging import getLogger, basicConfig, INFO
from asyncio import run, gather, sleep as aio_sleep

logger = getLogger(__name__)
basicConfig(level=INFO)

rng = default_rng(0)

def generate_prices(entities, periods):
    idx = MultiIndex.from_product([
        periods,
        CategoricalIndex(entities),
    ], names=['date', 'entity'])

    return DataFrame(
        data={
            'buy': (buy := (
                rng.normal(loc=100, scale=20, size=len(entities)).clip(0)
                 *
                rng.normal(loc=1, scale=0.05, size=(len(periods), len(entities))).clip(.80, 1.2).cumprod(axis=0)
            ).ravel()),
            'sell': buy * rng.normal(loc=1.2, scale=.05, size=len(idx)).clip(1, 1.5),
        },
        index=idx,
    ).round(2)

def generate_trades(prices):
    dates = date_range(
        prices.index.get_level_values('date').min().to_timestamp(),
        prices.index.get_level_values('date').max().to_timestamp(),
    )
    entities = prices.index.get_level_values('entity').unique()
    idx = MultiIndex.from_product([
        tile(dates, 100),
        CategoricalIndex(entities),
    ], names=['date', 'entity'])
    idx = MultiIndex.from_arrays([
        idx.get_level_values('date') + to_timedelta(rng.integers(24 * 60 * 60, size=len(idx)), unit='s'),
        idx.get_level_values('entity'),
    ], names=idx.names)

    return DataFrame({
            'volume': (volume := rng.integers(-100_000, +100_000, size=len(idx)).round(-2)),
            'price': (prices.stack().loc[
                MultiIndex.from_arrays([
                    idx.get_level_values('date').to_period('D'),
                    idx.get_level_values('entity'),
                    where(volume > 0, 'buy', 'sell'),
                ])
            ].values * rng.normal(loc=1, scale=.1, size=len(idx))).round(2),
        },
        index=idx,
    ).sample(frac=.50, random_state=rng).sort_index()

async def producer(data_dir):
    entities = rng.choice([*ascii_lowercase], size=(10, 4)).view('<U4').ravel()
    periods = period_range('2020-01-01', periods=52, freq='W')
    for w in periods:
        prices = generate_prices(entities, period_range(w.start_time, w.end_time, freq='D'))
        trades = generate_trades(prices)

        prices_path = data_dir / f"prices.{prices.index.get_level_values('date').max().to_timestamp():%Y%m%d}.pkl"
        prices.to_pickle(prices_path)
        logger.info('Wrote prices = %s', prices_path)
        await aio_sleep(1)

        trades_path = data_dir / f"trades.{trades.index.get_level_values('date').max():%Y%m%d}.pkl"
        trades.to_pickle(trades_path)
        logger.info('Wrote trades = %s', trades_path)
        await aio_sleep(1)

async def consumer(data_dir, queue):
    seen_files = {*()}

    while True:
        all_files = {*data_dir.iterdir()}
        new_files = all_files - seen_files

        to_process = defaultdict(dict)
        for p in new_files:
            ftype, date = p.stem.split('.')
            to_process[date][ftype] = p

        for ps in to_process.values():
            if len(ps) == 2 and ps.keys() == {'prices', 'trades'}:
                logger.info('Process = %s', ps)
                queue.append(ps)
                seen_files.update(ps)

        await aio_sleep(3)

async def worker(queue):
    while True:
        if queue:
            ps = queue.pop()
            prices, trades = read_pickle(ps['prices']), read_pickle(ps['trades'])
            print(
                (trades['volume'] * trades['price']).groupby('entity').sum()
                +
                (
                    trades['volume'].groupby('entity').sum().pipe(
                        lambda s: s.set_axis(
                            MultiIndex.from_arrays([s.index, where(s > 0, 'sell', 'buy')])
                        )
                    ) * prices.loc[prices.index.get_level_values('date')[-1]].stack()
                ).dropna().droplevel(level=-1),
            )
        await aio_sleep(0)

async def main():
    queue = []
    with TemporaryDirectory() as d:
        data_dir = Path(d)
        logger.info('data_dir = %s', data_dir)
        await gather(
            producer(data_dir),
            consumer(data_dir, queue),
            worker(queue),
            worker(queue),
            worker(queue),
        )

if __name__ == '__main__':
    run(main())

ulimit

print("Let's take a look!")
from collections import namedtuple
from functools import wraps
from inspect import signature
from itertools import islice, tee
from math import isclose
from shlex import quote
from subprocess import check_call, DEVNULL, CalledProcessError
from textwrap import dedent

from numpy import arange

nwise = lambda g, *, n=2: zip(*(islice(g, i, None) for i, g in enumerate(tee(g, n))))

def binary_search(values):
    lidx, ridx = 0, len(values) - 1
    while True:
        if (ridx - lidx) <= 1:
            break
        if (yield values[(midx := (ridx - lidx) // 2 + lidx)]):
            lidx = midx
        else:
            ridx = midx

class BisectedRun(namedtuple('RunBase', 'signature result exception')):
    sig = property(lambda s: s.signature)
    res = property(lambda s: s.result)
    exc = property(lambda s: s.exception)

    @lambda cls: cls()
    class args:
        def __get__(self, instance, owner):
            return namedtuple('Arguments', instance.sig.arguments.keys())(**instance.sig.arguments)

    @classmethod
    def support(cls, arguments, exceptions):
        def dec(f):
            sig = signature(f)
            @lambda bisected: setattr(f, 'bisected', bisected)
            @wraps(arguments)
            def bisected(*arg_args, **arg_kwargs):
                ci = binary_search(arguments(*arg_args, **arg_kwargs))
                @wraps(f)
                def inner(*f_fixed_args, **f_fixed_kwargs):
                    (f_args, f_kwargs), exc = next(ci), None
                    while True:
                        f_bound_args = sig.bind(
                            *f_fixed_args, *f_args,
                            **f_fixed_kwargs, **f_kwargs
                        )
                        try:
                            res = f(*f_bound_args.args, **f_bound_args.kwargs)
                        except (*exceptions,) as e:
                            exc = e
                        else:
                            exc = None
                        yield cls(f_bound_args, exc, res)
                        try:
                            f_args, f_kwargs = ci.send(exc is not None)
                        except StopIteration:
                            break
                return inner
            return f
        return dec

@BisectedRun.support(
    arguments=lambda min_vmem, max_vmem:
        [((), {'vmem': m}) for m in arange(min_vmem, max_vmem, 10 * 1024, dtype=int)],
    exceptions={CalledProcessError},
)
def run_python_ulimit(code, vmem):
    return check_call([
        'sh', '-c', f'''(
            ulimit -v {vmem}
            python -c {quote(code)}
        )'''
    ], stderr=DEVNULL, stdout=DEVNULL)

if __name__ == '__main__':
    code = dedent('''
        import numpy
    ''').strip()

    for prev_run, curr_run in nwise(run_python_ulimit.bisected(0, 2**20)(code)):
        pass
    bad_mem, ok_mem = sorted([prev_run.args.vmem, curr_run.args.vmem])

    print(
        f'{ok_mem  = :_}',
        f'{bad_mem = :_}',
        sep='\n'
    )

    # run_python_ulimit(code, ok_mem)
    # run_python_ulimit(code, bad_mem)
from collections import namedtuple
from functools import wraps
from inspect import signature
from itertools import islice, tee
from math import isclose
from shlex import quote
from subprocess import check_call, DEVNULL, CalledProcessError
from textwrap import dedent

from numpy import arange

nwise = lambda g, *, n=2: zip(*(islice(g, i, None) for i, g in enumerate(tee(g, n))))

def binary_search(values):
    lidx, ridx = 0, len(values) - 1
    while True:
        if (ridx - lidx) <= 1:
            break
        if (yield values[(midx := (ridx - lidx) // 2 + lidx)]):
            lidx = midx
        else:
            ridx = midx

class BisectedRun(namedtuple('RunBase', 'signature result exception')):
    sig = property(lambda s: s.signature)
    res = property(lambda s: s.result)
    exc = property(lambda s: s.exception)

    @lambda cls: cls()
    class args:
        def __get__(self, instance, owner):
            return namedtuple('Arguments', instance.sig.arguments.keys())(**instance.sig.arguments)

    @classmethod
    def support(cls, arguments, exceptions):
        def dec(f):
            sig = signature(f)
            @lambda bisected: setattr(f, 'bisected', bisected)
            @wraps(arguments)
            def bisected(*arg_args, **arg_kwargs):
                ci = binary_search(arguments(*arg_args, **arg_kwargs))
                @wraps(f)
                def inner(*f_fixed_args, **f_fixed_kwargs):
                    (f_args, f_kwargs), exc = next(ci), None
                    while True:
                        f_bound_args = sig.bind(
                            *f_fixed_args, *f_args,
                            **f_fixed_kwargs, **f_kwargs
                        )
                        try:
                            res = f(*f_bound_args.args, **f_bound_args.kwargs)
                        except (*exceptions,) as e:
                            exc = e
                        else:
                            exc = None
                        yield cls(f_bound_args, exc, res)
                        try:
                            f_args, f_kwargs = ci.send(exc is not None)
                        except StopIteration:
                            break
                return inner
            return f
        return dec

@BisectedRun.support(
    arguments=lambda min_vmem, max_vmem:
        [((), {'vmem': m}) for m in arange(min_vmem, max_vmem, 1024, dtype=int)],
    exceptions={CalledProcessError},
)
def run_python_ulimit(code, vmem):
    return check_call([
        'sh', '-c', f'''(
            ulimit -v {vmem}
            python -c {quote(code)}
        )'''
    ], stderr=DEVNULL, stdout=DEVNULL)

if __name__ == '__main__':
    attrdict_protocols = dedent('''
        # attrdict via dispatch to `dict.__*item__` protocols
        class attrdict(dict):
            __getattr__ = dict.__getitem__
            __setattr__ = dict.__setitem__
            __delattr__ = dict.__delitem__
    ''')
    attrdict_circular = dedent('''
        # attrdict via circular reference on `self.__dict__`
        class attrdict(dict):
            def __init__(self, *args, **kwargs):
                super().__init__(*args, **kwargs)
                self.__dict__ = self
    ''')
    simple_test = dedent('''
        for _ in range(2):
            d = attrdict({idx: None for idx in range(100_000)})
    ''')
    sustained_test = dedent('''
        for _ in range(100):
            d = attrdict({idx: None for idx in range(100_000)})
    ''')

    code = '\n'.join([attrdict_protocols, simple_test])
    for prev_run, curr_run in nwise(run_python_ulimit.bisected(0, 2**20)(code)):
        pass
    attrdict_protocols_ok_mem = max([prev_run.args.vmem, curr_run.args.vmem])

    code = '\n'.join([attrdict_circular, simple_test])
    for prev_run, curr_run in nwise(run_python_ulimit.bisected(0, 2**20)(code)):
        pass
    attrdict_circular_ok_mem = max([prev_run.args.vmem, curr_run.args.vmem])

    assert isclose(attrdict_protocols_ok_mem, attrdict_circular_ok_mem, abs_tol=1024), 'memory usage in simple test should be close (≤1MiB)'

    code = '\n'.join([attrdict_protocols, sustained_test])
    # run_python_ulimit(code, attrdict_protocols_ok_mem)

    code = '\n'.join([attrdict_circular, sustained_test])
    # run_python_ulimit(code, attrdict_circular_ok_mem)
from collections import namedtuple
from functools import wraps
from inspect import signature
from itertools import islice, tee
from math import isclose
from shlex import quote
from subprocess import check_call, DEVNULL, CalledProcessError
from textwrap import dedent

from numpy import arange

nwise = lambda g, *, n=2: zip(*(islice(g, i, None) for i, g in enumerate(tee(g, n))))

def binary_search(values):
    lidx, ridx = 0, len(values) - 1
    while True:
        if (ridx - lidx) <= 1:
            break
        if (yield values[(midx := (ridx - lidx) // 2 + lidx)]):
            lidx = midx
        else:
            ridx = midx

class BisectedRun(namedtuple('RunBase', 'signature result exception')):
    sig = property(lambda s: s.signature)
    res = property(lambda s: s.result)
    exc = property(lambda s: s.exception)

    @lambda cls: cls()
    class args:
        def __get__(self, instance, owner):
            return namedtuple('Arguments', instance.sig.arguments.keys())(**instance.sig.arguments)

    @classmethod
    def support(cls, arguments, exceptions):
        def dec(f):
            sig = signature(f)
            @lambda bisected: setattr(f, 'bisected', bisected)
            @wraps(arguments)
            def bisected(*arg_args, **arg_kwargs):
                ci = binary_search(arguments(*arg_args, **arg_kwargs))
                @wraps(f)
                def inner(*f_fixed_args, **f_fixed_kwargs):
                    (f_args, f_kwargs), exc = next(ci), None
                    while True:
                        f_bound_args = sig.bind(
                            *f_fixed_args, *f_args,
                            **f_fixed_kwargs, **f_kwargs
                        )
                        try:
                            res = f(*f_bound_args.args, **f_bound_args.kwargs)
                        except (*exceptions,) as e:
                            exc = e
                        else:
                            exc = None
                        yield cls(f_bound_args, exc, res)
                        try:
                            f_args, f_kwargs = ci.send(exc is not None)
                        except StopIteration:
                            break
                return inner
            return f
        return dec

@BisectedRun.support(
    arguments=lambda min_vmem, max_vmem:
        [((), {'vmem': m}) for m in arange(min_vmem, max_vmem, 1024, dtype=int)],
    exceptions={CalledProcessError},
)
def run_python_ulimit(code, vmem):
    return check_call([
        'sh', '-c', f'''(
            ulimit -v {vmem}
            python -c {quote(code)}
        )'''
    ], stderr=DEVNULL, stdout=DEVNULL)

if __name__ == '__main__':
    core_code = dedent('''
        from numpy import array
        def create_normal_array(size=100_000):
            xs = array([None for _ in range(size)], dtype=object)
            ys = array([None for _ in range(size)], dtype=object)
            return xs, ys
        def create_circular_array(size=100_000):
            xs = array([None for _ in range(size)], dtype=object)
            ys = array([None for _ in range(size)], dtype=object)
            xs[0], ys[0] = ys, xs # circular reference
            return xs, ys
    ''')
    baseline_test = dedent('''
        for _ in range(2):
            xs, ys = create_normal_array()
    ''')
    sustained_test = dedent('''
        for _ in range(100):
            xs, ys = create_circular_array()
    ''')
    verification_test = dedent('''
        for _ in range(100):
            xs, ys = create_normal_array()
    ''')

    code = '\n'.join([core_code, baseline_test])
    for prev_run, curr_run in nwise(run_python_ulimit.bisected(0, 2**20)(code)):
        pass
    ok_mem = max([prev_run.args.vmem, curr_run.args.vmem])

    code = '\n'.join([core_code, verification_test])
    # run_python_ulimit(code, ok_mem)

    code = '\n'.join([core_code, sustained_test])
    # run_python_ulimit(code, ok_mem)