MongoDB
 sql >> Database >  >> NoSQL >> MongoDB

Trasferisci Kafka Stream su MongoDB usando PySpark Structured Streaming

Ho trovato una soluzione. Dal momento che non riuscivo a trovare il driver Mongo giusto per lo streaming strutturato, ho lavorato su un'altra soluzione. Ora uso la connessione diretta a mongoDb e uso "foreach(...)" invece di foreachbatch(. ..). Il mio codice è simile a questo nel file testSpark.py:

....
import pymongo
from pymongo import MongoClient

local_url = "mongodb://localhost:27017"


def write_machine_df_mongo(target_df):

    cluster = MongoClient(local_url)
    db = cluster["test_db"]
    collection = db.test1

    post = {
            "machine_id": target_df.machine_id,
            "proc_type": target_df.proc_type,
            "sensor1_id": target_df.sensor1_id,
            "sensor2_id": target_df.sensor2_id,
            "time": target_df.time,
            "sensor1_val": target_df.sensor1_val,
            "sensor2_val": target_df.sensor2_val,
            }

    collection.insert_one(post)

machine_df.writeStream\
    .outputMode("append")\
    .foreach(write_machine_df_mongo)\
    .start()