A livello superficiale, l'unica cosa su cui ho domande è l'ordine di aumentare il gruppo di attesa e l'affidamento del lavoro:
func (s *Scheduler) Enqueue(req interface{}) {
select {
case s.reqChan <- req:
s.wg.Add(1)
}
}
Non credo che quanto sopra causerà molti problemi in pratica con questo grande carico di lavoro, ma penso che potrebbe essere una logica condizione di gara. A livelli più bassi di simultaneità e dimensioni di lavoro più piccole, può accodare un messaggio, passare successivamente a una goroutine che inizia a lavorare su quel messaggio, POI il lavoro nel gruppo di attesa.
Quindi sei sicuro di process
il metodo è threadsafe?? Presumo che in base alla documentazione redis go , funzioni con go run -race
hai qualche output?
Ad un certo punto è del tutto ragionevole e prevedibile che le prestazioni diminuiscano. Consiglierei di avviare i test delle prestazioni per vedere dove la latenza e il throughput iniziano a diminuire:
forse un pool di 10, 100, 500, 1000, 2500, 5000, 10000 o qualsiasi altra cosa abbia senso. IMO sembra che ci siano 3 variabili importanti da regolare:
- Dimensioni del pool di lavoratori
- Dimensioni del buffer della coda di lavoro
- Redis
MaxActive
La cosa più importante che salta fuori è che sembra che redis.Pool sia configurato per consentire un numero illimitato di connessioni:
pool := &redis.Pool{
MaxIdle: 50,
IdleTimeout: 240 * time.Second,
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
Dial: func() (redis.Conn, error) {
return dial("tcp", address, password)
},
}
// Numero massimo di connessioni allocate dal pool in un dato momento.// Se zero, non c'è limite al numero di connessioni nel pool.MaxActive int
Personalmente cercherei di capire dove e quando le prestazioni iniziano a diminuire rispetto alle dimensioni del tuo pool di lavoratori. Questo potrebbe rendere più facile capire da cosa è vincolato il tuo programma.