“But I predict that within 100 years, computers will be twice as powerful, 10000× larger, and so expensive that only the five richest kings of Europe will own them.”
— ‘Much Apu About Nothing’ (S07E23)
How do you profile memory usage in your Python code? How do you find the spots in your code where you are allocating too much memory? And, critically, what do you *do about it?*
Do you…
Then come join us for a session on memory profiling in Python (and pandas)!
Python does not provide fine-grained control over memory allocation,
deallocation, or layout. Tools like pandas often encourage making copies, going
so far as to remove keyword arguments like inplace=… from their APIs. Though
it may be the case for many of our analyses that we can simply buy a bigger
computer, there are still circumstances under which we may need to control,
predict, and reduce our memory usage.
In this episode, we will cover memory profilers in Python (memray) and how to
effectively use them to locate, investigate, and address memory-intensive code.
We’ll develop intuition around memory use, discuss how Python allocates and
deallocates memory, and discuss approaches we can take in code using pandas
to address memory issues and to improve the memory efficiency of our analyses.
Keywords: profiling, memory, pandas, Python
sys module, gc module, and “restricted computation domains”print("Let's take a look!")
“What can the sys module tell us?”
from sys import getsizeof
class T:
def __init__(self, data):
self.data = data
obj = T(0)
obj = T([1, 2, 3])
print(
# f'{getsizeof(0) = }',
# f'{getsizeof([1, 2, 3]) = }',
f'{getsizeof(obj) = }',
sep='\n',
)
struct _object {
Py_ssize_t ob_refcnt;
PyTypeObject *ob_type;
};
typedef struct {
PyObject ob_base;
Py_ssize_t ob_size; /* Number of items in variable part */
} PyVarObject;
struct _longobject {
PyObject_VAR_HEAD
digit ob_digit[1];
};
from collections.abc import Iterable
from sys import getsizeof
def traverse(obj, *, seen={*()}):
yield obj
if isinstance(obj, Iterable) and not isinstance(obj, str):
for x in obj:
if id(x) in seen:
continue
yield from traverse(x, seen={*seen, id(obj)})
xs = [1, 2, 3]
ys = [4, 5, 6, xs]
zs = [7, 8, 9, ys]
print(
f'{getsizeof(xs) = }',
f'{getsizeof(ys) = }',
f'{getsizeof(zs) = }',
# f'{getsizeof(xs[0]) = }',
f'{sum(getsizeof(x) for x in traverse(xs)) = }',
f'{sum(getsizeof(y) for y in traverse(ys)) = }',
f'{sum(getsizeof(z) for z in traverse(zs)) = }',
sep='\n',
)
“What can the gc module tell us?”
from gc import get_objects
from sys import getsizeof
import pandas
print(
f'{get_objects() = }',
f'{sum(getsizeof(x) for x in get_objects()) = :,}',
sep='\n',
)
from gc import get_referents, get_referrers
from dataclasses import dataclass
@dataclass
class T:
def __init__(self, x):
self.x = x
@lambda f: f()
def _():
obj0 = T(...)
obj1 = T(obj0)
print(
# f'{get_referents(obj0) = }',
f'{get_referrers(obj0) = }',
sep='\n',
)
“What can numpy.ndarray and pandas.Series, pandas.DataFrame tell us?”
from numpy import array, int32, int64
from numpy import shares_memory
# xs = array([1, 2, 3, 4], dtype=int32)
# xs = array([1, 2, 3, 4], dtype=int64)
print(
f'{xs = }',
f'{xs.dtype = }',
f'{xs.nbytes = }',
sep='\n',
)
from pandas import Series
s = Series([1, 2, 3, 4], index=[1, 2, 3, 4])
# s.attrs = {'a': ..., 'b': ...}
# s = Series('abcddddddddd abc ab a'.split(), index=[1, 2, 3, 4])
print(
s,
# s._data,
# f'{s.memory_usage() = }',
# f'{s.memory_usage(deep=True) = }',
sep='\n',
)
“Do we have fine-grained control over CPU use?”
class T:
def __add__(self, other):
return self
__radd__ = __add__
xs = [1, 2, 3, 4]
# xs = [1, 2, 3, 4, T()]
print(f'{sum(xs) = }')
def f():
pass
from collections.abc import Callable
from itertools import chain
from json import dumps
print(
f"{isinstance(chain, Callable) and hasattr(chain, '__get__') = }",
f"{isinstance(dumps, Callable) and hasattr(dumps, '__get__') = }",
sep='\n'
)
def g(xs):
for x in xs:
yield x ** 2
gi = g([1, 2, 3, 4, 5])
print(
f'{next(gi) = }',
sep='\n',
)
“Do we have fine-grained control over memory use?”
from numpy import array, int32, int64
# xs = [1, 2, 3, 4]
xs = array([1, 2, 3, 4], dtype=int32)
from collections import deque
def f(xs):
rv = []
for x in xs:
rv.append(x ** 2)
return rv
f([1, 2, 3, 4, 5])
def g(xs):
for x in xs:
yield x ** 2
# for x in g([1, 2, 3, 4, 5]): pass
deque(g([1, 2, 3, 4, 5]), maxlen=1)
“Iteration helpers.”
from random import Random
from itertools import islice, tee, repeat, chain, repeat, zip_longest
nwise = lambda g, n=2: zip(*(islice(g, i, None) for i, g in enumerate(tee(g, n))))
nwise_longest = lambda g, n=2, fv=object(): zip_longest(*(islice(g, i, None) for i, g in enumerate(tee(g, n))), fillvalue=fv)
first = lambda g, n=1: zip(chain(repeat(True, n), repeat(False)), g)
last = lambda g, m=1, s=object(): ((y[-1] is s, x) for x, *y in nwise_longest(g, m+1, s))
rnd = Random(0)
events = {
'a': rnd.randint(-10, +10),
'b': rnd.randint(-10, +10),
'c': rnd.randint(-10, +10),
'd': rnd.randint(-10, +10),
}
if __name__ == '__main__':
for is_first, (is_last, (prev, curr)) in first(last(nwise(events.items()))):
if is_first:
...
...
if is_last:
...
“Value disintermediation.”
def f(): pass
def g(): pass
def h(): pass
if __name__ == '__main__':
mode = 'f'
if mode == 'f':
f()
elif mode == 'g':
g()
elif mode == 'h':
h()
def f(): pass
def g(): pass
def h(): pass
if __name__ == '__main__':
modes = {'f': f, 'g': g, 'h': h}
modes['f']()
from dataclasses import dataclass
@dataclass
class proposer:
nodes : dict
@dataclass
class acceptor:
nodes : dict
@dataclass
class learner:
nodes : dict
if __name__ == '__main__':
nodes = {}
nodes.update({
'proposers': [proposer(nodes)],
'acceptors': [acceptor(nodes) for _ in range(3)],
'learners': [learner(nodes) for _ in range(2)],
})
from dataclasses import dataclass
@dataclass
class proposer:
nodes : dict
@dataclass
class acceptor:
nodes : dict
@dataclass
class learner:
nodes : dict
if __name__ == '__main__':
nodes = {}
nodes.update({
proposer: [proposer(nodes)],
acceptor: [acceptor(nodes) for _ in range(3)],
learner: [learner(nodes) for _ in range(2)],
})
“Restricted computation domains.”
from dataclasses import dataclass
from random import Random
rnd = Random(0)
@dataclass
class Datum:
value : int
def var(xs):
avg = sum(x.value for x in xs) / len(xs)
return sum((x.value - avg) ** 2 for x in xs) / len(xs)
xs = [Datum(rnd.randint(-10, +10)) for _ in range(10)]
print(f'{var(xs) = :.2f}')
from dataclasses import dataclass
from random import Random
rnd = Random(0)
@dataclass
class Data:
def __init__(self, values):
self.values = values
__iter__ = lambda self: iter(self.values)
__len__ = lambda self: len(self.values)
def var(self):
avg = sum(x for x in self) / len(self)
return sum((x - avg) ** 2 for x in self) / len(self)
xs = Data([rnd.randint(-10, +10) for _ in range(10)])
print(f'{xs.var() = :.2f}')
from numpy import array
from random import Random
rnd = Random(0)
xs = array([rnd.randint(-10, +10) for _ in range(10)])
print(f'{xs.var() = :.2f}')
from numpy import array
from numpy.random import default_rng
rng = default_rng(0)
xs = rng.integers(-10, +10, size=10)
print(f'{xs.var() = :.2f}')
“How do Python and NumPy/pandas fit together?”
from itertools import chain, combinations
def powerset(xs):
return chain.from_iterable(
combinations(xs, r=r) for r in range(len(xs) + 1)
)
events = [
('abc', +1),
('def', -2),
('xyz', +3),
]
scenarios = {frozenset(x) for x in powerset(events)}
print(
*sorted(scenarios, key=len),
sep='\n'
)
from itertools import chain, combinations
from pandas import MultiIndex, Series, date_range, Grouper
from numpy.random import default_rng
rng = default_rng(0)
def powerset(xs):
return chain.from_iterable(
combinations(xs, r=r) for r in range(len(xs) + 1)
)
events = [
('abc + 1', 'abc', lambda s: s + 1),
('def × 2', 'def', lambda s: s * 2),
('xyz ^ 3', 'xyz', lambda s: s ** 3),
]
scenarios = {frozenset(x) for x in powerset(events)}
s = Series(
index=(idx := MultiIndex.from_product([
'abc def xyz'.split(),
date_range('2020-01-01', periods=90),
], names=['entity', 'date'])),
data=rng.integers(-10, +10, size=len(idx)),
)
for sc in sorted(scenarios, key=len):
s_ = s.copy()
for _, ent, dlt in sc:
mask = s.index.get_level_values('entity') == ent
s_.loc[mask] = dlt(s_.loc[mask])
print(
' {} '.format(', '.join(sorted(nm for nm, *_ in sc))).center(40, '\N{box drawings light horizontal}'),
s_.groupby(Grouper(level='date', freq='M')).mean().round(2),
sep='\n',
)
tracemallocprint("Let's take a look!")
from tracemalloc import start, take_snapshot
start()
before = take_snapshot()
# xs = [None for _ in range(1024 ** 2)]
xs = [None for _ in range(10 * 1024 ** 2)]
after = take_snapshot()
print(
*(
diff
for diff in after.compare_to(before, 'lineno')
if diff.traceback[0].filename == __file__
),
sep='\n'
)
from tracemalloc import start, take_snapshot; start()
from contextlib import contextmanager
from random import random
@contextmanager
def profile_memory(*files):
try:
before = take_snapshot()
yield
finally:
after = take_snapshot()
diffs = (
diff for diff in after.compare_to(before, 'lineno')
if diff.traceback[0].filename in files
)
print(*diffs, sep='\n')
with profile_memory(__file__):
xs = [random() for _ in range(100_000)]
“How do we make this useful?”
from numpy import int64, sqrt, array
from numpy.random import default_rng
rng = default_rng(0)
coeffs = rng.integers(
-1_000, +1000,
size=(100_000, 3),
).view(
[('a', int64), ('b', int64), ('c', int64)],
).ravel()
# print(coeffs)
mask = (coeffs['b'] ** 2 > 4 * coeffs['a'] * coeffs['c']) & (2 * coeffs['a'] != 0)
real_coeffs = coeffs[mask]
a, b, c = real_coeffs['a'], real_coeffs['b'], real_coeffs['c']
roots = array([
(-b + sqrt(b**2 - 4*a*c)) / (2*a),
# ### ###
(-b - sqrt(b**2 - 4*a*c)) / (2*a),
]).T
print(roots)
from numpy import int64, sqrt, array
from numpy.random import default_rng
from tracemalloc import start, take_snapshot; start()
from contextlib import contextmanager
@contextmanager
def profile_memory(*files):
try:
before = take_snapshot()
yield
finally:
after = take_snapshot()
diffs = (
diff for diff in after.compare_to(before, 'lineno')
if diff.traceback[0].filename in files
)
print(*sorted(diffs, key=lambda d: d.size, reverse=True), sep='\n')
rng = default_rng(0)
coeffs = rng.integers(
-1_000, +1000,
size=(1_000_000, 3),
).view(
[('a', int64), ('b', int64), ('c', int64)],
).ravel()
# print(coeffs)
mask = (coeffs['b'] ** 2 > 4 * coeffs['a'] * coeffs['c']) & (2 * coeffs['a'] != 0)
real_coeffs = coeffs[mask]
a, b, c = real_coeffs['a'], real_coeffs['b'], real_coeffs['c']
with profile_memory(__file__):
roots = array([
(-b + sqrt(b**2 - 4*a*c)) / (2*a),
(-b - sqrt(b**2 - 4*a*c)) / (2*a),
]).T
print(f'{roots.nbytes = :,}')
from numpy import int64, sqrt, array
from numpy.random import default_rng
from tracemalloc import start, take_snapshot; start()
from contextlib import contextmanager
from numexpr import evaluate
@contextmanager
def profile_memory(*files):
try:
before = take_snapshot()
yield
finally:
after = take_snapshot()
diffs = (
diff for diff in after.compare_to(before, 'lineno')
if diff.traceback[0].filename in files
)
print(*sorted(diffs, key=lambda d: d.size, reverse=True), sep='\n')
rng = default_rng(0)
coeffs = rng.integers(
-1_000, +1000,
size=(1_000_000, 3),
).view(
[('a', int64), ('b', int64), ('c', int64)],
).ravel()
# print(coeffs)
mask = (coeffs['b'] ** 2 > 4 * coeffs['a'] * coeffs['c']) & (2 * coeffs['a'] != 0)
real_coeffs = coeffs[mask]
a, b, c = real_coeffs['a'], real_coeffs['b'], real_coeffs['c']
with profile_memory(__file__):
roots = array([
(-b + sqrt(b**2 - 4*a*c)) / (2*a),
(-b - sqrt(b**2 - 4*a*c)) / (2*a),
]).T
print(f'\N{box drawings light horizontal}' * 40)
with profile_memory(__file__):
roots = array([
evaluate('(-b + sqrt(b**2 - 4*a*c)) / (2*a)'),
evaluate('(-b - sqrt(b**2 - 4*a*c)) / (2*a)'),
])
print(f'{roots.nbytes = :,}')
from datetime import timedelta
from random import Random
from enum import Enum
from string import ascii_lowercase, digits
from pandas import DataFrame, to_datetime, concat, MultiIndex
from numpy.random import default_rng
rng = default_rng(0)
rnd = Random(0)
State = Enum('State', 'A B C D E')
def simulate():
td = lambda days: timedelta(seconds=rnd.randrange(24 * 60 * 60 * days))
yield State.A, td(days=90)
if rnd.choices([True, False], weights=[.90, .10])[0]:
yield State.B, td(days=30)
if rnd.choices([True, False], weights=[.95, .05])[0]:
for _ in range(max(0, int(rnd.gauss(mu=2, sigma=1)))):
yield State.C, td(days=14)
yield State.D, td(days=14)
if rnd.choices([True, False], weights=[.95, .05])[0]:
yield State.C, td(days=14)
if rnd.choices([True, False], weights=[.50, .50])[0]:
yield State.E, td(days=30)
if __name__ == '__main__':
entities = rng.choice([*ascii_lowercase, *digits], size=(10, 4))
entities = entities.view('<U4').ravel()
lifecycles = (
DataFrame(simulate(), columns='state time'.split())
.assign(entity=ent)
.set_index(['entity', 'state'])
.cumsum()
.pipe(lambda df: df + to_datetime('2020-01-01'))
.squeeze(axis='columns')
for ent in entities
)
lifecycles = concat(lifecycles, axis='index')
print(lifecycles)
(
lifecycles
.pipe(lambda s: s
.set_axis(
MultiIndex.from_arrays([
s.index.get_level_values('entity'),
s.index.get_level_values('state').map(lambda x: x.name),
])
)
)
.to_pickle('lifecycles.pkl')
)
from pandas import read_pickle, concat, DataFrame, date_range, MultiIndex, Series, IndexSlice
s = read_pickle('lifecycles.pkl')
idx = s.dt.to_period('D').groupby(['entity']).agg(['min', 'max']).pipe(
lambda df: concat(DataFrame({
'time': date_range(row['min'].iloc[0].start_time, row['max'].iloc[0].end_time),
}).assign(entity=idx).squeeze(axis='columns') for idx, row in df.groupby('entity')),
)
idx = MultiIndex.from_arrays([idx['entity'], idx['time']])
print(f'{len(idx) = }')
states = Series(
data=None,
index=idx,
dtype=str,
)
data = s.dt.floor('D').reset_index('state', drop=False).set_index('time', append=True).squeeze(axis='columns')
states.loc[data.index] = data.values
states = states.groupby('entity').ffill()
print(
states.groupby(states.index.get_level_values('time').to_period('M')).value_counts().loc[IndexSlice[:, 'B', :]],
sep='\n',
)
from tracemalloc import start, take_snapshot; start()
from contextlib import contextmanager
from pandas import read_pickle, concat, DataFrame, date_range, MultiIndex, Series, IndexSlice
@contextmanager
def profile_memory(*files):
try:
before = take_snapshot()
yield
finally:
after = take_snapshot()
diffs = (
diff for diff in after.compare_to(before, 'lineno')
if diff.traceback[0].filename in files
)
print(*sorted(diffs, key=lambda d: d.size, reverse=True), sep='\n')
s = read_pickle('lifecycles.pkl')
idx = s.dt.to_period('D').groupby(['entity']).agg(['min', 'max']).pipe(
lambda df: concat(DataFrame({
'time': date_range(row['min'].iloc[0].start_time, row['max'].iloc[0].end_time),
}).assign(entity=idx).squeeze(axis='columns') for idx, row in df.groupby('entity')),
)
with profile_memory(__file__):
idx = MultiIndex.from_arrays([idx['entity'], idx['time']])
states = Series(
data=None,
index=idx,
dtype=str,
)
data = s.dt.floor('D').reset_index('state', drop=False).set_index('time', append=True).squeeze(axis='columns')
states.loc[data.index] = data.values
states = states.groupby('entity').ffill()
print(
states.groupby(states.index.get_level_values('time').to_period('M')).value_counts().loc[IndexSlice[:, 'B', :]],
sep='\n',
)
memrayprint("Let's take a look!")
from pandas import period_range, MultiIndex, CategoricalIndex, DataFrame, date_range
from numpy import tile, where
from numpy.random import default_rng
from string import ascii_lowercase
rng = default_rng(0)
entities = rng.choice([*ascii_lowercase], size=(10, 4)).view('<U4').ravel()
periods = period_range('2020-01-01', periods=90, freq='D')
idx = MultiIndex.from_product([
periods,
CategoricalIndex(entities),
], names=['date', 'entity'])
prices = DataFrame(
data={
'buy': (buy := (
rng.normal(loc=100, scale=20, size=len(entities)).clip(0)
*
rng.normal(loc=1, scale=0.05, size=(len(periods), len(entities))).clip(.80, 1.2).cumprod(axis=0)
).ravel()),
'sell': buy * rng.normal(loc=1.2, scale=.05, size=len(idx)).clip(1, 1.5),
},
index=idx,
).round(2)
dates = date_range('2020-01-01', periods=90)
idx = MultiIndex.from_product([
tile(dates, 10),
CategoricalIndex(entities),
], names=['date', 'entity'])
trades = DataFrame({
'volume': (volume := rng.integers(-100_000, +100_000, size=len(idx)).round(-2)),
'price': (prices.stack().loc[
MultiIndex.from_arrays([
idx.get_level_values('date').to_period('D'),
idx.get_level_values('entity'),
where(volume > 0, 'buy', 'sell'),
])
].values * rng.normal(loc=1, scale=.1, size=len(idx))).round(2),
},
index=idx,
).sample(frac=.50, random_state=rng).sort_index()
print(
(trades['volume'] * trades['price']).groupby('entity').sum()
+
(
trades['volume'].groupby('entity').sum().pipe(
lambda s: s.set_axis(
MultiIndex.from_arrays([s.index, where(s > 0, 'sell', 'buy')])
)
) * prices.loc[prices.index.get_level_values('date')[-1]].stack()
).dropna().droplevel(level=-1),
)
from pandas import period_range, MultiIndex, CategoricalIndex, DataFrame, date_range, to_timedelta, read_pickle
from numpy import tile, where
from numpy.random import default_rng
from string import ascii_lowercase
from tempfile import TemporaryDirectory
from collections import defaultdict
from pathlib import Path
from logging import getLogger, basicConfig, INFO
from asyncio import run, gather, sleep as aio_sleep
logger = getLogger(__name__)
basicConfig(level=INFO)
rng = default_rng(0)
def generate_prices(entities, periods):
idx = MultiIndex.from_product([
periods,
CategoricalIndex(entities),
], names=['date', 'entity'])
return DataFrame(
data={
'buy': (buy := (
rng.normal(loc=100, scale=20, size=len(entities)).clip(0)
*
rng.normal(loc=1, scale=0.05, size=(len(periods), len(entities))).clip(.80, 1.2).cumprod(axis=0)
).ravel()),
'sell': buy * rng.normal(loc=1.2, scale=.05, size=len(idx)).clip(1, 1.5),
},
index=idx,
).round(2)
def generate_trades(prices):
dates = date_range(
prices.index.get_level_values('date').min().to_timestamp(),
prices.index.get_level_values('date').max().to_timestamp(),
)
entities = prices.index.get_level_values('entity').unique()
idx = MultiIndex.from_product([
tile(dates, 100),
CategoricalIndex(entities),
], names=['date', 'entity'])
idx = MultiIndex.from_arrays([
idx.get_level_values('date') + to_timedelta(rng.integers(24 * 60 * 60, size=len(idx)), unit='s'),
idx.get_level_values('entity'),
], names=idx.names)
return DataFrame({
'volume': (volume := rng.integers(-100_000, +100_000, size=len(idx)).round(-2)),
'price': (prices.stack().loc[
MultiIndex.from_arrays([
idx.get_level_values('date').to_period('D'),
idx.get_level_values('entity'),
where(volume > 0, 'buy', 'sell'),
])
].values * rng.normal(loc=1, scale=.1, size=len(idx))).round(2),
},
index=idx,
).sample(frac=.50, random_state=rng).sort_index()
async def producer(data_dir):
entities = rng.choice([*ascii_lowercase], size=(10, 4)).view('<U4').ravel()
periods = period_range('2020-01-01', periods=52, freq='W')
for w in periods:
prices = generate_prices(entities, period_range(w.start_time, w.end_time, freq='D'))
trades = generate_trades(prices)
prices_path = data_dir / f"prices.{prices.index.get_level_values('date').max().to_timestamp():%Y%m%d}.pkl"
prices.to_pickle(prices_path)
logger.info('Wrote prices = %s', prices_path)
await aio_sleep(1)
trades_path = data_dir / f"trades.{trades.index.get_level_values('date').max():%Y%m%d}.pkl"
trades.to_pickle(trades_path)
logger.info('Wrote trades = %s', trades_path)
await aio_sleep(1)
async def consumer(data_dir, queue):
seen_files = {*()}
while True:
all_files = {*data_dir.iterdir()}
new_files = all_files - seen_files
to_process = defaultdict(dict)
for p in new_files:
ftype, date = p.stem.split('.')
to_process[date][ftype] = p
for ps in to_process.values():
if len(ps) == 2 and ps.keys() == {'prices', 'trades'}:
logger.info('Process = %s', ps)
queue.append(ps)
seen_files.update(ps)
await aio_sleep(3)
async def worker(queue):
while True:
if queue:
ps = queue.pop()
prices, trades = read_pickle(ps['prices']), read_pickle(ps['trades'])
print(
(trades['volume'] * trades['price']).groupby('entity').sum()
+
(
trades['volume'].groupby('entity').sum().pipe(
lambda s: s.set_axis(
MultiIndex.from_arrays([s.index, where(s > 0, 'sell', 'buy')])
)
) * prices.loc[prices.index.get_level_values('date')[-1]].stack()
).dropna().droplevel(level=-1),
)
await aio_sleep(0)
async def main():
queue = []
with TemporaryDirectory() as d:
data_dir = Path(d)
logger.info('data_dir = %s', data_dir)
await gather(
producer(data_dir),
consumer(data_dir, queue),
worker(queue),
worker(queue),
worker(queue),
)
if __name__ == '__main__':
run(main())
ulimitprint("Let's take a look!")
from collections import namedtuple
from functools import wraps
from inspect import signature
from itertools import islice, tee
from math import isclose
from shlex import quote
from subprocess import check_call, DEVNULL, CalledProcessError
from textwrap import dedent
from numpy import arange
nwise = lambda g, *, n=2: zip(*(islice(g, i, None) for i, g in enumerate(tee(g, n))))
def binary_search(values):
lidx, ridx = 0, len(values) - 1
while True:
if (ridx - lidx) <= 1:
break
if (yield values[(midx := (ridx - lidx) // 2 + lidx)]):
lidx = midx
else:
ridx = midx
class BisectedRun(namedtuple('RunBase', 'signature result exception')):
sig = property(lambda s: s.signature)
res = property(lambda s: s.result)
exc = property(lambda s: s.exception)
@lambda cls: cls()
class args:
def __get__(self, instance, owner):
return namedtuple('Arguments', instance.sig.arguments.keys())(**instance.sig.arguments)
@classmethod
def support(cls, arguments, exceptions):
def dec(f):
sig = signature(f)
@lambda bisected: setattr(f, 'bisected', bisected)
@wraps(arguments)
def bisected(*arg_args, **arg_kwargs):
ci = binary_search(arguments(*arg_args, **arg_kwargs))
@wraps(f)
def inner(*f_fixed_args, **f_fixed_kwargs):
(f_args, f_kwargs), exc = next(ci), None
while True:
f_bound_args = sig.bind(
*f_fixed_args, *f_args,
**f_fixed_kwargs, **f_kwargs
)
try:
res = f(*f_bound_args.args, **f_bound_args.kwargs)
except (*exceptions,) as e:
exc = e
else:
exc = None
yield cls(f_bound_args, exc, res)
try:
f_args, f_kwargs = ci.send(exc is not None)
except StopIteration:
break
return inner
return f
return dec
@BisectedRun.support(
arguments=lambda min_vmem, max_vmem:
[((), {'vmem': m}) for m in arange(min_vmem, max_vmem, 10 * 1024, dtype=int)],
exceptions={CalledProcessError},
)
def run_python_ulimit(code, vmem):
return check_call([
'sh', '-c', f'''(
ulimit -v {vmem}
python -c {quote(code)}
)'''
], stderr=DEVNULL, stdout=DEVNULL)
if __name__ == '__main__':
code = dedent('''
import numpy
''').strip()
for prev_run, curr_run in nwise(run_python_ulimit.bisected(0, 2**20)(code)):
pass
bad_mem, ok_mem = sorted([prev_run.args.vmem, curr_run.args.vmem])
print(
f'{ok_mem = :_}',
f'{bad_mem = :_}',
sep='\n'
)
# run_python_ulimit(code, ok_mem)
# run_python_ulimit(code, bad_mem)
from collections import namedtuple
from functools import wraps
from inspect import signature
from itertools import islice, tee
from math import isclose
from shlex import quote
from subprocess import check_call, DEVNULL, CalledProcessError
from textwrap import dedent
from numpy import arange
nwise = lambda g, *, n=2: zip(*(islice(g, i, None) for i, g in enumerate(tee(g, n))))
def binary_search(values):
lidx, ridx = 0, len(values) - 1
while True:
if (ridx - lidx) <= 1:
break
if (yield values[(midx := (ridx - lidx) // 2 + lidx)]):
lidx = midx
else:
ridx = midx
class BisectedRun(namedtuple('RunBase', 'signature result exception')):
sig = property(lambda s: s.signature)
res = property(lambda s: s.result)
exc = property(lambda s: s.exception)
@lambda cls: cls()
class args:
def __get__(self, instance, owner):
return namedtuple('Arguments', instance.sig.arguments.keys())(**instance.sig.arguments)
@classmethod
def support(cls, arguments, exceptions):
def dec(f):
sig = signature(f)
@lambda bisected: setattr(f, 'bisected', bisected)
@wraps(arguments)
def bisected(*arg_args, **arg_kwargs):
ci = binary_search(arguments(*arg_args, **arg_kwargs))
@wraps(f)
def inner(*f_fixed_args, **f_fixed_kwargs):
(f_args, f_kwargs), exc = next(ci), None
while True:
f_bound_args = sig.bind(
*f_fixed_args, *f_args,
**f_fixed_kwargs, **f_kwargs
)
try:
res = f(*f_bound_args.args, **f_bound_args.kwargs)
except (*exceptions,) as e:
exc = e
else:
exc = None
yield cls(f_bound_args, exc, res)
try:
f_args, f_kwargs = ci.send(exc is not None)
except StopIteration:
break
return inner
return f
return dec
@BisectedRun.support(
arguments=lambda min_vmem, max_vmem:
[((), {'vmem': m}) for m in arange(min_vmem, max_vmem, 1024, dtype=int)],
exceptions={CalledProcessError},
)
def run_python_ulimit(code, vmem):
return check_call([
'sh', '-c', f'''(
ulimit -v {vmem}
python -c {quote(code)}
)'''
], stderr=DEVNULL, stdout=DEVNULL)
if __name__ == '__main__':
attrdict_protocols = dedent('''
# attrdict via dispatch to `dict.__*item__` protocols
class attrdict(dict):
__getattr__ = dict.__getitem__
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
''')
attrdict_circular = dedent('''
# attrdict via circular reference on `self.__dict__`
class attrdict(dict):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__dict__ = self
''')
simple_test = dedent('''
for _ in range(2):
d = attrdict({idx: None for idx in range(100_000)})
''')
sustained_test = dedent('''
for _ in range(100):
d = attrdict({idx: None for idx in range(100_000)})
''')
code = '\n'.join([attrdict_protocols, simple_test])
for prev_run, curr_run in nwise(run_python_ulimit.bisected(0, 2**20)(code)):
pass
attrdict_protocols_ok_mem = max([prev_run.args.vmem, curr_run.args.vmem])
code = '\n'.join([attrdict_circular, simple_test])
for prev_run, curr_run in nwise(run_python_ulimit.bisected(0, 2**20)(code)):
pass
attrdict_circular_ok_mem = max([prev_run.args.vmem, curr_run.args.vmem])
assert isclose(attrdict_protocols_ok_mem, attrdict_circular_ok_mem, abs_tol=1024), 'memory usage in simple test should be close (≤1MiB)'
code = '\n'.join([attrdict_protocols, sustained_test])
# run_python_ulimit(code, attrdict_protocols_ok_mem)
code = '\n'.join([attrdict_circular, sustained_test])
# run_python_ulimit(code, attrdict_circular_ok_mem)
from collections import namedtuple
from functools import wraps
from inspect import signature
from itertools import islice, tee
from math import isclose
from shlex import quote
from subprocess import check_call, DEVNULL, CalledProcessError
from textwrap import dedent
from numpy import arange
nwise = lambda g, *, n=2: zip(*(islice(g, i, None) for i, g in enumerate(tee(g, n))))
def binary_search(values):
lidx, ridx = 0, len(values) - 1
while True:
if (ridx - lidx) <= 1:
break
if (yield values[(midx := (ridx - lidx) // 2 + lidx)]):
lidx = midx
else:
ridx = midx
class BisectedRun(namedtuple('RunBase', 'signature result exception')):
sig = property(lambda s: s.signature)
res = property(lambda s: s.result)
exc = property(lambda s: s.exception)
@lambda cls: cls()
class args:
def __get__(self, instance, owner):
return namedtuple('Arguments', instance.sig.arguments.keys())(**instance.sig.arguments)
@classmethod
def support(cls, arguments, exceptions):
def dec(f):
sig = signature(f)
@lambda bisected: setattr(f, 'bisected', bisected)
@wraps(arguments)
def bisected(*arg_args, **arg_kwargs):
ci = binary_search(arguments(*arg_args, **arg_kwargs))
@wraps(f)
def inner(*f_fixed_args, **f_fixed_kwargs):
(f_args, f_kwargs), exc = next(ci), None
while True:
f_bound_args = sig.bind(
*f_fixed_args, *f_args,
**f_fixed_kwargs, **f_kwargs
)
try:
res = f(*f_bound_args.args, **f_bound_args.kwargs)
except (*exceptions,) as e:
exc = e
else:
exc = None
yield cls(f_bound_args, exc, res)
try:
f_args, f_kwargs = ci.send(exc is not None)
except StopIteration:
break
return inner
return f
return dec
@BisectedRun.support(
arguments=lambda min_vmem, max_vmem:
[((), {'vmem': m}) for m in arange(min_vmem, max_vmem, 1024, dtype=int)],
exceptions={CalledProcessError},
)
def run_python_ulimit(code, vmem):
return check_call([
'sh', '-c', f'''(
ulimit -v {vmem}
python -c {quote(code)}
)'''
], stderr=DEVNULL, stdout=DEVNULL)
if __name__ == '__main__':
core_code = dedent('''
from numpy import array
def create_normal_array(size=100_000):
xs = array([None for _ in range(size)], dtype=object)
ys = array([None for _ in range(size)], dtype=object)
return xs, ys
def create_circular_array(size=100_000):
xs = array([None for _ in range(size)], dtype=object)
ys = array([None for _ in range(size)], dtype=object)
xs[0], ys[0] = ys, xs # circular reference
return xs, ys
''')
baseline_test = dedent('''
for _ in range(2):
xs, ys = create_normal_array()
''')
sustained_test = dedent('''
for _ in range(100):
xs, ys = create_circular_array()
''')
verification_test = dedent('''
for _ in range(100):
xs, ys = create_normal_array()
''')
code = '\n'.join([core_code, baseline_test])
for prev_run, curr_run in nwise(run_python_ulimit.bisected(0, 2**20)(code)):
pass
ok_mem = max([prev_run.args.vmem, curr_run.args.vmem])
code = '\n'.join([core_code, verification_test])
# run_python_ulimit(code, ok_mem)
code = '\n'.join([core_code, sustained_test])
# run_python_ulimit(code, ok_mem)