# 将数据同步到 Typesense

Typesense 提供了 RESTful API,可用于将主数据库中的数据与 Typesense 保持同步。

根据您的架构、Typesense 集群的 CPU 容量以及所需的"实时性"程度,有几种方法可以实现这一点。

# 定期批量同步变更

# 轮询主数据库

  1. 在主数据库的每条记录中添加 updated_at 时间戳(如果尚未存在),并在修改任何记录时更新该时间戳。
  2. 对于被删除的记录,可以使用 is_deleted 布尔字段进行"软删除"(设置为 true),或者将已删除记录的 ID 保存在单独的表中并添加 deleted_at 时间戳。
  3. 定期(例如每 30 秒)查询数据库中所有 updated_at 时间戳介于当前时间和上次同步运行时间之间的记录(需要在持久化存储中维护这个 last_synced_at 时间戳)。
  4. 对步骤 3 中的记录向 Typesense 发起 批量导入 API 调用,使用 action=upsert 参数。
  5. 对于步骤 2 中标记为删除的记录,向 Typesense 发起 按查询删除 API 调用,使用类似这样的过滤器包含所有记录 ID:filter_by:=[id1,id2,id3]

如果数据分布在多个表中且数据库支持视图概念,可以创建一个 JOIN 所有需要表的数据库视图,然后轮询该视图而非单个表。

# 使用变更监听器

如果您的核心数据库支持变更触发器(change triggers)或变更数据捕获(change data capture)功能:

  1. 您可以编写监听器接入这些变更流,并将变更推送到临时队列
  2. 每隔5秒(可配置),通过定时任务从队列读取所有变更,并批量导入到Typesense中。

例如,以下是几种从MongoDBDynamoDBFirestore同步数据的方法。 对于MySQL的binlog同步,可以使用Maxwell (opens new window)工具将binlog转换为JSON格式并放入Kafka主题供消费。

# 使用ORM钩子

如果使用ORM框架,可以利用其提供的回调钩子:

  1. 在ORM的on_save回调(不同ORM可能名称不同)中,将需要同步到Typesense的变更写入临时队列
  2. 每隔5秒(可配置),通过定时任务从队列读取所有变更,并批量导入到Typesense中。

# 使用Airbyte

Airbyte (opens new window)是一个开源数据集成平台,只需点击几下即可在不同数据源和目标之间同步数据。

它支持多种数据源 (opens new window),包括MySQL、Postgres、MSSQL、Redshift、Kafka甚至Google Sheets,并且支持Typesense (opens new window)作为目标数据库。

有关如何部署和设置Airbyte的更多信息,请参阅官方文档 (opens new window)

# 实时变更同步

# 使用 API

除了定期批量同步变更外,如果您有需要实时更新某些记录的用例(例如希望用户对记录的编辑能立即反映在搜索结果中,而不是等待上述流程中设置的10秒或其他同步间隔), 您还可以使用单文档索引API

需要注意的是,对于相同数量的文档,批量导入端口的性能要高得多,且CPU占用率远低于单文档索引端口。 因此,建议尽可能使用批量导入端口,即使这意味着需要将上述流程的同步间隔缩短至2秒这样的低值。

# 高吞吐量写入

如果您的应用每秒产生超过10次写入操作,建议切换到使用 批量导入API,相比单文档写入接口,它在处理高吞吐量写入时性能更优。 例如,通过10,000次独立API调用发送10,000个文档,其CPU开销和执行速度会比通过单次批量导入API调用发送这些文档慢一个数量级。

以下是一种将实时需求与批量导入效率相结合的方法:

  1. 在主数据库(或缓存系统)中创建"缓冲表",包含以下字段:

    • record_id: 原始记录的ID
    • operation_type: 操作类型(insert插入/update更新/delete删除)
    • record_data: 完整的JSON格式记录数据(用于插入/更新)
    • created_at: 记录加入缓冲表的时间戳
    • processed: 表示该记录是否已处理的布尔标志
  2. 当应用中记录实时变更时:

    • 将变更插入上述缓冲表
    • 继续执行应用逻辑,无需等待Typesense索引操作
  3. 设置高频运行的定时任务(最短可设置为每5-10秒执行一次): 该任务会查询缓冲表中未处理的记录,按操作类型分组,通过action=upsert参数以批量导入API调用发送插入/更新操作,标记缓冲表中已处理的记录,并可选地在保留期后清除旧的已处理记录。

# 工作线程并行度考量

在扩展同步流程时,关键要使并发写入操作的数量与 Typesense 集群可用的 CPU 处理能力相匹配:

  • 为实现最佳性能,并发批量导入操作数量不应超过 N-2,其中 N 是 Typesense 可用的 CPU 核心数。
  • 例如,在 8vCPU 的服务器上,应将并发批量导入限制为 6 个工作线程。

这能确保 Typesense 在处理写入时保留足够的容量来处理搜索请求。

这种基于缓冲区的方案具有以下优势:

  • 您的应用保持响应性,因为数据库写入不会被 Typesense 索引过程阻塞
  • 您可以利用更高效的批量导入 API
  • 缓冲区提供了变更的审计追踪
  • 您可以根据实时需求调整处理频率

# 使用 Sequin

Sequin (opens new window) 将数据从您的 Postgres 数据库实时流式传输到 Typesense。数据库的任何变更(无论是来自您的应用程序、内部工具还是其他进程)都会立即反映到 Typesense 中。

这种方法具有以下优势:

  • Sequin 使用逻辑复制,几乎不会给数据库带来额外开销(与轮询和触发器不同)
  • 与 Typesense 的直接集成利用批量导入 API 高效加载每个创建和更新操作,并原生支持删除
  • 您可以根据实时需求调整 Sequin 的批处理行为
  • Sequin 提供数据转换、回填、过滤和内置重试机制,确保 Postgres 表完美复制到 Typesense

