GMVSAE#
在本节内容中,我们主要是介绍使用TrajDL来进行轨迹序列异常检测,并且针对GM-VSAE中的部分公式进行代码细节讲解。本节内容如下:
数据预处理
GM-VSAE训练
在线异常序列检测
1. 数据预处理#
在GM-VSAE中使用的数据集依旧是前文所介绍过的Porto数据集,该数据集是从出租车上采集到的波尔多市出租车轨迹数据集。下面我们基于TrajDL中的网格系统将原始的经纬度点序列转换为网格位置序列。
这里我们使用了SimpleGridSystem构建网格系统,使用RectangleBoundary构建区域。
from trajdl import trajdl_cpp
from trajdl.grid import SimpleGridSystem
# 网格大小(0.1km, 0.1km)
grid_height, grid_width = 0.1, 0.1
lat_size, lng_size = grid_height / 110.574, grid_width / 111.320 / 0.99974
grid = SimpleGridSystem(
# 使用波尔多市的左下角点和右上角点来构建和切分网格系统
trajdl_cpp.RectangleBoundary(
min_x=-8.690261,
min_y=41.140092,
max_x=-8.549155,
max_y=41.185969,
),
step_x=lng_size,
step_y=lat_size,
)
print("Grid Size:", len(grid))
Grid Size: 8058
Note
注意,此处使用的坐标系统为基于原始经纬度的坐标系统,并非T2VEC中使用的墨卡托坐标系统。
下面使用TrajDL中的公开数据集接口PortoDataset来加载Porto数据集,并将轨迹点序列Trajecotory转换为位置序列LocSeq。
from collections import defaultdict
import polars as pl
from tqdm.notebook import tqdm
from tqdm.contrib import tenumerate
from trajdl.datasets import LocSeq
from trajdl.datasets.open_source.conf import PortoDataset
# 定义最短和最长的序列长度
shortest, longest = 20, 1200
def generate_od_map(shortest: int, longest: int):
# 加载Porto数据集并以pl.DataFrame的数据类型返回
trajectories = (
PortoDataset()
.load(return_as="pl")
.select("POLYLINE")
.filter(
(pl.col("POLYLINE").list.len() >= shortest)
& (pl.col("POLYLINE").list.len() <= longest)
)["POLYLINE"]
.limit(100000)
)
print(len(trajectories), trajectories[0])
# 定义一个np.ndarray类型的轨迹点序列转换为LocSeq位置序列的函数
def transform_traj_into_loc_seq(traj_np, idx):
# 检测该序列是否所有点都在波尔多市的网格系统中
if grid.in_boundary_np(traj_np).all():
return LocSeq(grid.locate_unsafe_np(traj_np), entity_id=str(idx))
return None
# 基于OD构建一个dict,key是OD pair,values是位置序列组成的list
od_agg = defaultdict(list)
for idx, traj_pl in tenumerate(trajectories, desc="transform trajectories into location sequences"):
# 将pl.DataFrame类型的原始序列转化为位置序列
loc_seq = transform_traj_into_loc_seq(traj_pl.to_numpy(), idx)
if loc_seq:
od_agg[(loc_seq.o, loc_seq.d)].append(loc_seq)
return od_agg
od_agg = generate_od_map(shortest, longest)
load dataset: porto
100000 shape: (23,)
Series: '' [array[f64, 2]]
[
[-8.618643, 41.141412]
[-8.618499, 41.141376]
[-8.620326, 41.14251]
[-8.622153, 41.143815]
[-8.623953, 41.144373]
…
[-8.631144, 41.154489]
[-8.630829, 41.154507]
[-8.630829, 41.154516]
[-8.630829, 41.154498]
[-8.630838, 41.154489]
]
下面开始统计属于同一对起点和终点的轨迹序列数量,也即论文中的”固定行程“,并且划分训练集和验证集。划分的逻辑是,先统计有多少”起点-终点(OD)”的路程,然后统计每个路程中有多少条轨迹,过滤掉小于25条轨迹的路程。然后从每一条路程中,选择最后的5条序列作为验证集,其余的序列作为训练集。
# 划分训练和验证集
min_od_traj_num = 10
test_traj_num = 5
train_loc_seqs, val_loc_seqs, valid_ods = [], [], set()
for od, loc_seqs in tqdm(od_agg.items(), desc="generating dataset"):
num_loc_seqs = len(loc_seqs)
# 过滤掉小于25的OD路程
if num_loc_seqs >= min_od_traj_num:
for idx in range(num_loc_seqs - test_traj_num):
train_loc_seqs.append(loc_seqs[idx])
for idx in range(num_loc_seqs - test_traj_num, num_loc_seqs):
val_loc_seqs.append(loc_seqs[idx])
valid_ods.add(od)
print(len(train_loc_seqs), len(val_loc_seqs), len(valid_ods))
4116 1635 327
import numpy as np
from trajdl.datasets import LocSeqDataset
def generate_dataset(loc_seqs, nums):
sample_idx = np.random.choice(
list(range(len(loc_seqs))), size=nums, replace=False
)
loc_samples = [loc_seqs[i] for i in sample_idx]
return LocSeqDataset.init_from_loc_seqs(loc_samples)
train_dataset, val_dataset = (
generate_dataset(train_loc_seqs, 800),
generate_dataset(val_loc_seqs, 200),
)
print(train_dataset, val_dataset)
LocSeqDataset(size=800) LocSeqDataset(size=200)
Note
本节后续会展示GM-VASE模型在Porto数据集上训练和预训练,考虑到执行时间,此处在构建数据集的时候对训练序列train_loc_seqs采样800条数据,对验证序列val_loc_seqs采样200条序列。该参数用户在使用TrajDL中可自行设定。
在GM-VSAE中,因为没有异常轨迹的标签,所以在这里需要构建异常轨迹的序列,用于序列异常检测的下游任务。构建异常轨迹序列的方式是:从数据集中随机选取一定比例的轨迹序列,然后对轨迹序列上的每一个轨迹点进行随机扰动。扰动的方式是,随机从该点所在的位置选择其一阶邻居或二阶邻居的点作为该点的扰动进行替换。具体的实现如下:
# 先定义在GridSystem中扰动的函数
def perturb_locseq(locseq, level, prob):
loc_list = [locseq.o]
for idx in range(1, len(locseq) - 1):
loc = locseq[idx]
# 获取loc的邻域
grid_x, grid_y = grid.to_grid_coordinate(loc)
offset = [[0, 1], [1, 0], [-1, 0], [0, -1], [1, 1], [-1, -1], [-1, 1], [1, -1]]
x_offset, y_offset = offset[np.random.randint(0, len(offset))]
if grid.in_boundary_by_grid_coordinate(
grid_x + x_offset * level, grid_y + y_offset * level
):
grid_x += x_offset * level
grid_y += y_offset * level
loc_list.append(
grid.locate_by_grid_coordinate(grid_x, grid_y)
if np.random.random() < prob
else loc
)
loc_list.append(locseq.d)
return LocSeq(seq=loc_list)
在下文中,该异常数据要用于在线推理,为了便于展示,仅选取100条数据作为测试数据。
# 对数据集进行扰动
rng = np.random.default_rng(seed=42)
# 扰动的范围在网格的二阶邻域内
level = 2
ratio = 0.1
point_prob = 0.3
tmp_loc_seqs = train_loc_seqs[:100]
num_train_trajs = len(tmp_loc_seqs)
print(f"num_train_trajs: {num_train_trajs}")
train_outlier_idx = rng.choice(
num_train_trajs, int(num_train_trajs * ratio), replace=False
)
print(f"num outliers in training set: {train_outlier_idx.shape[0]}")
for outlier_idx in tqdm(train_outlier_idx):
tmp_loc_seqs[outlier_idx] = perturb_locseq(
tmp_loc_seqs[outlier_idx], level=level, prob=point_prob
)
train_outliers_ds = LocSeqDataset.init_from_loc_seqs(tmp_loc_seqs)
num_train_trajs: 100
num outliers in training set: 10
2. 构建tokenizer#
接下来,使用TrajDL中提供的LocSeqTokenizer来构建该数据集的tokenizer,后续模型的训练会使用该tokenizer。
from trajdl.tokenizers.locseq import LocSeqTokenizer
tokenizer = LocSeqTokenizer.build(
loc_seqs=train_loc_seqs, count_start_end_token=False
)
3. 构建DataModule#
TrajDL中已经封装好GMVSAEDataModule,直接导入并且实例化即可:
from trajdl.datasets.modules.gmvsae import GMVSAEDataModule
data_module = GMVSAEDataModule(
tokenizer = tokenizer,
train_table = train_dataset,
val_table = val_dataset,
test_table=train_outliers_ds,
train_batch_size = 1,
val_batch_size = 1,
num_cpus = -1,
num_train_batches = 5,
num_val_batches = 5,
)
data_module.setup("fit")
train_dataloader = data_module.train_dataloader()
next(iter(train_dataloader))
/home/chaosong/miniconda3/lib/python3.12/multiprocessing/popen_fork.py:66: RuntimeWarning: Using fork() can cause Polars to deadlock in the child process.
In addition, using fork() with Python in general is a recipe for mysterious
deadlocks and crashes.
The most likely reason you are seeing this error is because you are using the
multiprocessing module on Linux, which uses fork() by default. This will be
fixed in Python 3.14. Until then, you want to use the "spawn" context instead.
See https://docs.pola.rs/user-guide/misc/multiprocessing/ for details.
If you really know what your doing, you can silence this warning with the warning module
or by setting POLARS_ALLOW_FORKING_THREAD=1.
self.pid = os.fork()
/home/chaosong/miniconda3/lib/python3.12/multiprocessing/popen_fork.py:66: RuntimeWarning: Using fork() can cause Polars to deadlock in the child process.
In addition, using fork() with Python in general is a recipe for mysterious
deadlocks and crashes.
The most likely reason you are seeing this error is because you are using the
multiprocessing module on Linux, which uses fork() by default. This will be
fixed in Python 3.14. Until then, you want to use the "spawn" context instead.
See https://docs.pola.rs/user-guide/misc/multiprocessing/ for details.
If you really know what your doing, you can silence this warning with the warning module
or by setting POLARS_ALLOW_FORKING_THREAD=1.
self.pid = os.fork()
GMVSAESample(encoder_seq=tensor([[ 0, 0, 0, 5, 5, 121, 126, 159, 256, 46, 255, 400,
373, 237, 249, 315, 769, 326, 314, 351, 299, 341, 393, 391,
499, 425, 476, 482, 632, 509, 379, 1240, 389, 1291, 861, 1005,
999, 814, 1051, 117, 875, 13]]), encoder_lengths=[42], decoder_seq=tensor([[2792, 0, 0, 0, 5, 5, 121, 126, 159, 256, 46, 255,
400, 373, 237, 249, 315, 769, 326, 314, 351, 299, 341, 393,
391, 499, 425, 476, 482, 632, 509, 379, 1240, 389, 1291, 861,
1005, 999, 814, 1051, 117, 875, 13]]), decoder_lengths=[43], decoder_labels=tensor([[ 0, 0, 0, 5, 5, 121, 126, 159, 256, 46, 255, 400,
373, 237, 249, 315, 769, 326, 314, 351, 299, 341, 393, 391,
499, 425, 476, 482, 632, 509, 379, 1240, 389, 1291, 861, 1005,
999, 814, 1051, 117, 875, 13, 2793]]), mask=tensor([[1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1.]]))
4. GM-VASE模型#
使用TrajDL中的GMVSAE快速构建模型:
from trajdl.algorithms.gmvsae import GMVSAE
from trajdl.common.enum import Mode
# 构建GM-VASE模型
model = GMVSAE(
tokenizer=tokenizer,
embedding_dim=32,
hidden_size=256,
mem_num=10,
mode=Mode.TRAIN,
)
model
Mode.TRAIN mode.
GMVSAE(
(emb): Embedding(2797, 32)
(encoder): Encoder(
(emb): Embedding(2797, 32)
(rnn): GRU(32, 256, batch_first=True)
)
(latent): LatentSpace(
(mean_linear): Linear(in_features=256, out_features=256, bias=True)
(log_var_linear): Linear(in_features=256, out_features=256, bias=True)
)
(decoder): Decoder(
(emb): Embedding(2797, 32)
(rnn): GRU(32, 256, batch_first=True)
)
(reconstruct_loss): SampledSoftmaxLoss(
(eval_loss): CrossEntropyLoss()
)
)
5. 训练#
执行如下代码即可快速开展模型的训练:
import lightning as L
trainer = L.Trainer(max_epochs=1, logger=False, enable_checkpointing=False)
trainer.fit(model, data_module)
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
You are using a CUDA device ('NVIDIA GeForce RTX 4060 Ti') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
| Name | Type | Params | Mode
----------------------------------------------------------------
0 | emb | Embedding | 89.5 K | train
1 | encoder | Encoder | 312 K | train
2 | latent | LatentSpace | 136 K | train
3 | decoder | Decoder | 312 K | train
4 | reconstruct_loss | SampledSoftmaxLoss | 718 K | train
----------------------------------------------------------------
1.4 M Trainable params
2.6 K Non-trainable params
1.4 M Total params
5.562 Total estimated model params size (MB)
10 Modules in train mode
0 Modules in eval mode
`Trainer.fit` stopped: `max_epochs=1` reached.
6. 推理#
在推理部分,模型输出的是序列的异常分数。在上面我们已经介绍了我们使用扰动的方式构造了含有异常的轨迹序列数据train_outliers_ds以及对应的异常轨迹的索引train_outliers_idx,下面来对这部分数据进行推理。
import torch
# 获取train_outliers_ds的数据
data_module.setup("test")
test_loader = data_module.test_dataloader()
# 将模型设置为EVAL模式,在该模式下,forward函数输出轨迹序列的异常分数
model.set_mode(Mode.EVAL)
with torch.inference_mode():
predictions = trainer.predict(model, test_loader)
predictions = np.concatenate([i.detach().cpu().numpy() for i in predictions])
predictions
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
Mode.EVAL mode.
array([0.5056756 , 0.48780894, 0.45937836, 0.51884574, 0.48738056,
0.5361278 , 0.50197923, 0.49948454, 0.48858064, 0.49966207,
0.49241906, 0.50314605, 0.48632613, 0.49210462, 0.50009894,
0.5087272 , 0.51010156, 0.49676833, 0.5033755 , 0.50435996,
0.50172037, 0.499437 , 0.5000553 , 0.49882516, 0.5046132 ,
0.5085661 , 0.4920005 , 0.5024364 , 0.4848987 , 0.5072679 ,
0.49509317, 0.51177996, 0.514622 , 0.49713686, 0.4988504 ,
0.5019136 , 0.49380106, 0.49552593, 0.49455398, 0.50194365,
0.5044374 , 0.5035478 , 0.500573 , 0.5014754 , 0.5024654 ,
0.51039124, 0.5018568 , 0.49521226, 0.5014934 , 0.5041815 ,
0.50977015, 0.50311315, 0.50553393, 0.49988326, 0.50382173,
0.49746317, 0.51104045, 0.50880027, 0.5119606 , 0.50240314,
0.50588894, 0.49681446, 0.51230514, 0.49476367, 0.50931555,
0.5072595 , 0.50708145, 0.5014838 , 0.5060284 , 0.50831366,
0.50880224, 0.5149926 , 0.49607876, 0.49567425, 0.49886242,
0.49787003, 0.506079 , 0.5001845 , 0.5151178 , 0.5077387 ,
0.51237315, 0.49668157, 0.5056278 , 0.49551168, 0.48525807,
0.5063259 , 0.50314623, 0.50889134, 0.4953721 , 0.50477344,
0.51141757, 0.51157475, 0.49359927, 0.48121113, 0.5064759 ,
0.5036067 , 0.4922097 , 0.49704704, 0.50313294, 0.49909118],
dtype=float32)
使用下面的代码来计算AUC的值:
from sklearn.metrics import auc, precision_recall_curve
# 先定义一个计算AUC的函数的函数
def auc_score(y_true, y_score):
# shape of precision and recall is (N - 1,)
# 这里用1减去,说明0是正常序列,1是异常序列,precision_recall_curve默认的pos_label是1
precision, recall, _ = precision_recall_curve(1 - y_true, 1 - y_score)
# float
return auc(recall, precision)
y_true = np.ones(shape=(len(train_outliers_ds)))
for idx in train_outlier_idx:
y_true[idx] = 0
od_agg = defaultdict(list)
for idx, loc_seq in tenumerate(
train_outliers_ds.iter_as_seqs(), total=len(train_outliers_ds)
):
od_agg[(loc_seq.o, loc_seq.d)].append(idx)
od_auc = {}
for od, traj_indices in tqdm(od_agg.items()):
od_true, od_pred = y_true[traj_indices], predictions[traj_indices]
if od_true.sum() < od_true.shape[0]:
od_auc[od] = auc_score(od_true, od_pred)
np.mean(list(od_auc.values()))
0.11849053724053724
Tip
本文介绍了GM-VSAE中数据预处理的方式。
本文介绍了使用
TrajDL搭建GM-VSAE模块并开展实验。