本文是以下两篇引用的博客的从新整理修改而成

零、引用

MongoDB聚合

MongoDB聚合

一、Aggregate 简介

db.collection.aggregate()是基于数据处理的聚合管道,
每个文档通过一个由多个阶段组成的管道,
可以对每个阶段的管道进行分组、过滤等功能,
然后经过一系列的处理,输出相应的结果

二、处理示例图

示例

三、特点:

  1. db.collection.aggregate() 可以多个管道,能方便地进行数据的处理
  2. db.collection.aggregate() 使用了MongoDB内置的原生操作,聚合效率非常高,支持类似SQL Group By的操作,而不再需要用户编写自定义的JavaScript例程
  3. 每个阶段管道限制为100M的内存。如果一个节点管道超过这个极限,MongoDB将产生一个错误。可以设置allowDiskUse=true 将管道数据写入临时文件,解决这个限制
  4. db.collection.aggregate() 可以作用在分片集合,但结果不能输出在分片集合,MapReduce可以作用在分片集合,结果也可以输出在分片集合
  5. db.collection.aggregate() 方法可以返回一个指针(cursor),数据放在内存中,直接操作。跟Mongo shell 一样指针操作
  6. db.collection.aggregate() 输出结果只能保存在一个文档中,BSON Document大小限制为16M。可以通过返回指针解决,版本2.6中后面: db.collection.aggregate()方法返回一个指针,可以返回任何结果集的大小

四、aggregate语法:

语法模板: db.collection.aggregate(pipeline,options)

  1. pipeline 类型是Array 语法: db.collection.aggregate([{},…])

  2. $group语法: {$group: {_id:,:{:},…}}

    • _id 是要进行的分组的key
    • $group可以执行的表达式如下:
    • $sum 计算总和
    • $avg 计算平均值
    • $min 根据分组,获取集合中所有文档对应值的最小值
    • $max 根据分组,获取集合中所有文档对应值的最大值
    • $push 将指定的表达式的值添加到一个数组中
    • $addToSet 将表达式的值添加到一个集合中(无重复值)
    • $first 返回每组第一个文档,如果排序,按照排序,如果没有,按照默认的存储的顺序的第一个
    • $last 返回每组最后一个文档,如果排序,按照排序,如果没有,按照默认的存储的顺序的最后一个
  3. $project语法:

    • 可以对输入文档进行添加新字段或删除现有的字段,可以自定哪些字段显示与不显示。
  4. $match

    • 根据条件用于过滤数据,只输出符合条件的文档,如果放在pipeline前面,根据条件过滤数据,传输到下一个阶段管道,可以提高后续的数据处理效率。还可以放在out之前,对结果进行再一次过滤。
  5. $redact

    • 字段所处的document结构的级别
  6. $limit

    • 用来限制MongoDB聚合管道返回的文档数
  7. $skip

    • 在聚合管道中跳过指定数量的文档,并返回余下的文档。
  8. $unwind

    • 将文档中的某一个数组类型字段拆分成多条,每条包含数组中的一个值。
  9. $sample

    • 随机选择从其输入指定数量的文档。如果是大于或等于5%的collection的文档,$sample进行收集扫描,进行排序,然后选择顶部的文件。因此,$sample 在收集阶段是受排序的内存限制。
    • 语法: { $sample: { size: <positive integer> } }
  10. $sort

    • 将输入文档排序后输出。
  11. $geoNear

    • 用于地理位置数据分析。
  12. $out

    • 必须为pipeline最后一个阶段管道,因为是将最后计算结果写入到指定的collection中。
  13. $indexStats

    • 返回数据集合的每个索引的使用情况。
    • 语法: { $indexStats: { } }
  14. group

    • 将集合中的文档分组,可用于统计结果,$group首先将数据根据key进行分组。
  15. options参数

    • explain:返回指定aggregate各个阶段管道的执行计划信息。
    • 他操作返回一个游标,包含aggregate执行计划详细信息。

五、代码示例:

示例代码过长,可以咯过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
db.items.insert( [
{
"quantity" : 2,
"price" : 5.0,
"pnumber" : "p003",
},{
"quantity" : 2,
"price" : 8.0,
"pnumber" : "p002"
},{
"quantity" : 1,
"price" : 4.0,
"pnumber" : "p002"
},{
"quantity" : 2,
"price" : 4.0,
"pnumber" : "p001"
},{
"quantity" : 4,
"price" : 10.0,
"pnumber" : "p003"
},{
"quantity" : 10,
"price" : 20.0,
"pnumber" : "p001"
},{
"quantity" : 10,
"price" : 20.0,
"pnumber" : "p003"
},{
"quantity" : 5,
"price" : 10.0,
"pnumber" : "p002"
}
])

