Sequence and Dataset#

Attention

在1.0.0版本发布前,当前文档的内容可能会发生变化。

TrajDL主要处理时空序列数据上的深度学习任务,因此在TrajDL里面对时空序列数据进行了抽象,分为位置序列轨迹序列两种。

这两种序列都是表示一个实体的移动轨迹,只不过前者是位置id组成的序列,后者是轨迹点组成的序列。

TrajDL针对上述两种类型的数据,按单条序列和多条序列分别定义了序列数据集的概念。

  • 前者一般表示单条序列,用户可以将自己的数据集处理成多条这样的序列。

  • 对于一个包含多条序列的数据集,我们一般使用后者数据集表示。

序列与数据集的关系图

序列有一个抽象基类是BaseSeq,数据集有一个抽象基类BaseArrowDataset。这两项都是按位置序列轨迹序列进行细分。#

Tip

未来TrajDL会考虑将两种序列的API完全合并,目前大部分接口是相同的。

1.  Sequence#

对于单条序列,TrajDL提供了两种基础的序列类型用来表示位置序列轨迹序列,分别是LocSeqTrajectory

举个例子:我们有ABCD是4个不同的位置,一个人接连到达了这四个位置,那么这个人经过的位置序列可以表示为[A, B, C, D]

TrajDL提供了一个简单的类型LocSeq可以承载这样的位置序列:

from trajdl.datasets import LocSeq

locseq = LocSeq(["A", "B", "C", "D"])
print(locseq)
LocSeq(size=4, loc_seq='A', 'B', 'C', ...)

对于轨迹数据,假设我们有一组经纬度点,表示一个人的实时GPS数据:

[-8.608833, 41.147586]
[-8.608707, 41.147685]
[-8.608473, 41.14773]
[-8.608284, 41.148054]
[-8.607708, 41.148999]
[-8.60742, 41.149719]

我们可以使用Trajectory类型来表示这个数据

from trajdl.datasets import Trajectory

traj = Trajectory([
    [-8.608833, 41.147586],
    [-8.608707, 41.147685],
    [-8.608473, 41.14773],
    [-8.608284, 41.148054],
    [-8.607708, 41.148999],
    [-8.60742, 41.149719]
])
print(traj)
Trajectory(entity_id=None, length=6)

Note

Trajectory还可以通过numpy创建:

import numpy as np
from trajdl.datasets import Trajectory

seq = np.array([
    [-8.608833, 41.147586],
    [-8.608707, 41.147685],
    [-8.608473, 41.14773],
    [-8.608284, 41.148054],
    [-8.607708, 41.148999],
    [-8.60742, 41.149719]
])

traj = Trajectory(seq)
print(traj)

效果是一样的。

这两种数据类型本质上除了输入的序列不同(一个是位置序列,一个是轨迹点序列),其他的属性和方法都是一致的。可以获取它的长度,它的id等信息

print(len(locseq))
print(len(traj))

print(locseq.entity_id)
print(traj.entity_id)
4
6
None
None

上述两种类型的数据都是单条序列的数据,可以表示一条位置序列、或者一条轨迹序列。它的优点是清晰、简单,主要是给用户提供一个数据管理的工具。

这两种类型的序列数据目前具有6种属性:

Attribute Type Description
seq List[str] or numpy.ndarray 这一项表示序列的位置信息。LocSeq的这一项是List[str],表示位置序列;Trajectory的这一项是numpy.ndarray,表示经纬度序列,numpy.ndarray一定要是2列,第一列是经度,第二列是纬度。
entity_id str id项,这一项用户可以根据自己的需求设定,可以设定为序列的id,或者实体的id。
ts_seq List[int] 这一项表示每个位置的时间戳,可以是到达时的时间戳,也可以是离开时的时间戳,暂时没有区分,时间戳用int类型,具体的单位由用户自行决定。
ts_delta List[float] 表示两个连续的位置之间的时间戳的差值。
dis_delta List[float] 表示两个连续的位置之间的位移差值。
start_ts int 表示这条序列的起始时间戳,具体的单位由用户自行决定。

其中只有第一项是要求用户一定要传入的,其他项都是可选的项,可以直接通过.entity_id这样的属性进行获取。

Note

