Rails中的Action Cable(WebSocket )

1 什么是Action Cable?

Action Cable无缝集成了WebSockets和Rails应用程序的其余部分。它允许用Ruby以与Rails应用程序的其他部分相同的风格和形式编写实时特性,同时仍然具有性能和可伸缩性。它是一个全栈产品,同时提供了客户端JavaScript框架和服务器端Ruby框架。您可以访问使用Active Record或选择的ORM编写的完整域模型。

2 术语

Action Cable使用WebSockets而不是HTTP请求-响应协议。Action Cable和WebSockets都引入了一些不太熟悉的术语:

2.1 Connections
连接是客户端-服务器关系的基础。单个Action Cable服务器可以处理多个连接实例。每个WebSocket连接有一个连接实例。如果一个用户使用多个浏览器标签或设备,可能会有多个WebSockets打开到你的应用程序

2.2 Consumers
WebSocket连接的客户端称为消费者。在Action Cable中,消费者由客户端JavaScript框架创建。

2.3 Channels
每个消费者可以依次订阅多个通道。每个通道封装一个逻辑工作单元,类似于控制器在常规MVC设置中的工作。例如,您可以有一个ChatChannel和一个appearance channel,消费者可以订阅这两个channel中的一个或两个。至少,消费者应该订阅一个通道。

2.4 Subscribers
当消费者订阅一个通道时,他们充当一个订阅者。订阅者和通道之间的连接,令人惊讶的是,叫做订阅。消费者可以多次充当给定通道的订阅者。例如,消费者可以同时订阅多个聊天室。(记住,一个物理用户可能有多个消费者,每个标签/设备打开到你的连接)。

2.5 Pub-Sub
发布/订阅,指的是一个消息队列范例,信息的发送者(发布者)将数据发送到一个抽象的接收者(订阅者)类,而不指定单个接收者。Action Cable使用这种方法在服务器和许多客户机之间进行通信。

2.6 Broadcastings
广播是一种发布/订阅链接,广播器传输的任何内容都直接发送给正在流媒体的通道订阅者。每个通道可以播放零个或多个广播。

3 服务端组件

3.1 Connections
对于每个被服务器接受的WebSocket,一个连接对象被实例化。此对象成为从此处创建的所有通道订阅的父对象。连接本身不处理身份验证和授权之外的任何特定应用程序逻辑。WebSocket连接的客户端称为连接消费者。单个用户将为他们打开的每个浏览器选项卡、窗口或设备创建一个消费者连接对。
Connections是ApplicationCable::Connection的实例,它扩展了ActionCable::Connection::Base。在ApplicationCable::Connection中,您授权传入的连接,并在用户可以被识别的情况下继续建立它。

3.1.1 Connection设置

# app/channels/application_cable/connection.rb
module ApplicationCable
    class Connection < ActionCable::Connection::Base
        identified_by :current_user
        def connect
            self.current_user = find_verified_user
        end
        private
        def find_verified_user
            if verified_user = User.find_by(id: cookies.encrypted[:user_id])
                verified_user
            else
                reject_unauthorized_connection
            end
        end
     end
end

这里identified_by指定了一个连接标识符,稍后可以使用它来查找特定的连接。请注意,任何标记为标识符的东西都会在连接创建的任何通道实例上自动创建同名的委托。

此示例依赖于这样一个事实:您已经在应用程序的其他地方处理了用户的身份验证,成功的身份验证将使用用户ID设置加密的cookie。

当尝试新的连接时,cookie会自动发送到连接实例,您可以使用它来设置current_user。通过识别同一当前用户的连接,还可以确保以后可以检索给定用户的所有打开的连接(如果用户被删除或未经授权,还可以断开所有连接)。

如果你的认证方法包括使用会话,你使用cookie存储会话,你的会话cookie名为_session,用户ID密钥为user_id,你可以使用这种方法:

verified_user = User.find_by(id: cookies.encrypted['_session']['user_id'])

3.1.2 异常处理

默认情况下,未处理的异常会被捕获并记录到Rails的日志程序中。例如,如果你想全局拦截这些异常,并将它们报告给外部的bug跟踪服务,你可以使用rescue_from来实现:

