Here's the code that we started with.
import polars as pl
# First "read" in the data.
df = pl.read_csv("wowah_data.csv", parse_dates=False)
df.columns = [c.replace(" ", "") for c in df.columns]
df = df.lazy()
def set_types(dataf):
return (dataf.with_columns([
pl.col("guild") != -1,
pl.col("timestamp").str.strptime(pl.Datetime, fmt="%m/%d/%y %H:%M:%S"),
]))
# A small "pipeline" of steps.
(df
.filter(pl.col("char") == 21)
.pipe(set_types)
.sort(["char", "timestamp"])
.collect())
Let's now add a session to this data.
Sessionize
Given that we have a sorted dataset we can add a session by first calculating if the
time-jump between two rows is too big. We also need to check if there's jump between
two characters. We can again use expressions here by using the convenient .diff()
method on a column.
# This represents 20 minutes
threshold = 20 * 60 * 1000
(df
.filter(pl.col("char") == 21)
.pipe(set_types)
.sort(["char", "timestamp"])
.with_columns([
(pl.col("timestamp").diff().cast(pl.Int64) > threshold).fill_null(True).alias("ts_diff"),
(pl.col("char").diff() != 0).fill_null(True).alias("char_diff"),
])
.collect())
We now have two columns, ts_diff
and char_diff
. If any of these columns contain a true
value then we know that we're just started a new session. Either because too much time has passed, or because we're dealing with a new character. Let's now add a column that indicates a new session mark.
# This represents 20 minutes
threshold = 20 * 60 * 1000
(df
.filter(pl.col("char") == 21)
.pipe(set_types)
.sort(["char", "timestamp"])
.with_columns([
(pl.col("timestamp").diff().cast(pl.Int64) > threshold).fill_null(True).alias("ts_diff"),
(pl.col("char").diff() != 0).fill_null(True).alias("char_diff"),
])
.with_columns([
(pl.col("ts_diff") | pl.col("char_diff")).alias("new_session_mark")
])
.collect())
You'll notice that we've added a second .with_columns
method here. You may wonder, "why?". The reason is that the ts_diff
and char_diff
columns don't exist yet in the first .with_columns
statement.
Given our new_session_mark
we can run .cumsum()
over it to end up with a session id!
# This represents 20 minutes
threshold = 20 * 60 * 1000
(df
.filter(pl.col("char") == 21)
.pipe(set_types)
.sort(["char", "timestamp"])
.with_columns([
(pl.col("timestamp").diff().cast(pl.Int64) > threshold).fill_null(True).alias("ts_diff"),
(pl.col("char").diff() != 0).fill_null(True).alias("char_diff"),
])
.with_columns([
(pl.col("ts_diff") | pl.col("char_diff")).alias("new_session_mark")
])
.with_columns([
pl.col("new_session_mark").cumsum().alias("session")
])
.collect())
You can remove the filter step and explore the data to confirm that our code does what we want it to.