Rails:如何从服务或队列中监听/拉取?

2024-03-02

大多数 Rails 应用程序的工作方式都是等待来自客户端的请求,然后发挥其作用。 但是,如果我想将 Rails 应用程序用作微服务架构的一部分(例如),并进行一些异步通信(服务 A 将事件发送到 Kafka 或 RabbitMQ 队列,而服务 B(我的 Rails 应用程序)应该侦听该队列),我如何调整/启动 Rails 应用程序以立即侦听队列并由那里的事件触发? (这意味着初始触发器不是来自客户端,而是来自应用程序本身。)

谢谢你的建议!


我刚刚在我的应用程序中设置了 RabbitMQ 消息传递,并将在第二天左右实现解耦(多个分布式)应用程序。我发现this http://codetunes.com/2014/event-sourcing-on-rails-with-rabbitmq/文章非常有帮助(并且RabbitMQ 教程 https://www.rabbitmq.com/getstarted.html, 也)。下面的所有代码均适用于 RabbitMQ,并假设您在本地计算机上启动并运行了 RabbitMQ 服务器。

这是我到目前为止所拥有的 - 这对我有用:

  #Gemfile
  gem 'bunny'
  gem 'sneakers'

我有一个Publisher发送到队列:

  # app/agents/messaging/publisher.rb
  module Messaging
    class Publisher
      class << self

        def publish(args)
          connection = Bunny.new
          connection.start
          channel = connection.create_channel
          queue_name = "#{args.keys.first.to_s.pluralize}_queue"
          queue = channel.queue(queue_name, durable: true)
          channel.default_exchange.publish(args[args.keys.first].to_json, :routing_key => queue.name)
          puts "in #{self}.#{__method__}, [x] Sent #{args}!"
          connection.close
        end

      end
    end
  end

我这样使用:

  Messaging::Publisher.publish(event: {... event details...})

然后我就有了我的“听众”:

  # app/agents/messaging/events_queue_receiver.rb
  require_dependency "#{Rails.root.join('app','agents','messaging','events_agent')}"

  module Messaging
    class EventsQueueReceiver
      include Sneakers::Worker
      from_queue :events_queue, env: nil

      def work(msg)
        logger.info msg
        response = Messaging::EventsAgent.distribute(JSON.parse(msg).with_indifferent_access)
        ack! if response[:success]
      end

    end
  end

“监听者”将消息发送至Messaging::EventsAgent.distribute,就像这样:

  # app/agents/messaging/events_agent.rb
 require_dependency  #{Rails.root.join('app','agents','fsm','state_assignment_agent')}"

  module Messaging
    class EventsAgent
      EVENT_HANDLERS = {
        enroll_in_program: ["FSM::StateAssignmentAgent"]
      }
      class << self

        def publish(event)
          Messaging::Publisher.publish(event: event)
        end

        def distribute(event)
          puts "in #{self}.#{__method__}, message"
          if event[:handler]
            puts "in #{self}.#{__method__}, event[:handler: #{event[:handler}"
            event[:handler].constantize.handle_event(event)
          else
            event_name = event[:event_name].to_sym
            EVENT_HANDLERS[event_name].each do |handler|
              event[:handler] = handler
              publish(event)
            end
          end
          return {success: true}
        end

      end
    end
  end

按照 Codetunes 上的说明,我有:

  # Rakefile
  # Add your own tasks in files placed in lib/tasks ending in .rake,
  # for example lib/tasks/capistrano.rake, and they will automatically be available to Rake.

  require File.expand_path('../config/application', __FILE__)

  require 'sneakers/tasks'
  Rails.application.load_tasks

And:

  # app/config/sneakers.rb
  Sneakers.configure({})
  Sneakers.logger.level = Logger::INFO # the default DEBUG is too noisy

我打开两个控制台窗口。首先,我说(让我的监听器运行):

  $ WORKERS=Messaging::EventsQueueReceiver rake sneakers:run
  ... a bunch of start up info
  2016-03-18T14:16:42Z p-5877 t-14d03e INFO: Heartbeat interval used (in seconds): 2
  2016-03-18T14:16:42Z p-5899 t-14d03e INFO: Heartbeat interval used (in seconds): 2
  2016-03-18T14:16:42Z p-5922 t-14d03e INFO: Heartbeat interval used (in seconds): 2
  2016-03-18T14:16:42Z p-5944 t-14d03e INFO: Heartbeat interval used (in seconds): 2

在第二部分,我说:

  $ rails s --sandbox
  2.1.2 :001 > Messaging::Publisher.publish({:event=>{:event_name=>"enroll_in_program", :program_system_name=>"aha_chh", :person_id=>1}})
  in Messaging::Publisher.publish, [x] Sent {:event=>{:event_name=>"enroll_in_program", :program_system_name=>"aha_chh", :person_id=>1}}!
  => :closed 

然后,回到我的第一个窗口,我看到:

  2016-03-18T14:17:44Z p-5877 t-19nfxy INFO: {"event_name":"enroll_in_program","program_system_name":"aha_chh","person_id":1}
  in Messaging::EventsAgent.distribute, message
  in Messaging::EventsAgent.distribute, event[:handler]: FSM::StateAssignmentAgent

在我的 RabbitMQ 服务器中,我看到:

这是一个非常简单的设置,我相信在接下来的几天里我会学到更多。

祝你好运!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Rails:如何从服务或队列中监听/拉取? 的相关文章