# app/channels/application_cable/connection.rb
module ApplicationCable
     class Connection < ActionCable::Connection::Base
         rescue_from StandardError, with: :report_error

         private
         def report_error(e)
             SomeExternalBugtrackingService.notify(e)
         end
      end
end

3.2 Channels

通道封装逻辑工作单元,类似于常规MVC中控制器的工作。默认情况下,Rails会创建一个父ApplicationCable::Channel类(扩展了ActionCable::Channel::Base)来封装通道之间的共享逻辑。

3.2.1 父通道设置

# app/channels/application_cable/channel.rb
module ApplicationCable
     class Channel < ActionCable::Channel::Base
     end
end

然后创建自己的通道类。例如,你可以有一个ChatChannel和一个AppearanceChannel:

# app/channels/chat_channel.rb
class ChatChannel < ApplicationCable::Channel
end

# app/channels/appearance_channel.rb
class AppearanceChannel < ApplicationCable::Channel
end

消费者可以订阅这两个通道道中的一个或两个。

3.2.2 订阅

消费者作为订阅者订阅频道。它们的连接称为订阅。然后根据通道使用者发送的标识符将生成的消息路由到这些通道订阅。

# app/channels/chat_channel.rb
class ChatChannel < ApplicationCable::Channel
     # Called when the consumer has successfully
     # become a subscriber to this channel.
     def subscribed
     end
end

3.2.3 异常处理

与ApplicationCable::Connection一样,你也可以在特定的通道上使用rescue_from来处理引发的异常:

# app/channels/chat_channel.rb
class ChatChannel < ApplicationCable::Channel
     rescue_from 'MyError', with: :deliver_error_message

     private
     def deliver_error_message(e)
         broadcast_to(...)
     end
end

4、客户端组件

4.1 Connections

使用者需要连接的实例。这可以使用以下JavaScript来建立,默认情况下由Rails生成:

4.1.1 连接消费者

// app/javascript/channels/consumer.js
// Action Cable provides the framework to deal with WebSockets in Rails.
// You can generate new channels where WebSocket features live using the `bin/rails generate channel` command.

 import { createConsumer } from "@rails/actioncable"

 export default createConsumer()

这将准备一个消费者,将连接/cable在您的服务器默认。在还指定至少一个您感兴趣的订阅之前,不会建立连接。

使用者可以选择接受指定要连接的URL的参数。它可以是一个字符串,也可以是一个返回字符串的函数,当WebSocket打开时,这个字符串会被调用。

// 指定要连接的URL
createConsumer('https://ws.example.com/cable')


 // 使用函数动态生成URL
createConsumer(getWebSocketURL)

 function getWebSocketURL {
     const token = localStorage.get('auth-token')
     return `https://ws.example.com/cable?token=${token}`
}

4.1.2 订阅者

消费者通过创建对给定通道的订阅成为订阅者:

// app/javascript/channels/chat_channel.js
import consumer from "./consumer"

consumer.subscriptions.create({ channel: "ChatChannel", room: "Best Room" })

// app/javascript/channels/appearance_channel.js
import consumer from "./consumer"

consumer.subscriptions.create({ channel: "AppearanceChannel" })

虽然这会创建订阅,但稍后将描述响应接收到的数据所需的功能。

消费者可以多次充当给定通道的订阅者。例如,消费者可以同时订阅多个聊天室:

// app/javascript/channels/chat_channel.js
import consumer from "./consumer"

consumer.subscriptions.create({ channel: "ChatChannel", room: "1st Room" })
consumer.subscriptions.create({ channel: "ChatChannel", room: "2nd Room" })

5 客户端与服务端的交互

5.1 Streams(l流)

流提供了一种机制,通过这种机制,频道将发布的内容(广播)发送给它们的订阅者。例如,当:Room参数的值为“Best Room”时,下面的代码使用stream_from订阅名为chat_Best的广播房间:

# app/channels/chat_channel.rb
class ChatChannel < ApplicationCable::Channel
    def subscribed
        stream_from "chat_#{params[:room]}"
     end
end

然后,在你的Rails应用程序的其他地方,你可以通过调用broadcast来广播到这样一个房间:

ActionCable.server.broadcast("chat_Best Room",{body: "This Room is Best Room."})

