Date: Friday, Apr 25, 2025 at 9:30 AM US/Eastern
By the end of this session, you’ll have a concrete set of techniques to write Python code that’s easier to read, modify, and test—saving you and your collaborators from future headaches.
When working with data in Python, choosing the right storage format can make a huge difference in speed, efficiency, and usability. Should you prioritize human readability or fast loading times? How do different formats handle complex data structures? And what’s the best way to ensure data integrity when round-tripping between memory and disk?
In this seminar, we’ll explore the trade-offs between popular data persistence formats, including CSV, Pickle, and Parquet, along with a few others that might surprise you. We’ll discuss their strengths, weaknesses, and ideal use cases—whether you’re working with small datasets, large-scale analytics, or machine learning pipelines. Special attention will be given to Parquet, a powerful columnar format that can dramatically improve storage efficiency and query performance for tabular data.
By the end of this session, you’ll have a solid understanding of when to use each format, how to avoid common pitfalls, and how to make informed, practical decisions about storing and retrieving data efficiently in your projects.
python -m pip install fastapi uvicorn pandas numpy pyarrow pyyaml
print("Let's take a look!")
Let’s say we have a fancy LLM agent interaction that we want to share with our colleagues to help develop research ideas.
At its simplest, we’re just using the OpenAI REST API to run requests against a given model, with a sequence of prompts directing the model to give us useful output.
from os import environ
from pprint import pprint
from textwrap import dedent
from requests import post
root_url='https://api.openai.com/v1'
openai_api_key=environ['OPENAI_API_KEY']
headers = {
'Authorization': f'Bearer {openai_api_key}',
'Content-Type': 'application/json',
}
json = {
'model': 'gpt-4o',
'input': dedent('''
珍珠控注意!迎接/30國際珍奶日 85℃黑糖珍珠鮮奶等2飲品「第二杯25元」
每年4月30日是全球珍珠奶茶愛好者的盛大節日——國際珍奶日。為了慶祝這個特別的日子,知名連鎖飲品品牌85℃推出了令人心動的限時優惠活動,讓珍珠控們大呼過癮。
''').strip(),
'instructions': dedent('''
Translate and summarize the top three points of this headline.
Reply only in English. Reply only with bullet points.
Determine if this will have market impact.
Reply with a JSON payload with the market tickers that will be
affected as an array of strings. Specify only TPEX traded
companies.
'''),
}
resp = post(f'{root_url}/responses', headers=headers, json=json)
pprint(resp.json()['output'])
Let’s extend this code to formulate a simple conversation.
from asyncio import run, gather, sleep as aio_sleep
from collections import namedtuple
from collections.abc import MutableMapping
from contextlib import asynccontextmanager, contextmanager
from dataclasses import dataclass
from functools import cached_property
from itertools import batched, chain, count
from os import environ
from pathlib import Path
from pickle import load, dump
from textwrap import dedent
from httpx import AsyncClient
@asynccontextmanager
async def openai_session(
api_key,
*,
root_url='https://api.openai.com/v1',
):
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json',
}
async def executor():
reqs = yield ...
while True:
resps = await(gather(*(
client.post(
f'{root_url}/responses',
headers=headers,
json=req._asdict(),
) for req in reqs
)))
reqs = yield [x.json()['output'][0]['content'][0]['text'] for x in resps]
async with AsyncClient() as client:
ex = executor()
await anext(ex) # pump
yield lambda *reqs: ex.asend(reqs)
Request = namedtuple('Request', 'input instructions model', defaults=['gpt-4o'])
def conversation(article_text):
partial_text = article_text[0]
full_text = ''.join(article_text)
instructions = {
'summary': dedent('''
Translate and summarize the top three points of this headline.
Reply only in English. Reply only with bullet points.
''').strip(),
'market impact': dedent('''
Determine if this will have market impact. Reply with only true or false.
''').strip(),
'tickers': dedent('''
Reply with a JSON payload with the market tickers that will be
affected as an array of strings. Specify only TPEX traded
companies.
''').strip(),
}
_ = yield Request(partial_text, instructions['summary'])
resp = yield Request(partial_text, instructions['market impact'])
if 'true'.casefold() in resp.casefold():
_ = yield Request(full_text, instructions['tickers'])
async def main():
data_dir = Path('data')
with open(data_dir / 'article.txt') as f:
article_text = f.readlines()
convo = conversation(article_text)
async with openai_session(api_key=environ['OPENAI_API_KEY']) as sess:
resp = None
while True:
try:
req = convo.send(resp)
resp, = await sess(req)
except StopIteration:
break
print(
f'Query: {req.instructions}',
resp,
sep='\n', end='\n\n',
)
if __name__ == '__main__':
run(main())
Now, let’s say we have a FastAPI frontend sitting in front of the agent we wrote.
from asyncio import gather
from collections import namedtuple
from contextlib import asynccontextmanager
from dataclasses import dataclass
from os import environ
from textwrap import dedent
from fastapi import FastAPI
from httpx import AsyncClient
from pydantic import BaseModel
from uvicorn import run
@asynccontextmanager
async def openai_session(
api_key,
*,
root_url='https://api.openai.com/v1',
):
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json',
}
async def executor():
reqs = yield ...
while True:
resps = await(gather(*(
client.post(
f'{root_url}/responses',
headers=headers,
json=req._asdict(),
) for req in reqs
)))
reqs = yield [x.json()['output'][0]['content'][0]['text'] for x in resps]
async with AsyncClient() as client:
ex = executor()
await anext(ex) # pump
yield lambda *reqs: ex.asend(reqs)
@asynccontextmanager
async def lifespan(app):
global executor
async with openai_session(api_key=environ['OPENAI_API_KEY']) as executor:
yield
app = FastAPI(lifespan=lifespan)
Request = namedtuple('Request', 'input instructions model', defaults=['gpt-4o'])
def conversation(full_text):
partial_text = full_text.splitlines()[0]
instructions = {
'summary': dedent('''
Translate and summarize the top three points of this headline.
Reply only in English. Reply only with bullet points.
''').strip(),
'market impact': dedent('''
Determine if this will have market impact. Reply with only true or false.
''').strip(),
'tickers': dedent('''
Reply with a JSON payload with the market tickers that will be
affected as an array of strings. Specify only TPEX traded
companies.
''').strip(),
}
_ = yield Request(partial_text, instructions['summary'])
resp = yield Request(partial_text, instructions['market impact'])
if 'true'.casefold() in resp.casefold():
_ = yield Request(full_text, instructions['tickers'])
class SummaryRequest(BaseModel):
text : str
@app.post('/summarize')
async def summarize(request : SummaryRequest):
convo = conversation(request.text)
resps = [None]
while True:
try:
req = convo.send(resps[-1])
resps.extend(await executor(req))
except StopIteration:
break
return {'responses': resps[1:]}
if __name__ == '__main__':
from multiprocessing import Process
@lambda f: Process(target=f).start()
def test():
from subprocess import run
from time import sleep
sleep(.25)
run(['http', 'post', 'http://127.0.0.1:8000/summarize', 'text=@data/article.txt'])
run(app)
Where might the following data formats come up in such work?
Let’s consider:
jsonpicklepickleprint("Let's take a look!")
pickle is the easiest one to identify.
from pickle import dumps
print(
f'{dumps([1, 2, 3]) = }',
# f'{dumps(x for x in range(3)) = }',
# f'{dumps(open(__file__)) = }',
sep='\n'
)
pickle has almost arbitrary fidelity, but, in exchange, it allows
for arbitrary execution.
from io import BytesIO
from numpy.random import default_rng
from pandas import Series, IntervalIndex, Categorical, MultiIndex, read_pickle
rng = default_rng(0)
s0 = Series(
index=(idx := MultiIndex.from_product([
Categorical([*'abcd']),
IntervalIndex.from_breaks([0, 1, 2, 3]),
])),
data=rng.normal(size=len(idx)),
name='value',
)
print(s0)
s0.to_pickle(buf := BytesIO())
# buf.seek(0)
# s1 = read_pickle(buf)
# print(s1)
from collections import namedtuple
from dataclasses import dataclass
from pickle import dumps, loads
T0 = namedtuple('T0', 'x y')
@dataclass
class T1:
x : int
y : int
def f(): pass
print(
f'{loads(dumps(T0(123, 456))) = }',
f'{loads(dumps(T1(123, 456))) = }',
f'{loads(dumps(f)) = }',
sep='\n'
)
Never use pickle to move data anywhere further than from your left
hand to your right hand! Never send anyone a pickle file. Never accept
a pickle file from anyone. Never store one to disk in a location you
are not guaranteed to control completely!
from pickle import loads
data = b'c__builtin__\neval\n(V__import__("subprocess").run(["whoami"])\ntR.'
loads(data)
from contextlib import contextmanager
from collections.abc import MutableMapping
from dataclasses import dataclass
from pathlib import Path
from pickle import load, dump
@dataclass(frozen=True)
class Cache(MutableMapping):
filename : Path
storage : dict[object, object]
def __getitem__(self, key):
return self.storage[key]
def __setitem__(self, key, value):
self.storage[key] = value
def __delitem__(self, key):
del self.storage[key]
def __iter__(self):
return iter(self.storage)
def __len__(self):
return len(self.storage)
@classmethod
@contextmanager
def from_pickle(cls, filename):
try:
with open(filename, mode='rb') as f:
storage = load(f)
except FileNotFoundError:
storage = {}
try:
yield cls(filename=filename, storage=storage)
finally:
with open(filename, mode='wb') as f:
dump(storage, f)
if __name__ == '__main__':
from tempfile import TemporaryDirectory
with TemporaryDirectory() as d:
d = Path(d)
with Cache.from_pickle(d / 'cache.pkl') as c:
c[...] = ...
with Cache.from_pickle(d / 'cache.pkl') as c:
assert ... in c and c[...] == ...
jsonprint("Let's take a look!")
json is a necessary tool for our API, but it can be very limiting.
from collections import namedtuple
from datetime import datetime
from json import dumps
def default(obj):
if isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, set):
return [*obj]
T = namedtuple('T', 'x y')
print(
f'{dumps([1, 2, 3]) = }',
# f'{dumps({1, 2, 3}) = }',
# f'{dumps(datetime.now()) = }',
# f'{dumps({1, 2, 3}, default=default) = }',
# f'{dumps(datetime.now(), default=default) = }',
# f'{dumps(T(123, 456)) = }',
# f'{dumps({"a": 1, "b": 2, "c": 3}) = }',
# f'{dumps({1: "a", 2: "b", 3: "c"}) = }',
sep='\n',
)
from decimal import Decimal
from itertools import product
from math import pi, exp
from json import dumps as json_dumps, loads as json_loads
from ujson import loads as ujson_loads, dumps as ujson_dumps
from orjson import loads as orjson_loads, dumps as orjson_dumps
print(
f'{json_dumps(.1) = }',
# f'{json_dumps(.1 + .2) = }',
# *(f'{loads(dumps(.1 + .2)) == .1 + .2 = }'
# for loads, dumps in product(
# [json_loads, ujson_loads, orjson_loads],
# [json_dumps, ujson_dumps, orjson_dumps],
# )
# ),
# f'{json_loads('0.3', parse_float=Decimal) = }',
sep='\n',
)
from dataclasses import dataclass, is_dataclass, asdict
from json import dumps, loads
@dataclass
class T:
x : int
y : int
def default(obj):
if is_dataclass(obj):
return {**asdict(obj), 'type:dataclass': type(obj).__name__}
def object_hook(obj):
if (cls := obj.pop('type:dataclass', None)):
return globals()[cls](**obj)
obj = T(123, 456)
print(
f'{dumps(obj, default=default) = }',
f'{loads(dumps(obj, default=default), object_hook=object_hook) = }',
sep='\n',
)
We may prefer to use a third-party module such as orjson.
from collections import namedtuple
from datetime import datetime
from orjson import dumps, OPT_SERIALIZE_NUMPY
from numpy import array
from pandas import Series
T = namedtuple('T', 'x y')
def default(obj):
if isinstance(obj, set):
return [*obj]
if isinstance(obj, T):
return obj._asdict()
if isinstance(obj, Series):
return obj.to_dict()
print(
f'{dumps([1, 2, 3]) = }',
# f'{dumps({1, 2, 3}) = }',
# f'{dumps(datetime.now()) = }',
# f'{dumps({1, 2, 3}, default=default) = }',
# f'{dumps(T(123, 456), default=default) = }',
# f'{dumps(array([1, 2, 3])) = }',
# f'{dumps(array([1, 2, 3]), option=OPT_SERIALIZE_NUMPY) = }',
# f'{dumps(Series(index=[], data=[], dtype="float64")) = }',
# f'{dumps(Series(index=[], data=[], dtype="float64"), default=default) = }',
# f'{dumps(Series(index=["a"], data=[1], dtype="float64"), default=default) = }',
# f'{Series(index=[1], data=[1], dtype="float64").to_json() = }',
# f'{Series(index=[1], data=[1], dtype="float64").to_json(orient="split") = }',
sep='\n',
)
from fastapi import FastAPI
from fastapi.responses import ORJSONResponse
from orjson import loads, dumps, OPT_SERIALIZE_NUMPY
from pandas import Series
from uvicorn import run
class CustomResponse(ORJSONResponse):
media_type = 'application/json'
def render(self, content):
def default(obj):
if isinstance(obj, Series):
return loads(obj.to_json(orient='split'))
return dumps(content, default=default, option=OPT_SERIALIZE_NUMPY)
app = FastAPI(default_response_class=CustomResponse)
@app.get('/endpoint', response_class=CustomResponse)
async def endpoint():
return CustomResponse({'series': Series(index=[1], data=[1])})
if __name__ == '__main__':
from multiprocessing import Process
@lambda f: Process(target=f).start()
def test():
from subprocess import run
from time import sleep
sleep(.25)
run(['http', 'get', 'http://127.0.0.1:8000/endpoint'])
run(app)
yaml, tomlprint("Let's take a look!")
If you can use toml, consider it (or just json) before reaching for yaml.
from collections import namedtuple
from datetime import datetime
from yaml import dump, load, Loader, add_representer
T = namedtuple('T', 'x y')
# add_representer(T, lambda dumper, data: dumper.represent_mapping('tag:yaml.org,2002:map', data._asdict()))
print(
f'{dump([1, 2, 3]) = }',
# f'{dump({1, 2, 3}) = }',
# f'{load(dump({1, 2, 3}), Loader=Loader) = }',
# f'{dump(datetime.now()) = }',
# f'{dump(T(123, 456)) = }',
# f'{dump({"a": 1, "b": 2, "c": 3}) = }',
# f'{dump({1: "a", 2: "b", 3: "c"}) = }',
# f'{load(dump({1: "a", 2: "b", 3: "c"}), Loader=Loader) = }',
# f'{dump({T(0, 1): "a", T(1, 2): "b", T(2, 3): "c"}) = }',
sep='\n',
)
from collections import namedtuple
from datetime import datetime
from yaml import dump, load, Loader, add_representer, add_constructor
T = namedtuple('T', 'timestamp value')
def from_yaml(loader, node):
payload = loader.construct_mapping(node)
if (cls := payload.pop('type:dataclass', None)):
return globals()[cls](**payload)
return payload
add_representer(T, lambda dumper, data: dumper.represent_mapping(
'tag:yaml.org,2002:map', {**data._asdict(), 'type:dataclass': type(data).__name__})
)
add_constructor('tag:yaml.org,2002:map', from_yaml)
objs = {
'abc': T(timestamp=datetime(2020, 1, 1), value=123),
'xyz': T(timestamp=datetime(2020, 1, 1), value=123),
}
print(
# f'{dump(objs) = }',
f'{load(dump(objs), Loader=Loader) = }',
sep='\n',
)
from collections import namedtuple
from dataclasses import dataclass, asdict
from datetime import datetime
from yaml import dump, load, Loader, add_representer, add_constructor
class Node:
SUBCLASSES = {}
def __init_subclass__(cls, yaml_tag):
cls.SUBCLASSES[yaml_tag] = cls
@classmethod
def from_yaml(cls, payload):
if (key := payload.pop('type:dataclass', None)) in cls.SUBCLASSES:
return cls.SUBCLASSES[key].from_yaml(payload)
return {k: cls.from_yaml(v) for k, v in payload.items()}
@dataclass
class T(Node, yaml_tag='T'):
timestamp : datetime
value : int
@classmethod
def from_yaml(cls, payload):
return cls(**payload)
add_representer(T, lambda dumper, data: dumper.represent_mapping(
'tag:yaml.org,2002:map', {**asdict(data), 'type:dataclass': type(data).__name__})
)
objs = {
'abc': T(timestamp=datetime(2020, 1, 1), value=123),
'xyz': T(timestamp=datetime(2020, 1, 1), value=123),
}
print(
# f'{dump(objs) = }',
# f'{load(dump(objs), Loader=Loader) = }',
# f'{Node.from_yaml(load(dump(objs), Loader=Loader)) = }',
sep='\n',
)
from yaml import load, Loader, SafeLoader
doc = '''
- 07
- 08
- NI: Nicaragua
NL: Netherlands
NO: Norway
'''
print(
load(doc, Loader=Loader),
# load(doc, Loader=SafeLoader),
sep='\n',
)
from yaml import load, Loader, SafeLoader
doc = '''
!!python/object/new:os.system [whoami]
'''
print(
load(doc, Loader=Loader),
# load(doc, Loader=SafeLoader),
sep='\n',
)
from tomllib import loads
config = '''
# some config
[cache]
enabled = true
filename = '/run/user/1000/cache.pkl'
[services.openai]
api_key = "phiemeegh4tho0eihechoh6Weeg0eeVai6uv0lo5oojash6co5phait3oaW7Eevae1ohTeak5wiz1ooke9AiTh1AchuaK2ph"
use_until = 2020-01-01
'''
print(
f'{loads(config) = }',
)
sqlite3print("Let's take a look!")
Is sqlite3 even a persistence format?
from contextlib import closing
from sqlite3 import connect
from string import ascii_lowercase
from pandas import date_range, Categorical, MultiIndex, Series
from numpy import unique
from numpy.random import default_rng
rng = default_rng(0)
s = Series(
index=(idx := MultiIndex.from_product([
date_range('2020-01-01', periods=366),
Categorical(unique(rng.choice([*ascii_lowercase], size=(1_000, 4)).view('<U4').ravel())),
], names='date entity'.split())),
data=rng.normal(size=len(idx)),
name='value',
)
print(s)
with closing(connect(':memory:')) as conn:
s.to_sql(s.name, conn)
from contextlib import closing
from pathlib import Path
from sqlite3 import connect as sqlite3_connect
from string import ascii_lowercase
from tempfile import TemporaryDirectory
from duckdb import connect as duckdb_connect, install_extension, load_extension
from pandas import date_range, Categorical, MultiIndex, Series
from numpy import unique
from numpy.random import default_rng
rng = default_rng(0)
s = Series(
index=(idx := MultiIndex.from_product([
date_range('2020-01-01', periods=366),
Categorical(unique(rng.choice([*ascii_lowercase], size=(1_000, 4)).view('<U4').ravel())),
], names='date entity'.split())),
data=rng.normal(size=len(idx)),
name='value',
)
with TemporaryDirectory() as d:
d = Path(d)
with closing(sqlite3_connect(filename := (d / 'db.sqlite'))) as conn:
s.to_sql(s.name, conn)
print(f'{filename = }')
install_extension('sqlite3')
load_extension('sqlite3')
with duckdb_connect(filename) as conn:
print(f"{conn.execute('select count(*) from value').fetchall() = }")
from contextlib import closing
from pathlib import Path
from sqlite3 import connect as sqlite3_connect
from string import ascii_lowercase
from subprocess import run
from tempfile import TemporaryDirectory
from duckdb import connect as duckdb_connect
from pandas import date_range, Categorical, MultiIndex, Series
from numpy import unique
from numpy.random import default_rng
rng = default_rng(0)
s = Series(
index=(idx := MultiIndex.from_product([
date_range('2020-01-01', periods=366),
Categorical(unique(rng.choice([*ascii_lowercase], size=(1_000, 4)).view('<U4').ravel())),
], names='date entity'.split())),
data=rng.normal(size=len(idx)),
name='value',
)
with TemporaryDirectory() as d:
d = Path(d)
with closing(sqlite3_connect(filename := (d / 'data.sqlite'))) as conn:
s.to_sql(s.name, conn)
print(f'{filename = }', f'{filename.stat().st_size = :,}', sep='\t')
run(['gzip', filename])
filename = filename.with_suffix(f'{filename.suffix}.gz')
print(f'{filename = }', f'{filename.stat().st_size = :,}', sep='\t')
with duckdb_connect(filename := (d / 'data.duckb')) as conn:
df = s.to_frame()
conn.sql(f'create table df as select * from df')
print(f'{filename = }', f'{filename.stat().st_size = :,}', sep='\t')
s.to_pickle(filename := (d / 'data.pkl.gz'))
print(f'{filename = }', f'{filename.stat().st_size = :,}', sep='\t')
s.to_csv(filename := (d / 'data.csv.gz'))
print(f'{filename = }', f'{filename.stat().st_size = :,}', sep='\t')
parquet, zarr, and arrowprint("Let's take a look!")
parquet is a fine choice for storing out data at-rest, if that data is columnar.
from pathlib import Path
from tempfile import TemporaryDirectory
from numpy import array
from pandas import Series
from zarr import save
from zarr.storage import ZipStore
from pyarrow import parquet, Table
xs = array([
[1, 2, 3],
[4, 5, 6],
[7, 8, 9],
])
s = Series(
index=[*'abc'],
data=[1, 2, 3],
)
with TemporaryDirectory() as d:
d = Path(d)
tbl = Table.from_pandas(s.to_frame())
parquet.write_table(tbl, filename := (d / 's.parquet'))
store = ZipStore(d / 'xs.zarr.zip', mode='w')
save(store, xs)
from pathlib import Path
from string import ascii_lowercase
from tempfile import TemporaryDirectory
from pandas import date_range, Categorical, MultiIndex, Series
from numpy import unique
from numpy.random import default_rng
rng = default_rng(0)
s = Series(
index=(idx := MultiIndex.from_product([
date_range('2020-01-01', periods=366),
Categorical(unique(rng.choice([*ascii_lowercase], size=(10_000, 4)).view('<U4').ravel())),
], names='date entity'.split())),
data=rng.normal(size=len(idx)),
name='value',
)
with TemporaryDirectory() as d:
d = Path(d)
s.to_pickle(filename := (d / 'data.pkl.zstd'))
print(f'{filename.stat().st_size = :,}', f'{filename = }', sep='\t')
s.to_csv(filename := (d / 'data.csv.zstd'))
print(f'{filename.stat().st_size = :,}', f'{filename = }', sep='\t')
s.to_frame().to_parquet(filename := (d / 'data.parquet.zstd'))
print(f'{filename.stat().st_size = :,}', f'{filename = }', sep='\t')
s.to_frame().to_feather(filename := (d / 'data.feather.zstd'))
print(f'{filename.stat().st_size = :,}', f'{filename = }', sep='\t')
from pathlib import Path
from subprocess import run
from tempfile import TemporaryDirectory
from numpy.random import default_rng
from zarr import save, create_array
from zarr.storage import LocalStore
rng = default_rng(0)
with TemporaryDirectory() as d:
d = Path(d)
store = LocalStore(d / 'xs.zarr.d')
xs = create_array(store=store, shape=(15_000, 15_000), dtype='float64')
xs[:] = rng.random(size=xs.shape)
run(['du', '-hsc', d / 'xs.zarr.d'])
print("Let's take a look!")
from asyncio import gather
from base64 import b64encode
from collections import namedtuple
from collections.abc import MutableMapping
from contextlib import asynccontextmanager, contextmanager
from dataclasses import dataclass
from os import environ
from pathlib import Path
from pickle import load as pkl_load, dump as pkl_dump
from textwrap import dedent
from tomllib import loads as toml_loads
from aiosqlite import connect
from fastapi import FastAPI
from fastapi.responses import ORJSONResponse
from httpx import AsyncClient
from io import StringIO, BytesIO
from numpy.random import default_rng
from orjson import dumps as json_dumps, loads as json_loads, OPT_SERIALIZE_NUMPY
from pandas import Series, date_range, MultiIndex
from pydantic import BaseModel
from uvicorn import run
rng = default_rng(0)
@dataclass(frozen=True)
class Cache(MutableMapping):
filename : Path
storage : dict[object, object]
def __getitem__(self, key):
return self.storage[key]
def __setitem__(self, key, value):
self.storage[key] = value
def __delitem__(self, key):
del self.storage[key]
def __iter__(self):
return iter(self.storage)
def __len__(self):
return len(self.storage)
@classmethod
@contextmanager
def from_pickle(cls, filename):
try:
with open(filename, mode='rb') as f:
storage = load_pkl(f)
except FileNotFoundError:
storage = {}
try:
yield cls(filename=filename, storage=storage)
finally:
with open(filename, mode='wb') as f:
dump_pkl(storage, f)
@asynccontextmanager
async def openai_session(
api_key,
*,
root_url='https://api.openai.com/v1',
):
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json',
}
async def executor():
reqs = yield ...
while True:
resps = await(gather(*(
client.post(
f'{root_url}/responses',
headers=headers,
json=req._asdict(),
) for req in reqs
)))
reqs = yield [x.json()['output'][0]['content'][0]['text'] for x in resps]
async with AsyncClient() as client:
ex = executor()
await anext(ex) # pump
yield lambda *reqs: ex.asend(reqs)
class CustomResponse(ORJSONResponse):
media_type = 'application/json'
def render(self, content):
def default(obj):
if isinstance(obj, Series):
obj.to_json(buf := StringIO(), orient='split')
return buf.getvalue()
return json_dumps(content, default=default, option=OPT_SERIALIZE_NUMPY)
CONFIG = toml_loads(dedent('''
[database]
data_dir = "data/"
filename = "data/db.sqlite"
[services.openai]
api_key = "phiemeegh4tho0eihechoh6Weeg0eeVai6uv0lo5oojash6co5phait3oaW7Eevae1ohTeak5wiz1ooke9AiTh1AchuaK2ph"
''').strip())
@asynccontextmanager
async def lifespan(app):
global executor, conn
async with (
openai_session(api_key=environ.get('OPENAI_API_KEY', CONFIG['services']['openai']['api_key'])) as executor,
connect(CONFIG['database']['filename']) as conn,
):
query = dedent('''
create table if not exists responses (
input text
, instructions text
, model text
, response text
)
''').strip()
await conn.execute(query)
await conn.commit()
yield
async def retrieve(request):
query = dedent('''
select response from responses where input = :input and instructions = :instructions and model = :model
limit 1
''')
res = await conn.execute(query, request._asdict())
async for row, in res:
return [row]
raise LookupError()
async def store(requests, responses):
query = dedent('''
insert into responses (input, instructions, model, response)
values (:input, :instructions, :model, :response)
''')
res = await conn.executemany(query, [
{**req._asdict(), 'response': resp}
for req, resp in zip(requests, responses)
])
await conn.commit()
app = FastAPI(lifespan=lifespan)
Request = namedtuple('Request', 'input instructions model', defaults=['gpt-4o'])
def conversation(full_text):
partial_text = full_text.splitlines()[0]
instructions = {
'summary': dedent('''
Translate and summarize the top three points of this headline.
Reply only in English. Reply only with bullet points.
''').strip(),
'market impact': dedent('''
Determine if this will have market impact. Reply with only true or false.
''').strip(),
'tickers': dedent('''
Reply with a JSON payload with the market tickers that will be
affected as an array of strings. Specify only TPEX traded
companies.
''').strip(),
}
_ = yield Request(partial_text, instructions['summary'])
resp = yield Request(partial_text, instructions['market impact'])
if 'true'.casefold() in resp.casefold():
_ = yield Request(full_text, instructions['tickers'])
class SummaryRequest(BaseModel):
text : str
@app.post('/summarize')
async def summarize(request : SummaryRequest):
convo = conversation(request.text)
reqs, resps = [], [None]
while True:
try:
reqs.append(convo.send(resps[-1]))
try:
resps.extend(await retrieve(reqs[-1]))
except LookupError:
resps.extend(await executor(reqs[-1]))
except StopIteration:
break
await store(reqs, resps[1:])
return {'responses': resps[1:]}
class ReturnsRequest(BaseModel):
ticker : str
@app.post('/returns')
async def returns(request : ReturnsRequest, format : str='json'):
returns = Series(
index=(idx := MultiIndex.from_product([
date_range('2020-01-01', periods=365*10),
[request.ticker],
], names='date entity'.split())),
data=rng.normal(size=len(idx)),
name='value',
)
if format == 'json':
return CustomResponse({'returns': returns})
else:
returns.to_frame().to_feather(buf := BytesIO())
return CustomResponse({'returns': b64encode(buf.getvalue()).decode()})
if __name__ == '__main__':
from multiprocessing import Process
@lambda f: Process(target=f).start()
def test():
from subprocess import run
from time import sleep
sleep(.25)
run(['http', 'post', 'http://127.0.0.1:8000/summarize', 'text=@data/article.txt'])
run(['http', 'post', 'http://127.0.0.1:8000/returns', 'ticker=0000'])
run(app)