活动公告

系统通知
05-18 21:22
系统通知
通知:本站资源由网友上传分享,如有违规等问题请到版务模块进行投诉,资源失效请在帖子内回复要求补档,会尽快处理!
10-23 09:31

MongoDB数据库输出JSON数据的完整指南与最佳实践帮助开发者轻松实现数据导出与交换

SunJu_FaceMall

3万

主题

2860

科技点

3万

积分

白金月票

碾压王

积分
32872

塔罗立华奏

<font color=白金月票" /> 发表于 2025-9-6 12:50:02 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
1. 引言:MongoDB与JSON的关系

MongoDB是一种流行的NoSQL数据库,它使用文档存储模型,而JSON(JavaScript Object Notation)是一种轻量级的数据交换格式。这两者之间有着天然的联系,因为MongoDB中的文档本质上是BSON(Binary JSON)格式存储的,这使得JSON成为MongoDB数据导出的理想选择。

本文将全面介绍如何从MongoDB数据库中导出JSON数据,包括各种方法、工具、最佳实践以及实际应用场景,帮助开发者轻松实现数据导出与交换。

2. 为什么需要从MongoDB导出JSON数据

在实际开发中,将MongoDB数据导出为JSON格式有多种用途:

• 数据备份与迁移:将数据导出为JSON格式可以方便地进行备份或迁移到其他系统
• 数据分析:导出的JSON数据可以被各种数据分析工具处理
• 系统集成:不同系统之间通过JSON格式进行数据交换
• 开发测试:创建测试数据或模拟生产环境
• 报告生成:将数据导出后用于生成各种报告

3. MongoDB导出JSON的方法

3.1 使用mongoexport命令行工具

mongoexport是MongoDB提供的一个命令行工具,专门用于将集合中的数据导出为JSON或CSV格式。
  1. mongoexport --host=localhost --port=27017 --db=mydb --collection=mycollection --out=data.json
复制代码

• --host:MongoDB服务器地址
• --port:MongoDB服务器端口
• --db:要导出的数据库名称
• --collection:要导出的集合名称
• --out:输出文件路径
• --query:导出数据的查询条件(JSON格式)
• --fields:指定要导出的字段
• --type:输出文件类型(json或csv)
• --jsonArray:将输出作为JSON数组而不是每行一个JSON对象
• --pretty:格式化JSON输出,使其更易读

1. 使用查询条件导出特定数据
  1. mongoexport --db=sales --collection=orders --query='{"status": "completed", "date": {"$gte": {"$date": "2023-01-01T00:00:00Z"}}}' --out=completed_orders_2023.json
复制代码

2. 导出特定字段并格式化输出
  1. mongoexport --db=users --collection=profiles --fields=name,email,age --jsonArray --pretty --out=user_profiles.json
复制代码

3. 从认证的MongoDB实例导出数据
  1. mongoexport --host=cluster.example.com --port=27017 --db=app --collection=logs --username=admin --password=secret --authenticationDatabase=admin --out=logs.json
复制代码

3.2 使用MongoDB Compass图形界面

MongoDB Compass是MongoDB的官方图形界面工具,它提供了直观的方式来导出数据。

1. 连接到MongoDB服务器
2. 选择要导出的数据库和集合
3. 点击集合标签页中的”Export”按钮
4. 在弹出的对话框中选择”JSON”作为导出格式
5. 选择要导出的字段(可选)
6. 设置查询条件(可选)
7. 点击”Export”按钮并选择保存位置

• 导出整个集合:导出集合中的所有文档
• 导出查询结果:先执行查询,然后导出结果
• 字段选择:只导出选定的字段
• 输出格式:标准JSON或CSV

3.3 使用编程语言驱动程序

通过编程语言的MongoDB驱动程序,可以更灵活地控制数据导出过程。
  1. const { MongoClient } = require('mongodb');
  2. const fs = require('fs');
  3. const path = require('path');
  4. async function exportToJson() {
  5.   const uri = "mongodb://localhost:27017";
  6.   const client = new MongoClient(uri);
  7.   
  8.   try {
  9.     await client.connect();
  10.     const database = client.db("mydb");
  11.     const collection = database.collection("mycollection");
  12.    
  13.     // 查询条件
  14.     const query = { status: "active" };
  15.    
  16.     // 投影(选择字段)
  17.     const projection = { _id: 0, name: 1, email: 1 };
  18.    
  19.     // 执行查询
  20.     const cursor = collection.find(query).project(projection);
  21.    
  22.     // 转换为数组
  23.     const results = await cursor.toArray();
  24.    
  25.     // 转换为JSON字符串并格式化
  26.     const json = JSON.stringify(results, null, 2);
  27.    
  28.     // 写入文件
  29.     fs.writeFileSync(path.join(__dirname, 'output.json'), json);
  30.    
  31.     console.log(`成功导出 ${results.length} 条记录到 output.json`);
  32.   } finally {
  33.     await client.close();
  34.   }
  35. }
  36. exportToJson().catch(console.error);
复制代码
  1. from pymongo import MongoClient
  2. import json
  3. from datetime import datetime
  4. def export_to_json():
  5.     # 连接到MongoDB
  6.     client = MongoClient('mongodb://localhost:27017/')
  7.     db = client['mydb']
  8.     collection = db['mycollection']
  9.    
  10.     # 查询条件
  11.     query = {
  12.         "created_at": {
  13.             "$gte": datetime(2023, 1, 1),
  14.             "$lt": datetime(2023, 12, 31)
  15.         }
  16.     }
  17.    
  18.     # 投影(选择字段)
  19.     projection = {
  20.         "_id": 0,
  21.         "name": 1,
  22.         "email": 1,
  23.         "created_at": 1
  24.     }
  25.    
  26.     # 执行查询
  27.     cursor = collection.find(query, projection)
  28.    
  29.     # 转换为列表
  30.     results = list(cursor)
  31.    
  32.     # 处理日期对象,使其可JSON序列化
  33.     for doc in results:
  34.         if 'created_at' in doc and isinstance(doc['created_at'], datetime):
  35.             doc['created_at'] = doc['created_at'].isoformat()
  36.    
  37.     # 写入JSON文件
  38.     with open('output.json', 'w', encoding='utf-8') as f:
  39.         json.dump(results, f, ensure_ascii=False, indent=2)
  40.    
  41.     print(f"成功导出 {len(results)} 条记录到 output.json")
  42.    
  43.     # 关闭连接
  44.     client.close()
  45. if __name__ == "__main__":
  46.     export_to_json()
复制代码
  1. import com.mongodb.client.*;
  2. import com.mongodb.client.model.Filters;
  3. import com.mongodb.client.model.Projections;
  4. import org.bson.Document;
  5. import org.bson.conversions.Bson;
  6. import java.io.FileWriter;
  7. import java.io.IOException;
  8. import java.util.ArrayList;
  9. import java.util.List;
  10. public class MongoToJsonExporter {
  11.     public static void main(String[] args) {
  12.         // 连接字符串
  13.         String connectionString = "mongodb://localhost:27017";
  14.         
  15.         try (MongoClient mongoClient = MongoClients.create(connectionString)) {
  16.             // 获取数据库和集合
  17.             MongoDatabase database = mongoClient.getDatabase("mydb");
  18.             MongoCollection<Document> collection = database.getCollection("mycollection");
  19.             
  20.             // 查询条件
  21.             Bson query = Filters.eq("status", "active");
  22.             
  23.             // 投影(选择字段)
  24.             Bson projection = Projections.fields(
  25.                 Projections.excludeId(),
  26.                 Projections.include("name", "email", "age")
  27.             );
  28.             
  29.             // 执行查询
  30.             FindIterable<Document> documents = collection.find(query)
  31.                 .projection(projection);
  32.             
  33.             // 转换为List
  34.             List<Document> results = new ArrayList<>();
  35.             for (Document doc : documents) {
  36.                 results.add(doc);
  37.             }
  38.             
  39.             // 转换为JSON并写入文件
  40.             try (FileWriter writer = new FileWriter("output.json")) {
  41.                 writer.write("[\n");
  42.                 for (int i = 0; i < results.size(); i++) {
  43.                     writer.write("  " + results.get(i).toJson());
  44.                     if (i < results.size() - 1) {
  45.                         writer.write(",");
  46.                     }
  47.                     writer.write("\n");
  48.                 }
  49.                 writer.write("]");
  50.             }
  51.             
  52.             System.out.println("成功导出 " + results.size() + " 条记录到 output.json");
  53.         } catch (IOException e) {
  54.             e.printStackTrace();
  55.         }
  56.     }
  57. }
复制代码

3.4 使用聚合管道导出JSON

MongoDB的聚合管道提供了强大的数据处理能力,可以用于在导出前对数据进行转换和处理。
  1. db.orders.aggregate([
  2.   // 第一阶段:过滤条件
  3.   { $match: { status: "completed", date: { $gte: new Date("2023-01-01") } } },
  4.   
  5.   // 第二阶段:选择和重命名字段
  6.   { $project: {
  7.       _id: 0,
  8.       orderId: "$_id",
  9.       customer: "$customer.name",
  10.       amount: "$total",
  11.       items: "$products"
  12.   }},
  13.   
  14.   // 第三阶段:排序
  15.   { $sort: { amount: -1 } }
  16. ])
