ts-python

Seminar VII: “Patrick Swayze” (serialising data and using pickle)

Kevin Scott (Patrick Swayze): I was discharged for striking a radio operator who fell asleep at his post. You’re worried that I have no combat experience. You’re right. There’s no way of proving that I won’t fail in combat. But then again, you can’t prove that I will, either.

Uncommon Valor (1983)

‘Blaster’ (Reb Brown): Most human problems can be solved by an appropriate charge of high explosives.

Uncommon Valor (1983)

Johnny Castle (Patrick Swayze): You just put your pickle on everybody’s plate, college boy, and leave the hard stuff to me.

Dirty Dancing (1987)

James Dalton (Patrick Swayze): All you have to do is follow three simple rules. One: never underestimate your opponent. Expect the unexpected. Two: take it outside. Never start anything inside the bar unless it’s absolutely necessary. And three: be nice.

Road House (1989)

Date Time Track Meeting Link
Fri Oct 1, 2021 9:30 AM EDT Persisting and Serialising Data Seminar VII: “Patrick Swayze”

Audience

These sessions are designed for a broad audience of modelers and software programmers of all backgrounds and skill levels.

Our expected audience should comprise attendees with a…

During this session, we will endeavour to guide our audience to developing…

Abstract

In previous episodes, we have worked with data sets that we constructed, usually from random data. In practice, our data sets are likely to be stored in on-disk formats (e.g., CSV, Excel, HDF5, Parquet) or are retrieved over the network from remote servers (e.g., SQL databases, Hadoop/Hive stores.) Once we load this data, we may have intermediary or temporary results we want to store, and we may want flexible, fast, and easy ways to store this data.

In this episode, we’ll discuss data serialisation packages and technologies. We’ll discuss this in the context of permanent or long-term storage, as well in the context of transient, temporary, or short-term storage. We’ll take a close look at simple, common approaches provided by pandas (e.g., to_csv or to_pickle) and discuss the benefits as well as the significant limitations of these approaches. In particular, we will look closely at pickle and discuss why it is a “fantastic tool that you should never use.”

To Be Continued…

Did you enjoy this episode? Did you learn something new that will help you manage complex analyses that may involve transient or intermediary results that you want to persist between analyses?

If so, stay tuned for future episodes, which may…

If there are other related topics you’d like to see covered, please reach out to Diego Torres Quintanilla.

Notes

print("Let's get started!")

Is the pandas.DataFrame data?

from pandas import DataFrame, date_range
from numpy.random import default_rng
from string import ascii_lowercase
from numpy import repeat, tile

rng = default_rng(0)

tickers = rng.choice([*ascii_lowercase], size=(10, 4)).view('<U4').ravel()
dates = date_range('2020-01-01', periods=15)
prices = (
    100 * rng.random(size=len(tickers))
  * rng.normal(loc=1, scale=0.01, size=(len(dates), len(tickers))).cumprod(axis=0)
).ravel()
volumes = rng.integers(-50_000, +50_000, size=len(dates) * len(tickers)).round(-2)

df = DataFrame({
    'date':    tile(dates, len(tickers)),
    'ticker':  repeat(tickers, len(dates)),
    'price':   prices,
    'volumes': volumes,
})

print(
    df.sample(3),
)

When “serialising” we are looking to generate a stream of bytes for our given data, in order to facilitate transmitting it or storing it in some fashion.

We may do this in order to, e.g.,

from pandas import DataFrame, date_range
from numpy.random import default_rng
from string import ascii_lowercase
from numpy import repeat, tile
from pathlib import Path
from itertools import islice
from tempfile import TemporaryDirectory

rng = default_rng(0)

tickers = rng.choice([*ascii_lowercase], size=(10, 4)).view('<U4').ravel()
dates = date_range('2020-01-01', periods=15)
prices = (
    100 * rng.random(size=len(tickers))
  * rng.normal(loc=1, scale=0.01, size=(len(dates), len(tickers))).cumprod(axis=0)
).ravel()
volumes = rng.integers(-50_000, +50_000, size=len(dates) * len(tickers)).round(-2)

df = DataFrame({
    'date':    tile(dates, len(tickers)),
    'ticker':  repeat(tickers, len(dates)),
    'price':   prices,
    'volumes': volumes,
})

print(
    '\n'.join(x for x in dir(df) if x.startswith('to_'))
)

