RabbitMQ:在 Cloud Foundry 上启用 Grails 全文搜索

工程 | Peter Ledbrook | 2011 年 8 月 29 日 | ...

在我的 关于 Grails 和 Cloud Foundry 的第二篇博客 中,我介绍了 Grails Twitter 示例 的一个变种,可以托管在 CloudFoundry.com 上。当时我提到,使用 Searchable 插件进行全文搜索会将你限制在单个应用程序实例上,因为搜索索引对于每个实例来说是唯一的。换句话说,根据浏览器路由到的应用程序实例不同,你很可能会得到不同的搜索结果。

我还说过,解决这个问题的一个选项是在实例之间同步搜索索引。但这听起来并不是特别容易,对吧?巧合的是,Cloud Foundry 引入 RabbitMQ 服务意味着所需的代码更改比你预期的要小得多。因此,让我们看看我是如何为 Grails Twitter 状态消息添加全文搜索功能的。

使状态消息可搜索

Searchable 插件强烈假设您希望索引标准的 GORM 领域类。这意味着使用 Hibernate/SQL。但 Grails Twitter 的状态消息存储在 MongoDB 中,而不是 MySQL 中。我们能否使其可搜索?是的,我们可以,但会牺牲一些功能。

与普通领域类一样,搜索Status实例的第一步是添加一个searchable属性

package org.grails.twitter

import org.grails.twitter.auth.Person

class Status {
    static mapWith = "mongo"
    static transients = ["author"]

    static searchable = {
        only = ["message", "dateCreated"]
        authorId index: "no", store: "yes"
    }
	
    String message
    Long authorId
    List<String> tags = []
    Date dateCreated
	
    Person getAuthor() {
        return Person.get(authorId)
    }

    static constraints = {
        message maxSize: 160
    }
}

在本例中,我希望能够根据消息的创建日期和内容进行搜索,而不是其他。我还希望从搜索结果链接到消息的作者。但是,如果 authorId 没有被索引,那么搜索结果将不包含发布者的 ID。因此,我将authorId存储authorId在索引中,但不使其可搜索 (index: "no")。简单吧?当显示搜索结果时,现在可以包含每条消息作者的姓名了。

索引非 Hibernate 领域类的一个显著限制是镜像(mirroring)不起作用。这意味着新消息保存时不会自动被索引。幸运的是,我们这里实际上并不需要这种行为,因此我在Config.groovy:

searchable {
    ...
    mirrorChanges = false
    bulkIndexOnStartup = false
}

中禁用了镜像和“启动时批量索引”(bulk indexing on startup)。当然,我们确实希望在启动时索引状态消息,因为 Cloud Foundry 上的文件系统是短暂的,因此搜索索引需要在每次启动时重建。但自动索引对非 Hibernate 领域类也无效,所以我只好在BootStrap.groovy:

...
class BootStrap {

    def searchableService
    def springSecurityService

    def init = { servletContext ->
        ...
        // Index all Hibernate mapped domain classes.
        searchableService.reindex()

        // Index all status messages.
        def statusMessages = Status.list()
        log.info "Indexing ${statusMessages.size()} status messages"
        Status.reindex(statusMessages)
        log.info "Finished indexing"
    }
    ...
}

的末尾进行手动索引。这并不是很多代码,但足以让状态消息可搜索。剩下的就是确保新消息被索引,并且搜索索引在应用程序实例之间同步。

使用 RabbitMQ 同步

保持搜索索引同步的基本模型非常简单直观

每当保存一条状态消息时,都会向 RabbitMQ 代理发送一条消息,然后由代理将其转发给所有应用程序实例。然后,每个实例索引由该消息标识的Status实例。

在我们实现这个功能之前,需要安装 RabbitMQ 插件

    grails install-plugin rabbitmq

接下来的工作是使用适当的交换机和队列来配置代理。我之前写过关于 AMQP 协议RabbitMQ 插件 的博客,所以这里不再详细介绍交换机和队列。只需要知道我们需要一个 fanout 交换机(所有消息都路由到所有监听器)以及一个订阅该交换机的 Grails 服务即可。所以在Config.groovyConfig.groovy

