We've turned all the code that we need into pipeline functions. So the code-base looks like this now;
import polars as pl
# First, "read" the data.
df = pl.read_csv("wowah_data.csv", parse_dates=False)
df.columns = [c.replace(" ", "") for c in df.columns]
df = df.lazy()
def set_types(dataf):
return (dataf.with_columns([
pl.col("guild") != -1,
pl.col("timestamp").str.strptime(pl.Datetime, fmt="%m/%d/%y %H:%M:%S"),
]))
def sessionize(dataf, threshold=20 * 60 * 1000):
return (dataf
.sort(["char", "timestamp"])
.with_columns([
(pl.col("timestamp").diff().cast(pl.Int64) > threshold).fill_null(True).alias("ts_diff"),
(pl.col("char").diff() != 0).fill_null(True).alias("char_diff"),
])
.with_columns([
(pl.col("ts_diff") | pl.col("char_diff")).alias("new_session_mark")
])
.drop(["char_diff", "ts_diff", "new_session_mark"]))
def add_features(dataf):
return (dataf
.with_columns([
pl.col("char").count().over("session").alias("session_length"),
pl.col("session").n_unique().over("char").alias("n_sessions_per_char")
]))
def remove_bots(dataf, max_session_hours=24):
# We're using some domain knowledge. The logger of our dataset should log
# data every 10 minutes. That's what this line is based on.
n_rows = max_session_hours * 6
return (dataf
.filter(pl.col("session_length").max().over("char") < n_rows))
(df
.pipe(set_types)
.pipe(sessionize, threshold=20 * 60 * 1000)
.pipe(add_features)
.pipe(remove_bots, max_session_hours=10)
.collect())
Speedups by Caching
The query we have runs in 7 seconds, which is pretty fast. But it's still a bummer if we need to wait for the entire pipeline to run if we're only interested in changing the max_session_hours
at the end. There's a trick that we can do here though; we can collect an intermediate result!
df_intermediate = (df
.pipe(set_types)
.pipe(sessionize, threshold=20 * 60 * 1000)
.pipe(add_features)
.collect())
We now have an intermediate dataframe that's collected. This is not a lazy dataframe anymore. That means that the dataframe no longer represents actions to be run in the future. It actually represents data now which means that we don't need to re-compute everything when we run;
df_intermediate.pipe(remove_bots, max_session_hours=10)
You can confirm that the two types of dataframes are indeed different.
type(df_intermediate)
# polars.eager.frame.DataFrame