“Kids, you tried your best, and you failed miserably. The lesson is: never try.”
— Burns’ Heir (S05E18)
How do you make use of concurrency mechanisms in Python? How do you decide which of the various approaches to take, and how can these various approaches help you improve your code?
Do you…
Then join us for a session on concurrency and parallelism in Python.
The Global Interpreter Lock (“GIL”) is a specter that haunts concurrency approaches in Python. It eliminates threading as a simple concurrency primitive and forces us to develop a much more sophisticated view of concurrency and parallelism.
In this episode, we will discuss concurrency approaches in Python, touching
upon standard library modules such as threading, multiprocessing,
multiprocessing.shared_memory, and concurrency.futures. We’ll discuss the
motivation behind using these tools, their limitations, their relationship to
the Global Interpreter Lock (“GIL,”) existing workarounds to avoid the GIL, and
other related approaches for getting more work done in less overall time.
(Note: this session will not cover asyncio.)
Keywords: threading, multiprocessing, concurrency, parallem, parallelism, GIL, Global Interpreter Lock
print("Let's take a look!")
We’re looking for some decomposable operation that can be structured in Python.
from pandas import Series, date_range, MultiIndex
from numpy.random import default_rng
from string import ascii_lowercase
rng = default_rng(0)
s = Series(
index=(idx :=
MultiIndex.from_product([
date_range('2020-01-01', periods=10_000, freq='T'),
rng.choice([*ascii_lowercase], size=(100, 4)).view('<U4').ravel(),
], names=['date', 'entity'])
),
data=rng.random(size=len(idx)),
)
print(
# s.head(3),
# s.cumsum(),
s.groupby('entity').mean(),
sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
from scipy.spatial import KDTree
KDTree().query_ball_point(workers=2)
from string import ascii_lowercase
from tempfile import TemporaryDirectory
from pathlib import Path
from time import sleep
from itertools import cycle, count, tee
from uuid import uuid4
from contextlib import contextmanager
from pandas import Series, date_range, MultiIndex
from numpy.random import default_rng
rng = default_rng(0)
def generate():
start_date = '2020-01-01'
while True:
s = Series(
index=(idx :=
MultiIndex.from_product([
(dt := date_range(start_date, periods=10_000, freq='s')),
rng.choice([*ascii_lowercase], size=(100, 4)).view('<U4').ravel(),
], names=['date', 'entity'])
),
data=rng.random(size=len(idx)),
)
yield s
start_date = max(dt)
@contextmanager
def temp_paths(base_dir=Path('/nfs/shared/data')):
base_dir.mkdir(exist_ok=True)
with TemporaryDirectory(dir=base_dir) as d:
temp_dir = Path(d)
try:
yield temp_dir, (temp_dir / f'{uuid4()}.pkl' for _ in count())
finally:
for p in temp_dir.iterdir():
p.unlink()
if __name__ == '__main__':
with temp_paths() as (temp_dir, paths):
print(f'Writing to {temp_dir = }')
for end, s in zip(cycle([''] * 39 + ['\n']),generate()):
s.to_pickle(next(paths))
print('.', end=end)
sleep(2)
threadingprint("Let's take a look!")
from threading import Thread, get_ident, get_native_id
from os import getppid, getpid
def target(name):
print(
f'{name = }',
f'{getpid() = }',
f'{getppid() = }',
f'{get_ident() = :#_x}',
f'{get_native_id() = }',
sep='\n'
)
t = Thread(target=target, kwargs={'name': 'thread'})
t.start()
t.join()
from threading import Thread
from itertools import count
from random import random
from time import sleep
def target(name):
for x in count():
print(f'{name = } {x = }')
sleep(random())
pool = [
Thread(target=target, kwargs={'name': f'thread#{x}'})
for x in range(3)
]
for x in pool: x.start()
for x in pool: x.join()
from concurrent.futures import ThreadPoolExecutor
from itertools import count
from random import random
from time import sleep
def target(name):
for x in count():
print(f'{name = } {x = }')
sleep(random())
with ThreadPoolExecutor(max_workers=2) as executor:
future = executor.map(target, (f'thread#{x}' for x in range(3)))
print([*future])
from threading import Thread
from random import random
from time import sleep
from string import ascii_lowercase
from pandas import Series, date_range, MultiIndex
from numpy.random import default_rng
rng = default_rng(0)
s = Series(
index=(idx :=
MultiIndex.from_product([
date_range('2020-01-01', periods=100_000, freq='T'),
rng.choice([*ascii_lowercase], size=(100, 4)).view('<U4').ravel(),
], names=['date', 'entity'])
),
data=rng.random(size=len(idx)),
)
def target(k, g):
return k, g.mean()
pool = [
Thread(target=target, args=[k, g])
for k, g in s.groupby('entity')
]
for x in pool: x.start()
for x in pool: x.join()
from concurrent.futures import ThreadPoolExecutor
from string import ascii_lowercase
from pandas import Series, date_range, MultiIndex, DataFrame
from numpy.random import default_rng
rng = default_rng(0)
s = Series(
index=(idx :=
MultiIndex.from_product([
date_range('2020-01-01', periods=100_000, freq='T'),
rng.choice([*ascii_lowercase], size=(100, 4)).view('<U4').ravel(),
], names=['date', 'entity'])
),
data=rng.random(size=len(idx)),
)
def target(k, g):
return k, g.mean()
with ThreadPoolExecutor(max_workers=3) as executor:
future = executor.map(target, *zip(*s.groupby('entity')))
result = DataFrame(future)
print(result)
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from time import perf_counter
from numpy import mean, array
from numpy.random import default_rng
@contextmanager
def timed(msg):
try:
start = perf_counter()
yield
finally:
stop = perf_counter()
print(f'{msg:<48} elapsed \N{mathematical bold capital delta}t: {stop - start:.6f}')
rng = default_rng(0)
xs = rng.random(size=(10_000, 1_000))
with timed('raw numpy'):
xs.mean(axis=0)
with ThreadPoolExecutor(max_workers=10) as executor:
with timed('with threading'):
array([*executor.map(mean, xs.T)])
with timed('with loop'):
array([mean(row) for row in xs.T])
from random import random
d = {}
def target():
d[round(random(), 2)] = random()
return None
from dis import dis
dis(target)
from random import Random
from threading import Thread, Lock
from string import ascii_lowercase
from time import sleep
rnd = Random(0)
def f():
sleep(1e-12)
return 1
def target(owned, shared):
while True:
sleep(.1)
# shared[key := rnd.choice(ascii_lowercase)] += (value := 1)
with lock:
shared[key] = shared[key := rnd.choice(ascii_lowercase)] + (value := f())
owned[key] += value
def watch(owned, shared):
while True:
sleep(1)
shared_total = sum(shared.values())
owned_total = sum(sum(d.values()) for d in owned.values())
print(f'{shared_total:>10,} {owned_total:>10,} {shared_total == owned_total}')
d = {x: 0 for x in ascii_lowercase}
ds = {idx: {x: 0 for x in ascii_lowercase} for idx in range(20)}
lock = Lock()
pool = [
*(Thread(target=target, kwargs={'owned': ds[idx], 'shared': d}) for idx in range(20)),
Thread(target=watch, kwargs={'owned': ds, 'shared': d}),
]
for x in pool: x.start()
for x in pool: x.join()
from threading import Thread
from queue import Queue, Empty
from pathlib import Path
from time import sleep
def producer(queue, data_dir):
seen_files = {*()}
while True:
all_files = {*data_dir.iterdir()}
new_files = all_files - seen_files
# print(f'{new_files = }')
for x in new_files:
queue.put(x)
seen_files.update(new_files)
sleep(.1)
def consumer(queue):
while True:
try:
p = queue.get(block=True, timeout=1)
except Empty:
continue
print(f'{p = }')
if __name__ == '__main__':
q = Queue()
pool = [
Thread(target=producer, kwargs={
'queue': q,
'data_dir': Path('/nfs/shared/data/tmpp59fl2bz'),
}),
*(
Thread(target=consumer, kwargs={'queue': q})
for _ in range(1)
),
]
for x in pool: x.start()
for x in pool: x.join()
multiprocessingprint("Let's take a look!")
from multiprocessing import Process
from os import getppid, getpid
def target(name):
print(
f'{name = }',
f'{getpid() = }',
f'{getppid() = }',
sep='\n'
)
p = Process(target=target, kwargs={'name': 'process'})
p.start()
p.join()
from time import sleep
sleep(100)
from multiprocessing import Process
from itertools import count
from random import random
from time import sleep
def target(name):
for x in count():
print(f'{name = } {x = }')
sleep(random())
pool = [
Process(target=target, kwargs={'name': f'process#{x}'})
for x in range(3)
]
for x in pool: x.start()
for x in pool: x.join()
from concurrent.futures import ProcessPoolExecutor
from itertools import count
from random import random
from time import sleep
def target(name):
for x in count():
print(f'{name = } {x = }')
sleep(random())
with ProcessPoolExecutor(max_workers=3) as executor:
future = executor.map(target, (f'thread#{x}' for x in range(3)))
print([*future])
from multiprocessing import Process
from itertools import count
from random import random
from time import sleep
def target(name):
for x in count():
print(f'{name = } {x = }')
sleep(random())
pool = [
Process(target=target, kwargs={'name': f'process#{x}'})
for x in range(3)
]
for x in pool: x.start()
for x in pool: x.join()
from threading import Thread
from queue import Queue
from time import sleep
def target(name, data):
while True:
print(f'{name = } {data = }')
sleep(1)
from pandas import Series
class T: pass
xs = (_ for _ in range(10))
data = xs
pool = [
Thread(target=target, kwargs={'name': f'thread#{x}', 'data': data})
for x in range(3)
]
for x in pool: x.start()
for x in pool: x.join()
from multiprocessing import Process, Queue
from time import sleep
from random import random
# from pandas import DataFrame
def target(name, data):
while True:
print(f'{name = } {data = }')
data.put(...)
# data.put((_ for _ in range(10)))
print(f'{data.get(...) = }')
sleep(1)
data = Queue()
pool = [
Process(target=target, kwargs={'name': f'process#{x}', 'data': data})
for x in range(3)
]
for x in pool: x.start()
for x in pool: x.join()
from multiprocessing import Process, Queue
from multiprocessing.queues import Empty
from pathlib import Path
from time import sleep
def producer(queue, data_dir):
seen_files = {*()}
while True:
all_files = {*data_dir.iterdir()}
new_files = all_files - seen_files
# print(f'{new_files = }')
for x in new_files:
queue.put(x)
seen_files.update(new_files)
sleep(.1)
def consumer(queue):
while True:
try:
p = queue.get(block=True, timeout=1)
sleep(1)
except Empty:
continue
print(f'{p = }')
if __name__ == '__main__':
q = Queue()
pool = [
Process(target=producer, kwargs={
'queue': q,
'data_dir': Path('/nfs/shared/data/tmpukq5bs_g'),
}),
*(
Process(target=consumer, kwargs={'queue': q})
for _ in range(3)
),
]
for x in pool: x.start()
for x in pool: x.join()
from contextlib import contextmanager
from multiprocessing import Process
from multiprocessing.shared_memory import SharedMemory
from time import perf_counter
from numpy import ndarray, dtype as np_dtype, product, zeros
from numpy.linalg import det
from numpy.random import default_rng
rng = default_rng(0)
@contextmanager
def timed(msg):
try:
start = perf_counter()
yield
finally:
stop = perf_counter()
print(f'{msg:<48} elapsed \N{mathematical bold capital delta}t: {stop - start:.6f}')
@contextmanager
def shared_ndarray(name=None, dtype=np_dtype('int8'), shape=(1,)):
dtype = np_dtype(dtype)
size = product(shape) * dtype.itemsize
try:
if name is None:
shm = SharedMemory(create=True, size=size)
else:
shm = SharedMemory(name=name)
arr = ndarray(shape=shape, dtype=dtype, buffer=shm.buf)
yield arr
finally:
shm.close()
if name is None:
shm.unlink()
data = rng.uniform(-10, +10, (100, 1_000, 1_000)).astype('float64')
res = zeros(data.shape[0], dtype=data.dtype)
with timed('without multiprocessing'):
res[:] = data.mean(axis=(1, 2))
def target(arr, res, idx):
res[idx] = arr[idx].mean()
with shared_ndarray(dtype=data.dtype, shape=data.shape) as arr:
arr[:] = data
with shared_ndarray(dtype=data.dtype, shape=(data.shape[0],)) as res:
res[:] = 0
pool = [
Process(target=target, kwargs={
'arr': arr,
'res': res,
'idx': slice(idx*(data.shape[0] // 10), (idx+1)*(data.shape[0] // 10)),
})
for idx in range(10)
]
with timed('with multiprocessing'):
for x in pool: x.start()
for x in pool: x.join()
asyncioprint("Let's take a look!")
from itertools import islice
from time import sleep
def compute(x):
sleep(10e-6)
return x ** 2
def process(dataset):
rv = []
for x in dataset:
res = compute(x)
rv.append(res)
return rv
for x in islice(process(range(100_000)), 3):
print(f'{x = }')
from itertools import islice
from time import sleep
def compute(x):
sleep(10e-6)
return x ** 2
def process(dataset):
for x in dataset:
res = compute(x)
yield res
for x in islice(process(range(100_000)), 3):
print(f'{x = }')
T(decision) « T(computation) → lazy T(decision) » T(computation) → eager
from scipy import optimize
from numpy import polyval
from numpy.random import default_rng
rng = default_rng(0)
def f(x, coeffs):
return polyval(coeffs, x)
coeffs = rng.random(size=5+1).round(2)
print(
f'{coeffs = }',
# f'{(root := optimize.newton(f, 0, args=(coeffs,))) = :>.2f}',
# f'{(root := optimize.newton(f, 0, args=(coeffs,), maxiter=100)) = :>.2f}',
# f'{(root := optimize.newton(f, 0, args=(coeffs,), tol=1e-3)) = :>.2f}',
# f'{(root := optimize.newton(f, 0, args=(coeffs,), rtol=1e-2)) = :>.2f}',
f'{polyval(coeffs, root) = :>.2f}',
sep='\n',
)
from itertools import islice
from collections import namedtuple
from random import random
class State(namedtuple('State', 'iterations value')):
def __new__(cls, iterations=1, value=0):
return super().__new__(cls, iterations, value)
def transition(state):
return state._replace(iterations=state.iterations + 1, value=round(random(), 2))
def optimize(state):
while True:
yield state
state = transition(state)
# for st in islice(optimize(State()), 10):
# print(f'{st = }')
for st in optimize(State()):
print(f'{st = }')
if st.value > .75:
break
from time import sleep
def task(name):
while True:
print(f'{name = }')
yield
def gather(*tasks):
active = {*tasks}
while True:
completed = {*()}
for t in tasks:
try:
next(t)
sleep(.1)
except StopIteration:
completed.add(t)
active -= completed
def main():
tasks = [task(f'task#{x}') for x in range(3)]
gather(*tasks)
main()
from asyncio import run, gather, sleep as aio_sleep
async def task(name):
while True:
# if name == 'task#0':
# print(f'{name = }')
print(f'{name = }')
await aio_sleep(.1)
async def main():
tasks = [task(f'task#{x}') for x in range(3)]
await gather(*tasks)
run(main())
“Function Colouring”
def sync_f():
sync_f()
await async_f()
async def async_f():
sync_f()
await async_f()
from asyncio import run, gather, sleep as aio_sleep
from time import sleep as tm_sleep
async def task(name):
while True:
print(f'{name = }')
tm_sleep(1)
await aio_sleep(.1)
async def main():
tasks = [task(f'task#{x}') for x in range(3)]
await gather(*tasks)
run(main())
from asyncio import run
class context:
def __enter__(self):
pass
def __exit__(self, *_):
pass
async def main():
with context():
pass
run(main())
from asyncio import run
class context:
async def __aenter__(self):
pass
async def __aexit__(self, *_):
pass
async def main():
async with context():
pass
run(main())
from asyncio import run
class iter:
def __iter__(self):
return self
def __next__(self):
pass
async def main():
for _ in iter():
pass
run(main())
from asyncio import run
class iter:
def __aiter__(self):
return self
async def __anext__(self):
pass
async def main():
async for _ in iter():
pass
run(main())
from asyncio import sleep as aio_sleep, run, gather
from aiofiles import open as aio_open
from aiosqlite import connect as aiosql_connect
from pathlib import Path
async def producer(queue, data_dir):
seen_files = {*()}
while True:
all_files = {*data_dir.iterdir()}
new_files = all_files - seen_files
# print(f'{new_files = }')
for x in new_files:
queue.append(x)
seen_files.update(new_files)
await aio_sleep(.1)
async def consumer(queue):
while True:
if queue:
p = queue.pop()
print(f'{p = }')
await aio_sleep(0)
async def main():
queue = []
tasks = [
producer(
queue=queue,
data_dir=Path('/nfs/shared/data/tmphgv55fcz'),
),
*(consumer(queue=queue) for _ in range(3))
]
await gather(*tasks)
if __name__ == '__main__':
run(main())