T2VEC#

在本篇文档中,我们将会使用TrajDL来实现T2VEC算法。我们会以代码的形式介绍如下内容:

  • Porto轨迹数据集的加载和预处理

  • 原始数据集转化为TrajDL标准数据集

  • Tokenizer构建

  • T2VEC模型训练和推理

1.  Porto轨迹数据加载和预处理#

[Porto数据集]是波尔多市出租车轨迹数据,详细介绍见Open Source Datasets。在这里我们将使用TrajDLPortoDataset接口来加载。

import polars as pl
from trajdl.datasets.open_source import PortoDataset

# 以polar.DataFrame的数据类型返回,为了便于展示,限制10000条数据
porto = PortoDataset()

original_trajs = (
    porto.load(return_as="pl")
    .filter(pl.col("MISSING_DATA") == False)
    .sort("TIMESTAMP")["POLYLINE"]
    .limit(10000)
)

original_trajs.head(1)
load dataset: porto
shape: (1,)
POLYLINE
list[array[f64, 2]]
[[-8.610291, 41.140746], [-8.6103, 41.140755], … [-8.60589, 41.145345]]

Tip

TrajDL还提供可扩展的OpenSourceDataset的接口来支持用户自行配置和加载开源数据集。

2.  TrajectoryDataset的构建#

加载好Porto数据集之后,还无法直接用于模型的输入,需要将其转化为TrajDL中的标准轨迹点序列数据Trajectory,下面将介绍如何使用TrajDL中的API进行转换。

from tqdm.contrib import tenumerate
from trajdl.datasets import Trajectory

all_trajs = [
    Trajectory(traj_pl.to_numpy(), entity_id=str(idx))
    for idx, traj_pl in tenumerate(original_trajs, desc="transform trajectorys")
]
print(all_trajs[0], all_trajs[0].seq[:10])
Trajectory(entity_id=0, length=25) [[-8.610291 41.140746]
 [-8.6103   41.140755]
 [-8.610309 41.14089 ]
 [-8.613657 41.141358]
 [-8.614602 41.141484]
 [-8.614242 41.142618]
 [-8.61363  41.143239]
 [-8.612883 41.143761]
 [-8.612208 41.144238]
 [-8.611542 41.144724]]

接下来,我们将Trajectory切分为训练集、验证集和测试集部分。我们设定训练集800条轨迹,验证集和测试集分别是100条轨迹,序列最短长度是20,最大长度是100。

from tqdm.notebook import trange

# 定义超参数
NUM_TRAIN, NUM_VAL, TEST_START_IDX, NUM_TEST = 800, 100, 900, 100
MIN_LENGTH, MAX_LENGTH = 20, 100
MIN_LENGTH_TEST, MAX_LENGTH_TEST = 60, 200

all_trajs = all_trajs[:1000]

train_traj, val_traj = [], []
for idx in trange(NUM_TRAIN + NUM_VAL, desc="construct train and val set"):
    traj = all_trajs[idx]
    if MIN_LENGTH <= len(traj) <= MAX_LENGTH:
        if idx <= NUM_TRAIN:
            train_traj.append(traj)
        else:
            val_traj.append(traj)

test_traj = []
for idx in trange(TEST_START_IDX, len(all_trajs), desc="construct test set"):
    traj = all_trajs[idx]
    if len(test_traj) >= NUM_TEST:
        break
    if MIN_LENGTH_TEST <= len(traj) <= MAX_LENGTH_TEST:
        test_traj.append(traj)

print(len(train_traj), len(val_traj), len(test_traj))
661 83 29

Tip

如果数据集过于庞大,亦可以将其转换为TrajectoryDatset并使用save函数来将数据集保存在磁盘中。

2.1.  网格系统#

对于上文得到的TrajectoryDataset,其中数据的基本形式是轨迹的经纬度点序列,我们需要将其离散化转换为类似于自然语言处理领域的token,后续作为Embedding Layer的输入来获取词向量嵌入表征。

因此,在TrajDL中我们提供了GridSystem网格系统的API,读者使用该API可直接建立一套用于经纬度点转网格的系统,将连续的经纬度点离散化为网格(网格即为token)。另外,GridSystem底层封装了C++编写的trajdl_cpp工具来优化计算,详细的介绍参见网格系统

波尔多市的地图如图1所示,我们按照\(100m \times 100m\)为一个网格将地图进行切分,以此构建网格系统。

