ts-python

Seminar V: “Concurrency and Parallelism in Python”

“Kids, you tried your best, and you failed miserably. The lesson is: never try.”

— Burns’ Heir (S05E18)

Contents

  1. Abstract
  2. Notes

Abstract

How do you make use of concurrency mechanisms in Python? How do you decide which of the various approaches to take, and how can these various approaches help you improve your code?

Do you…

Then join us for a session on concurrency and parallelism in Python.

The Global Interpreter Lock (“GIL”) is a specter that haunts concurrency approaches in Python. It eliminates threading as a simple concurrency primitive and forces us to develop a much more sophisticated view of concurrency and parallelism.

In this episode, we will discuss concurrency approaches in Python, touching upon standard library modules such as threading, multiprocessing, multiprocessing.shared_memory, and concurrency.futures. We’ll discuss the motivation behind using these tools, their limitations, their relationship to the Global Interpreter Lock (“GIL,”) existing workarounds to avoid the GIL, and other related approaches for getting more work done in less overall time. (Note: this session will not cover asyncio.)

Keywords: threading, multiprocessing, concurrency, parallem, parallelism, GIL, Global Interpreter Lock

Notes

premise

print("Let's take a look!")

We’re looking for some decomposable operation that can be structured in Python.

from pandas import Series, date_range, MultiIndex
from numpy.random import default_rng
from string import ascii_lowercase

rng = default_rng(0)

s = Series(
    index=(idx :=
        MultiIndex.from_product([
            date_range('2020-01-01', periods=10_000, freq='T'),
            rng.choice([*ascii_lowercase], size=(100, 4)).view('<U4').ravel(),
        ], names=['date', 'entity'])
    ),
    data=rng.random(size=len(idx)),
)

print(
    # s.head(3),
    # s.cumsum(),
    s.groupby('entity').mean(),
    sep='\n{}\n'.format('\N{box drawings light horizontal}' * 40),
)
from scipy.spatial import KDTree

KDTree().query_ball_point(workers=2)
from string import ascii_lowercase
from tempfile import TemporaryDirectory
from pathlib import Path
from time import sleep
from itertools import cycle, count, tee
from uuid import uuid4
from contextlib import contextmanager

from pandas import Series, date_range, MultiIndex
from numpy.random import default_rng

rng = default_rng(0)

def generate():
    start_date = '2020-01-01'
    while True:
        s = Series(
            index=(idx :=
                MultiIndex.from_product([
                    (dt := date_range(start_date, periods=10_000, freq='s')),
                    rng.choice([*ascii_lowercase], size=(100, 4)).view('<U4').ravel(),
                ], names=['date', 'entity'])
            ),
            data=rng.random(size=len(idx)),
        )
        yield s
        start_date = max(dt)

@contextmanager
def temp_paths(base_dir=Path('/nfs/shared/data')):
    base_dir.mkdir(exist_ok=True)
    with TemporaryDirectory(dir=base_dir) as d:
        temp_dir = Path(d)
        try:
            yield temp_dir, (temp_dir / f'{uuid4()}.pkl' for _ in count())
        finally:
            for p in temp_dir.iterdir():
                p.unlink()

if __name__ == '__main__':
    with temp_paths() as (temp_dir, paths):
        print(f'Writing to {temp_dir = }')
        for end, s in zip(cycle([''] * 39 + ['\n']),generate()):
            s.to_pickle(next(paths))
            print('.', end=end)
            sleep(2)

threading

print("Let's take a look!")
from threading import Thread, get_ident, get_native_id
from os import getppid, getpid

def target(name):
    print(
        f'{name            = }',
        f'{getpid()        = }',
        f'{getppid()       = }',
        f'{get_ident()     = :#_x}',
        f'{get_native_id() = }',
        sep='\n'
    )

t = Thread(target=target, kwargs={'name': 'thread'})
t.start()
t.join()
from threading import Thread
from itertools import count
from random import random
from time import sleep

def target(name):
    for x in count():
        print(f'{name = } {x = }')
        sleep(random())

pool = [
    Thread(target=target, kwargs={'name': f'thread#{x}'})
    for x in range(3)
]
for x in pool: x.start()
for x in pool: x.join()
from concurrent.futures import ThreadPoolExecutor
from itertools import count
from random import random
from time import sleep

def target(name):
    for x in count():
        print(f'{name = } {x = }')
        sleep(random())

with ThreadPoolExecutor(max_workers=2) as executor:
    future = executor.map(target, (f'thread#{x}' for x in range(3)))
    print([*future])
from threading import Thread
from random import random
from time import sleep
from string import ascii_lowercase

from pandas import Series, date_range, MultiIndex
from numpy.random import default_rng

