Redis
 sql >> Database >  >> NoSQL >> Redis

Flask per esempio:implementazione di una coda di attività Redis

Questa parte del tutorial descrive in dettaglio come implementare una coda di attività Redis per gestire l'elaborazione del testo.

Aggiornamenti:

  • 02/12/2020:aggiornato a Python versione 3.8.1 e alle ultime versioni di Redis, Python Redis e RQ. Vedi sotto per i dettagli. Menziona un bug nell'ultima versione di RQ e fornisci una soluzione. Risolto il bug http prima di https.
  • 22/03/2016:aggiornato a Python versione 3.5.1 e alle ultime versioni di Redis, Python Redis e RQ. Vedi sotto per i dettagli.
  • 22/02/2015:Aggiunto supporto per Python 3.

Bonus gratuito: Fai clic qui per accedere a un video tutorial gratuito di Flask + Python che ti mostra come creare l'app Web Flask, passo dopo passo.

Ricorda:ecco cosa stiamo costruendo:un'app Flask che calcola le coppie parola-frequenza in base al testo di un determinato URL.

  1. Parte uno:imposta un ambiente di sviluppo locale e quindi distribuisci sia un ambiente di staging che un ambiente di produzione su Heroku.
  2. Parte due:configurare un database PostgreSQL insieme a SQLAlchemy e Alembic per gestire le migrazioni.
  3. Parte terza:aggiungi la logica di back-end per raschiare e quindi elaborare il conteggio delle parole da una pagina web utilizzando le librerie Request, BeautifulSoup e Natural Language Toolkit (NLTK).
  4. Parte quarta:implementare una coda di attività Redis per gestire l'elaborazione del testo. (attuale )
  5. Parte cinque:imposta Angular sul front-end per eseguire continuamente il polling del back-end per vedere se l'elaborazione della richiesta è stata completata.
  6. Parte sei:push al server di staging su Heroku:configurazione di Redis e dettagli su come eseguire due processi (web e worker) su un singolo Dyno.
  7. Parte 7:aggiorna il front-end per renderlo più intuitivo.
  8. Parte otto:crea una direttiva angolare personalizzata per visualizzare un grafico di distribuzione della frequenza utilizzando JavaScript e D3.

Ti serve il codice? Prendilo dal repository.


Requisiti di installazione

Strumenti utilizzati:

  • Redis (5.0.7)
  • Python Redis (3.4.1)
  • RQ (1.2.2) - una semplice libreria per creare una coda di attività

Inizia scaricando e installando Redis dal sito ufficiale o tramite Homebrew (brew install redis ). Una volta installato, avvia il server Redis:

$ redis-server

Quindi installa Python Redis e RQ in una nuova finestra di terminale:

$ cd flask-by-example
$ python -m pip install redis==3.4.1 rq==1.2.2
$ python -m pip freeze > requirements.txt


Imposta il lavoratore

Iniziamo creando un processo di lavoro per ascoltare le attività in coda. Crea un nuovo file worker.py e aggiungi questo codice:

import os

import redis
from rq import Worker, Queue, Connection

listen = ['default']

redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')

conn = redis.from_url(redis_url)

if __name__ == '__main__':
    with Connection(conn):
        worker = Worker(list(map(Queue, listen)))
        worker.work()

Qui, abbiamo ascoltato una coda chiamata default e stabilito una connessione al server Redis su localhost:6379 .

Attivalo in un'altra finestra del terminale:

$ cd flask-by-example
$ python worker.py
17:01:29 RQ worker started, version 0.5.6
17:01:29
17:01:29 *** Listening on default...

Ora dobbiamo aggiornare il nostro app.py per inviare lavori alla coda...



Aggiorna app.py

Aggiungi le seguenti importazioni a app.py :

from rq import Queue
from rq.job import Job
from worker import conn

Quindi aggiorna la sezione di configurazione:

app = Flask(__name__)
app.config.from_object(os.environ['APP_SETTINGS'])
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)

q = Queue(connection=conn)

from models import *

q = Queue(connection=conn) impostare una connessione Redis e inizializzare una coda in base a tale connessione.

Sposta la funzionalità di elaborazione del testo fuori dal percorso dell'indice e in una nuova funzione chiamata count_and_save_words() . Questa funzione accetta un argomento, un URL, che gli passeremo quando lo chiameremo dal nostro percorso di indice.