# 安装概述

TIP

请阅读 Sequin 的 Typesense 快速入门 (opens new window) 获取分步指南。

  1. 在本地设置 Sequin 或创建云端账户
  2. 将您的 Postgres 数据库连接到 Sequin
  3. 为每个需要同步到 Typesense 集合的表创建一个 Typesense sink

以下是一个示例 Sequin.yaml (opens new window),展示如何将 products 表同步到 Typesense:

databases:
  - name: "prod-db"
    username: "postgres"
    password: "postgres"
    hostname: "my-database-instance.abcd1234wxyz.us-east-1.rds.amazonaws.com"
    database: "prod"
    port: 5432
    slot_name: "sequin_slot"
    publication_name: "sequin_pub"
sinks:
  - name: "typesense-sink"
    database: "prod-db"
    table: "public.products"
    destination:
        type: "typesense"
        endpoint_url: "https://your-typesense-server:8108"
        collection_name: "products"
        api_key: "your-api-key"
        batch_size: 1000
        timeout_seconds: 5

# 全量重建索引

除了上述策略外,您还可以选择在夜间进行全量数据重建索引,以确保因以下原因导致的数据同步间隙得到修复:

  • 模式验证错误
  • 网络问题
  • 重试失败等情况

您可以使用 集合别名 功能,将数据重新索引到新集合,然后将别名切换到新集合。 或者使用 批量导入 API 配合 action=upsert 参数,结合 按查询删除 端点,在现有集合上重新导入数据集。

# 数据导入时的注意事项

以下是将数据批量导入 Typesense 时的一些实用建议:

# 文档 ID

为了后续能够更新推送到 Typesense 的记录,建议在发送给 Typesense 的每个文档中设置 id 字段。 这是一个特殊字段,Typesense 在内部用它来引用文档。

如果没有显式设置 id 字段,Typesense 会自动为文档生成一个 ID。如果你在导入接口中设置参数 return_ids=true,Typesense 会返回这些自动生成的 ID。 之后你需要将这些 id 字段保存到你的数据库中,以便未来更新相同的记录。

# 客户端超时设置

当导入大批量数据时,请确保在实例化客户端库时已将默认的客户端超时时间增加到最高 60 分钟。

Typesense 的写入 API 调用是同步的,因此你不希望客户端因超时而过早终止连接,然后又重新尝试相同的写入操作。

# 处理 HTTP 503 错误

当向 Typesense 发送大量写入请求时,系统有时会返回 HTTP 503 错误(Not Ready 或 Lagging)。

这是 Typesense 内置的一种背压机制,用于确保高负载写入不会耗尽 CPU 资源而影响读取性能。

如果遇到 HTTP 503 错误,您可以采取以下一种或多种措施:

  1. 增加 CPU 核心数:通过增加 CPU 核心来并行处理写入操作。对于高吞吐量写入场景,我们建议至少配置 4 个 CPU 核心。
  2. 延长客户端超时时间:在初始化客户端库时,将超时设置为较高值(例如 60 分钟),确保客户端不会中断正在进行的写入操作。 短超时设置往往会导致相同数据的频繁重试写入,引发"惊群效应"问题。
  3. 调整重试间隔:在客户端库中增加重试间隔时间,或设置为 10 至 60 秒之间的随机值(特别是在无服务器环境中),为重试操作引入随机抖动。 这为 Typesense 进程提供了更多时间来处理之前的写入请求。
  4. 使用批量导入 API:如果存在大量单文档写入 API 调用,建议改用性能更高的批量导入 API 端点。详见专门介绍高吞吐量写入的章节。
  5. 优化字段索引:对于仅用于展示而不参与搜索/过滤/分面/排序/分组的字段,建议将其排除在 schema 定义之外以避免索引开销。 这些字段仍可随文档发送至 Typesense - 它们会被存储在磁盘上并在文档匹配时返回,而不会占用内存索引资源。 这有助于避免不必要的 CPU 索引开销。
  6. 提升磁盘 I/O 性能:当单个文档体积较大时,磁盘 I/O 可能成为瓶颈。此时建议在 Typesense Cloud 中启用高性能磁盘,或使用 nVME SSD 磁盘提升性能。
  7. 调整服务端配置参数:可修改 服务端配置参数 中的 healthy-read-laghealthy-write-lag 值。 通常完成上述优化后无需此操作。在 Typesense Cloud 环境中,如需调整这些参数值,请联系 support@typesense.org 由我们协助处理。

# 处理"rejecting write: running out of resource type"错误

当内存不足(OUT_OF_MEMORY)或磁盘空间不足(OUT_OF_DISK)时,你可能会看到"running out of resource type"错误。

Typesense使用的内存量与节点中索引的数据量成正比。 因此,如果看到OUT_OF_MEMORY错误,你需要增加更多内存来容纳数据集。

当出现OUT_OF_DISK错误时,你需要增加磁盘容量并重启Typesense。 在Typesense Cloud上,我们会配置5倍于内存的磁盘空间(或至少8GB)。因此要增加磁盘空间,你需要升级到更高的内存规格。

# 客户端批处理与服务器端批处理

在导入API调用中,你会注意到一个名为batch_size的参数。 它控制服务器端批处理(在服务搜索队列之前,处理导入API调用中的文档数量),你几乎不需要更改这个默认值。

相反,你应该通过控制单个导入API调用中的文档数量来实现客户端批处理,并可以并行执行多个API调用。