Calmcode - polars: sessionize

Let's now sessionize the data with polars.

1 2 3 4 5 6 7 8 9

Here's the code that we started with.

import polars as pl

# First "read" in the data.
df = pl.read_csv("wowah_data.csv", parse_dates=False)
df.columns = [c.replace(" ", "") for c in df.columns]
df = df.lazy()

def set_types(dataf):
    return (dataf.with_columns([
                pl.col("guild") != -1,
                pl.col("timestamp").str.strptime(pl.Datetime, fmt="%m/%d/%y %H:%M:%S"),
            ]))

# A small "pipeline" of steps.
(df
  .filter(pl.col("char") == 21)
  .pipe(set_types)
  .sort(["char", "timestamp"])
  .collect())

Let's now add a session to this data.

Sessionize

Given that we have a sorted dataset we can add a session by first calculating if the time-jump between two rows is too big. We also need to check if there's jump between two characters. We can again use expressions here by using the convenient .diff() method on a column.

# This represents 20 minutes
threshold = 20 * 60 * 1000

(df
  .filter(pl.col("char") == 21)
  .pipe(set_types)
  .sort(["char", "timestamp"])
  .with_columns([
    (pl.col("timestamp").diff().cast(pl.Int64) > threshold).fill_null(True).alias("ts_diff"),
    (pl.col("char").diff() != 0).fill_null(True).alias("char_diff"),
  ])
  .collect())

We now have two columns, ts_diff and char_diff. If any of these columns contain a true value then we know that we're just started a new session. Either because too much time has passed, or because we're dealing with a new character. Let's now add a column that indicates a new session mark.

# This represents 20 minutes
threshold = 20 * 60 * 1000

(df
  .filter(pl.col("char") == 21)
  .pipe(set_types)
  .sort(["char", "timestamp"])
  .with_columns([
    (pl.col("timestamp").diff().cast(pl.Int64) > threshold).fill_null(True).alias("ts_diff"),
    (pl.col("char").diff() != 0).fill_null(True).alias("char_diff"),
  ])
  .with_columns([
    (pl.col("ts_diff") | pl.col("char_diff")).alias("new_session_mark")
  ])
  .collect())

You'll notice that we've added a second .with_columns method here. You may wonder, "why?". The reason is that the ts_diff and char_diff columns don't exist yet in the first .with_columns statement.

Given our new_session_mark we can run .cumsum() over it to end up with a session id!

# This represents 20 minutes
threshold = 20 * 60 * 1000

(df
  .filter(pl.col("char") == 21)
  .pipe(set_types)
  .sort(["char", "timestamp"])
  .with_columns([
    (pl.col("timestamp").diff().cast(pl.Int64) > threshold).fill_null(True).alias("ts_diff"),
    (pl.col("char").diff() != 0).fill_null(True).alias("char_diff"),
  ])
  .with_columns([
    (pl.col("ts_diff") | pl.col("char_diff")).alias("new_session_mark")
  ])
  .with_columns([
    pl.col("new_session_mark").cumsum().alias("session")
  ])
  .collect())

You can remove the filter step and explore the data to confirm that our code does what we want it to.