复制代码
  1. const { MongoClient } = require('mongodb');
  2. const fs = require('fs');
  3. async function exportWithAggregation() {
  4.   const uri = "mongodb://localhost:27017";
  5.   const client = new MongoClient(uri);
  6.   
  7.   try {
  8.     await client.connect();
  9.     const database = client.db("sales");
  10.     const collection = database.collection("orders");
  11.    
  12.     // 定义聚合管道
  13.     const pipeline = [
  14.       // 过滤条件
  15.       { $match: {
  16.           status: "completed",
  17.           date: { $gte: new Date("2023-01-01") }
  18.       }},
  19.       
  20.       // 计算每个订单的项目数量
  21.       { $addFields: {
  22.           itemCount: { $size: "$items" }
  23.       }},
  24.       
  25.       // 选择和重命名字段
  26.       { $project: {
  27.           _id: 0,
  28.           orderId: "$_id",
  29.           customer: 1,
  30.           date: 1,
  31.           total: 1,
  32.           itemCount: 1
  33.       }},
  34.       
  35.       // 排序
  36.       { $sort: { total: -1 } }
  37.     ];
  38.    
  39.     // 执行聚合
  40.     const cursor = collection.aggregate(pipeline);
  41.    
  42.     // 转换为数组
  43.     const results = await cursor.toArray();
  44.    
  45.     // 转换为JSON字符串并格式化
  46.     const json = JSON.stringify(results, null, 2);
  47.    
  48.     // 写入文件
  49.     fs.writeFileSync('orders_aggregated.json', json);
  50.    
  51.     console.log(`成功导出 ${results.length} 条记录到 orders_aggregated.json`);
  52.   } finally {
  53.     await client.close();
  54.   }
  55. }
  56. exportWithAggregation().catch(console.error);
复制代码

4. 导出JSON的最佳实践

4.1 性能优化

当导出大量数据时,一次性加载所有数据可能会导致内存问题。以下是几种处理大数据集的方法:

Node.js示例:使用流式处理
  1. const { MongoClient } = require('mongodb');
  2. const fs = require('fs');
  3. const { Transform } = require('stream');
  4. async function exportLargeCollection() {
  5.   const uri = "mongodb://localhost:27017";
  6.   const client = new MongoClient(uri);
  7.   
  8.   try {
  9.     await client.connect();
  10.     const database = client.db("large_db");
  11.     const collection = database.collection("large_collection");
  12.    
  13.     // 创建可写流
  14.     const outputStream = fs.createWriteStream('large_data.json');
  15.    
  16.     // 写入JSON数组的开始标记
  17.     outputStream.write('[\n');
  18.    
  19.     let first = true;
  20.     let count = 0;
  21.    
  22.     // 创建游标并设置批处理大小
  23.     const cursor = collection.find().batchSize(1000);
  24.    
  25.     // 使用流式处理
  26.     const transformStream = new Transform({
  27.       objectMode: true,
  28.       transform(doc, encoding, callback) {
  29.         if (!first) {
  30.           this.push(',\n');
  31.         } else {
  32.           first = false;
  33.         }
  34.         
  35.         // 处理文档并转换为JSON
  36.         const jsonDoc = JSON.stringify(doc, null, 2);
  37.         this.push(jsonDoc);
  38.         
  39.         count++;
  40.         if (count % 1000 === 0) {
  41.           console.log(`已处理 ${count} 条记录`);
  42.         }
  43.         
  44.         callback();
  45.       }
  46.     });
  47.    
  48.     // 管道处理
  49.     cursor.pipe(transformStream).pipe(outputStream, { end: false });
  50.    
  51.     // 等待流完成
  52.     await new Promise((resolve, reject) => {
  53.       outputStream.on('finish', resolve);
  54.       outputStream.on('error', reject);
  55.     });
  56.    
  57.     // 写入JSON数组的结束标记
  58.     outputStream.write('\n]');
  59.     outputStream.end();
  60.    
  61.     console.log(`成功导出 ${count} 条记录到 large_data.json`);
  62.   } finally {
  63.     await client.close();
  64.   }
  65. }
  66. exportLargeCollection().catch(console.error);
复制代码

Python示例:使用游标分批处理
  1. from pymongo import MongoClient
  2. import json
  3. import time
  4. def export_large_collection():
  5.     client = MongoClient('mongodb://localhost:27017/')
  6.     db = client['large_db']
  7.     collection = db['large_collection']
  8.    
  9.     # 查询条件(无条件表示全部)
  10.     query = {}
  11.    
  12.     # 投影(选择字段)
  13.     projection = {"_id": 0}
  14.    
  15.     # 设置批处理大小
  16.     batch_size = 1000
  17.    
  18.     # 打开输出文件
  19.     with open('large_data.json', 'w', encoding='utf-8') as f:
  20.         # 写入JSON数组的开始标记
  21.         f.write('[\n')
  22.         
  23.         first = True
  24.         count = 0
  25.         
  26.         # 使用游标和批处理
  27.         cursor = collection.find(query, projection).batch_size(batch_size)
  28.         
  29.         for doc in cursor:
  30.             if not first:
  31.                 f.write(',\n')
  32.             else:
  33.                 first = False
  34.             
  35.             # 写入文档
  36.             json.dump(doc, f, ensure_ascii=False)
  37.             
  38.             count += 1
  39.             if count % batch_size == 0:
  40.                 print(f"已处理 {count} 条记录")
  41.         
  42.         # 写入JSON数组的结束标记
  43.         f.write('\n]')
  44.    
  45.     print(f"成功导出 {count} 条记录到 large_data.json")
  46.     client.close()
  47. if __name__ == "__main__":
  48.     start_time = time.time()
  49.     export_large_collection()
  50.     end_time = time.time()
  51.     print(f"导出完成,耗时: {end_time - start_time:.2f} 秒")
复制代码

在导出数据前,确保查询条件使用的字段有适当的索引,可以显著提高导出速度。
  1. // 创建复合索引
  2. db.orders.createIndex({ status: 1, date: -1 })
  3. // 使用索引友好的查询
  4. db.orders.find({
  5.   status: "completed",
  6.   date: { $gte: new Date("2023-01-01") }
  7. }).sort({ date: -1 })
复制代码

4.2 数据安全

在导出数据时,可能需要处理敏感信息,如密码、个人身份信息等。

使用聚合管道屏蔽敏感字段
  1. db.users.aggregate([
  2.   {
  3.     $project: {
  4.       name: 1,
  5.       email: 1,
  6.       // 屏蔽密码字段
  7.       password: 0,
  8.       // 对电话号码进行部分屏蔽
  9.       phone: {
  10.         $concat: [
  11.           { $substrCP: ["$phone", 0, 3] },
  12.           "****",
  13.           { $substrCP: ["$phone", 7, 4] }
  14.         ]
  15.       }
  16.     }
  17.   }
  18. ])
复制代码

Node.js示例:在导出前处理敏感数据
  1. const { MongoClient } = require('mongodb');
  2. const fs = require('fs');
  3. async function exportWithSensitiveDataHandling() {
  4.   const uri = "mongodb://localhost:27017";
  5.   const client = new MongoClient(uri);
  6.   
  7.   try {
  8.     await client.connect();
  9.     const database = client.db("mydb");
  10.     const collection = database.collection("users");
  11.    
  12.     const cursor = collection.find({});
  13.    
  14.     const results = [];
  15.     for await (const doc of cursor) {
  16.       // 创建一个新对象,只包含需要的字段
  17.       const sanitizedDoc = {
  18.         id: doc._id,
  19.         name: doc.name,
  20.         email: doc.email,
  21.         // 屏蔽部分电话号码
  22.         phone: doc.phone ? doc.phone.replace(/(\d{3})\d{4}(\d{4})/, '$1****$2') : null,
  23.         // 屏蔽信用卡号
  24.         creditCard: doc.creditCard ? '****-****-****-' + doc.creditCard.slice(-4) : null
  25.       };
  26.       
  27.       results.push(sanitizedDoc);
  28.     }
  29.    
  30.     // 转换为JSON字符串并格式化
  31.     const json = JSON.stringify(results, null, 2);
  32.    
  33.     // 写入文件
  34.     fs.writeFileSync('sanitized_users.json', json);
  35.    
  36.     console.log(`成功导出 ${results.length} 条已处理敏感信息的记录到 sanitized_users.json`);
  37.   } finally {
  38.     await client.close();
  39.   }
  40. }
  41. exportWithSensitiveDataHandling().catch(console.error);
复制代码

确保只有授权用户才能导出数据,并且只导出他们有权访问的数据。
  1. // 在MongoDB中设置基于角色的访问控制
  2. db.createRole({
  3.   role: "exportOperator",
  4.   privileges: [
  5.     { resource: { db: "sales", collection: "orders" }, actions: ["find"] }
  6.   ],
  7.   roles: []
  8. })
  9. // 创建用户并分配角色
  10. db.createUser({
  11.   user: "export_user",
  12.   pwd: "secure_password",
  13.   roles: [ { role: "exportOperator", db: "admin" } ]
  14. })
复制代码

4.3 大数据集处理

对于分片集合,可以从每个分片并行导出数据,然后合并结果。
  1. from pymongo import MongoClient
  2. import json
  3. import concurrent.futures
  4. import os
  5. def export_shard_collection():
  6.     client = MongoClient('mongodb://localhost:27017/')
  7.     db = client['sharded_db']
  8.     collection = db['sharded_collection']
  9.    
  10.     # 获取分片信息
  11.     shard_info = db.command("collstats", "sharded_collection")
  12.     shards = client.config.shards.find()
  13.    
  14.     # 创建输出目录
  15.     os.makedirs("sharded_exports", exist_ok=True)
  16.    
  17.     # 为每个分片创建导出任务
  18.     def export_shard(shard):
  19.         shard_host = shard['host']
  20.         shard_client = MongoClient(shard_host)
  21.         shard_db = shard_client['sharded_db']
  22.         shard_collection = shard_db['sharded_collection']
  23.         
  24.         # 查询条件(可以根据分片键进行优化)
  25.         query = {}
  26.         
  27.         # 执行查询
  28.         cursor = shard_collection.find(query)
  29.         
  30.         # 转换为列表
  31.         results = list(cursor)
  32.         
  33.         # 写入文件
  34.         shard_name = shard['_id']
  35.         output_file = f"sharded_exports/{shard_name}.json"
  36.         
  37.         with open(output_file, 'w', encoding='utf-8') as f:
  38.             json.dump(results, f, ensure_ascii=False, indent=2)
  39.         
  40.         print(f"成功导出分片 {shard_name} 的 {len(results)} 条记录到 {output_file}")
  41.         
  42.         shard_client.close()
  43.         return len(results)
  44.    
  45.     # 并行执行导出任务
  46.     total_records = 0
  47.     with concurrent.futures.ThreadPoolExecutor() as executor:
  48.         futures = [executor.submit(export_shard, shard) for shard in shards]
  49.         
  50.         for future in concurrent.futures.as_completed(futures):
  51.             count = future.result()
  52.             total_records += count
  53.    
  54.     # 合并所有分片文件
  55.     merge_shard_files("sharded_exports", "merged_output.json")
  56.    
  57.     print(f"所有分片导出完成,共导出 {total_records} 条记录")
  58.     client.close()
  59. def merge_shard_files(input_dir, output_file):
  60.     import glob
  61.    
  62.     # 获取所有分片文件
  63.     files = glob.glob(f"{input_dir}/*.json")
  64.    
  65.     # 创建合并后的输出文件
  66.     with open(output_file, 'w', encoding='utf-8') as outfile:
  67.         outfile.write('[\n')
  68.         
  69.         first_file = True
  70.         for i, filename in enumerate(files):
  71.             with open(filename, 'r', encoding='utf-8') as infile:
  72.                 data = json.load(infile)
  73.                
  74.                 if not first_file:
  75.                     outfile.write(',\n')
  76.                 else:
  77.                     first_file = False
  78.                
  79.                 # 写入每个文档
  80.                 for j, doc in enumerate(data):
  81.                     if j > 0:
  82.                         outfile.write(',\n')
  83.                     json.dump(doc, outfile, ensure_ascii=False, indent=2)
  84.         
  85.         outfile.write('\n]')
  86.    
  87.     print(f"已合并所有分片文件到 {output_file}")
  88. if __name__ == "__main__":
  89.     export_shard_collection()
