EDIT AGGIORNATO ALLA FINE:Mostra il codice funzionante. Modulo principale non modificato ad eccezione del codice di debug. Nota:ho riscontrato il problema che ho già notato in merito alla necessità di annullare l'iscrizione prima della risoluzione.
Il codice sembra corretto. Mi piacerebbe vedere come lo stai istanziando.
In config/application.rb, probabilmente hai almeno qualcosa come:
require 'ws_communication'
config.middleware.use WsCommunication
Quindi, nel tuo client JavaScript, dovresti avere qualcosa del genere:
var ws = new WebSocket(uri);
Istanziate un'altra istanza di WsCommunication? Ciò imposterebbe @clients su un array vuoto e potrebbe mostrare i tuoi sintomi. Qualcosa del genere non sarebbe corretto:
var ws = new WsCommunication;
Ci aiuterebbe se mostrassi il client e, forse, config/application.rb se questo post non aiuta.
A proposito, sono d'accordo con il commento secondo cui @clients dovrebbe essere protetto da un mutex su qualsiasi aggiornamento, in caso contrario si legge anche. È una struttura dinamica che potrebbe cambiare in qualsiasi momento in un sistema basato sugli eventi. redis-mutex è una buona opzione. (Spero che il collegamento sia corretto poiché Github sembra generare 500 errori su tutto al momento.)
Potresti anche notare che $redis.publish restituisce un valore intero del numero di client che hanno ricevuto il messaggio.
Infine, potresti scoprire che devi assicurarti che l'iscrizione al tuo canale sia annullata prima della chiusura. Ho avuto situazioni in cui ho finito per inviare ogni messaggio più volte, anche molte, a causa di iscrizioni precedenti allo stesso canale che non erano state ripulite. Poiché ti stai iscrivendo al canale all'interno di un thread, dovrai annullare l'iscrizione all'interno dello stesso thread o il processo si "bloccherà" in attesa che appaia magicamente il thread giusto. Gestisco quella situazione impostando un flag "annulla iscrizione" e quindi inviando un messaggio. Quindi, all'interno del blocco on.message, verifico il flag di annullamento dell'iscrizione ed emetto lì l'annullamento dell'iscrizione.
Il modulo che hai fornito, con solo lievi modifiche al debug:
require 'faye/websocket'
require 'redis'
class WsCommunication
KEEPALIVE_TIME = 15 #seconds
CHANNEL = 'vip-deck'
def initialize(app)
@app = app
@clients = []
uri = URI.parse(ENV['REDISCLOUD_URL'])
$redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
Thread.new do
redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
redis_sub.subscribe(CHANNEL) do |on|
on.message do |channel, msg|
puts "Message event. Clients receiving:#{@clients.count};"
@clients.each { |ws| ws.send(msg) }
end
end
end
end
def call(env)
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})
ws.on :open do |event|
@clients << ws
puts "Open event. Clients open:#{@clients.count};"
end
ws.on :message do |event|
receivers = $redis.publish(CHANNEL, event.data)
puts "Message published:#{event.data}; Receivers:#{receivers};"
end
ws.on :close do |event|
@clients.delete(ws)
puts "Close event. Clients open:#{@clients.count};"
ws = nil
end
ws.rack_response
else
@app.call(env)
end
end
end
Il codice dell'abbonato al test che ho fornito:
# encoding: UTF-8
puts "Starting client-subscriber.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'websocket-client-simple'
puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
url = ARGV.shift || 'ws://localhost:3000'
EM.run do
ws = WebSocket::Client::Simple.connect url
ws.on :message do |msg|
puts msg
end
ws.on :open do
puts "-- Subscriber open (#{ws.url})"
end
ws.on :close do |e|
puts "-- Subscriber close (#{e.inspect})"
exit 1
end
ws.on :error do |e|
puts "-- Subscriber error (#{e.inspect})"
end
end
Il codice dell'editore di prova che ho fornito. L'editore e l'abbonato possono essere facilmente combinati, poiché questi sono solo test:
# encoding: UTF-8
puts "Starting client-publisher.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'json'
require 'websocket-client-simple'
puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
url = ARGV.shift || 'ws://localhost:3000'
EM.run do
count ||= 0
timer = EventMachine.add_periodic_timer(5+rand(5)) do
count += 1
send({"MESSAGE": "COUNT:#{count};"})
end
@ws = WebSocket::Client::Simple.connect url
@ws.on :message do |msg|
puts msg
end
@ws.on :open do
puts "-- Publisher open"
end
@ws.on :close do |e|
puts "-- Publisher close (#{e.inspect})"
exit 1
end
@ws.on :error do |e|
puts "-- Publisher error (#{e.inspect})"
@ws.close
end
def self.send message
payload = message.is_a?(Hash) ? message : {payload: message}
@ws.send(payload.to_json)
end
end
Un esempio di config.ru che esegue tutto questo al livello del middleware del rack:
require './controllers/main'
require './middlewares/ws_communication'
use WsCommunication
run Main.new
Questo è il principale. L'ho rimosso dalla mia versione in esecuzione, quindi potrebbe essere necessario modificarlo se lo usi:
%w(rubygems bundler sinatra/base json erb).each { |m| require m }
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)
Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
class Main < Sinatra::Base
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
get "/" do
erb :"index.html"
end
get "/assets/js/application.js" do
content_type :js
@scheme = env == "production" ? "wss://" : "ws://"
erb :"application.js"
end
end