MongoDB深度解析实战指南:从原理到生产部署以及主流语言实践

MongoDB深度解析实战指南:从原理到生产部署以及主流语言实践


一、MongoDB核心原理与数据结构深度解析

本文涉及Demo代码示例以常用Debian 12 LTS版系统环境为例,此也是个人常用开发测试系统版本,推荐:

适用版本:MongoDB 7.0 LTS (最新稳定版)
操作系统:Debian 12 (Bookworm)

1.1 文档数据库的本质

MongoDB作为面向文档的NoSQL数据库,其核心设计理念是灵活的模式(Schema-less)与JSON-like文档存储
与传统关系型数据库的行/列结构不同,MongoDB以BSON(Binary JSON)格式存储数据,每个文档都是自包含的、具有动态结构的独立单元。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 传统关系型 vs MongoDB文档结构对比
// 关系型(多表关联):
users表: {id: 1, name: "张三", dept_id: 101}
departments表: {id: 101, name: "技术部", manager: "李四"}

// MongoDB(嵌套文档):
{
"_id": ObjectId("[Object ID]"),
"name": "张三",
"department": {
"name": "技术部",
"manager": "李四",
"location": "北京"
},
"skills": ["Go", "Python", "MongoDB"],
"projects": [
{
"name": "业务平台重构",
"role": "后端开发",
"duration": "6个月"
}
],
"created_at": ISODate("2026-01-03T23:30:01Z")
}

1.2 BSON数据类型详解

MongoDB使用BSON(Binary Serialized Object Notation)作为存储格式,支持丰富的数据类型:

类型描述示例重要性
ObjectId12字节唯一标识符_id: ObjectId("507f1f77bcf86cd799439011")⭐⭐⭐⭐⭐
StringUTF-8字符串"name": "张三"⭐⭐⭐⭐⭐
Date64位整数时间戳"created_at": ISODate("2026-01-21T10:30:00Z")⭐⭐⭐⭐
Array有序数组"skills": ["Go", "Python"]⭐⭐⭐⭐⭐
Object嵌套文档"department": { "name": "技术部" }⭐⭐⭐⭐⭐
Decimal128高精度小数"price": NumberDecimal("19.99")⭐⭐⭐⭐
Binary二进制数据"avatar": BinData(0, "base64data")⭐⭐⭐
Timestamp内部操作时间戳由MongoDB内部使用⭐⭐

关键特性

  • 动态模式:同一集合中的文档可以有不同的字段结构
  • 原子性:单文档操作是原子的(多文档事务在4.0+支持)
  • 索引支持:所有字段都可建立索引,包括嵌套字段和数组元素

1.3 存储引擎:WiredTiger深度剖析

自MongoDB 3.2起,WiredTiger成为默认存储引擎,其核心特性包括:

graph TD
    A[WiredTiger存储引擎] --> B[文档级并发控制]
    A --> C[压缩存储]
    A --> D[检查点机制]
    A --> E[内存缓存]
    
    B --> B1[乐观锁]
    B --> B2[无锁快照读]
    
    C --> C1[Snappy压缩]
    C --> C2[Zstandard压缩]
    
    D --> D1[每60秒检查点]
    D --> D2[崩溃恢复]
    
    E --> E1[默认50%可用内存]
    E --> E2[LRU缓存淘汰]

内存配置公式

1
WiredTiger缓存大小 = min( (系统内存 - 1GB) * 0.5, 10GB )

生产环境建议

  • 32GB内存服务器:配置15GB缓存
  • 64GB内存服务器:配置30GB缓存
  • 128GB+内存服务器:配置50-60GB缓存

1.4 复制集(Replica Set)工作原理

复制集是MongoDB实现高可用的核心机制,采用Raft一致性算法变种,包含以下关键组件:

1
2
3
4
5
6
7
8
9
10
11
+-------------+     +-------------+     +-------------+
| Primary |<--->| Secondary |<--->| Secondary |
| (读写节点) | | (只读节点) | | (只读节点) |
+-------------+ +-------------+ +-------------+
^ ^ ^
| | |
v v v
+-------------+ +-------------+ +-------------+
| Oplog | | Oplog | | Oplog |
| (操作日志) | | (操作日志) | | (操作日志) |
+-------------+ +-------------+ +-------------+

Oplog(操作日志)特性

  • 固定集合(Capped Collection),默认占用5%磁盘空间
  • 记录所有数据变更操作(insert/update/delete)
  • 采用幂等操作设计,确保重放一致性
  • 默认保留时间:当磁盘空间不足时自动覆盖旧记录

选举机制

  1. Primary节点宕机或网络分区
  2. 剩余节点发起选举(需获得多数票)
  3. 新Primary节点接管写入操作
  4. 客户端自动重定向到新Primary

二、MongoDB版本演进与特性全景(2020-2026)

2.1 版本路线图与关键特性

版本发布时间核心特性适用场景升级注意事项
4.42020.07- $unionWith聚合
- 流式复制
- 客户端字段级加密(CSFLE)
传统企业应用FLE需要额外密钥管理服务
5.02021.07- 原生时间序列集合
- 分布式事务增强
- Live Resharding
IoT、时序数据时间序列集合需要5.0+驱动
6.02022.07- Atlas Search集成
- Queryable Encryption
- 改进的聚合性能
搜索密集型应用QE功能需要Atlas或企业版
7.02023.08- 向量搜索($vectorSearch)
- 自动分片均衡优化
- 严格的默认安全策略
AI/ML应用向量索引需要额外资源
7.0 LTS2024.03- 长期支持(3年)
- 稳定API
- 企业级安全增强
生产环境推荐最佳LTS版本

2.2 重要架构变更(5.0+)

时间序列集合优化

1
2
3
4
5
6
7
8
9
10
11
12
// 创建时间序列集合
db.createCollection("sensor_data", {
timeseries: {
timeField: "timestamp",
metaField: "sensor_id",
granularity: "minutes" // 可选: seconds/minutes/hours
}
})

// 自动优化存储结构
// 传统集合:每个文档独立存储
// 时间序列:按时间窗口聚合存储,节省50-90%空间

向量搜索集成(7.0+):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 创建向量索引
db.products.createIndex({
"embedding": "vector",
"dimensions": 1536,
"similarity": "cosine"
})

// 向量相似度搜索
db.products.aggregate([
{
$vectorSearch: {
index: "vector_index",
path: "embedding",
queryVector: [0.1, 0.2, 0.3, ...], // 1536维向量
numCandidates: 100,
limit: 10
}
}
])

2.3 版本选择建议

场景推荐版本理由
新项目启动7.0 LTS长期支持、最新特性、最佳性能
遗留系统升级6.0平稳过渡、兼容性好
云原生部署Atlas (最新版)免运维、自动扩展
资源受限环境5.0内存占用较低
合规性要求高7.0+企业版完整审计、加密功能

升级路径

1
4.4 → 5.0 → 6.0 → 7.0 LTS

关键限制

  • 不能跨主要版本直接升级(如4.4→6.0)
  • 升级前必须运行兼容性检查:db.adminCommand({ checkMetadataConsistency: 1 })
  • 分片集群升级需要按特定顺序:配置服务器→分片→mongos

三、Debian 12环境下的完整安装与深度配置

3.1 系统准备与依赖安装

1
2
3
4
5
6
7
8
9
10
11
12
# 更新系统
sudo apt update && sudo apt upgrade -y

# 安装必要依赖
sudo apt install -y curl wget gnupg2 apt-transport-https lsb-release ca-certificates

# 配置时区
sudo timedatectl set-timezone Asia/Shanghai

# 创建专用用户(安全最佳实践)
sudo adduser --system --group --no-create-home mongodb
sudo usermod -aG sudo mongodb

3.2 官方APT仓库配置(7.0 LTS版)

1
2
3
4
5
6
7
8
9
10
11
# 导入MongoDB官方GPG密钥
curl -fsSL https://www.mongodb.org/static/pgp/server-7.0.asc | sudo gpg --dearmor -o /usr/share/keyrings/mongodb-server-7.0.gpg

# 创建APT源列表文件
echo "deb [arch=amd64,arm64 signed-by=/usr/share/keyrings/mongodb-server-7.0.gpg] https://repo.mongodb.org/apt/debian $(lsb_release -cs)/mongodb-org/7.0 main" | sudo tee /etc/apt/sources.list.d/mongodb-org-7.0.list

# 更新包索引
sudo apt update

# 验证可用版本
apt policy mongodb-org

Debian 12兼容性说明

  • MongoDB官方不直接支持Debian 12,但使用Debian 11 (bullseye)仓库完全兼容
  • 如果遇到依赖问题,可手动下载.deb包安装:
    1
    2
    3
    wget https://repo.mongodb.org/apt/debian/dists/bullseye/mongodb-org/7.0/main/binary-amd64/mongodb-org-server_7.0.11_amd64.deb
    sudo dpkg -i mongodb-org-server_7.0.11_amd64.deb
    sudo apt --fix-broken install -y

3.3 深度配置详解(/etc/mongod.conf)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# 系统日志配置
systemLog:
destination: file
logAppend: true
path: /var/log/mongodb/mongod.log
logRotate: reopen
verbosity: 0 # 0-5,生产环境建议0-1