复制代码

对于频繁变化的大型集合,可以使用增量导出策略,只导出新增或修改的数据。
  1. const { MongoClient } = require('mongodb');
  2. const fs = require('fs');
  3. const path = require('path');
  4. async function incrementalExport() {
  5.   const uri = "mongodb://localhost:27017";
  6.   const client = new MongoClient(uri);
  7.   
  8.   try {
  9.     await client.connect();
  10.     const database = client.db("mydb");
  11.     const collection = database.collection("mycollection");
  12.    
  13.     // 读取上次导出的时间戳
  14.     let lastExportTime;
  15.     try {
  16.       const stats = JSON.parse(fs.readFileSync('export_stats.json', 'utf8'));
  17.       lastExportTime = new Date(stats.lastExportTime);
  18.     } catch (err) {
  19.       // 如果不存在,使用一个较早的日期
  20.       lastExportTime = new Date('2000-01-01T00:00:00Z');
  21.     }
  22.    
  23.     console.log(`上次导出时间: ${lastExportTime.toISOString()}`);
  24.    
  25.     // 查询自上次导出以来新增或修改的文档
  26.     const query = {
  27.       $or: [
  28.         { createdAt: { $gt: lastExportTime } },
  29.         { updatedAt: { $gt: lastExportTime } }
  30.       ]
  31.     };
  32.    
  33.     // 执行查询
  34.     const cursor = collection.find(query);
  35.    
  36.     // 转换为数组
  37.     const results = await cursor.toArray();
  38.    
  39.     // 如果有新数据,追加到现有文件或创建新文件
  40.     if (results.length > 0) {
  41.       const now = new Date();
  42.       const exportFileName = `incremental_export_${now.toISOString().replace(/:/g, '-')}.json`;
  43.       const exportFilePath = path.join(__dirname, exportFileName);
  44.       
  45.       // 转换为JSON字符串并格式化
  46.       const json = JSON.stringify(results, null, 2);
  47.       
  48.       // 写入文件
  49.       fs.writeFileSync(exportFilePath, json);
  50.       
  51.       // 更新导出统计信息
  52.       const exportStats = {
  53.         lastExportTime: now.toISOString(),
  54.         exportedCount: results.length,
  55.         exportFile: exportFileName
  56.       };
  57.       
  58.       fs.writeFileSync('export_stats.json', JSON.stringify(exportStats, null, 2));
  59.       
  60.       console.log(`成功导出 ${results.length} 条记录到 ${exportFileName}`);
  61.     } else {
  62.       console.log("没有新数据需要导出");
  63.     }
  64.   } finally {
  65.     await client.close();
  66.   }
  67. }
  68. incrementalExport().catch(console.error);
复制代码

5. 常见问题和解决方案

5.1 处理特殊数据类型

MongoDB支持一些JSON中不直接支持的数据类型,如ObjectId、Date、BinaryData等。在导出时需要特殊处理。
  1. // 在Node.js中处理ObjectId
  2. const { MongoClient, ObjectId } = require('mongodb');
  3. const fs = require('fs');
  4. async function exportWithObjectIdHandling() {
  5.   const uri = "mongodb://localhost:27017";
  6.   const client = new MongoClient(uri);
  7.   
  8.   try {
  9.     await client.connect();
  10.     const database = client.db("mydb");
  11.     const collection = database.collection("mycollection");
  12.    
  13.     const cursor = collection.find({});
  14.    
  15.     const results = [];
  16.     for await (const doc of cursor) {
  17.       // 转换ObjectId为字符串
  18.       const processedDoc = {
  19.         id: doc._id.toString(),
  20.         // 其他字段...
  21.         data: doc.data
  22.       };
  23.       
  24.       results.push(processedDoc);
  25.     }
  26.    
  27.     // 转换为JSON字符串并格式化
  28.     const json = JSON.stringify(results, null, 2);
  29.    
  30.     // 写入文件
  31.     fs.writeFileSync('data_with_string_ids.json', json);
  32.    
  33.     console.log(`成功导出 ${results.length} 条记录,ObjectId已转换为字符串`);
  34.   } finally {
  35.     await client.close();
  36.   }
  37. }
  38. exportWithObjectIdHandling().catch(console.error);
复制代码
  1. from pymongo import MongoClient
  2. import json
  3. from datetime import datetime
  4. def export_with_date_handling():
  5.     client = MongoClient('mongodb://localhost:27017/')
  6.     db = client['mydb']
  7.     collection = db['mycollection']
  8.    
  9.     # 自定义JSON编码器处理日期
  10.     class DateTimeEncoder(json.JSONEncoder):
  11.         def default(self, obj):
  12.             if isinstance(obj, datetime):
  13.                 return obj.isoformat()
  14.             return super().default(obj)
  15.    
  16.     # 执行查询
  17.     cursor = collection.find({})
  18.    
  19.     # 转换为列表
  20.     results = list(cursor)
  21.    
  22.     # 使用自定义编码器写入JSON文件
  23.     with open('data_with_dates.json', 'w', encoding='utf-8') as f:
  24.         json.dump(results, f, ensure_ascii=False, indent=2, cls=DateTimeEncoder)
  25.    
  26.     print(f"成功导出 {len(results)} 条记录,日期已转换为ISO格式")
  27.     client.close()
  28. if __name__ == "__main__":
  29.     export_with_date_handling()
复制代码

5.2 处理大数据量和内存限制

当导出的数据量很大时,可能会遇到内存限制问题。以下是几种解决方案:
  1. import com.mongodb.client.*;
  2. import com.mongodb.client.model.Filters;
  3. import org.bson.Document;
  4. import org.bson.conversions.Bson;
  5. import java.io.FileWriter;
  6. import java.io.IOException;
  7. import java.util.ArrayList;
  8. import java.util.List;
  9. public class BatchExportExample {
  10.     public static void main(String[] args) {
  11.         String connectionString = "mongodb://localhost:27017";
  12.         
  13.         try (MongoClient mongoClient = MongoClients.create(connectionString)) {
  14.             MongoDatabase database = mongoClient.getDatabase("mydb");
  15.             MongoCollection<Document> collection = database.getCollection("mycollection");
  16.             
  17.             // 设置批处理大小
  18.             int batchSize = 1000;
  19.             int totalExported = 0;
  20.             
  21.             try (FileWriter writer = new FileWriter("batch_export.json")) {
  22.                 writer.write("[\n");
  23.                
  24.                 boolean firstDocument = true;
  25.                
  26.                 // 使用游标和批处理
  27.                 MongoCursor<Document> cursor = collection.find().batchSize(batchSize).iterator();
  28.                
  29.                 List<Document> batch = new ArrayList<>(batchSize);
  30.                
  31.                 while (cursor.hasNext()) {
  32.                     batch.add(cursor.next());
  33.                     
  34.                     if (batch.size() >= batchSize) {
  35.                         // 处理当前批次
  36.                         if (!firstDocument) {
  37.                             writer.write(",\n");
  38.                         } else {
  39.                             firstDocument = false;
  40.                         }
  41.                         
  42.                         for (int i = 0; i < batch.size(); i++) {
  43.                             if (i > 0) {
  44.                                 writer.write(",\n");
  45.                             }
  46.                             writer.write("  " + batch.get(i).toJson());
  47.                         }
  48.                         
  49.                         totalExported += batch.size();
  50.                         System.out.println("已导出 " + totalExported + " 条记录");
  51.                         
  52.                         // 清空批次
  53.                         batch.clear();
  54.                     }
  55.                 }
  56.                
  57.                 // 处理最后一批
  58.                 if (!batch.isEmpty()) {
  59.                     if (!firstDocument) {
  60.                         writer.write(",\n");
  61.                     }
  62.                     
  63.                     for (int i = 0; i < batch.size(); i++) {
  64.                         if (i > 0) {
  65.                             writer.write(",\n");
  66.                         }
  67.                         writer.write("  " + batch.get(i).toJson());
  68.                     }
  69.                     
  70.                     totalExported += batch.size();
  71.                     System.out.println("已导出 " + totalExported + " 条记录");
  72.                 }
  73.                
  74.                 writer.write("\n]");
  75.             }
  76.             
  77.             System.out.println("导出完成,总计导出 " + totalExported + " 条记录");
  78.         } catch (IOException e) {
  79.             e.printStackTrace();
  80.         }
  81.     }
  82. }