rng = default_rng(0)

s = Series(
    index=(idx :=
        MultiIndex.from_product([
            date_range('2020-01-01', periods=100_000, freq='T'),
            rng.choice([*ascii_lowercase], size=(100, 4)).view('<U4').ravel(),
        ], names=['date', 'entity'])
    ),
    data=rng.random(size=len(idx)),
)

def target(k, g):
    return k, g.mean()

pool = [
    Thread(target=target, args=[k, g])
    for k, g in s.groupby('entity')
]
for x in pool: x.start()
for x in pool: x.join()
from concurrent.futures import ThreadPoolExecutor
from string import ascii_lowercase

from pandas import Series, date_range, MultiIndex, DataFrame
from numpy.random import default_rng

rng = default_rng(0)

s = Series(
    index=(idx :=
        MultiIndex.from_product([
            date_range('2020-01-01', periods=100_000, freq='T'),
            rng.choice([*ascii_lowercase], size=(100, 4)).view('<U4').ravel(),
        ], names=['date', 'entity'])
    ),
    data=rng.random(size=len(idx)),
)

def target(k, g):
    return k, g.mean()

with ThreadPoolExecutor(max_workers=3) as executor:
    future = executor.map(target, *zip(*s.groupby('entity')))
    result = DataFrame(future)
    print(result)
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from time import perf_counter

from numpy import mean, array
from numpy.random import default_rng

@contextmanager
def timed(msg):
    try:
        start = perf_counter()
        yield
    finally:
        stop = perf_counter()
        print(f'{msg:<48} elapsed \N{mathematical bold capital delta}t: {stop - start:.6f}')

rng = default_rng(0)

xs = rng.random(size=(10_000, 1_000))

with timed('raw numpy'):
    xs.mean(axis=0)

with ThreadPoolExecutor(max_workers=10) as executor:
    with timed('with threading'):
        array([*executor.map(mean, xs.T)])

with timed('with loop'):
    array([mean(row) for row in xs.T])
from random import random

d = {}

def target():
    d[round(random(), 2)] = random()
    return None

from dis import dis
dis(target)
from random import Random
from threading import Thread, Lock
from string import ascii_lowercase
from time import sleep

rnd = Random(0)

def f():
    sleep(1e-12)
    return 1

def target(owned, shared):
    while True:
        sleep(.1)
        # shared[key := rnd.choice(ascii_lowercase)] += (value := 1)
        with lock:
            shared[key] = shared[key := rnd.choice(ascii_lowercase)] + (value := f())
        owned[key] += value

def watch(owned, shared):
    while True:
        sleep(1)
        shared_total = sum(shared.values())
        owned_total = sum(sum(d.values()) for d in owned.values())
        print(f'{shared_total:>10,} {owned_total:>10,} {shared_total == owned_total}')

d = {x: 0 for x in ascii_lowercase}
ds = {idx: {x: 0 for x in ascii_lowercase} for idx in range(20)}
lock = Lock()
pool = [
    *(Thread(target=target, kwargs={'owned': ds[idx], 'shared': d}) for idx in range(20)),
    Thread(target=watch, kwargs={'owned': ds, 'shared': d}),
]
for x in pool: x.start()
for x in pool: x.join()
from threading import Thread
from queue import Queue, Empty
from pathlib import Path
from time import sleep

def producer(queue, data_dir):
    seen_files = {*()}
    while True:
        all_files = {*data_dir.iterdir()}
        new_files = all_files - seen_files
        # print(f'{new_files = }')
        for x in new_files:
            queue.put(x)
        seen_files.update(new_files)
        sleep(.1)

def consumer(queue):
    while True:
        try:
            p = queue.get(block=True, timeout=1)
        except Empty:
            continue
        print(f'{p = }')

if __name__ == '__main__':
    q = Queue()
    pool = [
        Thread(target=producer, kwargs={
            'queue': q,
            'data_dir': Path('/nfs/shared/data/tmpp59fl2bz'),
        }),
        *(
            Thread(target=consumer, kwargs={'queue': q})
            for _ in range(1)
        ),
    ]
    for x in pool: x.start()
    for x in pool: x.join()

multiprocessing

print("Let's take a look!")
from multiprocessing import Process
from os import getppid, getpid

def target(name):
    print(
        f'{name            = }',
        f'{getpid()        = }',
        f'{getppid()       = }',
        sep='\n'
    )

p = Process(target=target, kwargs={'name': 'process'})
p.start()
p.join()
from time import sleep
sleep(100)
from multiprocessing import Process
from itertools import count
from random import random
from time import sleep

def target(name):
    for x in count():
        print(f'{name = } {x = }')
        sleep(random())