with TemporaryDirectory() as d:
    path = Path(d) / 'df'

    df.to_csv(path := path.with_suffix('.csv'))
    with open(path) as f:
        for line in islice(f, 3):
            print(line, end='')

    df.to_json(path := path.with_suffix('.json'), indent=2)
    with open(path) as f:
        for line in islice(f, 5):
            print(line, end='')

    #  df.to_markdown(path := path.with_suffix('.md'))
    #  with open(path) as f:
    #      for line in islice(f, 5):
    #          print(line, end='')

    #  df.to_html(path := path.with_suffix('.html'))
    #  with open(path) as f:
    #      for line in islice(f, 5):
    #          print(line, end='')

    #  df.to_xml(path := path.with_suffix('.xml'))
    #  with open(path) as f:
    #      for line in islice(f, 5):
    #          print(line, end='')

    #  df.to_excel(path := path.with_suffix('.xlsx'))
    #  with open(path, 'rb') as f:
    #      print(f.read(20))

    df.to_hdf(path := path.with_suffix('.hdf'), key='df')
    with open(path, 'rb') as f:
        print(f.read(20))

    #  df.to_parquet(path := path.with_suffix('.parquet'))
    #  with open(path, 'rb') as f:
    #      print(f.read(20))

    df.to_pickle(path := path.with_suffix('.pickle'))
    with open(path, 'rb') as f:
        print(f.read(20))
from pandas import DataFrame, date_range, Categorical
from numpy.random import default_rng
from string import ascii_lowercase
from numpy import complex128
from decimal import Decimal
from enum import Enum
from pathlib import Path

rng = default_rng(0)

df = DataFrame({
    'date':    date_range('2021-01-01', periods=(size := 10)),
    'ticker':  rng.choice([*ascii_lowercase], size=(size, 4)).view('<U4').ravel(),
    'price':   rng.normal(size=size).round(2),
    'volume':  rng.integers(-1_000, +1_000, size=size),
    'signal':  rng.random(size=(size, 2)).ravel().view(complex128).round(2),
    'signal':  map(Decimal, rng.random(size=size)),
    'signal':  Categorical(
        rng.choice([*ascii_lowercase], size=(size, 4)).view('<U4').ravel()
    ),
    'data':  rng.choice([*Enum('Options', 'A B C D')], size=size),
    'data':  [Path('/tmp') / x for x in ascii_lowercase[:size]],
    'data':  [{*rng.integers(10, size=3)} for _ in range(size)],
    'data':  rng.choice([list, tuple, dict, set, frozenset], size=size),
    #  'data':  rng.choice([min, max, abs], size=size),
})

print(
    df.sample(3),
)
from pandas import DataFrame, date_range, period_range, timedelta_range, interval_range
from numpy.random import default_rng
from string import ascii_lowercase

rng = default_rng(0)

df = DataFrame({
    'date':    date_range('2021-01-01', periods=(size := 10)),
    'ticker':  rng.choice([*ascii_lowercase], size=(size, 4)).view('<U4').ravel(),
    'price':   rng.normal(size=size),
    'volume':  rng.integers(-1_000, +1_000, size=size),
})

#  df = df.set_index('date')
#  df.index = period_range('2021-01-01', periods=len(df), freq='Q')
#  df.index = timedelta_range(0, periods=len(df), freq='D')
df.index = interval_range(0, periods=len(df), freq=2)
df.index = interval_range(0, periods=len(df), freq=2, closed='left')

print(
    #  df,
    df.index,
)

Why pickle?

from pickle import loads, dumps
from datetime import datetime
from pathlib import Path

data = [1, 2, 3]
print(
    f'{data               = }',
    f'{dumps(data)        = }',
    f'{loads(dumps(data)) = }',
    sep='\n', end='\n\n',
)

data = 1, 2, 3
print(
    f'{data               = }',
    f'{dumps(data)        = }',
    f'{loads(dumps(data)) = }',
    sep='\n', end='\n\n',
)

data = {1, 2, 3}
print(
    f'{data               = }',
    f'{dumps(data)        = }',
    f'{loads(dumps(data)) = }',
    sep='\n', end='\n\n',
)

data = frozenset({1, 2, 3})
print(
    f'{data               = }',
    f'{dumps(data)        = }',
    f'{loads(dumps(data)) = }',
    sep='\n', end='\n\n',
)

data = datetime.now()
print(
    f'{data               = }',
    f'{dumps(data)        = }',
    f'{loads(dumps(data)) = }',
    sep='\n', end='\n\n',
)

data = Path('/tmp')
print(
    f'{data               = }',
    f'{dumps(data)        = }',
    f'{loads(dumps(data)) = }',
    sep='\n', end='\n\n',
)