目前这样的序列格式还不能满足用户灵活自定义额外的特征,比如一些连续性特征和离散型特征,会在未来版本提供支持。

2.  Dataset#

对于多条序列,TrajDL提供了基于Arrow的数据管理方式,称为BaseArrowDataset,细分为LocSeqDatasetTrajectoryDataset

这两种数据集是为了管理多条序列使用,它的主要特性是:

高性能

基于列存的Arrow数据类型提供了高效的存储与查询性能。

零拷贝

基于Arrow的数据集在取数的时候可以避免数据拷贝,而且基于Arrow的数据集在Pytorch dataloader处于多进程时的状态下也可以避免数据拷贝。

扩展性好

基于Arrow的数据集可以与PolarsPandasNumpy等成熟工具以极低的损耗无缝转换。用户可以基于Polars快速进行数据处理,然后一键导入成BaseArrowDataset,而且BaseArrowDataset也可以快速转换为Polars DataFramePandas DataFrame。而且ArrowParquet的交互也非常简单,很自然就可以扩展到分布式计算框架Spark上,这让大数据平台处理后的数据集无缝导入到TrajDL成为可能。

2.1.  LocSeqDataset#

LocSeqDatasetLocSeq是一一对应的,也就是说LocSeq具有什么样的属性,那么LocSeqDataset就具有什么属性。我们可以通过下面的例子展示如何快速构建一个LocSeqDataset

from trajdl.datasets import LocSeqDataset

seq1 = LocSeq(["A", "B", "C"])
seq2 = LocSeq(["C", "D"])

ds = LocSeqDataset.init_from_loc_seqs([seq1, seq2])
print(ds)
print(len(ds))
LocSeqDataset(size=2)
2

我们可以通过to_polars方法快速将其转换为Polars DataFrame进行其他操作。

df = ds.to_polars()
df.head()
shape: (2, 6)
seqentity_idts_seqts_deltadis_deltastart_ts
list[str]strlist[i64]list[f32]list[f32]i64
["A", "B", "C"]nullnullnullnullnull
["C", "D"]nullnullnullnullnull
import polars as pl

# 计算一共有多少个不同的位置
df.select(pl.col("seq").explode().unique()).count().item()
4

由于LocSeqDataset的底层使用的是Arrow类型的数据结构,因此也是可以直接从Arrow层面进行操作的。

ds.seq
<pyarrow.lib.ChunkedArray object at 0x7fbc500f5a80>
[
  [
    [
      "A",
      "B",
      "C"
    ],
    [
      "C",
      "D"
    ]
  ]
]
import pyarrow as pa

# 计算每条序列的长度
pa.compute.list_value_length(ds.seq).to_numpy()
array([3, 2])

我们可以看一下LocSeqDataset的schema

ds.schema()
seq: large_list<item: large_string>
  child 0, item: large_string
entity_id: large_string
ts_seq: large_list<item: int64>
  child 0, item: int64
ts_delta: large_list<item: float>
  child 0, item: float
dis_delta: large_list<item: float>
  child 0, item: float
start_ts: int64

对于索引操作,LocSeqDataset在设计上是返回一个自身的一个视图,并不会copy一份新的数据出来。而且索引操作的返回结果仍然是一个LocSeqDataset类型。

Note

为什么索引的返回结果仍然是LocSeqDataset

因为LocSeqDatasetTrajectoryDataset是构建在pyArrow.Table类型上的,Arrow数据的列存性质使得Arrow数据只有列的概念,没有行的概念,因此按行取数的过程实际是遍历所有的列,根据行的下标进行取数,这样得到的数据如果使用tuple或者list表示都不合理,但是是可以使用原来的schema将数据重新组织起来的,并且由于零拷贝的特性,这一点不会有什么损耗。

这样的特性也同样适用于TrajectoryDataset,因为这个特性是抽象基类BaseArrowDataset提供的。

ds[0], ds[1]
(LocSeqDataset(size=1), LocSeqDataset(size=1))
ds[0].seq, ds[1].seq
(<pyarrow.lib.ChunkedArray object at 0x7fbc445b0be0>
 [
   [
     [
       "A",
       "B",
       "C"
     ]
   ]
 ],
 <pyarrow.lib.ChunkedArray object at 0x7fbc445b0b20>
 [
   [
     [
       "C",
       "D"
     ]
   ]
 ])