如果您有一个与模型相关的流,则可以从通道和模型生成广播名称。例如,下面的代码使用stream_for订阅类似于广播comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE,其中Z2lkOi8vVGVzdEFwcC9Qb3N0LzE是Post模型的GlobalID。

class CommentsChannel < ApplicationCable::Channel
    def subscribed
        post = Post.find(params[:id])
        stream_for post
     end
end

然后你可以通过调用broadcast_to广播到这个频道:

CommentsChannel.broadcast_to(@post, @comment)

5.2 Broadcasting(广播)

广播是一个发布/订阅链接,发布者传输的任何内容都被直接路由到被命名为广播的流媒体通道订阅者。每个通道道可以播放零个或多个广播。

广播纯粹是一个在线队列,并且依赖于时间。如果一个消费者没有收到广播(订阅一个给定的通道),他们他们以后连接也不会得到广播。

5.3 Subscriptions

当消费者订阅一个通道时,他们充当一个订阅者。此连接称为订阅。然后根据Cable消费者发送的标识符将传入的消息路由到这些频道订阅。

// app/javascript/channels/chat_channel.js
// Assumes you've already requested the right to send web notifications
import consumer from "./consumer" 
consumer.subscriptions.create({ channel: "ChatChannel", room: "Best Room" },
    {
         received(data) { this.appendLine(data) },
         appendLine(data) {
             const html = this.createLine(data)
             const element = document.querySelector("[data-chat-room='Best Room']")
             element.insertAdjacentHTML("beforeend", html)},
         createLine(data) {
             return `
                         <article class="chat-line">
                             <span class="speaker">${data["sent_by"]}</span>
                             <span class="body">${data["body"]}</span>
                         </article>
                         `
            }
        }
)

5.4 给通道传递参数

创建订阅时,可以将参数从客户端传递到服务器端。例如:

# app/channels/chat_channel.rb
class ChatChannel < ApplicationCable::Channel
  def subscribed
     stream_from "chat_#{params[:room]}"
   end
end

作为第一个参数传递给订阅的对象。create成为cable通道中的params散列。关键字channel是必需的:

// app/javascript/channels/chat_channel.js
import consumer from "./consumer"
consumer.subscriptions.create({ channel: "ChatChannel", room: "Best Room" }, {
    received(data) {
         this.appendLine(data)
     },
    
      appendLine(data) {
         const html = this.createLine(data)
         const element = document.querySelector("[data-chat-room='Best Room']")
         element.insertAdjacentHTML("beforeend", html)
      },

       createLine(data) {
             return `
                         <article class="chat-line">
                             <span class="speaker">${data["sent_by"]}</span>
                             <span class="body">${data["body"]}</span>
                         </article> `
         }
})

# Somewhere in your app this is called, perhaps
# from a NewCommentJob.

ActionCable.server.broadcast(
     "chat_#{room}",
     {
         sent_by: 'Paul',
         body: 'This is a cool chat app.'
     }
)

5.5 转播一条消息

一个常见的用例是将一个客户端发送的消息重新广播到任何其他连接的客户端。

# app/channels/chat_channel.rb
class ChatChannel < ApplicationCable::Channel
    def subscribed
        stream_from "chat_#{params[:room]}"
    end

    def receive(data)
        ActionCable.server.broadcast("chat_#{params[:room]}", data)
    end
end