class T: pass
data = T()
print(
    f'{data               = }',
    f'{dumps(data)        = }',
    f'{loads(dumps(data)) = }',
    sep='\n', end='\n\n',
)

def f(): pass
data = f
print(
    f'{data               = }',
    f'{dumps(data)        = }',
    f'{loads(dumps(data)) = }',
    sep='\n', end='\n\n',
)

#  with open(__file__) as f:
#      data = f
#      print(
#          f'{data               = }',
#          f'{dumps(data)        = }',
#          f'{loads(dumps(data)) = }',
#          sep='\n', end='\n\n',
#      )

def g(): yield
gi = g()
data = gi
print(
    f'{data               = }',
    f'{dumps(data)        = }',
    f'{loads(dumps(data)) = }',
    sep='\n', end='\n\n',
)
from pandas import (
    DataFrame, date_range, Categorical, read_pickle,
    period_range, timedelta_range, interval_range,
)
from numpy.random import default_rng
from string import ascii_lowercase
from numpy import complex128
from decimal import Decimal
from enum import Enum
from pathlib import Path
from tempfile import NamedTemporaryFile

rng = default_rng(0)
Options = Enum('Options', 'A B C D')
df = DataFrame({
    'datetime':    date_range('2021-01-01', periods=(size := 10)),
    'str':         rng.choice([*ascii_lowercase], size=(size, 4)).view('<U4').ravel(),
    'float64':     rng.normal(size=size).round(2),
    'int64':       rng.integers(-1_000, +1_000, size=size),
    'complex128':  rng.random(size=(size, 2)).ravel().view(complex128).round(2),
    'Decimal':     map(Decimal, rng.random(size=size)),
    'Categorical':  Categorical(
        rng.choice([*ascii_lowercase], size=(size, 4)).view('<U4').ravel()
    ),
    'Enum':     rng.choice([*Options], size=size),
    'Path':     [Path('/tmp') / x for x in ascii_lowercase[:size]],
    'set':      [{*rng.integers(10, size=3)} for _ in range(size)],
    'type':     rng.choice([list, tuple, dict, set, frozenset], size=size),
    'function': rng.choice([min, max, abs], size=size),
})

df.index = date_range('2021-01-01', periods=len(df))
df.index = period_range('2021-01-01', periods=len(df), freq='Q')
df.index = timedelta_range(0, periods=len(df), freq='D')
df.index = interval_range(0, periods=len(df), freq=2)
df.index = interval_range(0, periods=len(df), freq=2, closed='left')

with NamedTemporaryFile() as f:
    before = df
    df.to_pickle(f.name)
    after = read_pickle(f.name)

print(
    before.sample(2),
    after.sample(2),
    (before == after).all(),
    (before.index == after.index).all(),
    sep='\n',
)
from sklearn.svm import SVC
from sklearn.datasets import load_iris
from pickle import dumps

clf = SVC()
clf.fit(*load_iris(return_X_y=True))
print(
    dumps(clf)[:5]
)
from pandas import (
    DataFrame, date_range, Categorical, read_pickle,
    period_range, timedelta_range, interval_range,
)
from numpy.random import default_rng
from string import ascii_lowercase
from numpy import complex128
from decimal import Decimal
from enum import Enum
from pathlib import Path
from multiprocessing import Process, Queue
from time import sleep
from logging import getLogger, basicConfig, INFO

logger = getLogger(__name__)
basicConfig(level=INFO)

class Custom:
    __repr__ = lambda s: f'{type(s).__name__}()'
    def __getstate__(self):
        logger.info(f'{type(self).__name__}.__getstate__()')
        return {}

rng = default_rng(0)
Options = Enum('Options', 'A B C D')
df = DataFrame({
    'datetime':    date_range('2021-01-01', periods=(size := 10)),
    'str':         rng.choice([*ascii_lowercase], size=(size, 4)).view('<U4').ravel(),
    'float64':     rng.normal(size=size).round(2),
    'int64':       rng.integers(-1_000, +1_000, size=size),
    'complex128':  rng.random(size=(size, 2)).ravel().view(complex128).round(2),
    'Decimal':     map(Decimal, rng.random(size=size)),
    'Categorical':  Categorical(
        rng.choice([*ascii_lowercase], size=(size, 4)).view('<U4').ravel()
    ),
    'Enum':     rng.choice([*Options], size=size),
    'Path':     [Path('/tmp') / x for x in ascii_lowercase[:size]],
    'set':      [{*rng.integers(10, size=3)} for _ in range(size)],
    'type':     rng.choice([list, tuple, dict, set, frozenset], size=size),
    'function': rng.choice([min, max, abs], size=size),
    'custom':   Custom(),
})