Porto Map

图1. 波尔多市地图(图源:Google Map)#

先创建网格系统在波尔多市的边界

from trajdl import trajdl_cpp

# 基于经纬度系统的区域边界
boundary_original = trajdl_cpp.RectangleBoundary(
    min_x=-8.690261,
    min_y=41.140092,
    max_x=-8.549155,
    max_y=41.185969,
)
# 转换为基于平面坐标系的区域边界
boundary = boundary_original.to_web_mercator()
print(f"boundary_original: {boundary_original}")
print(f"boundary: {boundary}")
boundary_original: RectangleBoundary(min_x=-8.690261, min_y=41.140092, max_x=-8.549155, max_y=41.185969)
boundary: RectangleBoundary(min_x=-967395.429381, min_y=5033027.213480, max_x=-951687.581313, max_y=5039810.867670)

通过打印的boundary_originalboundary两个变量可以看出这里存在两个坐标系统,一种是基于原始经纬度的坐标系统,另外一种是墨卡托投影系统,将原始的三维地球表面映射到二维平面系统。在T2VEC中使用的是墨卡托投影系统,该系统中数值的单位是”米“,在赤道处的比例是\(1:1\)(即墨卡托系统里赤道处的1米对应于1米的实际距离)。更为详细的介绍参见Grid

TrajDL中提供了将经纬度坐标转换为平面坐标的API:

print(trajdl_cpp.convert_gps_to_webmercator(-8.690261, 41.140092))
print(trajdl_cpp.convert_gps_to_webmercator(0, 0))
WebMercatorCoord(x=-967395.4293806, y=5033027.2134798)
WebMercatorCoord(x=0.0000000, y=-0.0000000)

可以看到经纬度坐标\((0,0)\)对应墨卡托投影系统中的坐标原点\((0,0)\)。下面我们在墨卡托系统中对波尔多市的地图进行网格系统的构建,一个网格单元的大小是\(100m \times 100m\)

from trajdl.grid import SimpleGridSystem

# 网格的划分距离为100m
grid_width, grid_height = 100, 100

# 创建网格系统
grid = SimpleGridSystem(
    # 使用波尔多市的左下角点和右上角点来构建和切分网格系统
    boundary,
    step_x=grid_width,
    step_y=grid_height,
)
print(len(grid), grid.num_x_grids, grid.num_y_grids)
10744 158 68

给定一个波尔多市的经纬度,即可映射到SimpleGridSystem中的一个网格token

# 转墨卡托坐标系统
web_mercator_location = trajdl_cpp.convert_gps_to_webmercator(-8.610291, 41.140746)
# 转网格id
x, y = web_mercator_location.x, web_mercator_location.y
grid_id = grid.locate_unsafe(x, y)

print(web_mercator_location, grid_id)
WebMercatorCoord(x=-958493.2097019, y=5033123.8845711) 89

3.  Tokenizer的构建#

在构建完GridSystem之后,我们要开始构建Tokenizer。在此处,Tokenizer的作用是给定一个输入的经纬度点(对应自然语言处理领域中一个word),使用Tokenizer将其转换为一个token,该token的表现形式是一个int类型的整数,作为Embedding Layer的输入来得到对应的词嵌入表示(word embedding)。

TrajDL中,我们提供了T2VECTokenizer的API来构建Tokenizer

import os

from trajdl.tokenizers.t2vec import T2VECTokenizer

output_folder = "./output/t2vec"
os.makedirs(output_folder, exist_ok=True)

tokenizer = T2VECTokenizer.build(
    grid=grid,
    boundary=boundary_original,
    trajectories=all_trajs,
    max_vocab_size=40000,  # 词表支持的词元上限,排序逻辑是词元在数据集中的频率
    min_freq=100,  # 被命中至少min_freq次的网格称之为`hot cell`,tokenizer中仅保留`hot cell`
    with_kd_tree=True,
)
tokenizer.save_pretrained(os.path.join(output_folder, "tokenizer.pkl"))
print("num vocab: ", len(tokenizer))
num vocab:  23

