ts-python

applied pandas I: pandas is well designed, actually!

Discussion (Wed Nov 18, 2020; 3:30 PM EST)

Keywords: understanding datatypes in pandas including pandas.array, pandas.Series, pandas.DataFrame; nullable ints, pandas.Categorical; resampling, masking

Presenter James Powell james@dutc.io
Date Wednesday, November 18, 2020
Time 3:30 PM EST
print('Good afternoon!')
# now, look at some "traces": captured data from a
#    running system used to gain operational insight.

# we have a file of JSON-lines that encode 
#    captured metrics from some multi-node cluster,
#    including memory use, CPU load averages, &c.

# (note: for the purposes of this exercise, we will
#    use random data, so there's no value in reading
#    too much into the data itself)

from datetime import
traces = '''
{ "timestamp": "2020-07-04T09:00:00.000000", "node": "a", "mem": { "total": 1024, "used": 768 }, "swap": { "total": 32, "used": 8 }, "cpu": [ 0.9, 0.7, 0.6 ] }
{ "timestamp": "2020-07-04T09:00:30.000000", "node": "b", "mem": { "total": 1024, "used": 256 }, "swap": { "total": 32, "used": 0 }, "cpu": [ 0.1, 0.1, 0.1 ] }
{ "timestamp": "2020-07-04T09:01:00.000000", "node": "a", "mem": { "total": 1024, "used": 768 }, "swap": { "total": 32, "used": 8 }, "cpu": [ 0.2, 0.7, 0.6 ] }
'''

Data File

Traces file, containing JSON lines data with traces traces.jsonl

Exercise

# TASK: read in traces.jsonl as a stream and identify nodes with the
#       following potential operational issues
#       - 1-min load average >.8 for more than one hour
#       - swap usage >80% for more than five minutes
#       - memory free <10% for more than 2 minute two times in one hour

from collections import namedtuple
from json import loads
from datetime import datetime

Cpu  = namedtuple('Cpu', 'avg_1m avg_5m avg_10m')
Mem  = namedtuple('Mem', 'total used')
Swap = namedtuple('Swap','total used')

class Trace(namedtuple('TraceBase', 'timestamp node cpu mem swap')):
    @classmethod
    def from_json(cls, json):
        timestamp = datetime.fromisoformat(json['timestamp'])
        node = json['node']
        cpu = Cpu(*json['cpu'])
        mem = Mem(**json['mem'])
        swap = Swap(**json['swap'])
        return cls(timestamp, node, cpu, mem, swap)

with open('traces.jsonl') as f:
    traces = [Trace.from_json(loads(line)) for line in f]

from collections import defaultdict
per_node = defaultdict(list)
for t in traces:
    per_node[t.node].append(t)

from itertools import islice, tee
nwise = lambda g, n=2: zip(*(islice(g, i, None) for i, g in enumerate(tee(g, n))))
keyfunc = lambda t: t.timestamp.replace(second=0, microsecond=0)
for group in nwise(sorted(per_node["a"], key=keyfunc), 60):
    if all(x.cpu.avg_1m > .80 for x in group):
        print(f'{group = }')
        break
from collections import namedtuple
from json import loads
from datetime import datetime

Cpu  = namedtuple('Cpu', 'avg_1m avg_5m avg_10m')
Mem  = namedtuple('Mem', 'total used')
class Swap(namedtuple('Swap','total used')):
    @property
    def free(self):
        return self.total - self.used

class Trace(namedtuple('TraceBase', 'timestamp node cpu mem swap')):
    @classmethod
    def from_json(cls, json):
        timestamp = datetime.fromisoformat(json['timestamp'])
        node = json['node']
        cpu = Cpu(*json['cpu'])
        mem = Mem(**json['mem'])
        swap = Swap(**json['swap'])
        return cls(timestamp, node, cpu, mem, swap)

with open('traces.jsonl') as f:
    traces = [Trace.from_json(loads(line)) for line in f]

from pandas import DataFrame, MultiIndex, concat
df = DataFrame(traces)
df.set_index('node', inplace=True)
df.set_index('timestamp', inplace=True, append=True)

for idx, col in enumerate(['1T', '5T', '10T']):
    df['cpu', col] = df['cpu'].str[idx]
del df['cpu']

for stat in {'mem', 'swap'}:
    for idx, col in enumerate(['total', 'usage']):
        df[stat, col] = df[stat].str[idx]
    df[stat, 'free'] = df[stat, 'total'] - df[stat, 'usage']
    df[stat, 'pct']  = df[stat, 'usage'] / df[stat, 'total']
    del df[stat]

df.columns = MultiIndex.from_tuples(df.columns)

df['warning', 'cpu']  = df['cpu', '1T'] > .80
df['warning', 'swap'] = df['swap', 'pct'] > .80

per_node = (df.loc[n].resample('1T').first().reset_index().assign(node=n)
            for n in df.index.get_level_values(0).unique())
df = concat(per_node)
df.set_index(['node', 'timestamp'], inplace=True)
df.sort_index(inplace=True)

for col in (w := df['warning'].rolling(60, min_periods=1).sum() == 60).columns:
    df['flag', col] = w[col]

from collections import namedtuple
class Flagged(namedtuple('Flagged', 'df cpu swap')):
    @classmethod
    def from_df(cls, df):
        cpu  = df['flag', 'cpu'].groupby(level=0).any()
        swap = df['flag', 'swap'].groupby(level=0).any()
        return cls(df, cpu, swap)

flagged = Flagged.from_df(df)
criteria = flagged.cpu & flagged.swap
for node in flagged.cpu[criteria].index:
    print(flagged.df.loc[node][['cpu', 'swap', 'flag']].head())


df[lambda df: ...]
xs = [1, 2, 3]
xs.sort()
sorted(xs, key=...)
from numpy import add, array
xs = array([1, 2, 3], dtype='int8')
print(f'{xs + 1000             = }')
print(f'{add(xs, 1000, out=xs) = }')
from pandas import DataFrame, MultiIndex
index = MultiIndex.from_arrays([[*'aab'], ['aa', 'bb', 'aa']])
df = DataFrame({
    'x': range(3),
    'y': range(3),
}, index=index)

print(df[df.index.get_level_values(1) == 'bb'])
print(df.xs('bb', level=1, drop_level=False))