# 存储引擎配置
storage:
dbPath: /var/lib/mongodb
journal:
enabled: true
commitIntervalMs: 100 # 默认100ms,可调至30提升性能
engine: wiredTiger
wiredTiger:
engineConfig:
cacheSizeGB: 8 # 根据内存调整,公式:(总内存-1GB)*0.5
journalCompressor: snappy
directoryForIndexes: false
collectionConfig:
blockCompressor: zstd # zstd压缩率更高,CPU消耗略高
indexConfig:
prefixCompression: true

# 网络配置
net:
port: 27017
bindIp: 127.0.0.1,192.168.1.100 # 仅绑定内网IP,禁止0.0.0.0
maxIncomingConnections: 65536
wireObjectCheck: true
ipv6: false

# 安全配置(生产环境必须启用)
security:
authorization: enabled
# keyFile用于副本集内部认证
keyFile: /etc/mongodb-keyfile
# TLS/SSL配置
tls:
mode: requireTLS
certificateKeyFile: /etc/ssl/mongodb.pem
CAFile: /etc/ssl/ca.pem
allowConnectionsWithoutCertificates: false
allowInvalidCertificates: false
allowInvalidHostnames: false
disabledProtocols: TLS1_0,TLS1_1

# 复制集配置
replication:
replSetName: "rs0"
enableMajorityReadConcern: true
oplogSizeMB: 10240 # 10GB oplog,根据写入量调整

# 分片配置(分片节点使用)
sharding:
clusterRole: shardsvr
archiveMovedChunks: true

# 进程管理
processManagement:
fork: false
pidFilePath: /var/run/mongodb/mongod.pid
timeZoneInfo: /usr/share/zoneinfo

关键配置项详解

  1. WiredTiger缓存配置

    • cacheSizeGB: 必须根据物理内存调整
    • 32GB内存服务器:建议15GB
    • 64GB内存服务器:建议30GB
    • 128GB+内存服务器:建议50-60GB
  2. 安全加固配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    # 生成keyFile(600权限)
    sudo openssl rand -base64 756 > /etc/mongodb-keyfile
    sudo chmod 400 /etc/mongodb-keyfile
    sudo chown mongodb:mongodb /etc/mongodb-keyfile

    # 生成TLS证书
    sudo openssl req -newkey rsa:2048 -new -x509 -days 365 -nodes \
    -out /etc/ssl/mongodb.crt -keyout /etc/ssl/mongodb.key
    sudo cat /etc/ssl/mongodb.crt /etc/ssl/mongodb.key > /etc/ssl/mongodb.pem
    sudo chmod 600 /etc/ssl/mongodb.pem
    sudo chown mongodb:mongodb /etc/ssl/mongodb.pem
  3. 目录权限设置

    1
    2
    3
    sudo mkdir -p /var/lib/mongodb /var/log/mongodb /var/run/mongodb
    sudo chown -R mongodb:mongodb /var/lib/mongodb /var/log/mongodb /var/run/mongodb
    sudo chmod 755 /var/lib/mongodb /var/log/mongodb

3.4 服务管理与状态监控

1
2
3
4
5
6
7
8
9
10
11
12
13
# 启动服务
sudo systemctl daemon-reload
sudo systemctl start mongod
sudo systemctl enable mongod

# 检查服务状态
sudo systemctl status mongod

# 查看日志(实时)
sudo tail -f /var/log/mongodb/mongod.log

# 基本连接测试
mongosh --eval "db.version()"

常见启动问题排查

1
2
3
4
5
6
7
8
9
10
11
12
# 检查端口监听
sudo netstat -tuln | grep 27017

# 检查文件权限
sudo ls -la /var/lib/mongodb
sudo ls -la /var/log/mongodb

# 检查内存限制
sudo ulimit -a

# 调试模式启动
sudo -u mongodb /usr/bin/mongod --config /etc/mongod.conf --verbose

四、从单体到高可用:副本集与分片集群实战

4.1 单机部署最佳实践

最小化安装(开发环境)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 仅启用基础功能
sudo tee /etc/mongod.conf > /dev/null <<'EOF'
storage:
dbPath: /var/lib/mongodb
journal:
enabled: true

systemLog:
destination: file
logAppend: true
path: /var/log/mongodb/mongod.log

net:
port: 27017
bindIp: 127.0.0.1

security:
authorization: disabled # 开发环境可关闭
EOF

生产环境单机加固

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 启用认证
mongosh <<EOF
use admin
db.createUser({
user: "admin",
pwd: "StrongAdminPass123!",
roles: ["root"]
})
EOF

# 重启服务
sudo systemctl restart mongod

# 验证认证
mongosh -u admin -p "StrongAdminPass123!" --authenticationDatabase admin --eval "db.runCommand({connectionStatus:1})"

4.2 三节点副本集完整部署

环境规划

节点IP地址角色硬件配置
node1192.168.1.101Primary/Secondary4C8G, 100GB SSD
node2192.168.1.102Secondary/Arbiter2C4G, 50GB SSD
node3192.168.1.103Secondary4C8G, 100GB SSD

步骤1:所有节点配置(/etc/mongod.conf):

1
2
3
4
5
6
7
8
9
10
11
12
replication:
replSetName: "prod-rs0"
enableMajorityReadConcern: true
oplogSizeMB: 20480 # 20GB oplog

security:
keyFile: /etc/mongodb-keyfile
authorization: enabled

net:
bindIp: 0.0.0.0 # 允许内网通信
port: 27017

步骤2:分发keyFile到所有节点

1
2
3
4
5
# 在node1生成keyFile
sudo openssl rand -base64 756 > /etc/mongodb-keyfile
sudo chmod 400 /etc/mongodb-keyfile
sudo scp /etc/mongodb-keyfile node2:/etc/
sudo scp /etc/mongodb-keyfile node3:/etc/

步骤3:启动所有节点服务

1
sudo systemctl restart mongod

步骤4:初始化副本集(在node1执行):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 连接node1
mongosh -u admin -p "StrongAdminPass123!" --authenticationDatabase admin

// 初始化副本集
rs.initiate({
_id: "prod-rs0",
version: 1,
members: [
{ _id: 0, host: "192.168.1.101:27017", priority: 2, tags: { dc: "beijing", rack: "a1" } },
{ _id: 1, host: "192.168.1.102:27017", priority: 1, tags: { dc: "beijing", rack: "a2" } },
{ _id: 2, host: "192.168.1.103:27017", priority: 1, tags: { dc: "shanghai", rack: "b1" } }
]
})

// 创建应用用户
use myapp
db.createUser({
user: "appuser",
pwd: "AppUserPass123!",
roles: [
{ role: "readWrite", db: "myapp" },
{ role: "read", db: "analytics" }
]
})

// 验证状态
rs.status()

副本集状态解读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
rs.status()
{
"set": "prod-rs0",
"date": ISODate("2026-01-21T11:30:00Z"),
"myState": 1, // 1=PRIMARY, 2=SECONDARY
"term": 5,
"syncingTo": "",
"members": [
{
"_id": 0,
"name": "192.168.1.101:27017",
"health": 1, // 1=healthy, 0=unhealthy
"state": 1, // 1=PRIMARY
"stateStr": "PRIMARY",
"uptime": 300,
"optime": { "ts": Timestamp(1705837800, 1), "t": 5 },
"optimeDate": ISODate("2026-01-21T11:30:00Z"),
"syncingTo": "",
"syncSourceHost": "",
"syncSourceId": -1,
"infoMessage": "could not find member to sync from",
"electionTime": Timestamp(1705837800, 1),
"electionDate": ISODate("2026-01-21T11:30:00Z"),
"configVersion": 1,
"self": true
},
{
"_id": 1,
"name": "192.168.1.102:27017",
"health": 1,
"state": 2, // 2=SECONDARY
"stateStr": "SECONDARY",
"uptime": 295,
"optime": { "ts": Timestamp(1705837800, 1), "t": 5 },
"optimeDurable": { "ts": Timestamp(1705837800, 1), "t": 5 },
"optimeDate": ISODate("2026-01-21T11:30:00Z"),
"optimeDurableDate": ISODate("2026-01-21T11:30:00Z"),
"lastHeartbeat": ISODate("2026-01-21T11:30:05Z"),
"lastHeartbeatRecv": ISODate("2026-01-21T11:30:04Z"),
"pingMs": 0,
"syncingTo": "192.168.1.101:27017",
"syncSourceHost": "192.168.1.101:27017",
"syncSourceId": 0,
"infoMessage": "",
"configVersion": 1
}
],
"ok": 1
}

读写分离配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// PHP应用连接字符串
mongodb://appuser:AppUserPass123!@192.168.1.101:27017,192.168.1.102:27017,192.168.1.103:27017/myapp?replicaSet=prod-rs0&readPreference=secondaryPreferred

// Go驱动配置
opts := options.Client().ApplyURI("mongodb://appuser:AppUserPass123!@192.168.1.101:27017,192.168.1.102:27017,192.168.1.103:27017").
SetReplicaSet("prod-rs0").
SetReadPreference(readpref.SecondaryPreferred())

// Python配置
client = MongoClient(
'mongodb://appuser:AppUserPass123!@192.168.1.101:27017,192.168.1.102:27017,192.168.1.103:27017',
replicaSet='prod-rs0',
readPreference='secondaryPreferred'
)

4.3 分片集群架构设计

分片集群组件