rabbitmq {
    connectionfactory {
        username = 'guest'
        password = 'guest'
        hostname = 'localhost'
    }

    queues = {
        exchange name: 'search.sync', type: fanout, durable: false
    }
}

中,我添加了以下内容:重要的是交换机的声明:当应用程序部署到 Cloud Foundry 时,连接工厂设置会被忽略,因为 RabbitMQ 服务在运行时绑定到应用程序。

发送消息只需一行代码

...
class StatusService {
    def springSecurityService
    def tagService
    
    void updateStatus(long userId, String message) {
        def status = new Status(message: message, authorId: userId).save(flush: true, failOnError: true)
        rabbitSend 'search.sync', '', "${status.id}:${status.class.name}"
        
        runAsync {
            tagService.extractTagsFromMessage(status)
        }
    }
    ...
}

而用于索引状态消息的服务也没有复杂多少

package org.grails.twitter

class SyncService {
    static rabbitSubscribe = "search.sync"
    static transactional = false

    def grailsApplication
    def searchableService

    void handleMessage(String message) {
        def parts = message.split(/:/)
        if (parts.size() != 2) {
            log.error "Invalid message: $message"
            return
        }

        def domainClass = grailsApplication.getDomainClass(parts[1])
        log.debug "Reindexing instance ${parts[0]} of ${parts[1]}"
        try {
            searchableService.reindex(domainClass.clazz.get(parts[0]))
        }
        catch (Exception ex) {
            log.error "Failed to index instance ${parts[0]} of ${parts[1]}", ex
        }
    }
}

所以rabbitSend()方法用于发送一个简单的字符串,其中包含Status实例 ID 和类名。在这种情况下,我们只处理Status实例,但让服务对所有潜在的可搜索领域类通用是很有用的。此外,使用 Groovy 意味着我们不必做任何糟糕的反射:我们只需获取类,然后直接调用我们想要的方法!

SyncService的重要部分是rabbitSubscribe属性和handleMessage()方法。前者声明该服务应订阅 "search.sync" 交换机,也就是我发送消息的交换机。handleMessage()方法。前者声明该服务应订阅 "search.sync" 交换机,也就是我发送消息的交换机。方法在每次从该交换机接收到消息时被调用,消息内容作为其参数。因此,该方法提取类名和实例 ID,并使用 Grails 的DomainClass.get()方法从数据存储(对于我们的Status消息来说是 MongoDB)中检索相关实例。最后,searchableService.reindex()方法将状态消息添加到本地搜索索引中。当然,这一切都在每个应用程序实例上发生。

现在应用程序已准备好部署到 Cloud Foundry 并扩展到允许的最大实例数!您可以在 CloudFoundry.com 上看到结果。请注意,在 GitHub 项目中,我做了一些 UI 工作来支持全文搜索,但这些更改与当前主题并不十分相关。

总结

不得不说,我自己也很惊讶,要实现搜索索引同步所需的代码竟然如此之少。不仅如此,我还能专注于如何解决问题,而不是如何编写代码,因为编码过程非常简单直观。最重要的是,使用 Cloud Foundry 意味着部署包括创建和绑定 RabbitMQ 服务,然后运行grails prod cf-update命令将更改推送到服务器。真是简单。

如您所见,RabbitMQ 可以为与云相关的问题提供创新的解决方案,而 Grails 插件通过其约定使它非常易于使用。您可以在同一应用程序的不同实例之间、不同的 Grails 应用程序之间,甚至使用不同语言和框架编写的应用程序之间进行通信。例如,我们可以部署一个简单的 Node.js 或 Sinatra 应用程序来记录和显示“search.sync”消息,以便您跟踪它们。基本上,RabbitMQ 是您的云工具箱中必不可少的一项。

获取 Spring 新闻通讯

订阅 Spring 新闻通讯,保持联系

订阅

抢先一步

VMware 提供培训和认证,助您快速前进。

了解更多

获取支持

Tanzu Spring 通过一项简单的订阅提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将到来的活动

查看 Spring 社区所有即将到来的活动。

查看全部