在这里,解释一下min_freqwith_kd_tree这两个参数的意义:

  • Tokenizer构建的词表越大,那么其后续的计算量越大复杂度越高,所以希望能够用最小的词表来表示整个数据集,所以在此处,就有min_freq这个参数来限制,当某个词元出现的频率小于min_freq的时候,Tokenizer会剔除掉该词元,以此方式来平衡词表的大小和词表的信息量。

  • 基于上一点,如果轨迹序列的经纬度点命中了一些被剔除掉的网格cell,那么Tokenizer无法将该轨迹点转换为token。此时,我们提供了KDTree搜索的方法,将所有的hot cell构建KDTree,当轨迹点没有命中hot cell时,则会通过KDTree来搜索与该轨迹点最相近的hot cell作为其近似token

Note

T2VECTokenizer中传入的boundary变量是基于原始经纬度的boundary_original

4.  K近邻网格#

T2VEC中,作者为了计算基于位置距离加权的损失函数,还构建了两个矩阵,分别是当前网格的10个近邻网格索引的矩阵 \(V\) ( \(N \times 10\) )和当前网格的10个近邻网格距离的矩阵 \(D\) ( \(N \times 10\) )。在tokenizer中也集成了这两个矩阵的构建函数k_nearest_hot_loc

from trajdl.common.enum import TokenEnum

k = 10  # 10个最近的网格
SPECIAL_TOKENS = TokenEnum.values()
vocab_list = tokenizer.vocab.keys()  # 获取全部字典的Token
loc_list, idx_list = zip(
    *((loc, tokenizer.loc2idx(loc)) for loc in vocab_list if loc not in SPECIAL_TOKENS)
)  # 剔除special tokens字
import numpy as np

dists, locations = tokenizer.k_nearest_hot_loc(
    loc_list, k=k
)  # 获取k个最近的网格以及对应的距离

# (num_locations, k),索引矩阵
V = np.zeros(shape=(len(vocab_list), k), dtype=np.int64)

# (num_locations, k),距离矩阵
D = np.zeros_like(V, dtype=np.float32)
D[idx_list, :] = dists

# 对于SPECIAL TOKENS,最近的token设定为自己
for token in SPECIAL_TOKENS:
    idx = tokenizer.loc2idx(token)
    V[idx] = idx

for line_idx, loc_list in zip(idx_list, locations):
    V[line_idx] = [tokenizer.loc2idx(loc) for loc in loc_list]

np.save(os.path.join(output_folder, "knn_indices.npy"), V)  # 保存k近邻网格的索引
np.save(os.path.join(output_folder, "knn_distances.npy"), D)  # 保留k近邻网格的距离

5.  训练和推理#

TrajDL中训练环节是基于Lightning框架编写的,典型的模块比如数据模块T2VECDataModule是继承自LightningDataModule、模型模块T2VEC是基于LightningModule模块编写的,用户可以通过简单几行代码,调用TrajDL中的API进行训练和推理。

Tip

另外,Lightning框架还提供了命令行和配置文件的方式进行模型训练与验证。读者如果想对Lightning的命令行与配置文件有进一步的了解,可以阅读LightningCLI

此处,我们仅展示使用API的方式来进行T2VEC模型的训练和推理。

首先,我们使用TrajDL导入数据模块T2VECDataModuleV2,在该模块中的collate_function中实现了downsamplingdistortion两种样本增强的方式。

from trajdl.datasets import TrajectoryDataset
from trajdl.datasets.modules.t2vec import T2VECDataModuleV2

# 将Trajectory转换为TrajectoryDataset
train_dataset = TrajectoryDataset.init_from_trajectories(train_traj)
val_dataset = TrajectoryDataset.init_from_trajectories(val_traj)
test_dataset = TrajectoryDataset.init_from_trajectories(test_traj)

data_module = T2VECDataModuleV2(
    tokenizer=tokenizer,
    train_table=train_dataset,
    val_table=val_dataset,
    test_table=test_dataset,
    train_batch_size=4,
    val_batch_size=4,
    num_train_batches=10,
    num_val_batches=10,
    num_cpus=-1,
    k=2,
)

Note

  1. 在上文中,我们使用了T2VECDataModuleV2来构建数据模块,该模块在collate_function中实现了样本下采样和扭曲的变换方式,在训练迭代的过程中,会在线生成对比的样本。此外,在TrajDL中还存在一种样本构建的方式,即离线样本构建,需要在训练之前将样本构建完毕存储到磁盘中,在训练时再从磁盘中加载样本到内存中,T2VECDataModule中的初始化接受的src_pathtrg_path即为离线构建的对比样本在磁盘中的路径。

  2. 此处为了演示TrajDL中API的使用,采用了在线样本构建的T2VECDataModuleV2,但是在Benchmark中,为了复现论文实验效果,使用了离线样本构建的T2VECDataModule,具体参见T2VEC的Benchmark代码即可。