1
2
3
4
5
6
7
8
9
10
11
+----------------+    +----------------+    +----------------+
| mongos | | mongos | | mongos |
| (查询路由器) | | (查询路由器) | | (查询路由器) |
+-------+--------+ +-------+--------+ +-------+--------+
| | |
| | |
+-------v--------+ +-------v--------+ +-------v--------+
| Config Servers | | Shard 1 | | Shard 2 |
| (配置服务器) | | (分片节点) | | (分片节点) |
| 3节点副本集 | | 3节点副本集 | | 3节点副本集 |
+----------------+ +----------------+ +----------------+

部署步骤概览

  1. 部署3节点Config Server副本集
  2. 部署多个Shard副本集(每个Shard 3节点)
  3. 部署多个mongos路由进程
  4. 初始化分片集群
  5. 启用数据库分片
  6. 配置集合分片键

完整部署脚本(简化版):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# 1. 配置Config Server
cat > /etc/mongod-config.conf <<EOF
sharding:
clusterRole: configsvr
replication:
replSetName: "configReplSet"
net:
bindIp: 0.0.0.0
port: 27019
storage:
dbPath: /var/lib/mongodb-config
systemLog:
destination: file
path: /var/log/mongodb/config.log
logAppend: true
EOF

# 2. 配置Shard节点
cat > /etc/mongod-shard1.conf <<EOF
sharding:
clusterRole: shardsvr
replication:
replSetName: "shard1ReplSet"
net:
bindIp: 0.0.0.0
port: 27018
storage:
dbPath: /var/lib/mongodb-shard1
systemLog:
destination: file
path: /var/log/mongodb/shard1.log
logAppend: true
EOF

# 3. 配置mongos
cat > /etc/mongos.conf <<EOF
sharding:
configDB: configReplSet/192.168.1.101:27019,192.168.1.102:27019,192.168.1.103:27019
net:
bindIp: 0.0.0.0
port: 27017
systemLog:
destination: file
path: /var/log/mongodb/mongos.log
logAppend: true
processManagement:
fork: true
EOF

五、Docker容器化部署全攻略

5.1 单机Docker部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 拉取官方镜像
docker pull mongo:7.0

# 运行容器(开发环境)
docker run -d \
--name mongodb-dev \
-p 27017:27017 \
-v mongodb-data:/data/db \
-e MONGO_INITDB_ROOT_USERNAME=admin \
-e MONGO_INITDB_ROOT_PASSWORD=DevPass123! \
--restart unless-stopped \
mongo:7.0

# 生产环境安全配置
docker run -d \
--name mongodb-prod \
-p 27017:27017 \
-v /mnt/data/mongodb:/data/db \
-v /etc/mongodb-keyfile:/etc/mongodb-keyfile:ro \
-v /etc/ssl/mongodb.pem:/etc/ssl/mongodb.pem:ro \
-e MONGO_INITDB_ROOT_USERNAME=admin \
-e MONGO_INITDB_ROOT_PASSWORD=StrongProdPass123! \
-e MONGO_INITDB_DATABASE=myapp \
--ulimit nofile=64000:64000 \
--memory=8g \
--cpus=4 \
--restart unless-stopped \
mongo:7.0 \
--auth \
--keyFile /etc/mongodb-keyfile \
--tlsMode requireTLS \
--tlsCertificateKeyFile /etc/ssl/mongodb.pem \
--bind_ip 0.0.0.0 \
--replSet rs0

5.2 Docker Compose副本集部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# docker-compose.yml
version: '3.8'

networks:
mongo-net:
driver: bridge

volumes:
mongo1_data:
mongo2_data:
mongo3_data:
mongo-keyfile:

services:
mongo1:
image: mongo:7.0
container_name: mongo1
restart: unless-stopped
ports:
- "27017:27017"
volumes:
- mongo1_data:/data/db
- ./mongo-keyfile:/etc/mongodb-keyfile:ro
- ./tls:/etc/ssl:ro
environment:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: ClusterPass123!
command: >
--replSet rs0
--auth
--keyFile /etc/mongodb-keyfile
--tlsMode requireTLS
--tlsCertificateKeyFile /etc/ssl/mongodb.pem
--bind_ip_all
--oplogSize 1024
networks:
- mongo-net
healthcheck:
test: echo 'db.runCommand({ping:1})' | mongosh localhost:27017/test --quiet
interval: 10s
timeout: 5s
retries: 5

mongo2:
image: mongo:7.0
container_name: mongo2
restart: unless-stopped
ports:
- "27018:27017"
volumes:
- mongo2_data:/data/db
- ./mongo-keyfile:/etc/mongodb-keyfile:ro
- ./tls:/etc/ssl:ro
environment:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: ClusterPass123!
command: >
--replSet rs0
--auth
--keyFile /etc/mongodb-keyfile
--tlsMode requireTLS
--tlsCertificateKeyFile /etc/ssl/mongodb.pem
--bind_ip_all
--oplogSize 1024
networks:
- mongo-net
healthcheck:
test: echo 'db.runCommand({ping:1})' | mongosh localhost:27017/test --quiet
interval: 10s
timeout: 5s
retries: 5

mongo3:
image: mongo:7.0
container_name: mongo3
restart: unless-stopped
ports:
- "27019:27017"
volumes:
- mongo3_data:/data/db
- ./mongo-keyfile:/etc/mongodb-keyfile:ro
- ./tls:/etc/ssl:ro
environment:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: ClusterPass123!
command: >
--replSet rs0
--auth
--keyFile /etc/mongodb-keyfile
--tlsMode requireTLS
--tlsCertificateKeyFile /etc/ssl/mongodb.pem
--bind_ip_all
--oplogSize 1024
networks:
- mongo-net
healthcheck:
test: echo 'db.runCommand({ping:1})' | mongosh localhost:27017/test --quiet
interval: 10s
timeout: 5s
retries: 5

mongo-init:
image: mongo:7.0
container_name: mongo-init
depends_on:
mongo1:
condition: service_healthy
mongo2:
condition: service_healthy
mongo3:
condition: service_healthy
command: >
mongosh --host mongo1:27017
-u admin
-p ClusterPass123!
--eval "
rs.initiate({
_id: 'rs0',
members: [
{ _id: 0, host: 'mongo1:27017', priority: 2 },
{ _id: 1, host: 'mongo2:27017', priority: 1 },
{ _id: 2, host: 'mongo3:27017', priority: 1 }
]
});
setTimeout(function() {
db.createUser({
user: 'appuser',
pwd: 'AppUserPass123!',
roles: [{ role: 'readWrite', db: 'myapp' }]
});
}, 5000);
"
networks:
- mongo-net

启动流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 1. 生成keyFile
openssl rand -base64 756 > mongo-keyfile
chmod 400 mongo-keyfile

# 2. 生成TLS证书(简化版)
mkdir tls
openssl req -x509 -newkey rsa:4096 -days 365 -nodes \
-keyout tls/mongodb.key -out tls/mongodb.crt \
-subj "/C=CN/ST=Beijing/L=Beijing/O=MyOrg/OU=IT/CN=localhost"
cat tls/mongodb.crt tls/mongodb.key > tls/mongodb.pem
chmod 600 tls/mongodb.pem

# 3. 启动集群
docker-compose up -d

# 4. 验证状态
docker-compose logs -f mongo-init
docker exec mongo1 mongosh -u admin -p ClusterPass123! --eval "rs.status()"

5.3 Kubernetes部署(生产环境)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# mongodb-statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: mongodb
spec:
serviceName: mongodb
replicas: 3
selector:
matchLabels:
app: mongodb
template:
metadata:
labels:
app: mongodb
spec:
containers:
- name: mongodb
image: mongo:7.0
ports:
- containerPort: 27017
volumeMounts:
- name: mongodb-data
mountPath: /data/db
- name: config
mountPath: /etc/mongod.conf
subPath: mongod.conf
- name: secrets
mountPath: /etc/secrets
readOnly: true
env:
- name: MONGODB_REPLICA_SET_NAME
value: "rs0"
- name: MONGODB_INITIAL_PRIMARY_HOST
value: "mongodb-0.mongodb"
- name: MONGODB_INITIAL_PRIMARY_PORT_NUMBER
value: "27017"
- name: MONGODB_ROOT_USERNAME
valueFrom:
secretKeyRef:
name: mongodb-secrets
key: root-username
- name: MONGODB_ROOT_PASSWORD
valueFrom:
secretKeyRef:
name: mongodb-secrets
key: root-password
resources:
requests:
memory: "4Gi"
cpu: "1"
limits:
memory: "8Gi"
cpu: "2"
livenessProbe:
exec:
command:
- mongosh
- --eval
- "db.adminCommand('ping')"
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
exec:
command:
- mongosh
- --eval
- "db.adminCommand('ping')"
initialDelaySeconds: 5
periodSeconds: 5
volumeClaimTemplates:
- metadata:
name: mongodb-data
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: "ssd"
resources:
requests:
storage: 100Gi

---
# Service配置
apiVersion: v1
kind: Service
metadata:
name: mongodb
labels:
app: mongodb
spec:
ports:
- port: 27017
targetPort: 27017
clusterIP: None
selector:
app: mongodb

六、多语言操作实战:完整代码示例

6.1 PHP 8.2 + MongoDB Driver 完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
<?php
/**
* MongoDB PHP 8.2 完整操作示例
* 需要安装: pecl install mongodb
* composer require mongodb/mongodb
*/

require_once __DIR__ . '/vendor/autoload.php';