// group 示例
> db.items.count()
8
> db.items.aggregate([{$group:{_id:null,count:{$sum:1}}}])
{ "_id" : null, "count" : 8 }

> db.items.aggregate([{$group:{_id:null,total:{$sum:"$quantity"}}}])
{ "_id" : null, "total" : 36 }

> db.items.aggregate([{$group:{_id:"$pnumber",total:{$sum:"$quantity"}}}])
{ "_id" : "p001", "total" : 12 }
{ "_id" : "p002", "total" : 8 }
{ "_id" : "p003", "total" : 16 }

> db.items.aggregate([{$group:{_id:"$pnumber",max:{$max:"$quantity"}}}])
{ "_id" : "p001", "max" : 10 }
{ "_id" : "p002", "max" : 5 }
{ "_id" : "p003", "max" : 10 }

> db.items.aggregate([{$group:{_id:"$pnumber",min:{$min:"$quantity"}}}])
{ "_id" : "p001", "min" : 2 }
{ "_id" : "p002", "min" : 1 }
{ "_id" : "p003", "min" : 2 }

> db.items.aggregate([{$group:{_id:"$pnumber",total:{$sum:"$quantity"}}}])
{ "_id" : "p001", "total" : 12 }
{ "_id" : "p002", "total" : 8 }
{ "_id" : "p003", "total" : 16 }
> db.items.aggregate([{$group:{_id:"$pnumber",total:{$sum:"$quantity"}}},{$group:{_id:null,max:{$max:"$total"}}}])
{ "_id" : null, "max" : 16 }


> db.items.aggregate([{$group:{_id:"$pnumber",price:{$avg:"$price"}}}])
{ "_id" : "p001", "price" : 12 }
{ "_id" : "p002", "price" : 7.333333333333333 }
{ "_id" : "p003", "price" : 11.666666666666666 }

> db.items.aggregate([{$group:{_id:"$pnumber",quantitys:{$push:"$quantity"}}}])
{ "_id" : "p001", "quantitys" : [ 2, 10 ] }
{ "_id" : "p002", "quantitys" : [ 2, 1, 5 ] }
{ "_id" : "p003", "quantitys" : [ 2, 4, 10 ] }

> db.items.aggregate([{$group:{_id:"$pnumber",quantitys:{$push:{quantity:"$quantity",price:"$price"}}}}])
{ "_id" : "p001", "quantitys" : [ { "quantity" : 2, "price" : 4 }, { "quantity": 10, "price" : 20 } ] }
{ "_id" : "p002", "quantitys" : [ { "quantity" : 2, "price" : 8 }, { "quantity": 1, "price" : 4 }, { "quantity" : 5, "price" : 10 } ] }
{ "_id" : "p003", "quantitys" : [ { "quantity" : 2, "price" : 5 }, { "quantity": 4, "price" : 10 }, { "quantity" : 10, "price" : 20 } ] }


> db.items.aggregate([{$group:{_id:"$pnumber",quantitys:{$addToSet:"$quantity"}}}])
{ "_id" : "p001", "quantitys" : [ 10, 2 ] }
{ "_id" : "p002", "quantitys" : [ 5, 1, 2 ] }
{ "_id" : "p003", "quantitys" : [ 10, 4, 2 ] }

> db.items.aggregate([{$group:{_id:"$pnumber",quantityFrist:{$first:"$quantity"}}}])
{ "_id" : "p001", "quantityFrist" : 2 }
{ "_id" : "p002", "quantityFrist" : 2 }
{ "_id" : "p003", "quantityFrist" : 2 }

// $project 示例
> db.items.aggregate([{$group:{_id:null,count:{$sum:1}}},{$project:{"_id":0,"count":1}}])
{ "count" : 8 }


// $match 示例
> db.items.aggregate([{$group:{_id:"$pnumber",total:{$sum:"$quantity"}}},{$match:{total:{$gt:8}}}])
{ "_id" : "p001", "total" : 12 }
{ "_id" : "p003", "total" : 16 }

> db.items.aggregate([{$match:{"pnumber":"p001"}},{$group:{_id:null,total:{$sum:"$quantity"}}}])
{ "_id" : null, "total" : 12 }

// $skip、$limit、$sort
// $limit、$skip、$sort、$match可以使用在阶段管道,如果使用在$group之前可以过滤掉一些数据,提高性能。