pool = [
    Process(target=target, kwargs={'name': f'process#{x}'})
    for x in range(3)
]
for x in pool: x.start()
for x in pool: x.join()
from concurrent.futures import ProcessPoolExecutor
from itertools import count
from random import random
from time import sleep

def target(name):
    for x in count():
        print(f'{name = } {x = }')
        sleep(random())

with ProcessPoolExecutor(max_workers=3) as executor:
    future = executor.map(target, (f'thread#{x}' for x in range(3)))
    print([*future])
from multiprocessing import Process
from itertools import count
from random import random
from time import sleep

def target(name):
    for x in count():
        print(f'{name = } {x = }')
        sleep(random())

pool = [
    Process(target=target, kwargs={'name': f'process#{x}'})
    for x in range(3)
]
for x in pool: x.start()
for x in pool: x.join()
from threading import Thread
from queue import Queue
from time import sleep

def target(name, data):
    while True:
        print(f'{name = } {data = }')
        sleep(1)

from pandas import Series
class T: pass
xs = (_ for _ in range(10))
data = xs
pool = [
    Thread(target=target, kwargs={'name': f'thread#{x}', 'data': data})
    for x in range(3)
]
for x in pool: x.start()
for x in pool: x.join()
from multiprocessing import Process, Queue
from time import sleep
from random import random
# from pandas import DataFrame

def target(name, data):
    while True:
        print(f'{name = } {data = }')
        data.put(...)
        # data.put((_ for _ in range(10)))
        print(f'{data.get(...) = }')
        sleep(1)

data = Queue()
pool = [
    Process(target=target, kwargs={'name': f'process#{x}', 'data': data})
    for x in range(3)
]
for x in pool: x.start()
for x in pool: x.join()
from multiprocessing import Process, Queue
from multiprocessing.queues import Empty
from pathlib import Path
from time import sleep

def producer(queue, data_dir):
    seen_files = {*()}
    while True:
        all_files = {*data_dir.iterdir()}
        new_files = all_files - seen_files
        # print(f'{new_files = }')
        for x in new_files:
            queue.put(x)
        seen_files.update(new_files)
        sleep(.1)

def consumer(queue):
    while True:
        try:
            p = queue.get(block=True, timeout=1)
            sleep(1)
        except Empty:
            continue
        print(f'{p = }')

if __name__ == '__main__':
    q = Queue()
    pool = [
        Process(target=producer, kwargs={
            'queue': q,
            'data_dir': Path('/nfs/shared/data/tmpukq5bs_g'),
        }),
        *(
            Process(target=consumer, kwargs={'queue': q})
            for _ in range(3)
        ),
    ]
    for x in pool: x.start()
    for x in pool: x.join()
from contextlib import contextmanager
from multiprocessing import Process
from multiprocessing.shared_memory import SharedMemory
from time import perf_counter

from numpy import ndarray, dtype as np_dtype, product, zeros
from numpy.linalg import det
from numpy.random import default_rng

rng = default_rng(0)

@contextmanager
def timed(msg):
    try:
        start = perf_counter()
        yield
    finally:
        stop = perf_counter()
        print(f'{msg:<48} elapsed \N{mathematical bold capital delta}t: {stop - start:.6f}')

@contextmanager
def shared_ndarray(name=None, dtype=np_dtype('int8'), shape=(1,)):
    dtype = np_dtype(dtype)
    size = product(shape) * dtype.itemsize
    try:
        if name is None:
            shm = SharedMemory(create=True, size=size)
        else:
            shm = SharedMemory(name=name)
        arr = ndarray(shape=shape, dtype=dtype, buffer=shm.buf)
        yield arr
    finally:
        shm.close()
        if name is None:
            shm.unlink()

data = rng.uniform(-10, +10, (100, 1_000, 1_000)).astype('float64')

res = zeros(data.shape[0], dtype=data.dtype)
with timed('without multiprocessing'):
    res[:] = data.mean(axis=(1, 2))

def target(arr, res, idx):
    res[idx] = arr[idx].mean()

