Sequence and Dataset#
Attention
在1.0.0版本发布前,当前文档的内容可能会发生变化。
TrajDL主要处理时空序列数据上的深度学习任务,因此在TrajDL里面对时空序列数据进行了抽象,分为位置序列和轨迹序列两种。
这两种序列都是表示一个实体的移动轨迹,只不过前者是位置id组成的序列,后者是轨迹点组成的序列。
TrajDL针对上述两种类型的数据,按单条序列和多条序列分别定义了序列和数据集的概念。
前者一般表示单条序列,用户可以将自己的数据集处理成多条这样的序列。
对于一个包含多条序列的数据集,我们一般使用后者数据集表示。
序列有一个抽象基类是BaseSeq,数据集有一个抽象基类BaseArrowDataset。这两项都是按位置序列和轨迹序列进行细分。#
Tip
未来TrajDL会考虑将两种序列的API完全合并,目前大部分接口是相同的。
1. Sequence#
对于单条序列,TrajDL提供了两种基础的序列类型用来表示位置序列和轨迹序列,分别是LocSeq和Trajectory。
举个例子:我们有A、B、C、D是4个不同的位置,一个人接连到达了这四个位置,那么这个人经过的位置序列可以表示为[A, B, C, D]。
TrajDL提供了一个简单的类型LocSeq可以承载这样的位置序列:
from trajdl.datasets import LocSeq
locseq = LocSeq(["A", "B", "C", "D"])
print(locseq)
LocSeq(size=4, loc_seq='A', 'B', 'C', ...)
对于轨迹数据,假设我们有一组经纬度点,表示一个人的实时GPS数据:
[-8.608833, 41.147586]
[-8.608707, 41.147685]
[-8.608473, 41.14773]
[-8.608284, 41.148054]
[-8.607708, 41.148999]
[-8.60742, 41.149719]
我们可以使用Trajectory类型来表示这个数据
from trajdl.datasets import Trajectory
traj = Trajectory([
[-8.608833, 41.147586],
[-8.608707, 41.147685],
[-8.608473, 41.14773],
[-8.608284, 41.148054],
[-8.607708, 41.148999],
[-8.60742, 41.149719]
])
print(traj)
Trajectory(entity_id=None, length=6)
Note
Trajectory还可以通过numpy创建:
import numpy as np
from trajdl.datasets import Trajectory
seq = np.array([
[-8.608833, 41.147586],
[-8.608707, 41.147685],
[-8.608473, 41.14773],
[-8.608284, 41.148054],
[-8.607708, 41.148999],
[-8.60742, 41.149719]
])
traj = Trajectory(seq)
print(traj)
效果是一样的。
这两种数据类型本质上除了输入的序列不同(一个是位置序列,一个是轨迹点序列),其他的属性和方法都是一致的。可以获取它的长度,它的id等信息
print(len(locseq))
print(len(traj))
print(locseq.entity_id)
print(traj.entity_id)
4
6
None
None
上述两种类型的数据都是单条序列的数据,可以表示一条位置序列、或者一条轨迹序列。它的优点是清晰、简单,主要是给用户提供一个数据管理的工具。
这两种类型的序列数据目前具有6种属性:
| Attribute | Type | Description |
|---|---|---|
seq |
List[str] or numpy.ndarray |
这一项表示序列的位置信息。LocSeq的这一项是List[str],表示位置序列;Trajectory的这一项是numpy.ndarray,表示经纬度序列,numpy.ndarray一定要是2列,第一列是经度,第二列是纬度。 |
entity_id |
str |
id项,这一项用户可以根据自己的需求设定,可以设定为序列的id,或者实体的id。 |
ts_seq |
List[int] |
这一项表示每个位置的时间戳,可以是到达时的时间戳,也可以是离开时的时间戳,暂时没有区分,时间戳用int类型,具体的单位由用户自行决定。 |
ts_delta |
List[float] |
表示两个连续的位置之间的时间戳的差值。 |
dis_delta |
List[float] |
表示两个连续的位置之间的位移差值。 |
start_ts |
int |
表示这条序列的起始时间戳,具体的单位由用户自行决定。 |
其中只有第一项是要求用户一定要传入的,其他项都是可选的项,可以直接通过.entity_id这样的属性进行获取。
Note
目前这样的序列格式还不能满足用户灵活自定义额外的特征,比如一些连续性特征和离散型特征,会在未来版本提供支持。
2. Dataset#
对于多条序列,TrajDL提供了基于Arrow的数据管理方式,称为BaseArrowDataset,细分为LocSeqDataset和TrajectoryDataset。
这两种数据集是为了管理多条序列使用,它的主要特性是:
- 高性能
基于列存的
Arrow数据类型提供了高效的存储与查询性能。- 零拷贝
基于
Arrow的数据集在取数的时候可以避免数据拷贝,而且基于Arrow的数据集在Pytorch dataloader处于多进程时的状态下也可以避免数据拷贝。- 扩展性好
基于
Arrow的数据集可以与Polars、Pandas、Numpy等成熟工具以极低的损耗无缝转换。用户可以基于Polars快速进行数据处理,然后一键导入成BaseArrowDataset,而且BaseArrowDataset也可以快速转换为Polars DataFrame或Pandas DataFrame。而且Arrow与Parquet的交互也非常简单,很自然就可以扩展到分布式计算框架Spark上,这让大数据平台处理后的数据集无缝导入到TrajDL成为可能。
2.1. LocSeqDataset#
LocSeqDataset与LocSeq是一一对应的,也就是说LocSeq具有什么样的属性,那么LocSeqDataset就具有什么属性。我们可以通过下面的例子展示如何快速构建一个LocSeqDataset。
from trajdl.datasets import LocSeqDataset
seq1 = LocSeq(["A", "B", "C"])
seq2 = LocSeq(["C", "D"])
ds = LocSeqDataset.init_from_loc_seqs([seq1, seq2])
print(ds)
print(len(ds))
LocSeqDataset(size=2)
2
我们可以通过to_polars方法快速将其转换为Polars DataFrame进行其他操作。
df = ds.to_polars()
df.head()
| seq | entity_id | ts_seq | ts_delta | dis_delta | start_ts |
|---|---|---|---|---|---|
| list[str] | str | list[i64] | list[f32] | list[f32] | i64 |
| ["A", "B", "C"] | null | null | null | null | null |
| ["C", "D"] | null | null | null | null | null |
import polars as pl
# 计算一共有多少个不同的位置
df.select(pl.col("seq").explode().unique()).count().item()
4
由于LocSeqDataset的底层使用的是Arrow类型的数据结构,因此也是可以直接从Arrow层面进行操作的。
ds.seq
<pyarrow.lib.ChunkedArray object at 0x7fbc500f5a80>
[
[
[
"A",
"B",
"C"
],
[
"C",
"D"
]
]
]
import pyarrow as pa
# 计算每条序列的长度
pa.compute.list_value_length(ds.seq).to_numpy()
array([3, 2])
我们可以看一下LocSeqDataset的schema
ds.schema()
seq: large_list<item: large_string>
child 0, item: large_string
entity_id: large_string
ts_seq: large_list<item: int64>
child 0, item: int64
ts_delta: large_list<item: float>
child 0, item: float
dis_delta: large_list<item: float>
child 0, item: float
start_ts: int64
对于索引操作,LocSeqDataset在设计上是返回一个自身的一个视图,并不会copy一份新的数据出来。而且索引操作的返回结果仍然是一个LocSeqDataset类型。
Note
为什么索引的返回结果仍然是LocSeqDataset?
因为LocSeqDataset与TrajectoryDataset是构建在pyArrow.Table类型上的,Arrow数据的列存性质使得Arrow数据只有列的概念,没有行的概念,因此按行取数的过程实际是遍历所有的列,根据行的下标进行取数,这样得到的数据如果使用tuple或者list表示都不合理,但是是可以使用原来的schema将数据重新组织起来的,并且由于零拷贝的特性,这一点不会有什么损耗。
这样的特性也同样适用于TrajectoryDataset,因为这个特性是抽象基类BaseArrowDataset提供的。
ds[0], ds[1]
(LocSeqDataset(size=1), LocSeqDataset(size=1))
ds[0].seq, ds[1].seq
(<pyarrow.lib.ChunkedArray object at 0x7fbc445b0be0>
[
[
[
"A",
"B",
"C"
]
]
],
<pyarrow.lib.ChunkedArray object at 0x7fbc445b0b20>
[
[
[
"C",
"D"
]
]
])
2.2. TrajectoryDataset#
TrajectoryDataset和LocSeqDataset基本是一样的,唯一的区别在于seq的类型,我们可以创建一个TrajectoryDataset试试。
import numpy as np
from trajdl.datasets import Trajectory, TrajectoryDataset
traj1 = Trajectory(seq=np.random.uniform(size=(10, 2)))
traj2 = Trajectory(seq=np.random.uniform(size=(15, 2)))
traj3 = Trajectory(seq=np.random.uniform(size=(5, 2)))
ds = TrajectoryDataset.init_from_trajectories([traj1, traj2, traj3])
print(ds)
TrajectoryDataset(size=3)
# 看一下schema
print(ds.schema())
seq: large_list<item: fixed_size_list<item: double>[2]>
child 0, item: fixed_size_list<item: double>[2]
child 0, item: double
entity_id: large_string
ts_seq: large_list<item: int64>
child 0, item: int64
ts_delta: large_list<item: float>
child 0, item: float
dis_delta: large_list<item: float>
child 0, item: float
start_ts: int64
可以看到,seq的类型与LocSeqDataset的seq的类型是不同的。
下面是这两种类型的Schema的对比:
| Attribute | LocSeqDataset | TrajectoryDataset |
|---|---|---|
seq |
pa.large_list(pa.large_string()) |
pa.large_list(pa.list_(pa.float64(), 2)) |
entity_id |
pa.large_string() |
pa.large_string() |
ts_seq |
pa.large_list(pa.int64()) |
pa.large_list(pa.int64()) |
ts_delta |
pa.large_list(pa.float32()) |
pa.large_list(pa.float32()) |
dis_delta |
pa.large_list(pa.float32()) |
pa.large_list(pa.float32()) |
start_ts |
pa.int64() |
pa.int64() |
只有seq字段的类型是不同的。
2.3. 数据集的持久化#
BaseArrowDataset是支持存储与加载的,通过save方法和init_from_parquet方法。
调用save方法后数据集会以parquet的格式存储到磁盘上。
调用init_from_parquet之后,数据集会从parquet文件重新加载。
help(LocSeqDataset.save)
Help on function save in module trajdl.datasets.arrow.abstract:
save(self, path: Union[str, pathlib.Path]) -> None
Save the dataset to a Parquet file.
Parameters
----------
path : Union[str, Path]
The file path to save the dataset.
Notes
-----
If the provided path does not end with '.parquet', it will be appended automatically.
help(LocSeqDataset.init_from_parquet)
Help on method init_from_parquet in module trajdl.datasets.arrow.abstract:
init_from_parquet(path: str) -> 'BaseArrowDataset' class method of trajdl.datasets.arrow.locseq.LocSeqDataset
Initialize the dataset from a Parquet file.
Parameters
----------
path : str
The file path to the Parquet file.
Returns
-------
BaseArrowDataset
An instance of the dataset initialized from the Parquet file.
Notes
-----
Due to differences in handling List[List[Float32]] across different frameworks,
it is recommended to read the file using PyArrow and try to convert types where necessary.
我们演示一下如何实现数据的存储与加载:
import os
import tempfile
with tempfile.TemporaryDirectory() as tmp_folder:
path = os.path.join(tmp_folder, "test_ds.parquet")
print("dataset will be saved as:", path)
ds.save(path)
new_ds = TrajectoryDataset.init_from_parquet(path)
print(new_ds)
dataset will be saved as: /tmp/tmp9378dmin/test_ds.parquet
TrajectoryDataset(size=3)
再数据集处理好后,使用BaseArrowDataset存储起来对后续的训练过程很有帮助,如何在训练的时候使用BaseArrowDataset需要看后续文档。
Tip
本文主要介绍TrajDL里面的序列与数据集的概念。前者用于表示单条序列,后者用于表示多条序列。
TrajDL内的大量操作会围绕这些序列和数据集展开,如轨迹切分、数据扰动,Tokenization等。
数据集在TrajDL的模型训练中发挥很大的作用。
基于Arrow的BaseArrowDataset还有很多用法会在后续的高级文档内展开。