复制代码
  1. from pymongo import MongoClient
  2. import json
  3. import concurrent.futures
  4. import math
  5. import time
  6. from datetime import datetime
  7. import os
  8. def parallel_export():
  9.     client = MongoClient('mongodb://localhost:27017/')
  10.     db = client['mydb']
  11.     collection = db['mycollection']
  12.    
  13.     # 获取集合中的文档总数
  14.     total_docs = collection.count_documents({})
  15.     print(f"集合中共有 {total_docs} 条文档")
  16.    
  17.     # 设置每个线程处理的文档数量
  18.     docs_per_thread = 5000
  19.    
  20.     # 计算需要的线程数
  21.     num_threads = math.ceil(total_docs / docs_per_thread)
  22.     print(f"将使用 {num_threads} 个线程进行并行导出")
  23.    
  24.     # 创建输出目录
  25.     os.makedirs("parallel_exports", exist_ok=True)
  26.    
  27.     # 自定义JSON编码器处理日期
  28.     class DateTimeEncoder(json.JSONEncoder):
  29.         def default(self, obj):
  30.             if isinstance(obj, datetime):
  31.                 return obj.isoformat()
  32.             return super().default(obj)
  33.    
  34.     # 定义导出函数
  35.     def export_range(start, end, thread_id):
  36.         thread_client = MongoClient('mongodb://localhost:27017/')
  37.         thread_db = thread_client['mydb']
  38.         thread_collection = thread_db['mycollection']
  39.         
  40.         # 查询指定范围的文档
  41.         cursor = thread_collection.find().skip(start).limit(end - start)
  42.         
  43.         # 转换为列表
  44.         results = list(cursor)
  45.         
  46.         # 写入文件
  47.         output_file = f"parallel_exports/thread_{thread_id}.json"
  48.         with open(output_file, 'w', encoding='utf-8') as f:
  49.             json.dump(results, f, ensure_ascii=False, indent=2, cls=DateTimeEncoder)
  50.         
  51.         thread_client.close()
  52.         return len(results)
  53.    
  54.     # 使用线程池并行导出
  55.     start_time = time.time()
  56.     total_exported = 0
  57.    
  58.     with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
  59.         futures = []
  60.         
  61.         for i in range(num_threads):
  62.             start = i * docs_per_thread
  63.             end = min((i + 1) * docs_per_thread, total_docs)
  64.             
  65.             future = executor.submit(export_range, start, end, i)
  66.             futures.append(future)
  67.         
  68.         for future in concurrent.futures.as_completed(futures):
  69.             count = future.result()
  70.             total_exported += count
  71.             print(f"线程完成,已导出 {count} 条记录,总计 {total_exported}/{total_docs}")
  72.    
  73.     # 合并所有线程导出的文件
  74.     merge_json_files("parallel_exports", "merged_export.json")
  75.    
  76.     end_time = time.time()
  77.     print(f"并行导出完成,共导出 {total_exported} 条记录,耗时: {end_time - start_time:.2f} 秒")
  78.    
  79.     client.close()
  80. def merge_json_files(input_dir, output_file):
  81.     import glob
  82.    
  83.     # 获取所有JSON文件
  84.     files = glob.glob(f"{input_dir}/*.json")
  85.    
  86.     # 创建合并后的输出文件
  87.     with open(output_file, 'w', encoding='utf-8') as outfile:
  88.         outfile.write('[\n')
  89.         
  90.         first_file = True
  91.         for i, filename in enumerate(files):
  92.             with open(filename, 'r', encoding='utf-8') as infile:
  93.                 data = json.load(infile)
  94.                
  95.                 if not first_file:
  96.                     outfile.write(',\n')
  97.                 else:
  98.                     first_file = False
  99.                
  100.                 # 写入每个文档
  101.                 for j, doc in enumerate(data):
  102.                     if j > 0:
  103.                         outfile.write(',\n')
  104.                     json.dump(doc, outfile, ensure_ascii=False, indent=2)
  105.         
  106.         outfile.write('\n]')
  107.    
  108.     print(f"已合并所有文件到 {output_file}")
  109. if __name__ == "__main__":
  110.     parallel_export()
复制代码

5.3 处理连接和认证问题
  1. const { MongoClient } = require('mongodb');
  2. const fs = require('fs');
  3. async function exportWithAuth() {
  4.   // 使用连接字符串处理认证
  5.   const uri = "mongodb://username:password@cluster.example.com:27017,cluster.example.com:27018,cluster.example.com:27019/mydb?replicaSet=myReplicaSet&authSource=admin";
  6.   
  7.   const client = new MongoClient(uri, {
  8.     // 连接池选项
  9.     maxPoolSize: 10,
  10.     minPoolSize: 1,
  11.     maxIdleTimeMS: 30000,
  12.     waitQueueTimeoutMS: 5000,
  13.    
  14.     // 重试选项
  15.     connectTimeoutMS: 10000,
  16.     socketTimeoutMS: 45000,
  17.     retryWrites: true,
  18.    
  19.     // SSL选项(如果需要)
  20.     ssl: true,
  21.     sslValidate: true,
  22.     sslCA: process.env.SSL_CA_FILE
  23.   });
  24.   
  25.   try {
  26.     await client.connect();
  27.     const database = client.db("mydb");
  28.     const collection = database.collection("mycollection");
  29.    
  30.     // 执行查询
  31.     const cursor = collection.find({});
  32.    
  33.     // 转换为数组
  34.     const results = await cursor.toArray();
  35.    
  36.     // 转换为JSON字符串并格式化
  37.     const json = JSON.stringify(results, null, 2);
  38.    
  39.     // 写入文件
  40.     fs.writeFileSync('auth_export.json', json);
  41.    
  42.     console.log(`成功导出 ${results.length} 条记录`);
  43.   } catch (err) {
  44.     console.error("导出过程中出错:", err);
  45.   } finally {
  46.     await client.close();
  47.   }
  48. }
  49. exportWithAuth().catch(console.error);
复制代码
  1. from pymongo import MongoClient
  2. import paramiko
  3. import json
  4. import sshtunnel
  5. from datetime import datetime
  6. def export_via_ssh_tunnel():
  7.     # SSH隧道配置
  8.     ssh_host = 'ssh.example.com'
  9.     ssh_port = 22
  10.     ssh_username = 'ssh_user'
  11.     ssh_pkey = paramiko.RSAKey.from_private_key_file('/path/to/private/key')
  12.    
  13.     # MongoDB配置
  14.     mongo_host = 'localhost'  # 相对于SSH服务器
  15.     mongo_port = 27017
  16.     mongo_db = 'mydb'
  17.     mongo_collection = 'mycollection'
  18.    
  19.     # 创建SSH隧道
  20.     server = sshtunnel.SSHTunnelForwarder(
  21.         (ssh_host, ssh_port),
  22.         ssh_username=ssh_username,
  23.         ssh_pkey=ssh_pkey,
  24.         remote_bind_address=(mongo_host, mongo_port)
  25.     )
  26.    
  27.     try:
  28.         # 启动SSH隧道
  29.         server.start()
  30.         
  31.         # 通过隧道连接到MongoDB
  32.         client = MongoClient(
  33.             host='localhost',
  34.             port=server.local_bind_port,
  35.             username='mongo_user',
  36.             password='mongo_password',
  37.             authSource='admin'
  38.         )
  39.         
  40.         db = client[mongo_db]
  41.         collection = db[mongo_collection]
  42.         
  43.         # 自定义JSON编码器处理日期
  44.         class DateTimeEncoder(json.JSONEncoder):
  45.             def default(self, obj):
  46.                 if isinstance(obj, datetime):
  47.                     return obj.isoformat()
  48.                 return super().default(obj)
  49.         
  50.         # 执行查询
  51.         cursor = collection.find({})
  52.         
  53.         # 转换为列表
  54.         results = list(cursor)
  55.         
  56.         # 写入JSON文件
  57.         with open('ssh_tunnel_export.json', 'w', encoding='utf-8') as f:
  58.             json.dump(results, f, ensure_ascii=False, indent=2, cls=DateTimeEncoder)
  59.         
  60.         print(f"成功导出 {len(results)} 条记录")
  61.         
  62.     except Exception as e:
  63.         print(f"导出过程中出错: {e}")
  64.     finally:
  65.         # 关闭连接和隧道
  66.         if 'client' in locals():
  67.             client.close()
  68.         server.stop()
  69. if __name__ == "__main__":
  70.     export_via_ssh_tunnel()
复制代码

6. 实际应用场景和案例研究

6.1 数据备份与恢复
  1. #!/bin/bash
  2. # MongoDB备份脚本
  3. # 配置变量
  4. MONGO_HOST="localhost"
  5. MONGO_PORT="27017"
  6. MONGO_USER="backup_user"
  7. MONGO_PASS="backup_password"
  8. MONGO_AUTH_DB="admin"
  9. BACKUP_DIR="/var/backups/mongodb"
  10. DATE=$(date +%Y%m%d_%H%M%S)
  11. RETENTION_DAYS=7
  12. # 创建备份目录
  13. mkdir -p "$BACKUP_DIR/$DATE"
  14. # 获取所有数据库列表
  15. DATABASES=$(mongo --host $MONGO_HOST --port $MONGO_PORT -u $MONGO_USER -p $MONGO_PASS --authenticationDatabase $MONGO_AUTH_DB --quiet --eval "db.getMongo().getDBNames().join(' ')")
  16. # 备份每个数据库
  17. for DB in $DATABASES; do
  18.     echo "备份数据库: $DB"
  19.    
  20.     # 获取数据库中的所有集合
  21.     COLLECTIONS=$(mongo --host $MONGO_HOST --port $MONGO_PORT -u $MONGO_USER -p $MONGO_PASS --authenticationDatabase $MONGO_AUTH_DB $DB --quiet --eval "db.getCollectionNames().join(' ')")
  22.    
  23.     # 为每个集合创建备份目录
  24.     mkdir -p "$BACKUP_DIR/$DATE/$DB"
  25.    
  26.     # 备份每个集合
  27.     for COLLECTION in $COLLECTIONS; do
  28.         echo "  备份集合: $DB.$COLLECTION"
  29.         mongoexport --host $MONGO_HOST --port $MONGO_PORT -u $MONGO_USER -p $MONGO_PASS --authenticationDatabase $MONGO_AUTH_DB \
  30.             --db $DB --collection $COLLECTION \
  31.             --out "$BACKUP_DIR/$DATE/$DB/$COLLECTION.json" \
  32.             --jsonArray
  33.     done
  34. done
  35. # 创建备份清单
  36. echo "备份时间: $DATE" > "$BACKUP_DIR/$DATE/backup_info.txt"
  37. echo "数据库列表: $DATABASES" >> "$BACKUP_DIR/$DATE/backup_info.txt"
  38. # 压缩备份
  39. echo "压缩备份文件..."
  40. tar -czf "$BACKUP_DIR/mongodb_backup_$DATE.tar.gz" -C "$BACKUP_DIR" "$DATE"
  41. # 删除未压缩的备份
  42. rm -rf "$BACKUP_DIR/$DATE"
  43. # 清理旧备份
  44. echo "清理超过 $RETENTION_DAYS 天的旧备份..."
  45. find "$BACKUP_DIR" -name "mongodb_backup_*.tar.gz" -type f -mtime +$RETENTION_DAYS -delete
  46. echo "备份完成: $BACKUP_DIR/mongodb_backup_$DATE.tar.gz"