use MongoDB\Client;
use MongoDB\Driver\Manager;
use MongoDB\Driver\BulkWrite;
use MongoDB\Driver\WriteConcern;
use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\Exception\ConnectionException;
use MongoDB\Driver\Exception\AuthenticationException;
use MongoDB\Operation\FindOneAndUpdate;

class MongoDBExample {
private Manager $manager;
private Client $client;
private string $uri;
private string $databaseName;
private string $collectionName;

public function __construct() {
$this->uri = 'mongodb://appuser:AppUserPass123!@192.168.1.101:27017,192.168.1.102:27017,192.168.1.103:27017/?replicaSet=prod-rs0&readPreference=secondaryPreferred&retryWrites=true&w=majority';
$this->databaseName = 'myapp';
$this->collectionName = 'users';

$this->connect();
}

private function connect(): void {
try {
$options = [
'connectTimeoutMS' => 5000,
'socketTimeoutMS' => 10000,
'serverSelectionTimeoutMS' => 5000,
'heartbeatFrequencyMS' => 10000,
'retryWrites' => true,
'w' => 'majority',
'journal' => true
];

$this->manager = new Manager($this->uri, $options);
$this->client = new Client($this->uri, $options);

echo "✅ 连接成功\n";
} catch (ConnectionException $e) {
die("❌ 连接失败: " . $e->getMessage());
} catch (AuthenticationException $e) {
die("❌ 认证失败: " . $e->getMessage());
}
}

/**
* 创建用户集合(带索引)
*/
public function createCollection(): void {
$database = $this->client->selectDatabase($this->databaseName);

// 删除已存在的集合(重新创建)
if (in_array($this->collectionName, $database->listCollections()->toArray())) {
$database->dropCollection($this->collectionName);
}

// 创建集合
$collection = $database->createCollection($this->collectionName, [
'validator' => [
'$jsonSchema' => [
'bsonType' => 'object',
'required' => ['username', 'email', 'created_at'],
'properties' => [
'username' => [
'bsonType' => 'string',
'description' => '用户名必须为字符串'
],
'email' => [
'bsonType' => 'string',
'pattern' => '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$',
'description' => '邮箱格式不正确'
],
'age' => [
'bsonType' => 'int',
'minimum' => 0,
'maximum' => 120,
'description' => '年龄必须在0-120之间'
],
'skills' => [
'bsonType' => 'array',
'items' => [
'bsonType' => 'string'
]
]
]
]
],
'validationLevel' => 'moderate',
'validationAction' => 'error'
]);

// 创建索引
$collection->createIndex(['username' => 1], ['unique' => true]);
$collection->createIndex(['email' => 1], ['unique' => true]);
$collection->createIndex(['age' => 1]);
$collection->createIndex(['skills' => 1]);
$collection->createIndex(['created_at' => -1]);

echo "✅ 集合 {$this->collectionName} 创建成功,索引已建立\n";
}

/**
* 批量插入用户数据
*/
public function bulkInsertUsers(): void {
$collection = $this->client->selectCollection($this->databaseName, $this->collectionName);

$users = [
[
'username' => 'zhangsan',
'email' => 'zhangsan@example.com',
'age' => 28,
'skills' => ['PHP', 'MySQL', 'Redis'],
'created_at' => new MongoDB\BSON\UTCDateTime()
],
[
'username' => 'lisi',
'email' => 'lisi@example.com',
'age' => 32,
'skills' => ['Go', 'MongoDB', 'Docker'],
'created_at' => new MongoDB\BSON\UTCDateTime()
],
[
'username' => 'wangwu',
'email' => 'wangwu@example.com',
'age' => 25,
'skills' => ['Python', 'TensorFlow', 'MongoDB'],
'created_at' => new MongoDB\BSON\UTCDateTime()
],
[
'username' => 'zhaoliu',
'email' => 'zhaoliu@example.com',
'age' => 35,
'skills' => ['Java', 'Spring', 'MongoDB'],
'created_at' => new MongoDB\BSON\UTCDateTime()
]
];

try {
$result = $collection->insertMany($users);
echo "✅ 批量插入成功,插入 {$result->getInsertedCount()} 条记录\n";
print_r($result->getInsertedIds());
} catch (\MongoDB\Exception\BulkWriteException $e) {
echo "❌ 批量插入失败: " . $e->getMessage() . "\n";
}
}

/**
* 高级查询示例
*/
public function advancedQueries(): void {
$collection = $this->client->selectCollection($this->databaseName, $this->collectionName);

echo "\n🔍 高级查询示例:\n";

// 1. 查找年龄大于30的用户
$cursor = $collection->find(
['age' => ['$gt' => 30]],
['projection' => ['username' => 1, 'age' => 1, 'skills' => 1, '_id' => 0]]
);

echo "1. 年龄 > 30 的用户:\n";
foreach ($cursor as $user) {
echo " - {$user['username']} ({$user['age']}岁): " . implode(', ', $user['skills']) . "\n";
}

// 2. 查找包含MongoDB技能的用户
$cursor = $collection->find(
['skills' => 'MongoDB'],
['sort' => ['created_at' => -1]]
);

echo "\n2. 掌握 MongoDB 技能的用户:\n";
foreach ($cursor as $user) {
echo " - {$user['username']}, 技能: " . implode(', ', $user['skills']) . "\n";
}

// 3. 聚合查询:按技能分组统计
$pipeline = [
['$unwind' => '$skills'],
['$group' => [
'_id' => '$skills',
'count' => ['$sum' => 1],
'avg_age' => ['$avg' => '$age']
]],
['$sort' => ['count' => -1, 'avg_age' => 1]],
['$limit' => 5]
];

$cursor = $collection->aggregate($pipeline);

echo "\n3. 技能统计(聚合查询):\n";
foreach ($cursor as $skill) {
echo " - {$skill['_id']}: {$skill['count']}人, 平均年龄: " . round($skill['avg_age'], 1) . "岁\n";
}
}

/**
* 事务操作示例
*/
public function transactionExample(): void {
$session = $this->client->startSession();

try {
$session->startTransaction([
'readConcern' => new \MongoDB\Driver\ReadConcern(\MongoDB\Driver\ReadConcern::MAJORITY),
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 10000),
'readPreference' => new ReadPreference(ReadPreference::RP_PRIMARY)
]);

$collection = $this->client->selectCollection($this->databaseName, $this->collectionName);

// 插入新用户
$newUser = [
'username' => 'transaction_user_' . time(),
'email' => 'tx_' . time() . '@example.com',
'age' => 30,
'skills' => ['Transaction', 'MongoDB'],
'created_at' => new MongoDB\BSON\UTCDateTime()
];

$insertResult = $collection->insertOne($newUser, ['session' => $session]);
$userId = $insertResult->getInsertedId();

echo "✅ 事务: 插入用户成功, ID: {$userId}\n";

// 更新统计信息
$analyticsCollection = $this->client->selectCollection($this->databaseName, 'analytics');
$analyticsCollection->updateOne(
['type' => 'user_count'],
['$inc' => ['count' => 1]],
['upsert' => true, 'session' => $session]
);

echo "✅ 事务: 更新统计信息成功\n";

// 提交事务
$session->commitTransaction();
echo "✅ 事务提交成功\n";

} catch (\Exception $e) {
// 回滚事务
$session->abortTransaction();
echo "❌ 事务失败,已回滚: " . $e->getMessage() . "\n";
} finally {
$session->endSession();
}
}

/**
* 性能优化:批量操作
*/
public function bulkOperations(): void {
$collection = $this->client->selectCollection($this->databaseName, $this->collectionName);

// 准备1000条测试数据
$bulkData = [];
for ($i = 0; $i < 1000; $i++) {
$bulkData[] = [
'username' => 'bulk_user_' . ($i + 1),
'email' => 'bulk_' . ($i + 1) . '@example.com',
'age' => rand(18, 60),
'skills' => ['Bulk', 'MongoDB', 'PHP'],
'created_at' => new MongoDB\BSON\UTCDateTime()
];
}

// 方法1: insertMany (推荐)
$startTime = microtime(true);
$result = $collection->insertMany($bulkData, [
'ordered' => false, // 无序插入,部分失败不影响其他
'bypassDocumentValidation' => true // 跳过验证提升性能
]);
$endTime = microtime(true);

echo "✅ insertMany 插入 1000 条记录,耗时: " . round($endTime - $startTime, 3) . "秒\n";
echo " 插入数量: {$result->getInsertedCount()}, 失败数量: " . count($result->getWriteErrors()) . "\n";

// 方法2: BulkWrite (更细粒度控制)
$bulk = new BulkWrite(['ordered' => false]);

for ($i = 0; $i < 500; $i++) {
$bulk->insert([
'username' => 'bulkwrite_user_' . ($i + 1),
'email' => 'bulkwrite_' . ($i + 1) . '@example.com',
'age' => rand(18, 60),
'skills' => ['BulkWrite', 'MongoDB'],
'created_at' => new MongoDB\BSON\UTCDateTime()
]);
}

$startTime = microtime(true);
$writeResult = $this->manager->executeBulkWrite(
"{$this->databaseName}.{$this->collectionName}",
$bulk,
['writeConcern' => new WriteConcern(WriteConcern::MAJORITY, 10000)]
);
$endTime = microtime(true);

echo "✅ BulkWrite 插入 500 条记录,耗时: " . round($endTime - $startTime, 3) . "秒\n";
echo " 插入数量: {$writeResult->getInsertedCount()}\n";
}