def count_and_save_words(url):

    errors = []

    try:
        r = requests.get(url)
    except:
        errors.append(
            "Unable to get URL. Please make sure it's valid and try again."
        )
        return {"error": errors}

    # text processing
    raw = BeautifulSoup(r.text).get_text()
    nltk.data.path.append('./nltk_data/')  # set the path
    tokens = nltk.word_tokenize(raw)
    text = nltk.Text(tokens)

    # remove punctuation, count raw words
    nonPunct = re.compile('.*[A-Za-z].*')
    raw_words = [w for w in text if nonPunct.match(w)]
    raw_word_count = Counter(raw_words)

    # stop words
    no_stop_words = [w for w in raw_words if w.lower() not in stops]
    no_stop_words_count = Counter(no_stop_words)

    # save the results
    try:
        result = Result(
            url=url,
            result_all=raw_word_count,
            result_no_stop_words=no_stop_words_count
        )
        db.session.add(result)
        db.session.commit()
        return result.id
    except:
        errors.append("Unable to add item to database.")
        return {"error": errors}


@app.route('/', methods=['GET', 'POST'])
def index():
    results = {}
    if request.method == "POST":
        # this import solves a rq bug which currently exists
        from app import count_and_save_words

        # get url that the person has entered
        url = request.form['url']
        if not url[:8].startswith(('https://', 'http://')):
            url = 'http://' + url
        job = q.enqueue_call(
            func=count_and_save_words, args=(url,), result_ttl=5000
        )
        print(job.get_id())

    return render_template('index.html', results=results)

Prendi nota del seguente codice:

job = q.enqueue_call(
    func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())

Nota: Dobbiamo importare il count_and_save_words funzione nella nostra funzione index poiché il pacchetto RQ ha attualmente un bug, per cui non troverà funzioni nello stesso modulo.

Qui abbiamo usato la coda che abbiamo inizializzato in precedenza e chiamata enqueue_call() funzione. Questo ha aggiunto un nuovo lavoro alla coda e quel lavoro ha eseguito count_and_save_words() funzione con l'URL come argomento. Il result_ttl=5000 line argument dice a RQ per quanto tempo mantenere il risultato del lavoro per - 5.000 secondi, in questo caso. Quindi abbiamo inviato l'ID del lavoro al terminale. Questo ID è necessario per vedere se l'elaborazione del lavoro è terminata.

Impostiamo un nuovo percorso per quello...



Ottieni risultati

@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        return str(job.result), 200
    else:
        return "Nay!", 202

Proviamolo.

Avvia il server, vai a http://localhost:5000/, usa l'URL https://realpython.com e prendi l'ID lavoro dal terminale. Quindi usa quell'id nell'endpoint '/results/', ad esempio http://localhost:5000/results/ef600206-3503-4b87-a436-ddd9438f2197.

Se sono trascorsi meno di 5.000 secondi prima di controllare lo stato, dovresti vedere un numero ID, che viene generato quando aggiungiamo i risultati al database:

# save the results
try:
    from models import Result
    result = Result(
        url=url,
        result_all=raw_word_count,
        result_no_stop_words=no_stop_words_count
    )
    db.session.add(result)
    db.session.commit()
    return result.id

Ora, riformuliamo leggermente il percorso per restituire i risultati effettivi dal database in JSON:

@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        result = Result.query.filter_by(id=job.result).first()
        results = sorted(
            result.result_no_stop_words.items(),
            key=operator.itemgetter(1),
            reverse=True
        )[:10]
        return jsonify(results)
    else:
        return "Nay!", 202

Assicurati di aggiungere l'importazione:

from flask import jsonify

Provalo di nuovo. Se tutto è andato bene, dovresti vedere qualcosa di simile a nel tuo browser:

[
  [
    "Python", 
    315
  ], 
  [
    "intermediate", 
    167
  ], 
  [
    "python", 
    161
  ], 
  [
    "basics", 
    118
  ], 
  [
    "web-dev", 
    108
  ], 
  [
    "data-science", 
    51
  ], 
  [
    "best-practices", 
    49
  ], 
  [
    "advanced", 
    45
  ], 
  [
    "django", 
    43
  ], 
  [
    "flask", 
    41
  ]
]


Cosa c'è dopo?

Bonus gratuito: Fai clic qui per accedere a un video tutorial gratuito di Flask + Python che ti mostra come creare l'app Web Flask, passo dopo passo.

Nella parte 5 uniremo client e server aggiungendo Angular al mix per creare un poller, che invierà una richiesta ogni cinque secondi a /results/<job_key> endpoint che richiede aggiornamenti. Una volta che i dati saranno disponibili, li aggiungeremo al DOM.

Saluti!

Questo è un pezzo di collaborazione tra Cam Linke, co-fondatore di Startup Edmonton, e la gente di Real Python