def producer(q):
    while True:
        q.put(df)
        sleep(1)

def consumer(q):
    while True:
        df = q.get()
        print(
            df.sample(3),
        )
        sleep(1)

q = Queue()
pool = [
    Process(target=producer, kwargs={'q': q}),
    Process(target=consumer, kwargs={'q': q}),
]
for p in pool:
    p.start()
for p in pool:
    p.join()

Why not pickle?

Warning

The pickle module is not secure. Only unpickle data you trust. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling. Never unpickle data that could have come from an untrusted source, or that could have been tampered with.

Consider signing data with hmac if you need to ensure that it has not been tampered with.

Safer serialization formats such as json may be more appropriate if you are processing untrusted data. See Comparison with json.

from dataclasses import dataclass
from pickle import dumps
from pickletools import dis

@dataclass
class Custom:
    a : int
    b : int

data = [1, 2, 3]
data = Custom(1, 2)
print(f'{dumps(data) = }')
dis(dumps(data))
from pickle import loads

data = b'c__builtin__\neval\n(V__import__("subprocess").run("free")\ntR.'
loads(data)
  1. How dangerous is this really?
  2. What can we do about it?
from hashlib import sha256
from hmac import new as hmac_new
from pickle import dumps
from dataclasses import dataclass

@dataclass
class T:
    a : int
    b : int

data = T(123, 456)
pickle_data = dumps(data)
pickle_hmac = hmac_new(b'key', pickle_data, sha256)

print(
    f'{data                    = }',
    f'{pickle_data             = }',
    f'{pickle_hmac.hexdigest() = }',
    sep='\n',
)
from fickling.pickle import Pickled
from pickle import dump as pickle_dump
from dataclasses import dataclass
from io import BytesIO
from ast import dump as ast_dump

@dataclass
class T:
    a : int
    b : int

data = T(123, 456)
pickle_dump(data, file=(file := BytesIO()))
file.seek(0)
print(
    ast_dump(
        Pickled.load(file).ast,
        indent=4,
    )
)

What else can we do?

from pandas import (
    DataFrame, date_range, Categorical, read_pickle,
    period_range, timedelta_range, interval_range,
    read_csv,
)
from numpy.random import default_rng
from string import ascii_lowercase
from numpy import complex128
from decimal import Decimal
from enum import Enum
from pathlib import Path
from tempfile import TemporaryDirectory
from itertools import islice
from dataclasses import dataclass

@dataclass
class Custom:
    a : int
    b : int

rng = default_rng(1)
Options = Enum('Options', 'A B C D')
df = DataFrame({
    'datetime':    date_range('2021-01-01', periods=(size := 10)),
    'str':         rng.choice([*ascii_lowercase], size=(size, 4)).view('<U4').ravel(),
    'float64':     rng.normal(size=size).round(2),
    'int64':       rng.integers(-1_000, +1_000, size=size),
    'complex128':  rng.random(size=(size, 2)).ravel().view(complex128).round(2),
    'Decimal':     map(Decimal, rng.random(size=size)),
    'Categorical':  Categorical(
        rng.choice([*ascii_lowercase], size=(size, 4)).view('<U4').ravel()
    ),
    'Enum':     rng.choice([*Options], size=size),
    'Path':     [Path('/tmp') / x for x in ascii_lowercase[:size]],
    'set':      [{*rng.integers(10, size=3)} for _ in range(size)],
    'custom':   [Custom(a, b) for a, b in rng.integers(10, size=(size, 2))],
})

with TemporaryDirectory() as d:
    path = Path(d) / 'df.csv'
    df.to_csv(path)
    with open(path) as f:
        for line in islice(f, 3):
            print(line, end='')

    print(
        df.sample(3),
        read_csv(path).sample(3).info(),
        sep='\n',
    )
from pandas import (
    DataFrame, date_range, Categorical, read_pickle,
    period_range, timedelta_range, interval_range,
    read_csv,
)
from numpy.random import default_rng
from string import ascii_lowercase
from numpy import complex128
from decimal import Decimal
from enum import Enum
from pathlib import Path
from tempfile import TemporaryDirectory
from itertools import islice
from dataclasses import dataclass

@dataclass
class Custom:
    a : int
    b : int