/**
* 监控与诊断
*/
public function monitoringExample(): void {
$serverStatus = $this->manager->executeCommand(
'admin',
new MongoDB\Driver\Command(['serverStatus' => 1])
)->toArray()[0];

echo "\n📊 服务器状态监控:\n";
echo " - 版本: {$serverStatus->version}\n";
echo " - 连接数: {$serverStatus->connections->current} / {$serverStatus->connections->available}\n";
echo " - 内存使用: " . round($serverStatus->mem->resident / 1024, 2) . "GB (驻留)\n";
echo " - 操作计数: \n";
echo " * 插入: {$serverStatus->opcounters->insert}\n";
echo " * 查询: {$serverStatus->opcounters->query}\n";
echo " * 更新: {$serverStatus->opcounters->update}\n";
echo " * 删除: {$serverStatus->opcounters->delete}\n";

// 慢查询日志
$currentOp = $this->manager->executeCommand(
'admin',
new MongoDB\Driver\Command([
'currentOp' => [
'active' => true,
'secs_running' => ['$gt' => 1] // 运行超过1秒的操作
]
])
)->toArray()[0];

echo "\n⚠️ 慢查询检测:\n";
if (isset($currentOp->inprog) && count($currentOp->inprog) > 0) {
foreach ($currentOp->inprog as $op) {
echo " - OPID: {$op->opid}, 操作: {$op->op}, 耗时: {$op->secs_running}秒\n";
echo " 查询: " . json_encode($op->query) . "\n";
}
} else {
echo " - 无慢查询\n";
}
}

/**
* 关闭连接
*/
public function close(): void {
unset($this->manager, $this->client);
echo "✅ 连接已关闭\n";
}
}

// 执行示例
try {
$mongoExample = new MongoDBExample();

echo "🚀 开始 MongoDB PHP 8.2 操作示例\n";
echo "=====================================\n";

$mongoExample->createCollection();
$mongoExample->bulkInsertUsers();
$mongoExample->advancedQueries();
$mongoExample->transactionExample();
$mongoExample->bulkOperations();
$mongoExample->monitoringExample();

$mongoExample->close();

echo "\n🎉 所有操作完成!\n";

} catch (\Exception $e) {
echo "❌ 程序异常: " . $e->getMessage() . "\n";
exit(1);
}

6.2 Go 1.22 + MongoDB Driver 完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
package main

import (
"context"
"fmt"
"log"
"time"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
)

// User 结构体映射MongoDB文档
type User struct {
ID primitive.ObjectID `bson:"_id,omitempty"`
Username string `bson:"username"`
Email string `bson:"email"`
Age int `bson:"age"`
Skills []string `bson:"skills"`
CreatedAt time.Time `bson:"created_at"`
UpdatedAt time.Time `bson:"updated_at"`
}

// Analytics 统计信息结构
type Analytics struct {
ID primitive.ObjectID `bson:"_id,omitempty"`
Type string `bson:"type"`
Count int `bson:"count"`
}

type MongoDBExample struct {
client *mongo.Client
database *mongo.Database
collection *mongo.Collection
ctx context.Context
cancel context.CancelFunc
}

func NewMongoDBExample() (*MongoDBExample, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

// 连接选项
clientOpts := options.Client().
ApplyURI("mongodb://appuser:AppUserPass123!@192.168.1.101:27017,192.168.1.102:27017,192.168.1.103:27017/?replicaSet=prod-rs0&readPreference=secondaryPreferred").
SetConnectTimeout(5 * time.Second).
SetSocketTimeout(10 * time.Second).
SetServerSelectionTimeout(5 * time.Second).
SetRetryWrites(true).
SetWriteConcern(writeconcern.New(writeconcern.WMajority(), writeconcern.J(true))).
SetReadPreference(readpref.Primary())

client, err := mongo.Connect(ctx, clientOpts)
if err != nil {
cancel()
return nil, fmt.Errorf("连接失败: %w", err)
}

// 验证连接
if err := client.Ping(ctx, readpref.Primary()); err != nil {
cancel()
client.Disconnect(ctx)
return nil, fmt.Errorf("Ping失败: %w", err)
}

example := &MongoDBExample{
client: client,
database: client.Database("myapp"),
collection: client.Database("myapp").Collection("users"),
ctx: ctx,
cancel: cancel,
}

fmt.Println("✅ MongoDB连接成功")
return example, nil
}

func (m *MongoDBExample) Close() {
defer m.cancel()
if err := m.client.Disconnect(m.ctx); err != nil {
log.Printf("断开连接失败: %v", err)
}
fmt.Println("✅ 连接已关闭")
}

// 创建集合和索引
func (m *MongoDBExample) CreateCollection() error {
// 删除已存在的集合
if err := m.collection.Drop(m.ctx); err != nil && !isNotFound(err) {
return fmt.Errorf("删除集合失败: %w", err)
}

// 创建集合验证器
validator := bson.M{
"$jsonSchema": bson.M{
"bsonType": "object",
"required": []string{"username", "email", "created_at"},
"properties": bson.M{
"username": bson.M{
"bsonType": "string",
"description": "用户名必须为字符串",
},
"email": bson.M{
"bsonType": "string",
"pattern": "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$",
"description": "邮箱格式不正确",
},
"age": bson.M{
"bsonType": "int",
"minimum": 0,
"maximum": 120,
"description": "年龄必须在0-120之间",
},
"skills": bson.M{
"bsonType": "array",
"items": bson.M{
"bsonType": "string",
},
},
},
},
}

opts := options.CreateCollection().
SetValidator(validator).
SetValidationLevel("moderate").
SetValidationAction("error")

if err := m.database.CreateCollection(m.ctx, "users", opts); err != nil {
return fmt.Errorf("创建集合失败: %w", err)
}

// 创建索引
indexModels := []mongo.IndexModel{
{
Keys: bson.D{{"username", 1}},
Options: options.Index().SetUnique(true),
},
{
Keys: bson.D{{"email", 1}},
Options: options.Index().SetUnique(true),
},
{
Keys: bson.D{{"age", 1}},
},
{
Keys: bson.D{{"skills", 1}},
},
{
Keys: bson.D{{"created_at", -1}},
},
}

if _, err := m.collection.Indexes().CreateMany(m.ctx, indexModels); err != nil {
return fmt.Errorf("创建索引失败: %w", err)
}

fmt.Println("✅ 集合和索引创建成功")
return nil
}

// 批量插入用户
func (m *MongoDBExample) BulkInsertUsers() error {
users := []interface{}{
bson.M{
"username": "zhangsan_go",
"email": "zhangsan_go@example.com",
"age": 28,
"skills": []string{"Go", "MongoDB", "Docker"},
"created_at": time.Now(),
"updated_at": time.Now(),
},
bson.M{
"username": "lisi_go",
"email": "lisi_go@example.com",
"age": 32,
"skills": []string{"Go", "gRPC", "Kubernetes"},
"created_at": time.Now(),
"updated_at": time.Now(),
},
bson.M{
"username": "wangwu_go",
"email": "wangwu_go@example.com",
"age": 25,
"skills": []string{"Go", "MongoDB", "Redis"},
"created_at": time.Now(),
"updated_at": time.Now(),
},
}

opts := options.InsertMany().SetOrdered(false)
result, err := m.collection.InsertMany(m.ctx, users, opts)
if err != nil {
return fmt.Errorf("批量插入失败: %w", err)
}

fmt.Printf("✅ 批量插入成功,插入 %d 条记录\n", len(result.InsertedIDs))
for i, id := range result.InsertedIDs {
fmt.Printf(" - 第 %d 条: %s\n", i+1, id)
}

return nil
}

// 高级查询
func (m *MongoDBExample) AdvancedQueries() error {
fmt.Println("\n🔍 高级查询示例:")

// 1. 查找年龄大于30的用户
filter := bson.M{"age": bson.M{"$gt": 30}}
projection := bson.M{"username": 1, "age": 1, "skills": 1, "_id": 0}

cursor, err := m.collection.Find(m.ctx, filter, options.Find().SetProjection(projection))
if err != nil {
return fmt.Errorf("查询失败: %w", err)
}
defer cursor.Close(m.ctx)

fmt.Println("1. 年龄 > 30 的用户:")
var results []bson.M
if err := cursor.All(m.ctx, &results); err != nil {
return fmt.Errorf("获取结果失败: %w", err)
}

for _, user := range results {
skills, _ := user["skills"].([]interface{})
skillStrs := make([]string, len(skills))
for i, s := range skills {
skillStrs[i] = s.(string)
}
fmt.Printf(" - %s (%d岁): %s\n",
user["username"],
user["age"],
stringSliceToString(skillStrs))
}

// 2. 聚合查询
pipeline := mongo.Pipeline{
{{"$unwind", "$skills"}},
{{"$group", bson.D{
{"_id", "$skills"},
{"count", bson.D{{"$sum", 1}}},
{"avg_age", bson.D{{"$avg", "$age"}}},
}}},
{{"$sort", bson.D{{"count", -1}, {"avg_age", 1}}}},
{{"$limit", 5}},
}

cursor, err = m.collection.Aggregate(m.ctx, pipeline)
if err != nil {
return fmt.Errorf("聚合查询失败: %w", err)
}
defer cursor.Close(m.ctx)

fmt.Println("\n2. 技能统计(聚合查询):")
type SkillStats struct {
Skill string `bson:"_id"`
Count int `bson:"count"`
AvgAge float64 `bson:"avg_age"`
}

var stats []SkillStats
if err := cursor.All(m.ctx, &stats); err != nil {
return fmt.Errorf("获取聚合结果失败: %w", err)
}

for _, stat := range stats {
fmt.Printf(" - %s: %d人, 平均年龄: %.1f岁\n",
stat.Skill, stat.Count, stat.AvgAge)
}

return nil
}