执行如下代码可以得到训练数据集中第一个batch的数据

data_module.setup("fit")
train_dataloader = data_module.train_dataloader()
next(iter(train_dataloader))
/home/chaosong/miniconda3/lib/python3.12/multiprocessing/popen_fork.py:66: RuntimeWarning: Using fork() can cause Polars to deadlock in the child process.
In addition, using fork() with Python in general is a recipe for mysterious
deadlocks and crashes.

The most likely reason you are seeing this error is because you are using the
multiprocessing module on Linux, which uses fork() by default. This will be
fixed in Python 3.14. Until then, you want to use the "spawn" context instead.

See https://docs.pola.rs/user-guide/misc/multiprocessing/ for details.

If you really know what your doing, you can silence this warning with the warning module
or by setting POLARS_ALLOW_FORKING_THREAD=1.

  self.pid = os.fork()
/home/chaosong/miniconda3/lib/python3.12/multiprocessing/popen_fork.py:66: RuntimeWarning: Using fork() can cause Polars to deadlock in the child process.
In addition, using fork() with Python in general is a recipe for mysterious
deadlocks and crashes.

The most likely reason you are seeing this error is because you are using the
multiprocessing module on Linux, which uses fork() by default. This will be
fixed in Python 3.14. Until then, you want to use the "spawn" context instead.

See https://docs.pola.rs/user-guide/misc/multiprocessing/ for details.

If you really know what your doing, you can silence this warning with the warning module
or by setting POLARS_ALLOW_FORKING_THREAD=1.

  self.pid = os.fork()
T2VECSample(src=tensor([[ 9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,
          9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  8,  8, 10, 10, 10,
         14, 14, 14, 14, 14, 14,  2],
        [ 9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,
          9,  9,  9,  8, 10, 10, 10, 14, 14, 14, 22, 22, 22, 22, 22, 22, 22, 22,
         22, 22, 22, 22, 22, 22, 22],
        [ 9,  9,  9,  9,  9, 13, 13, 10, 10, 14, 14, 14,  2,  7,  7, 17, 17, 17,
         17, 17,  1,  1, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22,
         22, 22, 22, 22, 22, 22, 22],
        [ 9,  9,  9,  9, 13, 14,  2,  2,  2,  4,  4,  7, 17, 17, 17, 17,  1, 22,
         22, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22,
         22, 22, 22, 22, 22, 22, 22]]), lengths=(43, 28, 22, 17), target=tensor([[18,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,
          9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,
          9,  8,  8, 10, 10, 10, 14, 14, 14, 14, 14, 14, 19],
        [18,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,
          9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,  9,
          9,  8,  8, 10, 10, 10, 14, 14, 14, 14, 14, 14, 19],
        [18,  9,  9,  9,  9,  9,  9,  9,  9,  9,  8, 13, 13, 10, 10, 10, 10, 10,
         14, 14, 14, 14, 14,  2,  2,  2,  4,  4,  4,  4,  7,  7, 17, 17, 17, 17,
         17, 17, 17, 17,  1,  1,  1, 19, 22, 22, 22, 22, 22],
        [18,  9,  9,  9,  9,  9,  9,  9,  9,  9,  8, 13, 13, 10, 10, 10, 10, 10,
         14, 14, 14, 14, 14,  2,  2,  2,  4,  4,  4,  4,  7,  7, 17, 17, 17, 17,
         17, 17, 17, 17,  1,  1,  1, 19, 22, 22, 22, 22, 22]]))

Note

此处的T2VECDataModuleV2继承自BaseTrajectoryDataModule,该模块可以直接接受TrajectoryDataset作为初始化数据集。

使用TrajDL中的API可以直接导入模型:

from trajdl.algorithms.t2vec import T2VEC