> db.items.aggregate([{ $skip: 2 },{ $limit: 4 }])
{ "_id" : ObjectId("574d937cfafef57ee4427ac4"), "quantity" : 1, "price" : 4, "pnumber" : "p002" }
{ "_id" : ObjectId("574d937cfafef57ee4427ac5"), "quantity" : 2, "price" : 4, "pnumber" : "p001" }
{ "_id" : ObjectId("574d937cfafef57ee4427ac6"), "quantity" : 4, "price" : 10, "pnumber" : "p003" }
{ "_id" : ObjectId("574d937cfafef57ee4427ac7"), "quantity" : 10, "price" : 20, "pnumber" : "p001" }
> db.items.aggregate([{ $limit: 4 },{ $skip: 2 }])
{ "_id" : ObjectId("574d937cfafef57ee4427ac4"), "quantity" : 1, "price" : 4, "pnumber" : "p002" }
{ "_id" : ObjectId("574d937cfafef57ee4427ac5"), "quantity" : 2, "price" : 4, "pnumber" : "p001" }

// $unwind
> db.items.aggregate([{$group:{_id:"$pnumber",quantitys:{$push:"$quantity"}}}])
{ "_id" : "p001", "quantitys" : [ 2, 10 ] }
{ "_id" : "p002", "quantitys" : [ 2, 1, 5 ] }
{ "_id" : "p003", "quantitys" : [ 2, 4, 10 ] }
> db.items.aggregate([{$group:{_id:"$pnumber",quantitys:{$push:"$quantity"}}},{$unwind:"$quantitys"}])
{ "_id" : "p001", "quantitys" : 2 }
{ "_id" : "p001", "quantitys" : 10 }
{ "_id" : "p002", "quantitys" : 2 }
{ "_id" : "p002", "quantitys" : 1 }
{ "_id" : "p002", "quantitys" : 5 }
{ "_id" : "p003", "quantitys" : 2 }
{ "_id" : "p003", "quantitys" : 4 }
{ "_id" : "p003", "quantitys" : 10 }

// $out
//必须为pipeline最后一个阶段管道,因为是将最后计算结果写入到指定的collection中。
{ "_id" : "p001", "quantitys" : 2 }
{ "_id" : "p001", "quantitys" : 10 }
{ "_id" : "p002", "quantitys" : 2 }
{ "_id" : "p002", "quantitys" : 1 }
{ "_id" : "p002", "quantitys" : 5 }
{ "_id" : "p003", "quantitys" : 2 }
{ "_id" : "p003", "quantitys" : 4 }
{ "_id" : "p003", "quantitys" : 10 }

> db.items.aggregate([{$group:{_id:"$pnumber",quantitys:{$push:"$quantity"}}},{$unwind:"$quantitys"},{$project:{"_id":0,"quantitys":1}},{$out:"result"}])
> db.result.find()
{ "_id" : ObjectId("57529143746e15e8aa207a29"), "quantitys" : 2 }
{ "_id" : ObjectId("57529143746e15e8aa207a2a"), "quantitys" : 10 }
{ "_id" : ObjectId("57529143746e15e8aa207a2b"), "quantitys" : 2 }
{ "_id" : ObjectId("57529143746e15e8aa207a2c"), "quantitys" : 1 }
{ "_id" : ObjectId("57529143746e15e8aa207a2d"), "quantitys" : 5 }
{ "_id" : ObjectId("57529143746e15e8aa207a2e"), "quantitys" : 2 }
{ "_id" : ObjectId("57529143746e15e8aa207a2f"), "quantitys" : 4 }
{ "_id" : ObjectId("57529143746e15e8aa207a30"), "quantitys" : 10 }

// $redact
/*语法:{ $redact: <expression> }
$redact 跟$cond结合使用,并在$cond里面使用了if 、then、else表达式,if-else缺一不可,$redact还有三个重要的参数:
1)$$DESCEND: 返回包含当前document级别的所有字段,并且会继续判字段包含内嵌文档,内嵌文档的字段也会去判断是否符合条件。
2)$$PRUNE:返回不包含当前文档或者内嵌文档级别的所有字段,不会继续检测此级别的其他字段,即使这些字段的内嵌文档持有相同的访问级别。
3)$$KEEP:返回包含当前文档或内嵌文档级别的所有字段,不再继续检测此级别的其他字段,即使这些字段的内嵌文档中持有不同的访问级别。
**/

// level=1则值为为$$DESCEND,否则为$$PRUNE,从顶部开始扫描下去,执行$$DESCEND包含当前document级别的所有fields。当前级别字段的内嵌文档将会被继续检测。
db.redact.insert(
{
_id: 1,
level: 1,
status: "A",
acct_id: "xyz123",
cc: [{
level: 1,
type: "yy",
num: 000000000000,
exp_date: ISODate("2015-11-01T00:00:00.000Z"),
billing_addr: {
level: 5,
addr1: "123 ABC Street",
city: "Some City"
}
},{
level: 3,
type: "yy",
num: 000000000000,
exp_date: ISODate("2015-11-01T00:00:00.000Z"),
billing_addr: {
level: 1,
addr1: "123 ABC Street",
city: "Some City"
}
}]
})