with shared_ndarray(dtype=data.dtype, shape=data.shape) as arr:
    arr[:] = data
    with shared_ndarray(dtype=data.dtype, shape=(data.shape[0],)) as res:
        res[:] = 0
        pool = [
            Process(target=target, kwargs={
                'arr': arr,
                'res': res,
                'idx': slice(idx*(data.shape[0] // 10), (idx+1)*(data.shape[0] // 10)),
            })
            for idx in range(10)
        ]
        with timed('with multiprocessing'):
            for x in pool: x.start()
            for x in pool: x.join()

asyncio

print("Let's take a look!")
from itertools import islice
from time import sleep

def compute(x):
    sleep(10e-6)
    return x ** 2

def process(dataset):
    rv = []
    for x in dataset:
        res = compute(x)
        rv.append(res)
    return rv

for x in islice(process(range(100_000)), 3):
    print(f'{x = }')
from itertools import islice
from time import sleep

def compute(x):
    sleep(10e-6)
    return x ** 2

def process(dataset):
    for x in dataset:
        res = compute(x)
        yield res

for x in islice(process(range(100_000)), 3):
    print(f'{x = }')

T(decision) « T(computation) → lazy T(decision) » T(computation) → eager

from scipy import optimize
from numpy import polyval
from numpy.random import default_rng

rng = default_rng(0)

def f(x, coeffs):
    return polyval(coeffs, x)

coeffs = rng.random(size=5+1).round(2)
print(
    f'{coeffs = }',
    # f'{(root := optimize.newton(f, 0, args=(coeffs,))) = :>.2f}',
    # f'{(root := optimize.newton(f, 0, args=(coeffs,), maxiter=100)) = :>.2f}',
    # f'{(root := optimize.newton(f, 0, args=(coeffs,), tol=1e-3)) = :>.2f}',
    # f'{(root := optimize.newton(f, 0, args=(coeffs,), rtol=1e-2)) = :>.2f}',
    f'{polyval(coeffs, root) = :>.2f}',
    sep='\n',
)
from itertools import islice
from collections import namedtuple
from random import random

class State(namedtuple('State', 'iterations value')):
    def __new__(cls, iterations=1, value=0):
        return super().__new__(cls, iterations, value)

def transition(state):
    return state._replace(iterations=state.iterations + 1, value=round(random(), 2))

def optimize(state):
    while True:
        yield state
        state = transition(state)

# for st in islice(optimize(State()), 10):
#     print(f'{st = }')

for st in optimize(State()):
    print(f'{st = }')
    if st.value > .75:
        break
from time import sleep

def task(name):
    while True:
        print(f'{name = }')
        yield

def gather(*tasks):
    active = {*tasks}
    while True:
        completed = {*()}
        for t in tasks:
            try:
                next(t)
                sleep(.1)
            except StopIteration:
                completed.add(t)
        active -= completed

def main():
    tasks = [task(f'task#{x}') for x in range(3)]
    gather(*tasks)

main()
from asyncio import run, gather, sleep as aio_sleep

async def task(name):
    while True:
        # if name == 'task#0':
        #     print(f'{name = }')
        print(f'{name = }')
        await aio_sleep(.1)

async def main():
    tasks = [task(f'task#{x}') for x in range(3)]
    await gather(*tasks)

run(main())

“Function Colouring”

def sync_f():
    sync_f()
    await async_f()

async def async_f():
    sync_f()
    await async_f()
from asyncio import run, gather, sleep as aio_sleep
from time import sleep as tm_sleep

async def task(name):
    while True:
        print(f'{name = }')
        tm_sleep(1)
        await aio_sleep(.1)

async def main():
    tasks = [task(f'task#{x}') for x in range(3)]
    await gather(*tasks)

run(main())
from asyncio import run

class context:
    def __enter__(self):
        pass
    def __exit__(self, *_):
        pass

async def main():
    with context():
        pass

run(main())
from asyncio import run

class context:
    async def __aenter__(self):
        pass
    async def __aexit__(self, *_):
        pass

async def main():
    async with context():
        pass

run(main())
from asyncio import run

class iter:
    def __iter__(self):
        return self
    def __next__(self):
        pass

async def main():
    for _ in iter():
        pass

run(main())
from asyncio import run

class iter:
    def __aiter__(self):
        return self
    async def __anext__(self):
        pass

async def main():
    async for _ in iter():
        pass

run(main())
from asyncio import sleep as aio_sleep, run, gather
from aiofiles import open as aio_open
from aiosqlite import connect as aiosql_connect
from pathlib import Path

async def producer(queue, data_dir):
    seen_files = {*()}
    while True:
        all_files = {*data_dir.iterdir()}
        new_files = all_files - seen_files
        # print(f'{new_files = }')
        for x in new_files:
            queue.append(x)
        seen_files.update(new_files)
        await aio_sleep(.1)

async def consumer(queue):
    while True:
        if queue:
            p = queue.pop()
            print(f'{p = }')
        await aio_sleep(0)

async def main():
    queue = []
    tasks = [
        producer(
            queue=queue,
            data_dir=Path('/nfs/shared/data/tmphgv55fcz'),
        ),
        *(consumer(queue=queue) for _ in range(3))
    ]
    await gather(*tasks)

if __name__ == '__main__':
    run(main())