rng = default_rng(1)
Options = Enum('Options', 'A B C D')
df = DataFrame({
    'datetime':    date_range('2021-01-01', periods=(size := 10)),
    'str':         rng.choice([*ascii_lowercase], size=(size, 4)).view('<U4').ravel(),
    'float64':     rng.normal(size=size).round(2),
    'int64':       rng.integers(-1_000, +1_000, size=size),
    'complex128':  rng.random(size=(size, 2)).ravel().view(complex128).round(2),
    'Decimal':     map(Decimal, rng.random(size=size)),
    'Categorical':  Categorical(
        rng.choice([*ascii_lowercase], size=(size, 4)).view('<U4').ravel()
    ),
    'Enum':     rng.choice([*Options], size=size),
    'Path':     [Path('/tmp') / x for x in ascii_lowercase[:size]],
    'set':      [{*rng.integers(10, size=3)} for _ in range(size)],
    'Custom':   [Custom(a, b) for a, b in rng.integers(10, size=(size, 2))],
})

def to_csv(df, path):
    df = df.copy()
    df['Enum'] = df['Enum'].apply(lambda x: x.name)
    df['Custom_a'] = df['Custom'].apply(lambda x: x.a)
    df['Custom_b'] = df['Custom'].apply(lambda x: x.b)
    del df['Custom']
    df.to_csv(path)

def from_csv(path):
    df = read_csv(path, index_col=0)
    df['Categorical'] = Categorical(df['Categorical'])
    df['Enum'] = df['Enum'].apply(Options.__getitem__)
    df['Custom'] = [Custom(a, b) for _, (a, b) in df[['Custom_a', 'Custom_b']].iterrows()]
    del df['Custom_a'], df['Custom_b']
    return df

with TemporaryDirectory() as d:
    path = Path(d) / 'df.csv'
    to_csv(df, path)
    with open(path) as f:
        for line in islice(f, 3):
            print(line, end='')
    print(
        df.sample(3),
        from_csv(path).sample(3),
        sep='\n',
    )
from pandas import (
    DataFrame, date_range, Categorical, read_pickle,
    period_range, timedelta_range, interval_range,
    read_csv,
)
from pandas.api.extensions import register_dataframe_accessor, register_series_accessor
from numpy.random import default_rng
from string import ascii_lowercase
from numpy import complex128
from decimal import Decimal
from enum import Enum
from pathlib import Path
from tempfile import TemporaryDirectory
from itertools import islice
from dataclasses import dataclass

@dataclass
class Custom:
    a : int
    b : int

rng = default_rng(1)
Options = Enum('Options', 'A B C D')
df = DataFrame({
    'datetime':    date_range('2021-01-01', periods=(size := 10)),
    'str':         rng.choice([*ascii_lowercase], size=(size, 4)).view('<U4').ravel(),
    'float64':     rng.normal(size=size).round(2),
    'int64':       rng.integers(-1_000, +1_000, size=size),
    'complex128':  rng.random(size=(size, 2)).ravel().view(complex128).round(2),
    'Decimal':     map(Decimal, rng.random(size=size)),
    'Categorical':  Categorical(
        rng.choice([*ascii_lowercase], size=(size, 4)).view('<U4').ravel()
    ),
    'Enum':     rng.choice([*Options], size=size),
    'Path':     [Path('/tmp') / x for x in ascii_lowercase[:size]],
    'set':      [{*rng.integers(10, size=3)} for _ in range(size)],
    'Custom':   [Custom(a, b) for a, b in rng.integers(10, size=(size, 2))],
})

@register_dataframe_accessor('persist')
@dataclass
class Persist:
    obj : DataFrame
    def dump(self, path):
        df = self.obj.copy()
        df['Enum'] = df['Enum'].apply(lambda x: x.name)
        df['Custom_a'] = df['Custom'].apply(lambda x: x.a)
        df['Custom_b'] = df['Custom'].apply(lambda x: x.b)
        del df['Custom']
        df.to_csv(path)

    @classmethod
    def load(cls, path):
        df = read_csv(path, index_col=0)
        df['Categorical'] = Categorical(df['Categorical'])
        df['Enum'] = df['Enum'].apply(Options.__getitem__)
        df['Custom'] = [Custom(a, b) for _, (a, b) in df[['Custom_a', 'Custom_b']].iterrows()]
        del df['Custom_a'], df['Custom_b']
        return df

with TemporaryDirectory() as d:
    path = Path(d) / 'df.csv'
    df.persist.dump(path)
    with open(path) as f:
        for line in islice(f, 3):
            print(line, end='')
    print(
        df.sample(3),
        DataFrame.persist.load(path).sample(3),
        sep='\n',
    )