// 事务操作
func (m *MongoDBExample) TransactionExample() error {
session, err := m.client.StartSession()
if err != nil {
return fmt.Errorf("创建会话失败: %w", err)
}
defer session.EndSession(m.ctx)

callback := func(sessCtx mongo.SessionContext) (interface{}, error) {
// 插入新用户
newUser := bson.M{
"username": "transaction_user_" + time.Now().Format("20060102150405"),
"email": "tx_" + time.Now().Format("20060102150405") + "@example.com",
"age": 30,
"skills": []string{"Transaction", "Go", "MongoDB"},
"created_at": time.Now(),
"updated_at": time.Now(),
}

result, err := m.collection.InsertOne(sessCtx, newUser)
if err != nil {
return nil, fmt.Errorf("插入用户失败: %w", err)
}
fmt.Printf("✅ 事务: 插入用户成功, ID: %s\n", result.InsertedID)

// 更新统计信息
analyticsColl := m.database.Collection("analytics")
filter := bson.M{"type": "user_count"}
update := bson.M{"$inc": bson.M{"count": 1}}

_, err = analyticsColl.UpdateOne(
sessCtx,
filter,
update,
options.Update().SetUpsert(true),
)
if err != nil {
return nil, fmt.Errorf("更新统计失败: %w", err)
}
fmt.Println("✅ 事务: 更新统计信息成功")

return nil, nil
}

if _, err := session.WithTransaction(m.ctx, callback); err != nil {
return fmt.Errorf("事务执行失败: %w", err)
}

fmt.Println("✅ 事务提交成功")
return nil
}

// 性能测试:批量插入
func (m *MongoDBExample) PerformanceTest() error {
// 准备10,000条测试数据
bulkData := make([]interface{}, 10000)
for i := 0; i < 10000; i++ {
bulkData[i] = bson.M{
"username": fmt.Sprintf("perf_user_%d", i+1),
"email": fmt.Sprintf("perf_%d@example.com", i+1),
"age": 18 + i%43,
"skills": []string{"Performance", "MongoDB", "Go"},
"created_at": time.Now(),
"updated_at": time.Now(),
}
}

// 测试insertMany性能
startTime := time.Now()
opts := options.InsertMany().SetOrdered(false)
_, err := m.collection.InsertMany(m.ctx, bulkData, opts)
if err != nil {
return fmt.Errorf("批量插入失败: %w", err)
}
duration := time.Since(startTime)

fmt.Printf("✅ 插入 10,000 条记录,耗时: %.3f秒\n", duration.Seconds())
fmt.Printf(" 吞吐量: %.2f 条/秒\n", 10000/duration.Seconds())

return nil
}

// 监控信息
func (m *MongoDBExample) Monitoring() error {
serverStatus := bson.M{}
err := m.database.RunCommand(m.ctx, bson.D{{"serverStatus", 1}}).Decode(&serverStatus)
if err != nil {
return fmt.Errorf("获取服务器状态失败: %w", err)
}

fmt.Println("\n📊 服务器状态监控:")
fmt.Printf(" - 版本: %s\n", serverStatus["version"])

if connections, ok := serverStatus["connections"].(bson.M); ok {
fmt.Printf(" - 连接数: %d / %d\n",
connections["current"], connections["available"])
}

if mem, ok := serverStatus["mem"].(bson.M); ok {
if resident, ok := mem["resident"].(int32); ok {
fmt.Printf(" - 内存使用: %.2fGB (驻留)\n", float64(resident)/1024)
}
}

if opcounters, ok := serverStatus["opcounters"].(bson.M); ok {
fmt.Println(" - 操作计数:")
fmt.Printf(" * 插入: %d\n", opcounters["insert"])
fmt.Printf(" * 查询: %d\n", opcounters["query"])
fmt.Printf(" * 更新: %d\n", opcounters["update"])
fmt.Printf(" * 删除: %d\n", opcounters["delete"])
}

return nil
}

// 辅助函数
func isNotFound(err error) bool {
if err == nil {
return false
}
return err.Error() == "ns not found"
}

func stringSliceToString(s []string) string {
result := "["
for i, str := range s {
if i > 0 {
result += ", "
}
result += str
}
result += "]"
return result
}

func main() {
fmt.Println("🚀 开始 MongoDB Go 1.22 操作示例")
fmt.Println("=====================================")

example, err := NewMongoDBExample()
if err != nil {
log.Fatalf("初始化失败: %v", err)
}
defer example.Close()

if err := example.CreateCollection(); err != nil {
log.Fatalf("创建集合失败: %v", err)
}

if err := example.BulkInsertUsers(); err != nil {
log.Fatalf("批量插入失败: %v", err)
}

if err := example.AdvancedQueries(); err != nil {
log.Fatalf("高级查询失败: %v", err)
}

if err := example.TransactionExample(); err != nil {
log.Fatalf("事务操作失败: %v", err)
}

if err := example.PerformanceTest(); err != nil {
log.Fatalf("性能测试失败: %v", err)
}

if err := example.Monitoring(); err != nil {
log.Fatalf("监控失败: %v", err)
}

fmt.Println("\n🎉 所有操作完成!")
}

6.3 Python 3.11 + PyMongo 异步操作示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
#!/usr/bin/env python3
"""
MongoDB Python 3.11 异步操作完整示例
需要安装: pip install pymongo motor
"""

import asyncio
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
import motor.motor_asyncio
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase, AsyncIOMotorCollection
from bson import ObjectId
from bson.codec_options import CodecOptions
from pymongo import ReadPreference, WriteConcern
from pymongo.errors import ConnectionFailure, OperationFailure, DuplicateKeyError

# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("mongodb_example.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger("MongoDBExample")

class User:
"""用户模型"""
def __init__(self, username: str, email: str, age: int, skills: List[str]):
self.username = username
self.email = email
self.age = age
self.skills = skills
self.created_at = datetime.utcnow()
self.updated_at = datetime.utcnow()

def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"username": self.username,
"email": self.email,
"age": self.age,
"skills": self.skills,
"created_at": self.created_at,
"updated_at": self.updated_at
}

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "User":
"""从字典创建对象"""
user = cls(
username=data["username"],
email=data["email"],
age=data["age"],
skills=data["skills"]
)
user.created_at = data.get("created_at", datetime.utcnow())
user.updated_at = data.get("updated_at", datetime.utcnow())
return user

class MongoDBExample:
"""MongoDB异步操作示例类"""

def __init__(self):
self.client: Optional[AsyncIOMotorClient] = None
self.db: Optional[AsyncIOMotorDatabase] = None
self.collection: Optional[AsyncIOMotorCollection] = None
self.uri = "mongodb://appuser:AppUserPass123!@192.168.1.101:27017,192.168.1.102:27017,192.168.1.103:27017/?replicaSet=prod-rs0&readPreference=secondaryPreferred"

async def connect(self) -> None:
"""连接MongoDB"""
try:
self.client = motor.motor_asyncio.AsyncIOMotorClient(
self.uri,
serverSelectionTimeoutMS=5000,
connectTimeoutMS=5000,
socketTimeoutMS=10000,
maxPoolSize=50,
minPoolSize=10,
retryWrites=True,
w="majority",
journal=True,
readPreference=ReadPreference.PRIMARY
)

# 验证连接
await self.client.admin.command('ping')
logger.info("✅ MongoDB连接成功")

# 选择数据库和集合
self.db = self.client.get_database(
"myapp",
codec_options=CodecOptions(tz_aware=True)
)
self.collection = self.db.get_collection(
"users",
write_concern=WriteConcern(w="majority", j=True)
)

except ConnectionFailure as e:
logger.error(f"❌ 连接失败: {e}")
raise
except OperationFailure as e:
logger.error(f"❌ 操作失败: {e}")
raise

async def close(self) -> None:
"""关闭连接"""
if self.client:
self.client.close()
logger.info("✅ 连接已关闭")

async def create_collection(self) -> None:
"""创建集合和索引"""
try:
# 删除已存在的集合
if "users" in await self.db.list_collection_names():
await self.db.drop_collection("users")
logger.info("🗑️ 已删除现有集合")

# 创建集合验证器
validator = {
"$jsonSchema": {
"bsonType": "object",
"required": ["username", "email", "created_at"],
"properties": {
"username": {
"bsonType": "string",
"description": "用户名必须为字符串"
},
"email": {
"bsonType": "string",
"pattern": "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$",
"description": "邮箱格式不正确"
},
"age": {
"bsonType": "int",
"minimum": 0,
"maximum": 120,
"description": "年龄必须在0-120之间"
},
"skills": {
"bsonType": "array",
"items": {
"bsonType": "string"
}
},
"created_at": {
"bsonType": "date"
},
"updated_at": {
"bsonType": "date"
}
}
}
}

# 创建集合
await self.db.create_collection(
"users",
validator=validator,
validationLevel="moderate",
validationAction="error"
)
logger.info("✅ 集合创建成功")

