db.collection.aggregate(
[
{
"$addFields": {
"indexes": {
"$range": [
0,
{
"$size": "$time_series"
}
]
},
"reversedSeries": {
"$reverseArray": "$time_series"
}
}
},
{
"$project": {
"derivatives": {
"$reverseArray": {
"$slice": [
{
"$map": {
"input": {
"$zip": {
"inputs": [
"$reversedSeries",
"$indexes"
]
}
},
"in": {
"$subtract": [
{
"$arrayElemAt": [
"$$this",
0
]
},
{
"$arrayElemAt": [
"$reversedSeries",
{
"$add": [
{
"$arrayElemAt": [
"$$this",
1
]
},
1
]
}
]
}
]
}
}
},
{
"$subtract": [
{
"$size": "$time_series"
},
1
]
}
]
}
},
"time_series": 1
}
}
]
)
Per farlo, possiamo utilizzare la pipeline precedente nella versione 3.4+. Nella pipeline, utilizziamo il $addFields
fase della pipeline. per aggiungere l'array dell'indice degli elementi di "serie_time" da fare, abbiamo anche invertito l'array delle serie temporali e lo abbiamo aggiunto al documento utilizzando rispettivamente $range
e $reverseArray
operatori
Abbiamo invertito l'array qui perché l'elemento nella posizione p
nell'array è sempre maggiore dell'elemento nella posizione p+1
il che significa che [p] - [p+1] < 0
e non vogliamo utilizzare $multiply
qui.(vedi pipeline per la versione 3.2)
Successivamente $zipped
i dati delle serie temporali con l'array degli indici e applicato un substract
espressione all'array risultante utilizzando $map
operatore.
Quindi $slice
il risultato per eliminare null/None
valore dall'array e annullato il risultato.
In 3.2 possiamo usare $unwind
operatore per rilassarsi il nostro array e includere l'indice di ogni elemento nell'array specificando un documento come operando invece del tradizionale "percorso" preceduto da $ .
Successivamente, dobbiamo $group
i nostri documenti e utilizzare il $push
accumulatore per restituire un array di documenti secondari che assomigliano a questo:
{
"_id" : ObjectId("57c11ddbe860bd0b5df6bc64"),
"time_series" : [
{ "value" : 10, "index" : NumberLong(0) },
{ "value" : 20, "index" : NumberLong(1) },
{ "value" : 40, "index" : NumberLong(2) },
{ "value" : 70, "index" : NumberLong(3) },
{ "value" : 110, "index" : NumberLong(4) }
]
}
Finalmente arriva il $project
palcoscenico. In questa fase, dobbiamo utilizzare $map
per applicare una serie di espressioni a ciascun elemento nell'array appena calcolato nel $group
fase.
Ecco cosa sta succedendo all'interno di $map
(vedi $map
come ciclo for) in espressione:
Per ogni documento secondario, assegniamo il valore campo a una variabile utilizzando $let
operatore variabile. Quindi sottraiamo il valore dal valore del campo "valore" dell'elemento successivo nell'array.
Poiché l'elemento successivo nell'array è l'elemento nell'indice corrente più uno, tutto ciò di cui abbiamo bisogno è l'aiuto di $arrayElemAt
operatore e un semplice $add
izione dell'indice dell'elemento corrente e 1
.
Il $subtract
l'espressione restituisce un valore negativo, quindi dobbiamo moltiplicare il valore per -1
utilizzando $multiply
operatore.
Abbiamo anche bisogno di $filter
l'array risultante perché l'ultimo elemento è None
o null
. Il motivo è che quando l'elemento corrente è l'ultimo elemento, $subtract
return None
perché l'indice dell'elemento successivo è uguale alla dimensione dell'array.
db.collection.aggregate([
{
"$unwind": {
"path": "$time_series",
"includeArrayIndex": "index"
}
},
{
"$group": {
"_id": "$_id",
"time_series": {
"$push": {
"value": "$time_series",
"index": "$index"
}
}
}
},
{
"$project": {
"time_series": {
"$filter": {
"input": {
"$map": {
"input": "$time_series",
"as": "el",
"in": {
"$multiply": [
{
"$subtract": [
"$$el.value",
{
"$let": {
"vars": {
"nextElement": {
"$arrayElemAt": [
"$time_series",
{
"$add": [
"$$el.index",
1
]
}
]
}
},
"in": "$$nextElement.value"
}
}
]
},
-1
]
}
}
},
"as": "item",
"cond": {
"$gte": [
"$$item",
0
]
}
}
}
}
}
])
Un'altra opzione che ritengo meno efficiente è eseguire un'operazione di mappatura/riduzione sulla nostra raccolta utilizzando map_reduce
metodo.
>>> import pymongo
>>> from bson.code import Code
>>> client = pymongo.MongoClient()
>>> db = client.test
>>> collection = db.collection
>>> mapper = Code("""
... function() {
... var derivatives = [];
... for (var index=1; index<this.time_series.length; index++) {
... derivatives.push(this.time_series[index] - this.time_series[index-1]);
... }
... emit(this._id, derivatives);
... }
... """)
>>> reducer = Code("""
... function(key, value) {}
... """)
>>> for res in collection.map_reduce(mapper, reducer, out={'inline': 1})['results']:
... print(res) # or do something with the document.
...
{'value': [10.0, 20.0, 30.0, 40.0], '_id': ObjectId('57c11ddbe860bd0b5df6bc64')}
Puoi anche recuperare tutto il documento e utilizzare il numpy.diff
per restituire il derivato in questo modo:
import numpy as np
for document in collection.find({}, {'time_series': 1}):
result = np.diff(document['time_series'])