// app/javascript/channels/chat_channel.js
import consumer from "./consumer"
const chatChannel = consumer.subscriptions.create({ channel: "ChatChannel", room: "Best Room" }, {
    received(data) {
        // data => { sent_by: "Paul", body: "This is a cool chat app." }
    }
}

chatChannel.send({ sent_by: "Paul", body: "This is a cool chat app." })

转发的广播将被所有连接的客户端接收,包括发送消息的客户端。

6 完整的例子

下面的设置步骤对于这两个例子都是通用的:

6.1 例1 用户appearance

这里有一个简单的通道例子,它可以跟踪用户是否appearance以及他们在哪个页面上。(这对于创建appearance功能很有用,比如如果用户在线,在用户名旁边显示一个绿点)。

创建服务器端appearance通道:

# app/channels/appearance_channel.rb
class AppearanceChannel < ApplicationCable::Channel
    def subscribed
        current_user.appear
    end

    def unsubscribed
        current_user.disappear
    end

    def appear(data)
        current_user.appear(on: data['appearing_on'])
    end

    def away
        current_user.away
    end
end

当订阅被启动时,订阅的回调被触发,我们借此机会说“当前用户确实出现了”。出现/消失API可以由Redis,一个数据库,或其他任何东西支持。

创建客户端appearance通道订阅:

// app/javascript/channels/appearance_channel.js
import consumer from "./consumer"
consumer.subscriptions.create("AppearanceChannel", {
    // Called once when the subscription is created.
    initialized() { this.update = this.update.bind(this) },

    // Called when the subscription is ready for use on the server.
    connected() { this.install() this.update() },

    // Called when the WebSocket connection is closed.
    disconnected() { this.uninstall() },

    // Called when the subscription is rejected by the server.
    rejected() { this.uninstall() },

    update() {
        this.documentIsActive ? this.appear() : this.away()
     },

    appear() {
         // Calls `AppearanceChannel#appear(data)` on the server.
         this.perform(
            "appear",
            {
                appearing_on: this.appearingOn   
            }
        ) },

     away() {
         // Calls `AppearanceChannel#away` on the server.
         this.perform("away")
     },

     install() {
         window.addEventListener("focus", this.update)
         window.addEventListener("blur", this.update)
         document.addEventListener("turbolinks:load", this.update)
         document.addEventListener("visibilitychange", this.update)
     },

     uninstall() {
         window.removeEventListener("focus", this.update)
         window.removeEventListener("blur", this.update)
         document.removeEventListener("turbolinks:load", this.update)
         document.removeEventListener("visibilitychange", this.update)
     },

     get documentIsActive() {
         return document.visibilityState == "visible" && document.hasFocus()
     },

     get appearingOn() {
         const element = document.querySelector("[data-appearing-on]")
         return element ? element.getAttribute("data-appearing-on") : null
     }
})

6.1.1 客户端与服务端交互

    1 客户端通过App.cable = ActionCable.createConsumer("ws://cable.example.com") 连接服务器,服务器通过current_user标识此连接。

    2 客户端通过consumer.subscriptions.create({ channel: "AppearanceChannel" })订阅appearance通道。

    3 服务器识别出为appearance通道发起的新订阅,并运行其订阅的回调,调用current_user的appear方法。

    4 客户端得知订阅已经建立,调用connected,按顺序调用install和appear方法,appear调用服务器的AppearanceChannel#appear(data),同时提供一组散列数据{ appearing_on: this.appearingOn }。这是可能的,因为服务器端通道实例自动公开类上声明的所有公共方法(出去callback),以便可以通过订阅的perform方法作为远程过程调用访问这些方法。

    5 服务器在连接的appearance通道上接收appear请求,该连接由current_user标识。Server从数据散列中检索具有:appearance _on键的数据,并将其设置为传递给current_user.appear的:on键的值。

6.2 例2 接收通知

appearance示例是关于通过WebSocket连接向客户端调用服务器公开功能的。但WebSockets的优点在于它是双向的。现在让我们展示一个示例,其中服务器调用客户机上的一个操作。

这是一个web通知通道,当你广播到正确的流时,允许你触发客户端web通知:

创建服务器端web通知通道:

# app/channels/web_notifications_channel.rb
class WebNotificationsChannel < ApplicationCable::Channel
    def subscribed
        stream_for current_user
    end
end

创建客户端web通知通道订阅:

// app/javascript/channels/web_notifications_channel.js
// Client-side which assumes you've already requested
// the right to send web notifications.
import consumer from "./consumer"
consumer.subscriptions.create("WebNotificationsChannel", {
    received(data) {
        new Notification(data["title"], body: data["body"])
    }
})

从应用程序的其他地方广播内容到web notification channel实例:

# Somewhere in your app this is called, perhaps from a NewCommentJob
WebNotificationsChannel.broadcast_to(
    current_user,
    title: 'New things!',
    body: 'All the news fit to print'
)

WebNotificationsChannel.broadcast_to的调用将消息放置在当前订阅适配器的pubsub队列中,并为每个用户使用单独的广播名称。对于ID为1的用户,广播名称将是web_notifications:1。

通道被指示将所有到达web_notifications:1的内容通过received callback直接发送到客户端。作为参数传递的数据是作为第二个参数发送到服务器端广播调用的散列,JSON为传输进行了编码,并为接收到的数据参数解包。

7 配置

Action Cable有两个必需的配置:订阅适配器和允许的请求来源。

7.1 订阅适配器

默认情况下,Action Cable在config/cable.yml中查找配置。

development:
    adapter: async

test:
    adapter: async

production:
    adapter: redis
    url: redis://10.10.3.153:6381
    channel_prefix: appname_production

7.1.1 适配器的配置

下面是可供最终用户使用的订阅适配器的列表。

7.1.1.1 Async Adapter

异步适配器用于开发/测试,不应该在生产中使用。

7.1.1.2 Redis Adapter

Redis适配器要求用户提供指向Redis服务器的URL。此外,一个channel_prefix可能会被提供,以避免在多个应用使用相同的Redis服务器时通道名称冲突。

7.1.1.3 PostgreSQL Adapter

PostgreSQL适配器使用Active Record的连接池,因此应用程序的config/database.yml数据库配置,用于其连接。

7.2 允许的请求源

Action Cable将只接受来自指定来源的请求,这些请求将作为数组传递给服务器配置。起源可以是字符串或正则表达式的实例,将对其执行匹配检查。

config.action_cable.allowed_request_origins = ['https://rubyonrails.com', %r{http://ruby.*}]

默认情况下,当在开发环境中运行时,Action Cable允许来自localhost:3000的所有请求。

7.3 consumers配置

要配置URL,请在HTML布局头部添加对action_cable_meta_tag的调用。这使用的URL或路径通常通过config.action_cable设置。环境配置文件中的url。

7.4 Worker Pool 配置

工作线程池用于在与服务器主线程隔离的情况下运行连接回调和通道操作。Action Cable允许应用程序配置worker池中同时处理的线程数。

config.action_cable.worker_pool_size = 4

另外,请注意,您的服务器必须提供至少与您的worker相同数量的数据库连接。默认的worker池大小设置为4,这意味着您必须至少建立4个可用的数据库连接。你可以更改config/database.yml的pool属性实现。

7.5 客户端日志

默认情况下禁用客户端日志记录。你可以通过设置ActionCable.logger.enabled为true来启用。

import * as ActionCable from '@rails/actioncable'

 ActionCable.logger.enabled = true

7.6 其他配置

要配置的另一个常见选项是应用于每个连接日志记录器的日志标记。下面是一个使用用户账号id(如果可用)的例子,否则在标记时使用“no-account”:

config.action_cable.log_tags = [ -> request { request.env['user_account_id'] || "no-account" }, :action_cable, -> request { request.uuid }]

要了解所有配置选项的完整列表,请参阅ActionCable::Server::configuration类。

8 运行独立的Cable服务器

8.1 in App

Action Cable可以与Rails应用程序一起运行。例如,要监听/websocket 请求,请指定config.action_cable.mount_path路径:

# config/application.rb
class Application < Rails::Application
    config.action_cable.mount_path = '/websocket'
end

如果在layout中已经放置了action_cable_meta_ta, 那么你可以使用ActionCable.createConsumer()连接cable服务器。否则,需要给createConsumer第一个参数指定路径,比如ActionCable.createConsumer("/websocket")。

对于你创建的每一个服务器实例和你的服务器产生的每一个worker,你也会有一个新的Action Cable实例,但是使用Redis可以保持消息在连接之间同步。

8.2 独立

cable服务器可以与普通的应用服务器分开。它仍然是一个rack应用程序,但它是自己的rack应用程序。推荐的基本设置如下:

# cable/config.ru
require_relative "../config/environment"Rails.application.eager_load!

run ActionCable.server

然后启动服务

#!/bin/bash
bundle exec puma -p 28080 cable/config.ru

上面的操作将在端口28080上启动一个cable服务器。

8.3 注意

WebSocket服务器不能访问会话,但是它可以访问cookie。当您需要处理身份验证时,可以使用此功能。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容