复制代码
  1. const { MongoClient } = require('mongodb');
  2. const fs = require('fs');
  3. const path = require('path');
  4. async function restoreFromBackup(backupDir) {
  5.   const uri = "mongodb://localhost:27017";
  6.   const client = new MongoClient(uri);
  7.   
  8.   try {
  9.     await client.connect();
  10.    
  11.     // 读取备份目录中的所有文件
  12.     const backupFiles = fs.readdirSync(backupDir, { withFileTypes: true });
  13.    
  14.     for (const file of backupFiles) {
  15.       if (file.isDirectory()) {
  16.         // 处理数据库目录
  17.         const dbName = file.name;
  18.         const db = client.db(dbName);
  19.         
  20.         console.log(`恢复数据库: ${dbName}`);
  21.         
  22.         // 读取数据库中的所有集合文件
  23.         const collectionFiles = fs.readdirSync(path.join(backupDir, dbName));
  24.         
  25.         for (const collectionFile of collectionFiles) {
  26.           if (collectionFile.endsWith('.json')) {
  27.             // 处理集合文件
  28.             const collectionName = path.basename(collectionFile, '.json');
  29.             const collection = db.collection(collectionName);
  30.             
  31.             console.log(`  恢复集合: ${dbName}.${collectionName}`);
  32.             
  33.             // 读取JSON文件
  34.             const jsonContent = fs.readFileSync(path.join(backupDir, dbName, collectionFile), 'utf8');
  35.             const documents = JSON.parse(jsonContent);
  36.             
  37.             if (documents.length > 0) {
  38.               // 清空现有集合(可选)
  39.               await collection.deleteMany({});
  40.               
  41.               // 插入文档
  42.               await collection.insertMany(documents);
  43.               
  44.               console.log(`    恢复了 ${documents.length} 条文档`);
  45.             }
  46.           }
  47.         }
  48.       }
  49.     }
  50.    
  51.     console.log("备份恢复完成");
  52.   } finally {
  53.     await client.close();
  54.   }
  55. }
  56. // 使用示例
  57. // restoreFromBackup('/var/backups/mongodb/20231115_143022').catch(console.error);
复制代码

6.2 数据迁移
  1. import pymongo
  2. import json
  3. import psycopg2
  4. from datetime import datetime
  5. import sys
  6. def mongo_to_postgres():
  7.     # MongoDB连接配置
  8.     mongo_client = pymongo.MongoClient("mongodb://localhost:27017/")
  9.     mongo_db = mongo_client["source_db"]
  10.    
  11.     # PostgreSQL连接配置
  12.     pg_conn = psycopg2.connect(
  13.         host="localhost",
  14.         database="target_db",
  15.         user="postgres",
  16.         password="postgres"
  17.     )
  18.     pg_cursor = pg_conn.cursor()
  19.    
  20.     try:
  21.         # 迁移用户数据
  22.         print("迁移用户数据...")
  23.         mongo_users = mongo_db["users"].find({})
  24.         
  25.         # 创建PostgreSQL表(如果不存在)
  26.         pg_cursor.execute("""
  27.             CREATE TABLE IF NOT EXISTS users (
  28.                 id SERIAL PRIMARY KEY,
  29.                 mongo_id VARCHAR(24) UNIQUE,
  30.                 name VARCHAR(100) NOT NULL,
  31.                 email VARCHAR(100) UNIQUE,
  32.                 age INTEGER,
  33.                 created_at TIMESTAMP,
  34.                 updated_at TIMESTAMP
  35.             )
  36.         """)
  37.         
  38.         # 插入用户数据
  39.         for user in mongo_users:
  40.             pg_cursor.execute(
  41.                 """
  42.                 INSERT INTO users (mongo_id, name, email, age, created_at, updated_at)
  43.                 VALUES (%s, %s, %s, %s, %s, %s)
  44.                 ON CONFLICT (mongo_id) DO UPDATE SET
  45.                     name = EXCLUDED.name,
  46.                     email = EXCLUDED.email,
  47.                     age = EXCLUDED.age,
  48.                     updated_at = EXCLUDED.updated_at
  49.                 """,
  50.                 (
  51.                     str(user["_id"]),
  52.                     user.get("name"),
  53.                     user.get("email"),
  54.                     user.get("age"),
  55.                     user.get("created_at"),
  56.                     user.get("updated_at")
  57.                 )
  58.             )
  59.         
  60.         pg_conn.commit()
  61.         print(f"用户数据迁移完成,共迁移 {mongo_db['users'].count_documents({})} 条记录")
  62.         
  63.         # 迁移订单数据
  64.         print("迁移订单数据...")
  65.         mongo_orders = mongo_db["orders"].find({})
  66.         
  67.         # 创建PostgreSQL表(如果不存在)
  68.         pg_cursor.execute("""
  69.             CREATE TABLE IF NOT EXISTS orders (
  70.                 id SERIAL PRIMARY KEY,
  71.                 mongo_id VARCHAR(24) UNIQUE,
  72.                 user_id INTEGER REFERENCES users(id),
  73.                 order_number VARCHAR(50),
  74.                 total DECIMAL(10,2),
  75.                 status VARCHAR(20),
  76.                 order_date TIMESTAMP,
  77.                 created_at TIMESTAMP,
  78.                 updated_at TIMESTAMP
  79.             )
  80.         """)
  81.         
  82.         # 创建用户ID映射(MongoDB _id 到 PostgreSQL id)
  83.         pg_cursor.execute("SELECT mongo_id, id FROM users")
  84.         user_id_map = {row[0]: row[1] for row in pg_cursor.fetchall()}
  85.         
  86.         # 插入订单数据
  87.         for order in mongo_orders:
  88.             # 获取对应的PostgreSQL用户ID
  89.             user_mongo_id = str(order.get("user_id"))
  90.             pg_user_id = user_id_map.get(user_mongo_id)
  91.             
  92.             if pg_user_id is None:
  93.                 print(f"警告: 找不到用户ID {user_mongo_id},跳过订单 {order.get('_id')}")
  94.                 continue
  95.             
  96.             pg_cursor.execute(
  97.                 """
  98.                 INSERT INTO orders (mongo_id, user_id, order_number, total, status, order_date, created_at, updated_at)
  99.                 VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
  100.                 ON CONFLICT (mongo_id) DO UPDATE SET
  101.                     user_id = EXCLUDED.user_id,
  102.                     order_number = EXCLUDED.order_number,
  103.                     total = EXCLUDED.total,
  104.                     status = EXCLUDED.status,
  105.                     order_date = EXCLUDED.order_date,
  106.                     updated_at = EXCLUDED.updated_at
  107.                 """,
  108.                 (
  109.                     str(order["_id"]),
  110.                     pg_user_id,
  111.                     order.get("order_number"),
  112.                     order.get("total"),
  113.                     order.get("status"),
  114.                     order.get("order_date"),
  115.                     order.get("created_at"),
  116.                     order.get("updated_at")
  117.                 )
  118.             )
  119.         
  120.         pg_conn.commit()
  121.         print(f"订单数据迁移完成,共迁移 {mongo_db['orders'].count_documents({})} 条记录")
  122.         
  123.         print("数据迁移完成")
  124.         
  125.     except Exception as e:
  126.         print(f"迁移过程中出错: {e}")
  127.         pg_conn.rollback()
  128.     finally:
  129.         pg_cursor.close()
  130.         pg_conn.close()
  131.         mongo_client.close()
  132. if __name__ == "__main__":
  133.     mongo_to_postgres()
