构建一个即时消息应用(五):实时消息

对于实时消息,我们将使用 服务器发送事件。

构建一个即时消息应用(五):实时消息

本文是该系列的第五篇。

对于实时消息,我们将使用 服务器发送事件 Server-Sent Events 。这是一个打开的连接,我们可以在其中传输数据流。我们会有个端点,用户会在其中订阅发送给他的所有消息。

消息户端

在 HTTP 部分之前,让我们先编写一个 映射 map ,让所有客户端都监听消息。 像这样全局初始化:

“`
type MessageClient struct {
Messages chan Message
UserID string
}

var messageClients sync.Map

“`

已创建的新消息

还记得在 上一篇文章 中,当我们创建这条消息时,我们留下了一个 “TODO” 注释。在那里,我们将使用这个函数来调度一个 goroutine。

“`
go messageCreated(message)

“`

把这行代码插入到我们留注释的位置。


func messageCreated(message Message) error {
if err := db.QueryRow(

SELECT user_id FROM participants
WHERE user_id != $1 and conversation_id = $2
`, message.UserID, message.ConversationID).
Scan(&message.ReceiverID); err != nil {
return err
}

go broadcastMessage(message)

return nil

}

func broadcastMessage(message Message) {
messageClients.Range(func(key, _ interface{}) bool {
client := key.(*MessageClient)
if client.UserID == message.ReceiverID {
client.Messages <- message
}
return true
})
}

“`

该函数查询接收者 ID(其他参与者 ID),并将消息发送给所有客户端。

订阅消息

让我们转到 main() 函数并添加以下路由:

“`
router.HandleFunc(“GET”, “/api/messages”, guard(subscribeToMessages))

“`

此端点处理 /api/messages 上的 GET 请求。请求应该是一个 EventSource 连接。它用一个事件流响应,其中的数据是 JSON 格式的。

“`
func subscribeToMessages(w http.ResponseWriter, r *http.Request) {
if a := r.Header.Get(“Accept”); !strings.Contains(a, “text/event-stream”) {
http.Error(w, “This endpoint requires an EventSource connection”, http.StatusNotAcceptable)
return
}

f, ok := w.(http.Flusher)
if !ok {
    respondError(w, errors.New("streaming unsupported"))
    return
}

ctx := r.Context()
authUserID := ctx.Value(keyAuthUserID).(string)

h := w.Header()
h.Set("Cache-Control", "no-cache")
h.Set("Connection", "keep-alive")
h.Set("Content-Type", "text/event-stream")

messages := make(chan Message)
defer close(messages)

client := &MessageClient{Messages: messages, UserID: authUserID}
messageClients.Store(client, nil)
defer messageClients.Delete(client)

for {
    select {
    case <-ctx.Done():
        return
    case message := <-messages:
        if b, err := json.Marshal(message); err != nil {
            log.Printf("could not marshall message: %v\n", err)
            fmt.Fprintf(w, "event: error\ndata: %v\n\n", err)
        } else {
            fmt.Fprintf(w, "data: %s\n\n", b)
        }
        f.Flush()
    }
}

}

“`

首先,它检查请求头是否正确,并检查服务器是否支持流式传输。我们创建一个消息通道,用它来构建一个客户端,并将其存储在客户端映射中。每当创建新消息时,它都会进入这个通道,因此我们可以通过 for-select 循环从中读取。

服务器发送事件 Server-Sent Events 使用以下格式发送数据:

“`
data: some data here\n\n

“`

我们以 JSON 格式发送:

“`
data: {“foo”:”bar”}\n\n

“`

我们使用 fmt.Fprintf() 以这种格式写入响应 写入器 writter ,并在循环的每次迭代中刷新数据。

这个循环会一直运行,直到使用请求上下文关闭连接为止。我们延迟了通道的关闭和客户端的删除,因此,当循环结束时,通道将被关闭,客户端不会收到更多的消息。

注意, 服务器发送事件 Server-Sent Events (EventSource)的 JavaScript API 不支持设置自定义请求头?,所以我们不能设置 Authorization: Bearer <token>。这就是为什么 guard() 中间件也会从 URL 查询字符串中读取令牌的原因。

via: https://nicolasparada.netlify.com/posts/go-messenger-realtime-messages/

作者:Nicolás Parada 选题:lujun9972 译者:gxlct008 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

主题测试文章,只做测试使用。发布者:eason,转转请注明出处:https://aicodev.cn/2020/10/05/%e6%9e%84%e5%bb%ba%e4%b8%80%e4%b8%aa%e5%8d%b3%e6%97%b6%e6%b6%88%e6%81%af%e5%ba%94%e7%94%a8%ef%bc%88%e4%ba%94%ef%bc%89%ef%bc%9a%e5%ae%9e%e6%97%b6%e6%b6%88%e6%81%af/

(0)
eason的头像eason
上一篇 2020年10月5日
下一篇 2020年10月5日

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信