pandas I: pandas is well designed, actually!Keywords: understanding datatypes in pandas including pandas.array, pandas.Series, pandas.DataFrame; nullable ints, pandas.Categorical; resampling, masking
| Presenter | James Powell james@dutc.io |
| Date | Wednesday, November 18, 2020 |
| Time | 3:30 PM EST |
print('Good afternoon!')
# now, look at some "traces": captured data from a
# running system used to gain operational insight.
# we have a file of JSON-lines that encode
# captured metrics from some multi-node cluster,
# including memory use, CPU load averages, &c.
# (note: for the purposes of this exercise, we will
# use random data, so there's no value in reading
# too much into the data itself)
from datetime import
traces = '''
{ "timestamp": "2020-07-04T09:00:00.000000", "node": "a", "mem": { "total": 1024, "used": 768 }, "swap": { "total": 32, "used": 8 }, "cpu": [ 0.9, 0.7, 0.6 ] }
{ "timestamp": "2020-07-04T09:00:30.000000", "node": "b", "mem": { "total": 1024, "used": 256 }, "swap": { "total": 32, "used": 0 }, "cpu": [ 0.1, 0.1, 0.1 ] }
{ "timestamp": "2020-07-04T09:01:00.000000", "node": "a", "mem": { "total": 1024, "used": 768 }, "swap": { "total": 32, "used": 8 }, "cpu": [ 0.2, 0.7, 0.6 ] }
'''
Traces file, containing JSON lines data with traces traces.jsonl
objectsobject has fields timestamp, node, mem, swap, cpu
timestamp: datetime.isoformat() datetimecpu: string node namemem: object with fields total and used with memory usageswap: object with fields total and used with swap usagecpu: array with four numbers (1-min, 5-min, 10-min load average)# TASK: read in traces.jsonl as a stream and identify nodes with the
# following potential operational issues
# - 1-min load average >.8 for more than one hour
# - swap usage >80% for more than five minutes
# - memory free <10% for more than 2 minute two times in one hour
from collections import namedtuple
from json import loads
from datetime import datetime
Cpu = namedtuple('Cpu', 'avg_1m avg_5m avg_10m')
Mem = namedtuple('Mem', 'total used')
Swap = namedtuple('Swap','total used')
class Trace(namedtuple('TraceBase', 'timestamp node cpu mem swap')):
@classmethod
def from_json(cls, json):
timestamp = datetime.fromisoformat(json['timestamp'])
node = json['node']
cpu = Cpu(*json['cpu'])
mem = Mem(**json['mem'])
swap = Swap(**json['swap'])
return cls(timestamp, node, cpu, mem, swap)
with open('traces.jsonl') as f:
traces = [Trace.from_json(loads(line)) for line in f]
from collections import defaultdict
per_node = defaultdict(list)
for t in traces:
per_node[t.node].append(t)
from itertools import islice, tee
nwise = lambda g, n=2: zip(*(islice(g, i, None) for i, g in enumerate(tee(g, n))))
keyfunc = lambda t: t.timestamp.replace(second=0, microsecond=0)
for group in nwise(sorted(per_node["a"], key=keyfunc), 60):
if all(x.cpu.avg_1m > .80 for x in group):
print(f'{group = }')
break
from collections import namedtuple
from json import loads
from datetime import datetime
Cpu = namedtuple('Cpu', 'avg_1m avg_5m avg_10m')
Mem = namedtuple('Mem', 'total used')
class Swap(namedtuple('Swap','total used')):
@property
def free(self):
return self.total - self.used
class Trace(namedtuple('TraceBase', 'timestamp node cpu mem swap')):
@classmethod
def from_json(cls, json):
timestamp = datetime.fromisoformat(json['timestamp'])
node = json['node']
cpu = Cpu(*json['cpu'])
mem = Mem(**json['mem'])
swap = Swap(**json['swap'])
return cls(timestamp, node, cpu, mem, swap)
with open('traces.jsonl') as f:
traces = [Trace.from_json(loads(line)) for line in f]
from pandas import DataFrame, MultiIndex, concat
df = DataFrame(traces)
df.set_index('node', inplace=True)
df.set_index('timestamp', inplace=True, append=True)
for idx, col in enumerate(['1T', '5T', '10T']):
df['cpu', col] = df['cpu'].str[idx]
del df['cpu']
for stat in {'mem', 'swap'}:
for idx, col in enumerate(['total', 'usage']):
df[stat, col] = df[stat].str[idx]
df[stat, 'free'] = df[stat, 'total'] - df[stat, 'usage']
df[stat, 'pct'] = df[stat, 'usage'] / df[stat, 'total']
del df[stat]
df.columns = MultiIndex.from_tuples(df.columns)
df['warning', 'cpu'] = df['cpu', '1T'] > .80
df['warning', 'swap'] = df['swap', 'pct'] > .80
per_node = (df.loc[n].resample('1T').first().reset_index().assign(node=n)
for n in df.index.get_level_values(0).unique())
df = concat(per_node)
df.set_index(['node', 'timestamp'], inplace=True)
df.sort_index(inplace=True)
for col in (w := df['warning'].rolling(60, min_periods=1).sum() == 60).columns:
df['flag', col] = w[col]
from collections import namedtuple
class Flagged(namedtuple('Flagged', 'df cpu swap')):
@classmethod
def from_df(cls, df):
cpu = df['flag', 'cpu'].groupby(level=0).any()
swap = df['flag', 'swap'].groupby(level=0).any()
return cls(df, cpu, swap)
flagged = Flagged.from_df(df)
criteria = flagged.cpu & flagged.swap
for node in flagged.cpu[criteria].index:
print(flagged.df.loc[node][['cpu', 'swap', 'flag']].head())
df[lambda df: ...]
xs = [1, 2, 3]
xs.sort()
sorted(xs, key=...)
from numpy import add, array
xs = array([1, 2, 3], dtype='int8')
print(f'{xs + 1000 = }')
print(f'{add(xs, 1000, out=xs) = }')
from pandas import DataFrame, MultiIndex
index = MultiIndex.from_arrays([[*'aab'], ['aa', 'bb', 'aa']])
df = DataFrame({
'x': range(3),
'y': range(3),
}, index=index)
print(df[df.index.get_level_values(1) == 'bb'])
print(df.xs('bb', level=1, drop_level=False))