复制代码
  1. const { MongoClient } = require('mongodb');
  2. const mysql = require('mysql2/promise');
  3. const fs = require('fs');
  4. const path = require('path');
  5. async function mysqlToMongoDB() {
  6.   // MySQL连接配置
  7.   const mysqlConnection = await mysql.createConnection({
  8.     host: 'localhost',
  9.     user: 'root',
  10.     password: 'password',
  11.     database: 'source_db'
  12.   });
  13.   
  14.   // MongoDB连接配置
  15.   const mongoUri = "mongodb://localhost:27017";
  16.   const mongoClient = new MongoClient(mongoUri);
  17.   
  18.   try {
  19.     await mongoClient.connect();
  20.     const mongoDb = mongoClient.db('target_db');
  21.    
  22.     // 迁移用户数据
  23.     console.log('迁移用户数据...');
  24.     const [users] = await mysqlConnection.execute('SELECT * FROM users');
  25.     const usersCollection = mongoDb.collection('users');
  26.    
  27.     // 转换用户数据格式
  28.     const userDocuments = users.map(user => ({
  29.       _id: user.id,  // 使用MySQL ID作为MongoDB _id
  30.       name: user.name,
  31.       email: user.email,
  32.       age: user.age,
  33.       profile: {
  34.         address: user.address,
  35.         phone: user.phone
  36.       },
  37.       createdAt: user.created_at,
  38.       updatedAt: user.updated_at
  39.     }));
  40.    
  41.     // 插入用户数据
  42.     if (userDocuments.length > 0) {
  43.       await usersCollection.insertMany(userDocuments);
  44.       console.log(`用户数据迁移完成,共迁移 ${userDocuments.length} 条记录`);
  45.     }
  46.    
  47.     // 迁移产品数据
  48.     console.log('迁移产品数据...');
  49.     const [products] = await mysqlConnection.execute('SELECT * FROM products');
  50.     const productsCollection = mongoDb.collection('products');
  51.    
  52.     // 转换产品数据格式
  53.     const productDocuments = products.map(product => ({
  54.       _id: product.id,
  55.       name: product.name,
  56.       description: product.description,
  57.       price: parseFloat(product.price),
  58.       category: product.category,
  59.       tags: product.tags ? product.tags.split(',') : [],
  60.       specifications: JSON.parse(product.specifications || '{}'),
  61.       stock: {
  62.         quantity: product.stock_quantity,
  63.         reserved: product.stock_reserved || 0,
  64.         available: product.stock_quantity - (product.stock_reserved || 0)
  65.       },
  66.       createdAt: product.created_at,
  67.       updatedAt: product.updated_at
  68.     }));
  69.    
  70.     // 插入产品数据
  71.     if (productDocuments.length > 0) {
  72.       await productsCollection.insertMany(productDocuments);
  73.       console.log(`产品数据迁移完成,共迁移 ${productDocuments.length} 条记录`);
  74.     }
  75.    
  76.     // 迁移订单数据
  77.     console.log('迁移订单数据...');
  78.     const [orders] = await mysqlConnection.execute('SELECT * FROM orders');
  79.     const ordersCollection = mongoDb.collection('orders');
  80.    
  81.     // 获取订单项数据
  82.     const [orderItems] = await mysqlConnection.execute('SELECT * FROM order_items');
  83.    
  84.     // 按订单ID分组订单项
  85.     const orderItemsByOrderId = {};
  86.     orderItems.forEach(item => {
  87.       if (!orderItemsByOrderId[item.order_id]) {
  88.         orderItemsByOrderId[item.order_id] = [];
  89.       }
  90.       orderItemsByOrderId[item.order_id].push({
  91.         productId: item.product_id,
  92.         quantity: item.quantity,
  93.         price: parseFloat(item.price),
  94.         subtotal: parseFloat(item.subtotal)
  95.       });
  96.     });
  97.    
  98.     // 转换订单数据格式
  99.     const orderDocuments = orders.map(order => {
  100.       const items = orderItemsByOrderId[order.id] || [];
  101.       
  102.       return {
  103.         _id: order.id,
  104.         userId: order.user_id,
  105.         orderNumber: order.order_number,
  106.         status: order.status,
  107.         items: items,
  108.         subtotal: parseFloat(order.subtotal),
  109.         tax: parseFloat(order.tax),
  110.         shipping: parseFloat(order.shipping),
  111.         total: parseFloat(order.total),
  112.         shippingAddress: {
  113.           street: order.shipping_street,
  114.           city: order.shipping_city,
  115.           state: order.shipping_state,
  116.           zipCode: order.shipping_zip_code,
  117.           country: order.shipping_country
  118.         },
  119.         paymentMethod: order.payment_method,
  120.         paymentStatus: order.payment_status,
  121.         orderDate: order.order_date,
  122.         createdAt: order.created_at,
  123.         updatedAt: order.updated_at
  124.       };
  125.     });
  126.    
  127.     // 插入订单数据
  128.     if (orderDocuments.length > 0) {
  129.       await ordersCollection.insertMany(orderDocuments);
  130.       console.log(`订单数据迁移完成,共迁移 ${orderDocuments.length} 条记录`);
  131.     }
  132.    
  133.     console.log('数据迁移完成');
  134.    
  135.   } catch (error) {
  136.     console.error('迁移过程中出错:', error);
  137.   } finally {
  138.     await mysqlConnection.end();
  139.     await mongoClient.close();
  140.   }
  141. }
  142. mysqlToMongoDB().catch(console.error);
复制代码

6.3 数据分析和报告生成
  1. from pymongo import MongoClient
  2. import json
  3. import pandas as pd
  4. from datetime import datetime, timedelta
  5. import matplotlib.pyplot as plt
  6. import seaborn as sns
  7. def sales_data_analysis():
  8.     # 连接到MongoDB
  9.     client = MongoClient('mongodb://localhost:27017/')
  10.     db = client['sales_db']
  11.    
  12.     # 定义分析日期范围
  13.     end_date = datetime.now()
  14.     start_date = end_date - timedelta(days=90)  # 最近90天
  15.    
  16.     # 查询销售数据
  17.     pipeline = [
  18.         {
  19.             '$match': {
  20.                 'order_date': {
  21.                     '$gte': start_date,
  22.                     '$lte': end_date
  23.                 },
  24.                 'status': 'completed'
  25.             }
  26.         },
  27.         {
  28.             '$project': {
  29.                 '_id': 0,
  30.                 'order_id': '$_id',
  31.                 'customer_id': 1,
  32.                 'order_date': 1,
  33.                 'total': 1,
  34.                 'items': 1,
  35.                 'payment_method': 1,
  36.                 'year': {'$year': '$order_date'},
  37.                 'month': {'$month': '$order_date'},
  38.                 'day': {'$dayOfMonth': '$order_date'},
  39.                 'day_of_week': {'$dayOfWeek': '$order_date'}
  40.             }
  41.         }
  42.     ]
  43.    
  44.     # 执行聚合查询
  45.     sales_data = list(db.orders.aggregate(pipeline))
  46.    
  47.     # 导出原始数据到JSON
  48.     with open('sales_data_raw.json', 'w', encoding='utf-8') as f:
  49.         json.dump(sales_data, f, ensure_ascii=False, indent=2, default=str)
  50.    
  51.     print(f"已导出 {len(sales_data)} 条销售记录到 sales_data_raw.json")
  52.    
  53.     # 转换为DataFrame进行分析
  54.     df = pd.DataFrame(sales_data)
  55.    
  56.     # 1. 按日期分析销售趋势
  57.     df['order_date'] = pd.to_datetime(df['order_date'])
  58.     df.set_index('order_date', inplace=True)
  59.    
  60.     # 按天汇总销售数据
  61.     daily_sales = df['total'].resample('D').sum()
  62.    
  63.     # 导出每日销售数据
  64.     daily_sales.to_json('daily_sales.json', date_format='iso')
  65.    
  66.     # 2. 按产品分析销售情况
  67.     product_sales = {}
  68.     for _, order in df.iterrows():
  69.         for item in order['items']:
  70.             product_id = item['product_id']
  71.             if product_id not in product_sales:
  72.                 product_sales[product_id] = {
  73.                     'quantity': 0,
  74.                     'revenue': 0
  75.                 }
  76.             product_sales[product_id]['quantity'] += item['quantity']
  77.             product_sales[product_id]['revenue'] += item['price'] * item['quantity']
  78.    
  79.     # 转换为DataFrame并排序
  80.     product_df = pd.DataFrame.from_dict(product_sales, orient='index')
  81.     product_df = product_df.sort_values('revenue', ascending=False)
  82.    
  83.     # 导出产品销售数据
  84.     product_df.to_json('product_sales.json', orient='index')
  85.    
  86.     # 3. 按支付方式分析
  87.     payment_stats = df.groupby('payment_method')['total'].agg(['count', 'sum', 'mean'])
  88.    
  89.     # 导出支付方式统计数据
  90.     payment_stats.to_json('payment_stats.json', orient='index')
  91.    
  92.     # 4. 按星期几分析销售模式
  93.     weekday_stats = df.groupby('day_of_week')['total'].agg(['count', 'sum', 'mean'])
  94.    
  95.     # 导出星期几统计数据
  96.     weekday_stats.to_json('weekday_stats.json', orient='index')
  97.    
  98.     # 生成可视化图表
  99.     plt.figure(figsize=(15, 10))
  100.    
  101.     # 每日销售趋势图
  102.     plt.subplot(2, 2, 1)
  103.     daily_sales.plot()
  104.     plt.title('每日销售趋势')
  105.     plt.xlabel('日期')
  106.     plt.ylabel('销售额')
  107.     plt.grid(True)
  108.    
  109.     # 产品销售Top 10
  110.     plt.subplot(2, 2, 2)
  111.     top_products = product_df.head(10)
  112.     top_products['revenue'].plot(kind='bar')
  113.     plt.title('产品销售额Top 10')
  114.     plt.xlabel('产品ID')
  115.     plt.ylabel('销售额')
  116.     plt.xticks(rotation=45)
  117.    
  118.     # 支付方式分布
  119.     plt.subplot(2, 2, 3)
  120.     payment_stats['count'].plot(kind='pie', autopct='%1.1f%%')
  121.     plt.title('支付方式分布')
  122.     plt.ylabel('')
  123.    
  124.     # 星期几销售模式
  125.     plt.subplot(2, 2, 4)
  126.     weekday_names = ['周日', '周一', '周二', '周三', '周四', '周五', '周六']
  127.     weekday_stats.index = [weekday_names[i-1] for i in weekday_stats.index]
  128.     weekday_stats['sum'].plot(kind='bar')
  129.     plt.title('按星期几的销售额')
  130.     plt.xlabel('星期')
  131.     plt.ylabel('销售额')
  132.    
  133.     plt.tight_layout()
  134.     plt.savefig('sales_analysis.png')
  135.     plt.close()
  136.    
  137.     # 生成分析报告
  138.     report = f"""
  139. # 销售数据分析报告
  140. ## 分析期间
  141. {start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')}
  142. ## 总体销售情况
  143. - 总订单数: {len(df)}
  144. - 总销售额: {df['total'].sum():.2f}
  145. - 平均订单金额: {df['total'].mean():.2f}
  146. ## 销售趋势
  147. - 最高单日销售额: {daily_sales.max():.2f} ({daily_sales.idxmax().strftime('%Y-%m-%d')})
  148. - 最低单日销售额: {daily_sales.min():.2f} ({daily_sales.idxmin().strftime('%Y-%m-%d')})
  149. - 平均每日销售额: {daily_sales.mean():.2f}
  150. ## 热销产品
  151. - 销售额最高的产品: {product_df.index[0]} (销售额: {product_df.iloc[0]['revenue']:.2f})
  152. - 销量最高的产品: {product_df.sort_values('quantity', ascending=False).index[0]} (销量: {product_df.sort_values('quantity', ascending=False).iloc[0]['quantity']})
  153. ## 支付方式分析
  154. """
  155.    
  156.     for payment_method, stats in payment_stats.iterrows():
  157.         report += f"- {payment_method}: {stats['count']} 笔订单, 总额 {stats['sum']:.2f}, 平均 {stats['mean']:.2f}\n"
  158.    
  159.     report += "\n## 星期几销售模式\n"
  160.    
  161.     for day, stats in weekday_stats.iterrows():
  162.         report += f"- {day}: {stats['count']} 笔订单, 总额 {stats['sum']:.2f}, 平均 {stats['mean']:.2f}\n"
  163.    
  164.     # 保存报告
  165.     with open('sales_analysis_report.md', 'w', encoding='utf-8') as f:
  166.         f.write(report)
  167.    
  168.     print("销售数据分析完成,已生成以下文件:")
  169.     print("- sales_data_raw.json: 原始销售数据")
  170.     print("- daily_sales.json: 每日销售数据")
  171.     print("- product_sales.json: 产品销售数据")
  172.     print("- payment_stats.json: 支付方式统计数据")
  173.     print("- weekday_stats.json: 星期几销售数据")
  174.     print("- sales_analysis.png: 销售分析图表")
  175.     print("- sales_analysis_report.md: 销售分析报告")
  176.    
  177.     client.close()
  178. if __name__ == "__main__":
  179.     sales_data_analysis()