# 创建索引
index_models = [
{"key": {"username": 1}, "name": "username_1", "unique": True},
{"key": {"email": 1}, "name": "email_1", "unique": True},
{"key": {"age": 1}, "name": "age_1"},
{"key": {"skills": 1}, "name": "skills_1"},
{"key": {"created_at": -1}, "name": "created_at_-1"},
]

for index in index_models:
await self.collection.create_index(
index["key"],
unique=index.get("unique", False),
name=index["name"]
)

logger.info("✅ 索引创建成功")

except OperationFailure as e:
logger.error(f"❌ 集合创建失败: {e}")
raise

async def bulk_insert_users(self) -> None:
"""批量插入用户数据"""
users = [
User("zhangsan_py", "zhangsan_py@example.com", 28, ["Python", "MongoDB", "Django"]).to_dict(),
User("lisi_py", "lisi_py@example.com", 32, ["Python", "FastAPI", "Redis"]).to_dict(),
User("wangwu_py", "wangwu_py@example.com", 25, ["Python", "Pandas", "MongoDB"]).to_dict(),
User("zhaoliu_py", "zhaoliu_py@example.com", 35, ["Python", "TensorFlow", "MongoDB"]).to_dict(),
]

try:
result = await self.collection.insert_many(users, ordered=False)
logger.info(f"✅ 批量插入成功,插入 {len(result.inserted_ids)} 条记录")
for i, user_id in enumerate(result.inserted_ids):
logger.info(f" - 第 {i+1} 条: {user_id}")
except DuplicateKeyError as e:
logger.warning(f"⚠️ 部分记录重复: {e}")
except OperationFailure as e:
logger.error(f"❌ 批量插入失败: {e}")
raise

async def advanced_queries(self) -> None:
"""高级查询示例"""
logger.info("\n🔍 高级查询示例:")

# 1. 查找年龄大于30的用户
filter_query = {"age": {"$gt": 30}}
projection = {"username": 1, "age": 1, "skills": 1, "_id": 0}

cursor = self.collection.find(filter_query, projection)
users = await cursor.to_list(length=100)

logger.info("1. 年龄 > 30 的用户:")
for user in users:
skills_str = ", ".join(user["skills"])
logger.info(f" - {user['username']} ({user['age']}岁): {skills_str}")

# 2. 聚合查询:按技能分组统计
pipeline = [
{"$unwind": "$skills"},
{
"$group": {
"_id": "$skills",
"count": {"$sum": 1},
"avg_age": {"$avg": "$age"},
"users": {"$push": "$username"}
}
},
{"$sort": {"count": -1, "avg_age": 1}},
{"$limit": 5}
]

cursor = self.collection.aggregate(pipeline)
results = await cursor.to_list(length=100)

logger.info("\n2. 技能统计(聚合查询):")
for result in results:
users_str = ", ".join(result["users"][:3]) + ("..." if len(result["users"]) > 3 else "")
logger.info(f" - {result['_id']}: {result['count']}人, 平均年龄: {result['avg_age']:.1f}岁, 用户: {users_str}")

# 3. 文本搜索(需要先创建文本索引)
try:
await self.collection.create_index([("username", "text"), ("skills", "text")])
logger.info("\n3. 文本搜索示例:")

cursor = self.collection.find(
{"$text": {"$search": "MongoDB"}},
{"score": {"$meta": "textScore"}}
).sort([("score", {"$meta": "textScore"})])

results = await cursor.to_list(length=10)
for result in results:
logger.info(f" - {result['username']}, 技能: {', '.join(result['skills'])}, 相关度: {result['score']:.2f}")

except OperationFailure as e:
logger.warning(f"⚠️ 文本搜索失败: {e}")

async def transaction_example(self) -> None:
"""事务操作示例"""
logger.info("\n🔄 事务操作示例:")

async with await self.client.start_session() as session:
async with session.start_transaction(
read_concern={"level": "majority"},
write_concern={"w": "majority", "j": True},
read_preference=ReadPreference.PRIMARY
):
try:
# 插入新用户
new_user = User(
username=f"tx_user_{int(datetime.utcnow().timestamp())}",
email=f"tx_{int(datetime.utcnow().timestamp())}@example.com",
age=30,
skills=["Transaction", "Python", "MongoDB"]
).to_dict()

result = await self.collection.insert_one(new_user, session=session)
logger.info(f"✅ 事务: 插入用户成功, ID: {result.inserted_id}")

# 更新统计信息
analytics_coll = self.db.get_collection("analytics")
update_result = await analytics_coll.update_one(
{"type": "user_count"},
{"$inc": {"count": 1}},
upsert=True,
session=session
)
logger.info(f"✅ 事务: 更新统计信息成功, 修改: {update_result.modified_count}, 新增: {update_result.upserted_id}")

# 提交事务
await session.commit_transaction()
logger.info("✅ 事务提交成功")

except Exception as e:
# 回滚事务
await session.abort_transaction()
logger.error(f"❌ 事务失败,已回滚: {e}")
raise

async def async_operations(self) -> None:
"""异步操作示例"""
logger.info("\n⚡ 异步操作示例:")

# 1. 并发插入
async def insert_user(index: int) -> str:
user = User(
username=f"async_user_{index}",
email=f"async_{index}@example.com",
age=20 + index % 40,
skills=["Async", "Python", "MongoDB"]
).to_dict()
result = await self.collection.insert_one(user)
return str(result.inserted_id)

# 使用asyncio.gather并发执行
start_time = datetime.utcnow()
tasks = [insert_user(i) for i in range(100)]
results = await asyncio.gather(*tasks, return_exceptions=True)

successful_count = sum(1 for r in results if not isinstance(r, Exception))
error_count = len(results) - successful_count

duration = (datetime.utcnow() - start_time).total_seconds()
logger.info(f"✅ 并发插入 100 条记录,成功: {successful_count}, 失败: {error_count}, 耗时: {duration:.3f}秒")
logger.info(f" 吞吐量: {successful_count/duration:.2f} 条/秒")

# 2. 异步监控
async def monitor_collection() -> None:
while True:
count = await self.collection.count_documents({})
logger.info(f"📊 当前用户数量: {count}")
await asyncio.sleep(5)

# 启动监控任务
monitor_task = asyncio.create_task(monitor_collection())

# 模拟持续操作
await asyncio.sleep(10)
monitor_task.cancel()

async def performance_test(self) -> None:
"""性能测试"""
logger.info("\n📈 性能测试:")

# 准备10,000条测试数据
bulk_data = []
for i in range(10000):
bulk_data.append({
"username": f"perf_user_{i+1}",
"email": f"perf_{i+1}@example.com",
"age": 18 + i % 43,
"skills": ["Performance", "MongoDB", "Python"],
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
})

# 测试批量插入性能
start_time = datetime.utcnow()
try:
result = await self.collection.insert_many(bulk_data, ordered=False)
duration = (datetime.utcnow() - start_time).total_seconds()
logger.info(f"✅ 插入 10,000 条记录,耗时: {duration:.3f}秒")
logger.info(f" 吞吐量: {10000/duration:.2f} 条/秒")
logger.info(f" 插入数量: {len(result.inserted_ids)}")
except Exception as e:
logger.error(f"❌ 性能测试失败: {e}")
raise

# 测试查询性能
start_time = datetime.utcnow()
count = await self.collection.count_documents({"age": {"$gt": 30}})
duration = (datetime.utcnow() - start_time).total_seconds()
logger.info(f"✅ 查询年龄>30的用户,数量: {count}, 耗时: {duration:.6f}秒")

async def run_example(self) -> None:
"""运行完整示例"""
try:
logger.info("🚀 开始 MongoDB Python 3.11 异步操作示例")
logger.info("=====================================")

await self.connect()
await self.create_collection()
await self.bulk_insert_users()
await self.advanced_queries()
await self.transaction_example()
await self.performance_test()
await self.async_operations()

logger.info("\n🎉 所有操作完成!")

except Exception as e:
logger.exception(f"❌ 程序异常: {e}")
raise
finally:
await self.close()

async def main() -> None:
"""主函数"""
example = MongoDBExample()
await example.run_example()

if __name__ == "__main__":
asyncio.run(main())

七、生产环境最佳实践与注意事项 ⚠️

7.1 安全加固清单

网络层安全

1
2
3
4
5
6
# 防火墙配置(仅允许应用服务器访问)
sudo iptables -A INPUT -p tcp --dport 27017 -s 192.168.1.0/24 -j ACCEPT
sudo iptables -A INPUT -p tcp --dport 27017 -j DROP

# 使用VPC/VNet隔离
# 云环境:配置安全组,仅允许特定IP访问

认证与授权

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 创建最小权限用户
use myapp
db.createUser({
user: "readonly_user",
pwd: "ReadOnlyPass123!",
roles: [
{ role: "read", db: "myapp" },
{ role: "read", db: "analytics" }
]
})

// 应用用户
db.createUser({
user: "app_user",
pwd: "AppUserPass123!",
roles: [
{ role: "readWrite", db: "myapp" },
{ role: "read", db: "reference_data" }
]
})

// 监控用户
db.createUser({
user: "monitor_user",
pwd: "MonitorPass123!",
roles: [
{ role: "clusterMonitor", db: "admin" },
{ role: "read", db: "local" }
]
})

加密传输