db.redact.aggregate(
[
{ $match: { status: "A" } },
{
$redact: {
$cond: {
if: { $eq: [ "$level", 1] },
then: "$$DESCEND",
else: "$$PRUNE"
}
}
}
]
);

{
"_id" : 1,
"level" : 1,
"status" : "A",
"acct_id" : "xyz123",
"cc" : [
{ "level" : 1,
"type" : "yy",
"num" : 0,
"exp_date" : ISODate("2015-11-01T00:00:00Z")
}
]
}

// 2. $$PRUNE:不包含当前文档或者内嵌文档级别的所有字段,不会继续检测此级别的其他字段,即使这些字段的内嵌文档持有相同的访问级别。连等级的字段都不显示,也不会去扫描等级字段包含下级。
db.redact.insert(
{
_id: 1,
level: 1,
status: "A",
acct_id: "xyz123",
cc: {
level: 3,
type: "yy",
num: 000000000000,
exp_date: ISODate("2015-11-01T00:00:00.000Z"),
billing_addr: {
level: 1,
addr1: "123 ABC Street",
city: "Some City"
}
}
}
)
db.redact.aggregate(
[
{ $match: { status: "A" } },
{
$redact: {
$cond: {
if: { $eq: [ "$level", 3] },
then: "$$PRUNE",
else: "$$DESCEND"
}
}
}
]
);
{ "_id" : 1, "level" : 1, "status" : "A", "acct_id" : "xyz123" }

// 3、$$KEEP:返回包含当前文档或内嵌文档级别的所有字段,不再继续检测此级别的其他字段,即使这些字段的内嵌文档中持有不同的访问级别。
db.redact.insert(
{
_id: 1,
level: 1,
status: "A",
acct_id: "xyz123",
cc: {
level: 2,
type: "yy",
num: 000000000000,
exp_date: ISODate("2015-11-01T00:00:00.000Z"),
billing_addr: {
level:3,
addr1: "123 ABC Street",
city: "Some City"
}
}
}
)

db.redact.aggregate(
[
{ $match: { status: "A" } },
{
$redact: {
$cond: {
if: { $eq: [ "$level", 1] },
then: "$$KEEP",
else: "$$PRUNE"
}
}
}
]
);

{ "_id" : 1, "level" : 1, "status" : "A", "acct_id" : "xyz123", "cc" : { "level" : 2, "type" : "yy", "num" : 0, "exp_date" : ISODate("2015-11-01T00:00:00Z"), "billing_addr" : { "level" : 3, "addr1" : "123 ABC Street", "city" : "Some City" } } }

// opiton 示例
db.items.aggregate([{$group:{_id:"$pnumber",total:{$sum:"$quantity"}}},{$group:{_id:null,max:{$max:"$total"}}}],{explain:true})

{
"stages": [
{
"$cursor": {
"query": {},
"fields": {
"pnumber": 1,
"quantity": 1,
"_id": 0
},
"plan": {
"cursor": "BasicCursor",
"isMultiKey": false,
"scanAndOrder": false,
"allPlans": [
{
"cursor": "BasicCursor",
"isMultiKey": false,
"scanAndOrder": false
}
]
}
}
},
{
"$group": {
"_id": "$pnumber",
"total": {
"$sum": "$quantity"
}
}
},
{
"$group": {
"_id": {
"$const": null
},
"max": {
"$max": "$total"
}
}
}
],
"ok": 1
}

六、其他

  • allowDiskUse:每个阶段管道限制为100MB的内存,如果大于100MB的数据可以先写入临时文件。设置为true时,aggregate操作可时可以先将数据写入对应数据目录的子目录中的唯一并以_tmp结尾的文档中。
  • cursor:指定游标的初始批批大小。光标的字段的值是一个与场batchSize文件。
    • 语法: cursor: { batchSize: <int> }
      1
      var cursor=db.items.aggregate([{$group:{_id:"$pnumber",total:{$sum:"$quantity"}}},{ $limit: 2 }],{cursor: { batchSize: 1 }})
  • mongodb shell 设置游标大小cursor.batchSize(size) 一次返回多少条,游标提供了很多方法:
    • cursor.hasNext()
    • cursor.next()
    • cursor.toArray()
    • cursor.forEach()
    • cursor.map()
    • cursor.objsLeftInBatch()
    • cursor.itcount()
    • cursor.pretty()
  • bypassDocumentValidation:只有当你指定了$out操作符,使db.collection.aggregate绕过文档验证操作过程中。这让您插入不符合验证要求的文档