In [None]:
import cudf
import cupy as cp
import numpy as np


In [32]:
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

{'status': 'ok', 'restart': True}

In [None]:
DIR = "./data"
BIG_KM = 2000.0
MAX_GAP_HOURS = 6
MAX_GAP = np.timedelta64(MAX_GAP_HOURS, "h")

In [7]:
users = cudf.read_csv(f"{DIR}/users.csv", dtype={"id": "uint64"})
flights = cudf.read_csv(f"{DIR}/flights.csv")
user_flights = cudf.read_csv(
    f"{DIR}/user_flights.csv", dtype={"user_id": "uint64", "flight_id": "uint64"}
)
cards = cudf.read_csv(
    f"{DIR}/cards.csv", dtype={"id": "uint64", "user_id": "uint64", "number": "uint64"}
)
card_flights = cudf.read_csv(
    f"{DIR}/card_flights.csv", dtype={"card_id": "uint64", "flight_id": "uint64"}
)

In [None]:
has_time = flights["has_time"].astype("str").str.strip().str.lower()
has_time = (has_time == "true")

date = flights["dep_date"].fillna("").astype("str").str.strip()
time = flights["dep_time"].fillna("").astype("str").str.strip()

mask_date = date.str.match(r"^\d{4}-\d{2}-\d{2}$")
mask_time = time.str.match(r"^\d{2}:\d{2}(:\d{2})?$") 

mask_dt = mask_date & mask_time & has_time          
mask_d  = mask_date & (~has_time | ~mask_time)     


In [9]:
for c in ("fromlat", "fromlon", "tolat", "tolon"):
    flights[c] = flights[c].astype("float64")

R = cp.float64(6371.0088)
rad = cp.float64(cp.pi / 180.0)

lat1 = flights["fromlat"].values * rad
lon1 = flights["fromlon"].values * rad
lat2 = flights["tolat"].values  * rad
lon2 = flights["tolon"].values  * rad

dlat = lat2 - lat1
dlon = lon2 - lon1
a = cp.sin(dlat * 0.5)**2 + cp.cos(lat1) * cp.cos(lat2) * cp.sin(dlon * 0.5)**2
flights["distance_km"] = cudf.Series(R * (2.0 * cp.arcsin(cp.sqrt(a))))

In [None]:
map_user_flights = user_flights[["user_id", "flight_id"]]

cards_min = cards[["id", "user_id"]].rename(columns={"id": "card_id"})
map_card_users = card_flights.merge(cards_min, on="card_id", how="left")[["user_id", "flight_id"]]

user_flight_map = cudf.concat([map_user_flights, map_card_users], ignore_index=True)
user_flight_map = user_flight_map.dropna(subset=["user_id", "flight_id"]).drop_duplicates()

uf = user_flight_map.merge(
    flights,
    left_on="flight_id",
    right_on="id",
    how="inner",             
)
uf = uf.dropna(subset=["dep_ts"]).sort_values(["user_id", "dep_ts", "flight_id"])

In [11]:
uid = "user_id"

In [12]:
prev_dep = uf.groupby(uid)["dep_ts"].shift(1)
gap = uf["dep_ts"] - prev_dep
cond_lt6h = gap <= MAX_GAP
is_new_seg = cond_lt6h.isna() | (~cond_lt6h)
seg_id = is_new_seg.astype("int32").groupby(uf[uid]).cumsum()
streak_sizes = uf.groupby([uid, seg_id]).size().rename("streak_len")
max_streak_lt6h = streak_sizes.groupby(level=0).max().rename("max_streak_lt6h")


In [13]:
big = uf["distance_km"] >= BIG_KM
big_new = big.isna() | (~big)
big_seg = big_new.astype("int32").groupby(uf[uid]).cumsum()
big_sizes = uf[big].groupby([uid, big_seg]).size().rename("big_run_len")
count_big_streaks_ge3 = (big_sizes >= 3).groupby(level=0).sum().astype("int64").rename("count_big_streaks_ge3")


In [14]:
route = uf["from"].astype("str") + "â†’" + uf["to"].astype("str")
route_prev = route.groupby(uf[uid]).shift(1)
route_change = route_prev.isna() | (route != route_prev)
route_seg = route_change.astype("int32").groupby(uf[uid]).cumsum()
route_run_sizes = uf.groupby([uid, route_seg]).size().rename("route_run_len")
max_consec_same_route = route_run_sizes.groupby(level=0).max().rename("max_consec_same_route")

In [None]:
orig = uf["from"].astype("str")
orig_prev = orig.groupby(uf[uid]).shift(1)
orig_change = orig_prev.isna() | (orig != orig_prev)
orig_seg = orig_change.astype("int32").groupby(uf[uid]).cumsum()
orig_run_sizes = uf.groupby([uid, orig_seg]).size().rename("orig_run_len")
max_consec_same_origin = orig_run_sizes.groupby(level=0).max().rename("max_consec_same_origin")


In [None]:
longest_distance_km = uf.groupby(uid)["distance_km"].max().rename("longest_distance_km")

gap_days = (gap / np.timedelta64(1, "D")).astype("float64")
avg_days_between = gap_days.groupby(uf[uid]).mean().rename("avg_days_between_flights")

unique_airports = cudf.concat(
    [
        uf[[uid, "from"]].rename(columns={"from": "airport"}),
        uf[[uid, "to"]].rename(columns={"to": "airport"}),
    ],
    ignore_index=True,
).groupby(uid)["airport"].nunique().astype("int64").rename("unique_airports")


In [17]:
metrics = (
    max_streak_lt6h.to_frame()
    .join(count_big_streaks_ge3, how="left")
    .join(max_consec_same_route,  how="left")
    .join(max_consec_same_origin, how="left")
    .join(unique_airports,        how="left")
    .join(longest_distance_km,    how="left")
    .join(avg_days_between,       how="left")
).fillna({
    "max_streak_lt6h": 0,
    "count_big_streaks_ge3": 0,
    "max_consec_same_route": 0,
    "max_consec_same_origin": 0,
    "unique_airports": 0,
})

users_analytics = users.set_index("id").join(metrics, how="left")


In [20]:
users_analytics.to_csv(f"{DIR}/users_analytics_gpu_cudf.csv", index=True)