# 使用 MongoDB 和 Typesense 实现全文模糊搜索

本教程将展示如何将数据从 MongoDB 导入 Typesense,然后使用 Typesense 进行支持容错、过滤、分面等功能的全文搜索。

整体流程上,我们将使用 MongoDB 的变更流(Change Streams)设置触发器,并在每次变更事件时将数据推送到 Typesense。

Typesense MongoDB 集成图表

更新

我们发布了一个 Node.js CLI 工具,您可以安装它来自动将 MongoDB 文档同步到 Typesense。

以下是设置方法: typesense/typesense-mongodb (opens new window).

# 第一步:安装并运行 Typesense

使用 Docker 安装并启动 Typesense,运行以下 Docker 命令:

现在,我们可以检查 Typesense 服务器是否已准备好接受请求。

curl http://localhost:8108/health
{"ok":true}

您也可以通过其他方式运行 Typesense。查看 Typesense 安装指南Typesense Cloud (opens new window) 获取更多详情。

# 第二步:启动 MongoDB 副本集

MongoDB 副本集提供冗余和高可用性,是所有生产部署的基础。

如果您有一个独立的 MongoDB 实例,可以按照以下步骤将其转换为副本集:

  • 关闭已运行的 MongoDB 服务器
  • 使用 --replSet 选项启动 MongoDB 服务器
mongod --port "PORT" --dbpath "YOUR_DB_DATA_PATH" --replSet "REPLICA_SET_INSTANCE_NAME"

在 mongo shell 中执行 rs.status() 命令检查副本集状态。

# 第三步:开启变更流

现在让我们开启一个变更流来监听 MongoDB 集群中的数据变化。稍后我们会将这些变更推送到 Typesense。

我们可以从任何数据承载成员上为 MongoDB 副本集开启变更流。详细说明请参阅 MongoDB 变更流 (opens new window) 文档。

以下是一个示例:

const uri = '<MongoDB-URI>'
const mongodbOptions = {
  useNewUrlParser: true,
  useUnifiedTopology: true,
}
const client = new MongoClient(uri, mongodbOptions)
await client.connect()
const collection = client.db('sample').collection('books')
const changeStream = collection.watch()
changeStream.on('change', next => {
  // 处理下一个文档
})

# 第四步:创建 Typesense 集合

要使用 Typesense,我们首先需要创建一个客户端。Typesense 支持多种 API 客户端,包括 Javascript、Python、Ruby、PHP 等。

要初始化 Javascript 客户端,你需要 Typesense 服务器的 API 密钥:

import Typesense from 'typesense'

let typesense = new Typesense.Client({
  nodes: [
    {
      host: 'localhost',
      port: '8108',
      protocol: 'http',
    },
  ],
  apiKey: '<API_KEY>',
  connectionTimeoutSeconds: 2,
})

接下来,我们将创建一个集合。集合需要一个 schema(模式),它定义了文档的结构。

let schema = {
  name: 'books',
  fields: [
    { name: 'id', type: 'string', facet: false },
    { name: 'name', type: 'string', facet: false },
    { name: 'author', type: 'string', facet: false },
    { name: 'year', type: 'int32', facet: true },
  ],
  default_sorting_field: 'year',
}
await typesense.collections().create(schema)

# 步骤 5:将文档索引到 Typesense

接下来,我们将创建一个函数来监听 MongoDB 的变更流(change streams),并将变更写入 Typesense。

以下是 MongoDB 变更流的响应示例:

{
  _id: {
    _data: '826062978E000000012B022C0100296E5'
  },
  operationType: 'insert',
    clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1617074062 },
  fullDocument: {
    _id: 6062978e06e4444ef0c7f16a,
      name: 'Davinci Code',
      author: 'Dan Brown',
      year: 2003
  },
  ns: { db: 'sample', coll: 'books' },
  documentKey: { _id: 6062978e06e4444ef0c7f16a }
}
{
  _id: {
    _data: '826062978E000000032B022C0100296E5'
  },
  operationType: 'update',
    clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 3, high_: 1617074062 },
  ns: { db: 'sample', coll: 'books' },
  documentKey: { _id: 6062978e06e4444ef0c7f16a },
  updateDescription: { updatedFields: { year: 2000 }, removedFields: [] }
}
{
  _id: {
    _data: '826062978E000000072B022C0100296E5'
  },
  operationType: 'delete',
    clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 7, high_: 1617074062 },
  ns: { db: 'sample', coll: 'books' },
  documentKey: { _id: 6062978e06e4444ef0c7f16c }
}
async function index(next, typesense) {
  if (next.operationType == 'delete') {
    await typesense.collections('books').documents(next.documentKey._id).delete()
  } else if (next.operationType == 'update') {
    let data = JSON.stringify(next.updateDescription.updatedFields)
    await typesense.collections('books').documents(next.documentKey._id).update(data)
  } else {
    next.fullDocument.id = next.fullDocument['_id']
    delete next.fullDocument._id
    let data = JSON.stringify(next.fullDocument)
    await typesense.collections('books').documents().upsert(data)
  }
}

# 完整示例

以下是完整的代码示例:

const { MongoClient } = require('mongodb')
const Typesense = require('typesense')

async function listDatabases(client) {
  databasesList = await client.db().admin().listDatabases()

  console.log('数据库列表:')
  databasesList.databases.forEach(db => console.log(` - ${db.name}`))
}

function closeChangeStream(timeInMs = 60000, changeStream) {
  return new Promise(resolve => {
    setTimeout(() => {
      console.log('正在关闭变更流')
      changeStream.close()
      resolve()
    }, timeInMs)
  })
}

async function index(next, typesense) {
  console.log(next)
  if (next.operationType == 'delete') {
    await typesense.collections('books').documents(next.documentKey._id).delete()
    console.log(next.documentKey._id)
  } else if (next.operationType == 'update') {
    let data = JSON.stringify(next.updateDescription.updatedFields)
    await typesense.collections('books').documents(next.documentKey._id).update(data)
    console.log(data)
  } else {
    next.fullDocument.id = next.fullDocument['_id']
    delete next.fullDocument._id
    let data = JSON.stringify(next.fullDocument)
    await typesense.collections('books').documents().upsert(data)
    console.log(data)
  }
}

async function monitorListingsUsingEventEmitter(client, typesense, timeInMs = 60000) {
  const collection = client.db('sample').collection('books')
  const changeStream = collection.watch()
  changeStream.on('change', next => {
    index(next, typesense)
  })
  await closeChangeStream(timeInMs, changeStream)
}

async function createSchema(schema, typesense) {
  const collectionsList = await typesense.collections().retrieve()
  var toCreate = collectionsList.find((value, index, array) => {
    return value['name'] == schema['name']
  })

  if (!toCreate) {
    await typesense.collections().create(schema)
  }
}

async function main() {
  const typesense = new Typesense.Client({
    nodes: [
      {
        host: 'localhost',
        port: '8108',
        protocol: 'http',
      },
    ],
    apiKey: '<API_KEY>',
    connectionTimeoutSeconds: 2,
  })
  let schema = {
    name: 'books',
    fields: [
      {
        name: 'id',
        type: 'string',
        facet: false,
      },
      {
        name: 'name',
        type: 'string',
        facet: false,
      },
      {
        name: 'author',
        type: 'string',
        facet: false,
      },
      {
        name: 'year',
        type: 'int32',
        facet: true,
      },
    ],
    default_sorting_field: 'year',
  }
  createSchema(schema, typesense)
  const mongodbOptions = {
    useNewUrlParser: true,
    useUnifiedTopology: true,
  }
  const uri = '<Mongo-URI>'
  const client = new MongoClient(uri, mongodbOptions)
  try {
    await client.connect()
    await listDatabases(client)
    await monitorListingsUsingEventEmitter(client, typesense)
  } catch (e) {
    console.error(e)
  } finally {
    await client.close()
  }
}

main().catch(console.error)

就是这样 😊!现在你可以轻松地使用 Typesense 搜索 MongoDB 文档。你甚至可以使用 Typesense Cloud (opens new window)MongoDB Atlas (opens new window) 来获取托管版本的 Typesense 和 MongoDB。

# 相关参考