# 构建模型,我们使用默认参数,用户也可以根据文档修改模型的类型,比如使用GRU、LSTM等编码器
model = T2VEC(
    embedding_dim=256,
    hidden_size=256,
    tokenizer=tokenizer,
    knn_indices_path=os.path.join(output_folder, "knn_indices.npy"),
    knn_distances_path=os.path.join(output_folder, "knn_distances.npy"),
)
model
T2VEC(
  (embedding): SimpleEmbedding(
    (embedding): Embedding(23, 256, padding_idx=22)
  )
  (encoder): T2VECEncoder(
    (emb): SimpleEmbedding(
      (embedding): Embedding(23, 256, padding_idx=22)
    )
    (encoder): GRU(256, 256, batch_first=True)
  )
  (decoder): DecoderWithAttention(
    (embedding_layer): SimpleEmbedding(
      (embedding): Embedding(23, 256, padding_idx=22)
    )
    (rnn): StackingGRU(
      (grus): ModuleList(
        (0): GRUCell(256, 256)
      )
      (dropout): Dropout(p=0.0, inplace=False)
    )
    (attention): GlobalAttention(
      (L1): Linear(in_features=256, out_features=256, bias=False)
      (L2): Linear(in_features=512, out_features=256, bias=False)
      (softmax): Softmax(dim=1)
      (tanh): Tanh()
    )
    (dropout): Dropout(p=0.0, inplace=False)
  )
  (projector): Sequential(
    (0): Linear(in_features=256, out_features=23, bias=True)
    (1): LogSoftmax(dim=1)
  )
  (loss_fn): KLDivLoss()
)

使用PytorchLightning中的Trainer,两行代码即可开始模型的训练,训练的结果在文件夹lightning_logs中。为了展示训练的流程,在此处的代码中仅训练一轮,在一轮训练结束执行,会在验证数据集val_dataset上进行验证。

import lightning as L

trainer = L.Trainer(max_epochs=1, logger=False, enable_checkpointing=False)
trainer.fit(model, data_module)
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
You are using a CUDA device ('NVIDIA GeForce RTX 4060 Ti') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
  | Name      | Type                 | Params | Mode 
-----------------------------------------------------------
0 | embedding | SimpleEmbedding      | 5.9 K  | train
1 | encoder   | T2VECEncoder         | 400 K  | train
2 | decoder   | DecoderWithAttention | 597 K  | train
3 | projector | Sequential           | 5.9 K  | train
4 | loss_fn   | KLDivLoss            | 0      | train
-----------------------------------------------------------
997 K     Trainable params
460       Non-trainable params
998 K     Total params
3.993     Total estimated model params size (MB)
19        Modules in train mode
0         Modules in eval mode
`Trainer.fit` stopped: `max_epochs=1` reached.

模型训练完成之后,即可开始在test_dataset上的推理,推理的代码如下:

import torch

# 先获取test_dataloader
data_module.setup("test")
test_dataloader = data_module.test_dataloader()
test_sample_1 = next(iter(test_dataloader))
test_sample_2 = next(iter(test_dataloader))

model.eval()

with torch.inference_mode():
    vec_1 = model(test_sample_1)
    vec_2 = model(test_sample_2)

batch_vec_1, batch_vec_2 = vec_1.detach().cpu().numpy(), vec_2.detach().cpu().numpy()
print(batch_vec_1.shape, batch_vec_2.shape)
(4, 256) (4, 256)

计算两个batch中样本两两之间的轨迹相似度:

from sklearn.metrics.pairwise import euclidean_distances

print(euclidean_distances(batch_vec_1, batch_vec_2))
[[ 9.431693    9.25107     9.196043    9.048125  ]
 [ 9.406215    9.225336    9.184401    9.036469  ]
 [ 3.198747    2.6682477  10.128413   10.006379  ]
 [ 0.7574723   0.27536792 10.713933   10.591955  ]]

Note

  1. 此处的API调用仅展示如何使用TrajDL来快速开展T2VEC模型的训练和推理,论文中的消融实验的设计并非这样直接对测试数据进行推理后计算Embedding的相似度。

  2. 关于完整的实验,以及使用命令行和YAML配置文件的方式开展模型的训练和部署,可以参见 Benchmark中的内容,其中已经给出用于T2VEC复现的完整的配置文件和执行脚本,包括数据预处理、词嵌入模型预训练、模型的训练&部署、论文实验的复现(在代码中也给出了诸多灵活使用PytorchLighting的示例代码供以学习参考)。

Tip

  1. 介绍了如果使用TrajDL中的API来进行T2VEC中数据集、tokenizer和增强样本的构建。

  2. 基于TrajDL快速开展T2VEC的训练和推理。