Redis
 sql >> Database >  >> NoSQL >> Redis

Accesso a una variabile all'interno di un thread di binari

EDIT AGGIORNATO ALLA FINE:Mostra il codice funzionante. Modulo principale non modificato ad eccezione del codice di debug. Nota:ho riscontrato il problema che ho già notato in merito alla necessità di annullare l'iscrizione prima della risoluzione.

Il codice sembra corretto. Mi piacerebbe vedere come lo stai istanziando.

In config/application.rb, probabilmente hai almeno qualcosa come:

require 'ws_communication'
config.middleware.use WsCommunication

Quindi, nel tuo client JavaScript, dovresti avere qualcosa del genere:

var ws = new WebSocket(uri);

Istanziate un'altra istanza di WsCommunication? Ciò imposterebbe @clients su un array vuoto e potrebbe mostrare i tuoi sintomi. Qualcosa del genere non sarebbe corretto:

var ws = new WsCommunication;

Ci aiuterebbe se mostrassi il client e, forse, config/application.rb se questo post non aiuta.

A proposito, sono d'accordo con il commento secondo cui @clients dovrebbe essere protetto da un mutex su qualsiasi aggiornamento, in caso contrario si legge anche. È una struttura dinamica che potrebbe cambiare in qualsiasi momento in un sistema basato sugli eventi. redis-mutex è una buona opzione. (Spero che il collegamento sia corretto poiché Github sembra generare 500 errori su tutto al momento.)

Potresti anche notare che $redis.publish restituisce un valore intero del numero di client che hanno ricevuto il messaggio.

Infine, potresti scoprire che devi assicurarti che l'iscrizione al tuo canale sia annullata prima della chiusura. Ho avuto situazioni in cui ho finito per inviare ogni messaggio più volte, anche molte, a causa di iscrizioni precedenti allo stesso canale che non erano state ripulite. Poiché ti stai iscrivendo al canale all'interno di un thread, dovrai annullare l'iscrizione all'interno dello stesso thread o il processo si "bloccherà" in attesa che appaia magicamente il thread giusto. Gestisco quella situazione impostando un flag "annulla iscrizione" e quindi inviando un messaggio. Quindi, all'interno del blocco on.message, verifico il flag di annullamento dell'iscrizione ed emetto lì l'annullamento dell'iscrizione.

Il modulo che hai fornito, con solo lievi modifiche al debug:

require 'faye/websocket'
require 'redis'

class WsCommunication
  KEEPALIVE_TIME = 15 #seconds
  CHANNEL = 'vip-deck'

  def initialize(app)
    @app = app
    @clients = []
    uri = URI.parse(ENV['REDISCLOUD_URL'])
    $redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
    Thread.new do
      redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
      redis_sub.subscribe(CHANNEL) do |on|
        on.message do |channel, msg|
          puts "Message event. Clients receiving:#{@clients.count};"
          @clients.each { |ws| ws.send(msg) }
        end
      end
    end
  end

  def call(env)
    if Faye::WebSocket.websocket?(env)
      ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})

      ws.on :open do |event|
        @clients << ws
        puts "Open event. Clients open:#{@clients.count};"
      end

      ws.on :message do |event|
        receivers = $redis.publish(CHANNEL, event.data)
        puts "Message published:#{event.data}; Receivers:#{receivers};"
      end

      ws.on :close do |event|
        @clients.delete(ws)
        puts "Close event. Clients open:#{@clients.count};"
        ws = nil
      end

      ws.rack_response
    else
      @app.call(env)
    end
  end
end

Il codice dell'abbonato al test che ho fornito:

# encoding: UTF-8
puts "Starting client-subscriber.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'websocket-client-simple'

puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"

url = ARGV.shift || 'ws://localhost:3000'

EM.run do

  ws = WebSocket::Client::Simple.connect url

  ws.on :message do |msg|
    puts msg
  end

  ws.on :open do
    puts "-- Subscriber open (#{ws.url})"
  end

  ws.on :close do |e|
    puts "-- Subscriber close (#{e.inspect})"
    exit 1
  end

  ws.on :error do |e|
    puts "-- Subscriber error (#{e.inspect})"
  end

end

Il codice dell'editore di prova che ho fornito. L'editore e l'abbonato possono essere facilmente combinati, poiché questi sono solo test:

# encoding: UTF-8
puts "Starting client-publisher.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'json'
require 'websocket-client-simple'

puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"

url = ARGV.shift || 'ws://localhost:3000'

EM.run do
  count ||= 0
  timer = EventMachine.add_periodic_timer(5+rand(5)) do
    count += 1
    send({"MESSAGE": "COUNT:#{count};"})
  end

  @ws = WebSocket::Client::Simple.connect url

  @ws.on :message do |msg|
    puts msg
  end

  @ws.on :open do
    puts "-- Publisher open"
  end

  @ws.on :close do |e|
    puts "-- Publisher close (#{e.inspect})"
    exit 1
  end

  @ws.on :error do |e|
    puts "-- Publisher error (#{e.inspect})"
    @ws.close
  end

  def self.send message
    payload = message.is_a?(Hash) ? message : {payload: message}
    @ws.send(payload.to_json)
  end
end

Un esempio di config.ru che esegue tutto questo al livello del middleware del rack:

require './controllers/main'
require './middlewares/ws_communication'
use WsCommunication
run Main.new

Questo è il principale. L'ho rimosso dalla mia versione in esecuzione, quindi potrebbe essere necessario modificarlo se lo usi:

%w(rubygems bundler sinatra/base json erb).each { |m| require m }
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)

Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']

  class Main < Sinatra::Base

    env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
    get "/" do
      erb :"index.html"
    end

    get "/assets/js/application.js" do
      content_type :js
      @scheme = env == "production" ? "wss://" : "ws://"
      erb :"application.js"
    end
  end