随机推荐

  • Windows Phone 运行时没有页面转换

    我正在尝试禁用 Windows Phone 运行时的默认页面导航动画 我所能实现的就是将其更改为连续体 十字转门或幻灯片 但我希望页面能够立即更改 因此 当我调用 this Frame navigate 时 它应该导航到新页面 而无需任何动
  • 从 Windows 上下文菜单接收参数

    我以前做过这个 但我一生都不记得该怎么做 在我的资源管理器上下文菜单中 我添加了一个新条目 转到 regedit 转到 HKEY CLASSES ROOT bla bla bla 现在 当我单击我的选项时 我想传递文件路径 文件名等我的应用
  • 如何从服务 C# 捕获控制台输出?

    我们有一个部署到远程客户系统的 C 服务 应用程序将大量 诊断 信息写入控制台 即 Console WriteLine 该服务没有 做它应该做的事 我们如何捕获另一个应用程序中服务的控制台输出 WinForm 版本的应用程序可以在客户位置加
  • Gradle Jacoco 和 JUnit5

    我们刚刚将单元测试移植到 JUnit5 意识到这仍然是相当早期的采用 谷歌上几乎没有任何提示 最具挑战性的是为我们在 jenkins 上使用的 Junit5 测试获取 jacoco 代码覆盖率 因为这花了我几乎一天的时间才弄清楚 所以我想我
  • 注册为 Spring bean 时过滤器调用两次

    我想用 Autowire with a Filter 所以我在中定义我的过滤器SecurityConfig如下 Override protected void configure HttpSecurity http throws Excep
  • 如何在页面之间共享信息

    在开始之前 正如标题中所述 我正在学习 NET MAUI 而且我对此还很陌生 我的问题是我找不到从一个页面到上一个页面共享信息的方法 我想做的是 在 MainPage 中 有一个按钮 按下后会将用户发送到另一个页面 我们将其称为 Login
  • :为什么我无法设置Xamarin.Forms.ListView的SelectedItem属性?

    object lastItem null foreach object item in listView ItemsSource lastItem item if lastItem null listView SelectedItem la
  • 获取文件的最后修改日期/时间作为本地日期/时间字符串

    new File url lastModified 返回一个long等于自纪元以来的毫秒数 基于 GMT 将其转换为一个简单的方法是什么String代表系统本地日期 时间 如果你真的需要看到我的尝试 那就是 但这是一团糟 而且无论如何都是错
  • 如何从 Rails 内部重新启动 Rails?

    好的 所以我想在 Rails 中创建一个操作来重新启动自身 我做了一些搜索并发现 http snippets dzone com posts show 5002 http snippets dzone com posts show 5002
  • 无法从 Metro 风格应用程序获取可用磁盘空间

    我正在编写一个 Metro 风格的应用程序 想要确定托管用户音乐库的驱动器的可用存储容量 我想在磁盘上没有剩余空间或剩余空间很少的情况下禁用某些应用程序功能 我使用 P Invoke 调用 GetDiskFreeSpaceExW 并收到错误
  • 如何计算(A*B)%C? [复制]

    这个问题在这里已经有答案了 有人可以帮我如何计算吗 A B C where 1 lt A B C lt 10 18在C 中 没有big num 只是一种数学方法 从我的脑海中浮现出来 未经广泛测试 typedef unsigned long
  • 应如何聚合公开子实体的信息?

    从这个问题跟进实体是否应该有方法 如果有 如何防止它们在聚合之外被调用 https stackoverflow com questions 51907447 should entity have methods and if so how
  • 从 GNU Octave 中的矩阵中删除一列

    在 GNU Octave 中 我希望能够从矩阵中删除特定的列 为了一般性 我还希望能够从矩阵中删除特定行 假设我有这个 mymatrix eye 5 mymatrix Diagonal Matrix 1 0 0 0 0 0 1 0 0 0
  • 合并 Pig 中的两行

    我想为下面的查询编写一个猪脚本 输入是 ABC DEF GHI JKL MNO PQR STU VWX 输出应该是 ABC DEF GHI JKL MNO PQR STU VWX 有人可以帮我吗 使用土猪很难解决这个问题 一种选择是下载da
  • (量角器)检查单击时是否禁用输入?

    我有两个输入字段 用户名和密码以及一个微调按钮 当我单击此微调按钮时 这两个输入字段将被禁用 并且我将被重定向到另一个页面 我正在编写一个端到端测试来检查这些输入字段是否被禁用 element by model username sendK
  • 如何按不同时区的年月日进行聚合

    我有一个 MongoDB 它以 UTC 格式存储日期对象 好吧 我想按不同时区 CET 的年 月 日执行聚合 这样做对于 UTC 来说效果很好 BasicDBObject group id new BasicDBObject id new
  • 为什么 Big Query 远程函数无法激活超过 60 个云函数实例?

    我已经开始使用远程功能 https cloud google com bigquery docs reference standard sql remote functions https cloud google com bigquery
  • 有没有办法在 iOS 上自动更新(或只是清除缓存)PWA?

    我一直在 iOS 上努力尝试一些在 Android 上可以轻松运行的东西 让我的 PWA 在有新版本时自动更新 我根本不确定这在 iOS 上是否可行 我使用 vue js 和 Quasar 来构建我的应用程序 一切都可以在 Android
  • 检查用户是否处于特定半径(Flutter)

    我想使用地理位置检查用户是否在他 她的房子里 该应用程序将在前台运行 并且不会显示地图 用户的房屋纬度和经度将存储在 Firestore 中 当用户打开应用程序时 我想检查他 她是否在他的房子里 由于房屋大小可能会有所不同 我会考虑以几米为
  • Rails:如何从服务或队列中监听/拉取?

    大多数 Rails 应用程序的工作方式都是等待来自客户端的请求 然后发挥其作用 但是 如果我想将 Rails 应用程序用作微服务架构的一部分 例如 并进行一些异步通信 服务 A 将事件发送到 Kafka 或 RabbitMQ 队列 而服务