1
2
3
4
5
6
# TLS/SSL配置检查
mongosh --tls --tlsCAFile /etc/ssl/ca.pem \
--eval "db.serverStatus().security"

# 客户端强制TLS
mongodb://user:pass@host:27017/db?tls=true&tlsCAFile=/path/to/ca.pem

7.2 备份与恢复策略

完整备份方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/bin/bash
# MongoDB备份脚本(生产环境)

BACKUP_DIR="/backup/mongodb"
DATE=$(date +%Y%m%d_%H%M%S)
RETENTION_DAYS=7

# 创建备份目录
mkdir -p ${BACKUP_DIR}/${DATE}

# 1. 逻辑备份(mongodump)
mongodump --uri="mongodb://backup_user:BackupPass123!@192.168.1.101:27017,192.168.1.102:27017,192.168.1.103:27017/?replicaSet=prod-rs0" \
--gzip \
--archive=${BACKUP_DIR}/${DATE}/full_backup.gz \
--oplog

# 2. 配置文件备份
cp /etc/mongod.conf ${BACKUP_DIR}/${DATE}/

# 3. 计算备份大小
BACKUP_SIZE=$(du -sh ${BACKUP_DIR}/${DATE} | cut -f1)
echo "✅ 备份完成,大小: ${BACKUP_SIZE}"

# 4. 清理旧备份
find ${BACKUP_DIR} -maxdepth 1 -type d -mtime +${RETENTION_DAYS} -exec rm -rf {} \;

# 5. 上传到对象存储
aws s3 cp ${BACKUP_DIR}/${DATE} s3://mongodb-backups/prod/${DATE}/ --recursive

恢复流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 1. 停止应用
sudo systemctl stop myapp

# 2. 恢复数据
mongorestore --uri="mongodb://admin:AdminPass123!@localhost:27017" \
--gzip \
--archive=/backup/mongodb/20260121_120000/full_backup.gz \
--oplogReplay \
--drop

# 3. 验证数据
mongosh -u admin -p AdminPass123! --eval "
db.getSiblingDB('myapp').users.countDocuments();
db.getSiblingDB('myapp').users.findOne();
"

# 4. 重启应用
sudo systemctl start myapp

7.3 监控与告警体系

监控指标清单

指标类别关键指标告警阈值采集方式
连接connections.current>80% max connectionsserverStatus
性能opcounters.*突增50%serverStatus
复制replication.lag>30秒replSetGetStatus
存储wiredTiger.cache.*使用率>90%serverStatus
操作globalLock.currentQueue>10serverStatus
资源CPU/Memory/DiskCPU>80%, 内存>90%系统监控

Prometheus + Grafana 配置

1
2
3
4
5
6
7
8
9
10
11
# prometheus.yml
scrape_configs:
- job_name: 'mongodb'
static_configs:
- targets: ['192.168.1.101:9216', '192.168.1.102:9216', '192.168.1.103:9216']
metrics_path: /metrics
relabel_configs:
- source_labels: [__address__]
target_label: instance
regex: (.*):.*
replacement: $1

MongoDB Exporter 启动

1
2
3
4
5
6
7
8
9
10
docker run -d \
--name mongodb_exporter \
-p 9216:9216 \
-e MONGODB_URI="mongodb://monitor_user:MonitorPass123!@192.168.1.101:27017,192.168.1.102:27017,192.168.1.103:27017/?replicaSet=prod-rs0" \
-e MONGODB_COLLECT_DATABASE_STATUS=true \
-e MONGODB_COLLECT_COLLECTION_STATUS=true \
-e MONGODB_COLLECT_TOP_METRICS=true \
-e MONGODB_COLLECT_INDEX_USAGE=true \
--restart unless-stopped \
percona/mongodb_exporter:0.30

7.4 性能优化关键点

索引优化原则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 1. 避免过度索引(每个索引增加写入开销)
db.collection.getIndexes().forEach(printjson)

// 2. 复合索引顺序:等值查询字段 -> 排序字段 -> 范围查询字段
// 好:{ status: 1, created_at: -1, user_id: 1 }
// 差:{ user_id: 1, status: 1, created_at: -1 }

// 3. 覆盖索引查询
db.orders.find(
{ status: "completed", created_at: { $gt: ISODate("2026-01-01") } },
{ _id: 0, order_id: 1, amount: 1 }
).hint({ status: 1, created_at: -1, amount: 1 })

// 4. 文本索引优化
db.products.createIndex(
{ product_name: "text", description: "text" },
{
weights: { product_name: 10, description: 5 },
default_language: "english",
language_override: "language"
}
)

查询优化技巧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 1. 避免$or操作符,改用$in
// 差:{ $or: [{ status: "active" }, { status: "pending" }] }
// 好:{ status: { $in: ["active", "pending"] } }

// 2. 限制返回字段
db.users.find(
{ age: { $gt: 30 } },
{ username: 1, email: 1, _id: 0 }
).limit(100)

// 3. 使用$project和$match在聚合管道中尽早过滤
db.orders.aggregate([
{ $match: { created_at: { $gt: ISODate("2026-01-01") } } },
{ $lookup: { from: "users", localField: "user_id", foreignField: "_id", as: "user" } },
{ $unwind: "$user" },
{ $project: { total: 1, user.username: 1, user.email: 1 } },
{ $sort: { total: -1 } },
{ $limit: 100 }
])

// 4. 避免全集合扫描
db.collection.explain("executionStats").find({ non_indexed_field: "value" })

硬件优化建议

资源类型开发环境生产环境(小)生产环境(中)生产环境(大)
CPU2核4-8核16-32核32-64核
内存4GB16GB64GB128GB+
磁盘50GB HDD500GB SSD2TB NVMe10TB+ NVMe
IOPS10030001000050000+
网络1Gbps1Gbps10Gbps25Gbps+

八、常见问题排查与解决方案

8.1 连接问题排查

症状:应用无法连接MongoDB

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 1. 检查服务状态
sudo systemctl status mongod

# 2. 检查端口监听
sudo netstat -tuln | grep 27017
# 正常输出:tcp 0 0 192.168.1.101:27017 0.0.0.0:* LISTEN

# 3. 检查防火墙
sudo iptables -L -n | grep 27017

# 4. 检查绑定IP配置
grep bindIp /etc/mongod.conf

# 5. 测试本地连接
mongosh --eval "db.ping()"

# 6. 测试远程连接
telnet 192.168.1.101 27017

解决方案

  • 修改bindIp为具体IP或0.0.0.0(配合防火墙)
  • 检查SELinux/AppArmor限制
  • 云环境检查安全组规则

8.2 复制集问题排查

症状:副本集状态异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 1. 检查副本集状态
rs.status()

// 2. 检查复制延迟
rs.printSecondaryReplicationInfo()

// 3. 检查oplog窗口
rs.printReplicationInfo()

// 4. 检查网络连接
db.adminCommand({ replSetGetStatus: 1 }).members.forEach(member => {
print(member.name + ": " + member.health + ", " + member.stateStr)
})

// 5. 强制重新配置(谨慎使用!⚠️)
cfg = rs.conf()
cfg.members[0].priority = 3
rs.reconfig(cfg, { force: true })

常见问题

  • 网络分区:确保节点间网络延迟<10ms
  • oplog不足:增大oplogSizeMB
  • 时钟不同步:配置NTP服务
  • 磁盘空间不足:监控磁盘使用率

8.3 性能问题排查&跟踪常用方法

症状:查询变慢,CPU使用率高

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 1. 检查当前操作
db.currentOp({ secs_running: { $gt: 1 } })

// 2. 检查锁状态
db.serverStatus().globalLock

// 3. 检查慢查询日志
db.setProfilingLevel(1, { slowms: 50 }) // 记录>50ms的查询
db.system.profile.find().sort({ ts: -1 }).limit(10)

// 4. 检查索引使用
db.collection.explain("executionStats").find({ query })

// 5. 检查内存使用
db.serverStatus().wiredTiger.cache

优化步骤

  1. 识别慢查询
  2. 分析执行计划
  3. 添加/优化索引
  4. 重构查询逻辑
  5. 考虑分片扩展

九、小结未来预估方向

9.1 技术选型建议

MongoDB适用场景

  • ✅ 灵活模式需求(快速迭代的产品)
  • ✅ 高写入负载(IoT、实时分析)
  • ✅ 层次化数据(嵌套文档结构)
  • ✅ 全球分布式部署(多区域副本集)
  • ✅ JSON原生应用(Web API、移动后端)

慎用场景

  • ❌ 强事务需求(银行核心系统)
  • ❌ 复杂JOIN查询(传统ERP系统)
  • ❌ 严格ACID要求(财务记账)
  • ❌ 超低延迟要求(HFT高频交易)

9.2 2026年技术趋势

MongoDB发展方向

  1. AI/ML集成:向量搜索成为标配,内置ML模型部署
  2. Serverless演进:Atlas Serverless成为主流部署模式
  3. 多云支持:无缝跨云数据同步,统一管理界面
  4. 实时分析:增强时序数据处理能力,与流处理平台深度集成
  5. 安全增强:零信任架构,细粒度数据访问控制

9.3 推荐官方资料

官方文档

MongoDB深度解析实战指南:从原理到生产部署以及主流语言实践

https://www.wdft.com/46b9fbaf.html

Author

Jaco Liu

Posted on

2026-01-03

Updated on

2026-01-21

Licensed under