复制代码
  1. const { MongoClient } = require('mongodb');
  2. const fs = require('fs');
  3. async function userBehaviorAnalysis() {
  4.   const uri = "mongodb://localhost:27017";
  5.   const client = new MongoClient(uri);
  6.   
  7.   try {
  8.     await client.connect();
  9.     const database = client.db("analytics_db");
  10.    
  11.     // 1. 用户活动分析
  12.     console.log("分析用户活动...");
  13.    
  14.     const userActivityPipeline = [
  15.       {
  16.         $match: {
  17.           timestamp: {
  18.             $gte: new Date(new Date() - 30 * 24 * 60 * 60 * 1000) // 最近30天
  19.           }
  20.         }
  21.       },
  22.       {
  23.         $group: {
  24.           _id: "$userId",
  25.           sessionCount: { $sum: 1 },
  26.           lastActivity: { $max: "$timestamp" },
  27.           activities: { $push: "$action" }
  28.         }
  29.       },
  30.       {
  31.         $project: {
  32.           userId: "$_id",
  33.           sessionCount: 1,
  34.           lastActivity: 1,
  35.           uniqueActivities: { $size: { $setUnion: "$activities" } },
  36.           mostCommonActivity: {
  37.             $arrayElemAt: [
  38.               {
  39.                 $slice: [
  40.                   {
  41.                     $sortArray: {
  42.                       input: {
  43.                         $map: {
  44.                           input: { $setUnion: "$activities" },
  45.                           as: "activity",
  46.                           in: {
  47.                             activity: "$$activity",
  48.                             count: {
  49.                               $size: {
  50.                                 $filter: {
  51.                                   input: "$activities",
  52.                                   cond: { $eq: ["$$this", "$$activity"] }
  53.                                 }
  54.                               }
  55.                             }
  56.                           }
  57.                         }
  58.                       },
  59.                       sortBy: { count: -1 }
  60.                     }
  61.                   },
  62.                   1
  63.                 ]
  64.               },
  65.               0
  66.             ]
  67.           }
  68.         }
  69.       },
  70.       {
  71.         $sort: { sessionCount: -1 }
  72.       }
  73.     ];
  74.    
  75.     const userActivity = await database.collection("user_sessions").aggregate(userActivityPipeline).toArray();
  76.    
  77.     // 导出用户活动数据
  78.     fs.writeFileSync('user_activity_analysis.json', JSON.stringify(userActivity, null, 2));
  79.     console.log(`已导出 ${userActivity.length} 个用户的活动分析数据`);
  80.    
  81.     // 2. 用户路径分析
  82.     console.log("分析用户路径...");
  83.    
  84.     const userPathPipeline = [
  85.       {
  86.         $match: {
  87.           timestamp: {
  88.             $gte: new Date(new Date() - 30 * 24 * 60 * 60 * 1000) // 最近30天
  89.           }
  90.         }
  91.       },
  92.       {
  93.         $sort: { userId: 1, timestamp: 1 }
  94.       },
  95.       {
  96.         $group: {
  97.           _id: "$userId",
  98.           paths: {
  99.             $push: {
  100.               page: "$page",
  101.               timestamp: "$timestamp"
  102.             }
  103.           }
  104.         }
  105.       },
  106.       {
  107.         $project: {
  108.           userId: "$_id",
  109.           pathSequence: {
  110.             $map: {
  111.               input: "$paths",
  112.               as: "path",
  113.               in: "$$path.page"
  114.             }
  115.           },
  116.           sessionCount: { $size: "$paths" },
  117.           timeSpent: {
  118.             $reduce: {
  119.               input: { $slice: ["$paths", 1, { $size: "$paths" }] },
  120.               initialValue: 0,
  121.               in: {
  122.                 $add: [
  123.                   "$$value",
  124.                   {
  125.                     $subtract: [
  126.                       "$$this.timestamp",
  127.                       {
  128.                         $arrayElemAt: [
  129.                           "$paths",
  130.                           { $subtract: [{ $indexOfArray: ["$paths", "$$this"] }, 1] }
  131.                         ]
  132.                       }.timestamp
  133.                     ]
  134.                   }
  135.                 ]
  136.               }
  137.             }
  138.           }
  139.         }
  140.       },
  141.       {
  142.         $addFields: {
  143.           avgTimePerPage: {
  144.             $cond: {
  145.               if: { $gt: [{ $size: "$pathSequence" }, 1] },
  146.               then: { $divide: ["$timeSpent", { $subtract: [{ $size: "$pathSequence" }, 1] } },
  147.               else: 0
  148.             }
  149.           }
  150.         }
  151.       },
  152.       {
  153.         $sort: { sessionCount: -1 }
  154.       }
  155.     ];
  156.    
  157.     const userPaths = await database.collection("page_views").aggregate(userPathPipeline).toArray();
  158.    
  159.     // 导出用户路径数据
  160.     fs.writeFileSync('user_path_analysis.json', JSON.stringify(userPaths, null, 2));
  161.     console.log(`已导出 ${userPaths.length} 个用户的路径分析数据`);
  162.    
  163.     // 3. 转化漏斗分析
  164.     console.log("分析转化漏斗...");
  165.    
  166.     const conversionFunnelPipeline = [
  167.       {
  168.         $match: {
  169.           timestamp: {
  170.             $gte: new Date(new Date() - 30 * 24 * 60 * 60 * 1000) // 最近30天
  171.           }
  172.         }
  173.       },
  174.       {
  175.         $group: {
  176.           _id: "$userId",
  177.           events: { $push: "$event" },
  178.           firstEvent: { $first: "$event" },
  179.           lastEvent: { $last: "$event" },
  180.           uniqueEvents: { $addToSet: "$event" }
  181.         }
  182.       },
  183.       {
  184.         $project: {
  185.           userId: "$_id",
  186.           events: 1,
  187.           firstEvent: 1,
  188.           lastEvent: 1,
  189.           visitedHomepage: { $in: ["homepage", "$uniqueEvents"] },
  190.           viewedProduct: { $in: ["view_product", "$uniqueEvents"] },
  191.           addedToCart: { $in: ["add_to_cart", "$uniqueEvents"] },
  192.           initiatedCheckout: { $in: ["initiate_checkout", "$uniqueEvents"] },
  193.           completedPurchase: { $in: ["purchase", "$uniqueEvents"] }
  194.         }
  195.       },
  196.       {
  197.         $group: {
  198.           _id: null,
  199.           totalUsers: { $sum: 1 },
  200.           homepageVisits: { $sum: { $cond: ["$visitedHomepage", 1, 0] } },
  201.           productViews: { $sum: { $cond: ["$viewedProduct", 1, 0] } },
  202.           cartAdditions: { $sum: { $cond: ["$addedToCart", 1, 0] } },
  203.           checkouts: { $sum: { $cond: ["$initiatedCheckout", 1, 0] } },
  204.           purchases: { $sum: { $cond: ["$completedPurchase", 1, 0] } }
  205.         }
  206.       },
  207.       {
  208.         $project: {
  209.           _id: 0,
  210.           totalUsers: 1,
  211.           homepageVisits: 1,
  212.           productViews: 1,
  213.           cartAdditions: 1,
  214.           checkouts: 1,
  215.           purchases: 1,
  216.           homepageConversionRate: { $divide: ["$homepageVisits", "$totalUsers"] },
  217.           productViewRate: { $divide: ["$productViews", "$homepageVisits"] },
  218.           cartAdditionRate: { $divide: ["$cartAdditions", "$productViews"] },
  219.           checkoutRate: { $divide: ["$checkouts", "$cartAdditions"] },
  220.           purchaseRate: { $divide: ["$purchases", "$checkouts"] },
  221.           overallConversionRate: { $divide: ["$purchases", "$totalUsers"] }
  222.         }
  223.       }
  224.     ];
  225.    
  226.     const conversionFunnel = await database.collection("user_events").aggregate(conversionFunnelPipeline).toArray();
  227.    
  228.     // 导出转化漏斗数据
  229.     fs.writeFileSync('conversion_funnel_analysis.json', JSON.stringify(conversionFunnel, null, 2));
  230.     console.log("已导出转化漏斗分析数据");
  231.    
  232.     // 4. 用户留存分析
  233.     console.log("分析用户留存...");
  234.    
  235.     const userRetentionPipeline = [
  236.       {
  237.         $match: {
  238.           event: "login",
  239.           timestamp: {
  240.             $gte: new Date(new Date() - 90 * 24 * 60 * 60 * 1000) // 最近90天
  241.           }
  242.         }
  243.       },
  244.       {
  245.         $group: {
  246.           _id: "$userId",
  247.           firstLogin: { $min: "$timestamp" },
  248.           lastLogin: { $max: "$timestamp" },
  249.           loginCount: { $sum: 1 },
  250.           loginDays: {
  251.             $addToSet: {
  252.               $dateToString: {
  253.                 format: "%Y-%m-%d",
  254.                 date: "$timestamp"
  255.               }
  256.             }
  257.           }
  258.         }
  259.       },
  260.       {
  261.         $project: {
  262.           userId: "$_id",
  263.           firstLogin: 1,
  264.           lastLogin: 1,
  265.           loginCount: 1,
  266.           activeDays: { $size: "$loginDays" },
  267.           cohort: {
  268.             $dateToString: {
  269.               format: "%Y-%m",
  270.               date: "$firstLogin"
  271.             }
  272.           }
  273.         }
  274.       },
  275.       {
  276.         $group: {
  277.           _id: "$cohort",
  278.           users: { $push: "$$ROOT" },
  279.           userCount: { $sum: 1 }
  280.         }
  281.       },
  282.       {
  283.         $project: {
  284.           cohort: "$_id",
  285.           userCount: 1,
  286.           users: 1,
  287.           day1Retention: {
  288.             $divide: [
  289.               {
  290.                 $size: {
  291.                   $filter: {
  292.                     input: "$users",
  293.                     cond: {
  294.                       $gte: [
  295.                         {
  296.                           $dateDiff: {
  297.                             startDate: "$$this.firstLogin",
  298.                             endDate: "$$this.lastLogin",
  299.                             unit: "day"
  300.                           }
  301.                         },
  302.                         1
  303.                       ]
  304.                     }
  305.                   }
  306.                 }
  307.               },
  308.               "$userCount"
  309.             ]
  310.           },
  311.           day7Retention: {
  312.             $divide: [
  313.               {
  314.                 $size: {
  315.                   $filter: {
  316.                     input: "$users",
  317.                     cond: {
  318.                       $gte: [
  319.                         {
  320.                           $dateDiff: {
  321.                             startDate: "$$this.firstLogin",
  322.                             endDate: "$$this.lastLogin",
  323.                             unit: "day"
  324.                           }
  325.                         },
  326.                         7
  327.                       ]
  328.                     }
  329.                   }
  330.                 }
  331.               },
  332.               "$userCount"
  333.             ]
  334.           },
  335.           day30Retention: {
  336.             $divide: [
  337.               {
  338.                 $size: {
  339.                   $filter: {
  340.                     input: "$users",
  341.                     cond: {
  342.                       $gte: [
  343.                         {
  344.                           $dateDiff: {
  345.                             startDate: "$$this.firstLogin",
  346.                             endDate: "$$this.lastLogin",
  347.                             unit: "day"
  348.                           }
  349.                         },
  350.                         30
  351.                       ]
  352.                     }
  353.                   }
  354.                 }
  355.               },
  356.               "$userCount"
  357.             ]
  358.           }
  359.         }
  360.       },
  361.       {
  362.         $sort: { cohort: 1 }
  363.       }
  364.     ];
  365.    
  366.     const userRetention = await database.collection("user_events").aggregate(userRetentionPipeline).toArray();
  367.    
  368.     // 导出用户留存数据
  369.     fs.writeFileSync('user_retention_analysis.json', JSON.stringify(userRetention, null, 2));
  370.     console.log(`已导出 ${userRetention.length} 个用户群组的留存分析数据`);
  371.    
  372.     // 生成分析报告
  373.     const report = `# 用户行为分析报告
  374. ## 分析期间
  375. 最近30天(${new Date(new Date() - 30 * 24 * 60 * 60 * 1000).toISOString().split('T')[0]} 至 ${new Date().toISOString().split('T')[0]})
  376. ## 用户活动分析
  377. - 分析用户数: ${userActivity.length}
  378. - 平均每个用户会话数: ${(userActivity.reduce((sum, user) => sum + user.sessionCount, 0) / userActivity.length).toFixed(2)}
  379. - 最活跃用户: ${userActivity[0].userId} (${userActivity[0].sessionCount} 次会话)
  380. ## 用户路径分析
  381. - 分析用户数: ${userPaths.length}
  382. - 平均每个用户页面浏览数: ${(userPaths.reduce((sum, user) => sum + user.sessionCount, 0) / userPaths.length).toFixed(2)}
  383. - 平均每个用户停留时间: ${(userPaths.reduce((sum, user) => sum + user.timeSpent, 0) / userPaths.length / 1000 / 60).toFixed(2)} 分钟
  384. ## 转化漏斗分析
  385. `;
  386.    
  387.     if (conversionFunnel.length > 0) {
  388.       const funnel = conversionFunnel[0];
  389.       report += `
  390. - 总用户数: ${funnel.totalUsers}
  391. - 首页访问数: ${funnel.homepageVisits} (${(funnel.homepageConversionRate * 100).toFixed(2)}%)
  392. - 产品浏览数: ${funnel.productViews} (${(funnel.productViewRate * 100).toFixed(2)}%)
  393. - 加入购物车数: ${funnel.cartAdditions} (${(funnel.cartAdditionRate * 100).toFixed(2)}%)
  394. - 结账开始数: ${funnel.checkouts} (${(funnel.checkoutRate * 100).toFixed(2)}%)
  395. - 完成购买数: ${funnel.purchases} (${(funnel.purchaseRate * 100).toFixed(2)}%)
  396. - 整体转化率: ${(funnel.overallConversionRate * 100).toFixed(2)}%
  397. `;
  398.     }
  399.    
  400.     report += "\n## 用户留存分析\n";
  401.    
  402.     if (userRetention.length > 0) {
  403.       report += "| 群组 | 用户数 | 次日留存率 | 7日留存率 | 30日留存率 |\n";
  404.       report += "|------|--------|------------|-----------|------------|\n";
  405.       
  406.       userRetention.forEach(cohort => {
  407.         report += `| ${cohort.cohort} | ${cohort.userCount} | ${(cohort.day1Retention * 100).toFixed(2)}% | ${(cohort.day7Retention * 100).toFixed(2)}% | ${(cohort.day30Retention * 100).toFixed(2)}% |\n`;
  408.       });
  409.     }
  410.    
  411.     // 保存报告
  412.     fs.writeFileSync('user_behavior_analysis_report.md', report);
  413.    
  414.     console.log("用户行为分析完成,已生成以下文件:");
  415.     console.log("- user_activity_analysis.json: 用户活动分析数据");
  416.     console.log("- user_path_analysis.json: 用户路径分析数据");
  417.     console.log("- conversion_funnel_analysis.json: 转化漏斗分析数据");
  418.     console.log("- user_retention_analysis.json: 用户留存分析数据");
  419.     console.log("- user_behavior_analysis_report.md: 用户行为分析报告");
  420.    
  421.   } finally {
  422.     await client.close();
  423.   }
  424. }
  425. userBehaviorAnalysis().catch(console.error);
复制代码

7. 总结

本文详细介绍了从MongoDB数据库导出JSON数据的各种方法、工具和最佳实践。我们讨论了:

1. 多种导出方法:使用mongoexport命令行工具使用MongoDB Compass图形界面使用编程语言驱动程序(Node.js、Python、Java)使用聚合管道进行数据转换
2. 使用mongoexport命令行工具
3. 使用MongoDB Compass图形界面
4. 使用编程语言驱动程序(Node.js、Python、Java)
5. 使用聚合管道进行数据转换
6. 最佳实践:性能优化技术,包括批量处理和索引优化数据安全措施,包括敏感数据处理和访问控制大数据集处理策略,包括分片集合导出和增量导出
7. 性能优化技术,包括批量处理和索引优化
8. 数据安全措施,包括敏感数据处理和访问控制
9. 大数据集处理策略,包括分片集合导出和增量导出
10. 常见问题和解决方案:处理特殊数据类型(ObjectId、Date等)处理大数据量和内存限制处理连接和认证问题
11. 处理特殊数据类型(ObjectId、Date等)
12. 处理大数据量和内存限制
13. 处理连接和认证问题
14. 实际应用场景:数据备份与恢复数据迁移(MongoDB到其他数据库,其他数据库到MongoDB)数据分析和报告生成
15. 数据备份与恢复
16. 数据迁移(MongoDB到其他数据库,其他数据库到MongoDB)
17. 数据分析和报告生成

多种导出方法:

• 使用mongoexport命令行工具
• 使用MongoDB Compass图形界面
• 使用编程语言驱动程序(Node.js、Python、Java)
• 使用聚合管道进行数据转换

最佳实践:

• 性能优化技术,包括批量处理和索引优化
• 数据安全措施,包括敏感数据处理和访问控制
• 大数据集处理策略,包括分片集合导出和增量导出

常见问题和解决方案:

• 处理特殊数据类型(ObjectId、Date等)
• 处理大数据量和内存限制
• 处理连接和认证问题

实际应用场景:

• 数据备份与恢复
• 数据迁移(MongoDB到其他数据库,其他数据库到MongoDB)
• 数据分析和报告生成

通过本文提供的详细指南和代码示例,开发者可以根据自己的需求选择最适合的方法来导出MongoDB数据为JSON格式,并确保导出过程高效、安全且可靠。

无论您是需要进行数据备份、系统迁移、数据分析还是其他任何需要导出MongoDB数据的场景,本文提供的知识都将帮助您轻松实现数据导出与交换。
「七転び八起き(ななころびやおき)」
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则