2.2.  TrajectoryDataset#

TrajectoryDatasetLocSeqDataset基本是一样的,唯一的区别在于seq的类型,我们可以创建一个TrajectoryDataset试试。

import numpy as np
from trajdl.datasets import Trajectory, TrajectoryDataset

traj1 = Trajectory(seq=np.random.uniform(size=(10, 2)))
traj2 = Trajectory(seq=np.random.uniform(size=(15, 2)))
traj3 = Trajectory(seq=np.random.uniform(size=(5, 2)))

ds = TrajectoryDataset.init_from_trajectories([traj1, traj2, traj3])
print(ds)
TrajectoryDataset(size=3)
# 看一下schema
print(ds.schema())
seq: large_list<item: fixed_size_list<item: double>[2]>
  child 0, item: fixed_size_list<item: double>[2]
      child 0, item: double
entity_id: large_string
ts_seq: large_list<item: int64>
  child 0, item: int64
ts_delta: large_list<item: float>
  child 0, item: float
dis_delta: large_list<item: float>
  child 0, item: float
start_ts: int64

可以看到,seq的类型与LocSeqDatasetseq的类型是不同的。

下面是这两种类型的Schema的对比:

Attribute LocSeqDataset TrajectoryDataset
seq pa.large_list(pa.large_string()) pa.large_list(pa.list_(pa.float64(), 2))
entity_id pa.large_string() pa.large_string()
ts_seq pa.large_list(pa.int64()) pa.large_list(pa.int64())
ts_delta pa.large_list(pa.float32()) pa.large_list(pa.float32())
dis_delta pa.large_list(pa.float32()) pa.large_list(pa.float32())
start_ts pa.int64() pa.int64()

只有seq字段的类型是不同的。

2.3.  数据集的持久化#

BaseArrowDataset是支持存储加载的,通过save方法和init_from_parquet方法。

调用save方法后数据集会以parquet的格式存储到磁盘上。

调用init_from_parquet之后,数据集会从parquet文件重新加载。

help(LocSeqDataset.save)
Help on function save in module trajdl.datasets.arrow.abstract:

save(self, path: Union[str, pathlib.Path]) -> None
    Save the dataset to a Parquet file.

    Parameters
    ----------
    path : Union[str, Path]
        The file path to save the dataset.

    Notes
    -----
    If the provided path does not end with '.parquet', it will be appended automatically.
help(LocSeqDataset.init_from_parquet)
Help on method init_from_parquet in module trajdl.datasets.arrow.abstract:

init_from_parquet(path: str) -> 'BaseArrowDataset' class method of trajdl.datasets.arrow.locseq.LocSeqDataset
    Initialize the dataset from a Parquet file.

    Parameters
    ----------
    path : str
        The file path to the Parquet file.

    Returns
    -------
    BaseArrowDataset
        An instance of the dataset initialized from the Parquet file.

    Notes
    -----
    Due to differences in handling List[List[Float32]] across different frameworks,
    it is recommended to read the file using PyArrow and try to convert types where necessary.

我们演示一下如何实现数据的存储与加载:

import os
import tempfile

with tempfile.TemporaryDirectory() as tmp_folder:
    path = os.path.join(tmp_folder, "test_ds.parquet")
    print("dataset will be saved as:", path)

    ds.save(path)

    new_ds = TrajectoryDataset.init_from_parquet(path)
    print(new_ds)
dataset will be saved as: /tmp/tmp9378dmin/test_ds.parquet
TrajectoryDataset(size=3)

再数据集处理好后,使用BaseArrowDataset存储起来对后续的训练过程很有帮助,如何在训练的时候使用BaseArrowDataset需要看后续文档。

Tip

本文主要介绍TrajDL里面的序列数据集的概念。前者用于表示单条序列,后者用于表示多条序列。

TrajDL内的大量操作会围绕这些序列和数据集展开,如轨迹切分、数据扰动,Tokenization等。

数据集TrajDL的模型训练中发挥很大的作用。

基于ArrowBaseArrowDataset还有很多用法会在后续的高级文档内展开。