|
|
马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
1. 引言:MongoDB与JSON的关系
MongoDB是一种流行的NoSQL数据库,它使用文档存储模型,而JSON(JavaScript Object Notation)是一种轻量级的数据交换格式。这两者之间有着天然的联系,因为MongoDB中的文档本质上是BSON(Binary JSON)格式存储的,这使得JSON成为MongoDB数据导出的理想选择。
本文将全面介绍如何从MongoDB数据库中导出JSON数据,包括各种方法、工具、最佳实践以及实际应用场景,帮助开发者轻松实现数据导出与交换。
2. 为什么需要从MongoDB导出JSON数据
在实际开发中,将MongoDB数据导出为JSON格式有多种用途:
• 数据备份与迁移:将数据导出为JSON格式可以方便地进行备份或迁移到其他系统
• 数据分析:导出的JSON数据可以被各种数据分析工具处理
• 系统集成:不同系统之间通过JSON格式进行数据交换
• 开发测试:创建测试数据或模拟生产环境
• 报告生成:将数据导出后用于生成各种报告
3. MongoDB导出JSON的方法
3.1 使用mongoexport命令行工具
mongoexport是MongoDB提供的一个命令行工具,专门用于将集合中的数据导出为JSON或CSV格式。
- mongoexport --host=localhost --port=27017 --db=mydb --collection=mycollection --out=data.json
复制代码
• --host:MongoDB服务器地址
• --port:MongoDB服务器端口
• --db:要导出的数据库名称
• --collection:要导出的集合名称
• --out:输出文件路径
• --query:导出数据的查询条件(JSON格式)
• --fields:指定要导出的字段
• --type:输出文件类型(json或csv)
• --jsonArray:将输出作为JSON数组而不是每行一个JSON对象
• --pretty:格式化JSON输出,使其更易读
1. 使用查询条件导出特定数据
- mongoexport --db=sales --collection=orders --query='{"status": "completed", "date": {"$gte": {"$date": "2023-01-01T00:00:00Z"}}}' --out=completed_orders_2023.json
复制代码
2. 导出特定字段并格式化输出
- mongoexport --db=users --collection=profiles --fields=name,email,age --jsonArray --pretty --out=user_profiles.json
复制代码
3. 从认证的MongoDB实例导出数据
- mongoexport --host=cluster.example.com --port=27017 --db=app --collection=logs --username=admin --password=secret --authenticationDatabase=admin --out=logs.json
复制代码
3.2 使用MongoDB Compass图形界面
MongoDB Compass是MongoDB的官方图形界面工具,它提供了直观的方式来导出数据。
1. 连接到MongoDB服务器
2. 选择要导出的数据库和集合
3. 点击集合标签页中的”Export”按钮
4. 在弹出的对话框中选择”JSON”作为导出格式
5. 选择要导出的字段(可选)
6. 设置查询条件(可选)
7. 点击”Export”按钮并选择保存位置
• 导出整个集合:导出集合中的所有文档
• 导出查询结果:先执行查询,然后导出结果
• 字段选择:只导出选定的字段
• 输出格式:标准JSON或CSV
3.3 使用编程语言驱动程序
通过编程语言的MongoDB驱动程序,可以更灵活地控制数据导出过程。
- const { MongoClient } = require('mongodb');
- const fs = require('fs');
- const path = require('path');
- async function exportToJson() {
- const uri = "mongodb://localhost:27017";
- const client = new MongoClient(uri);
-
- try {
- await client.connect();
- const database = client.db("mydb");
- const collection = database.collection("mycollection");
-
- // 查询条件
- const query = { status: "active" };
-
- // 投影(选择字段)
- const projection = { _id: 0, name: 1, email: 1 };
-
- // 执行查询
- const cursor = collection.find(query).project(projection);
-
- // 转换为数组
- const results = await cursor.toArray();
-
- // 转换为JSON字符串并格式化
- const json = JSON.stringify(results, null, 2);
-
- // 写入文件
- fs.writeFileSync(path.join(__dirname, 'output.json'), json);
-
- console.log(`成功导出 ${results.length} 条记录到 output.json`);
- } finally {
- await client.close();
- }
- }
- exportToJson().catch(console.error);
复制代码- from pymongo import MongoClient
- import json
- from datetime import datetime
- def export_to_json():
- # 连接到MongoDB
- client = MongoClient('mongodb://localhost:27017/')
- db = client['mydb']
- collection = db['mycollection']
-
- # 查询条件
- query = {
- "created_at": {
- "$gte": datetime(2023, 1, 1),
- "$lt": datetime(2023, 12, 31)
- }
- }
-
- # 投影(选择字段)
- projection = {
- "_id": 0,
- "name": 1,
- "email": 1,
- "created_at": 1
- }
-
- # 执行查询
- cursor = collection.find(query, projection)
-
- # 转换为列表
- results = list(cursor)
-
- # 处理日期对象,使其可JSON序列化
- for doc in results:
- if 'created_at' in doc and isinstance(doc['created_at'], datetime):
- doc['created_at'] = doc['created_at'].isoformat()
-
- # 写入JSON文件
- with open('output.json', 'w', encoding='utf-8') as f:
- json.dump(results, f, ensure_ascii=False, indent=2)
-
- print(f"成功导出 {len(results)} 条记录到 output.json")
-
- # 关闭连接
- client.close()
- if __name__ == "__main__":
- export_to_json()
复制代码- import com.mongodb.client.*;
- import com.mongodb.client.model.Filters;
- import com.mongodb.client.model.Projections;
- import org.bson.Document;
- import org.bson.conversions.Bson;
- import java.io.FileWriter;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
- public class MongoToJsonExporter {
- public static void main(String[] args) {
- // 连接字符串
- String connectionString = "mongodb://localhost:27017";
-
- try (MongoClient mongoClient = MongoClients.create(connectionString)) {
- // 获取数据库和集合
- MongoDatabase database = mongoClient.getDatabase("mydb");
- MongoCollection<Document> collection = database.getCollection("mycollection");
-
- // 查询条件
- Bson query = Filters.eq("status", "active");
-
- // 投影(选择字段)
- Bson projection = Projections.fields(
- Projections.excludeId(),
- Projections.include("name", "email", "age")
- );
-
- // 执行查询
- FindIterable<Document> documents = collection.find(query)
- .projection(projection);
-
- // 转换为List
- List<Document> results = new ArrayList<>();
- for (Document doc : documents) {
- results.add(doc);
- }
-
- // 转换为JSON并写入文件
- try (FileWriter writer = new FileWriter("output.json")) {
- writer.write("[\n");
- for (int i = 0; i < results.size(); i++) {
- writer.write(" " + results.get(i).toJson());
- if (i < results.size() - 1) {
- writer.write(",");
- }
- writer.write("\n");
- }
- writer.write("]");
- }
-
- System.out.println("成功导出 " + results.size() + " 条记录到 output.json");
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
复制代码
3.4 使用聚合管道导出JSON
MongoDB的聚合管道提供了强大的数据处理能力,可以用于在导出前对数据进行转换和处理。
- db.orders.aggregate([
- // 第一阶段:过滤条件
- { $match: { status: "completed", date: { $gte: new Date("2023-01-01") } } },
-
- // 第二阶段:选择和重命名字段
- { $project: {
- _id: 0,
- orderId: "$_id",
- customer: "$customer.name",
- amount: "$total",
- items: "$products"
- }},
-
- // 第三阶段:排序
- { $sort: { amount: -1 } }
- ])
复制代码- const { MongoClient } = require('mongodb');
- const fs = require('fs');
- async function exportWithAggregation() {
- const uri = "mongodb://localhost:27017";
- const client = new MongoClient(uri);
-
- try {
- await client.connect();
- const database = client.db("sales");
- const collection = database.collection("orders");
-
- // 定义聚合管道
- const pipeline = [
- // 过滤条件
- { $match: {
- status: "completed",
- date: { $gte: new Date("2023-01-01") }
- }},
-
- // 计算每个订单的项目数量
- { $addFields: {
- itemCount: { $size: "$items" }
- }},
-
- // 选择和重命名字段
- { $project: {
- _id: 0,
- orderId: "$_id",
- customer: 1,
- date: 1,
- total: 1,
- itemCount: 1
- }},
-
- // 排序
- { $sort: { total: -1 } }
- ];
-
- // 执行聚合
- const cursor = collection.aggregate(pipeline);
-
- // 转换为数组
- const results = await cursor.toArray();
-
- // 转换为JSON字符串并格式化
- const json = JSON.stringify(results, null, 2);
-
- // 写入文件
- fs.writeFileSync('orders_aggregated.json', json);
-
- console.log(`成功导出 ${results.length} 条记录到 orders_aggregated.json`);
- } finally {
- await client.close();
- }
- }
- exportWithAggregation().catch(console.error);
复制代码
4. 导出JSON的最佳实践
4.1 性能优化
当导出大量数据时,一次性加载所有数据可能会导致内存问题。以下是几种处理大数据集的方法:
Node.js示例:使用流式处理
- const { MongoClient } = require('mongodb');
- const fs = require('fs');
- const { Transform } = require('stream');
- async function exportLargeCollection() {
- const uri = "mongodb://localhost:27017";
- const client = new MongoClient(uri);
-
- try {
- await client.connect();
- const database = client.db("large_db");
- const collection = database.collection("large_collection");
-
- // 创建可写流
- const outputStream = fs.createWriteStream('large_data.json');
-
- // 写入JSON数组的开始标记
- outputStream.write('[\n');
-
- let first = true;
- let count = 0;
-
- // 创建游标并设置批处理大小
- const cursor = collection.find().batchSize(1000);
-
- // 使用流式处理
- const transformStream = new Transform({
- objectMode: true,
- transform(doc, encoding, callback) {
- if (!first) {
- this.push(',\n');
- } else {
- first = false;
- }
-
- // 处理文档并转换为JSON
- const jsonDoc = JSON.stringify(doc, null, 2);
- this.push(jsonDoc);
-
- count++;
- if (count % 1000 === 0) {
- console.log(`已处理 ${count} 条记录`);
- }
-
- callback();
- }
- });
-
- // 管道处理
- cursor.pipe(transformStream).pipe(outputStream, { end: false });
-
- // 等待流完成
- await new Promise((resolve, reject) => {
- outputStream.on('finish', resolve);
- outputStream.on('error', reject);
- });
-
- // 写入JSON数组的结束标记
- outputStream.write('\n]');
- outputStream.end();
-
- console.log(`成功导出 ${count} 条记录到 large_data.json`);
- } finally {
- await client.close();
- }
- }
- exportLargeCollection().catch(console.error);
复制代码
Python示例:使用游标分批处理
- from pymongo import MongoClient
- import json
- import time
- def export_large_collection():
- client = MongoClient('mongodb://localhost:27017/')
- db = client['large_db']
- collection = db['large_collection']
-
- # 查询条件(无条件表示全部)
- query = {}
-
- # 投影(选择字段)
- projection = {"_id": 0}
-
- # 设置批处理大小
- batch_size = 1000
-
- # 打开输出文件
- with open('large_data.json', 'w', encoding='utf-8') as f:
- # 写入JSON数组的开始标记
- f.write('[\n')
-
- first = True
- count = 0
-
- # 使用游标和批处理
- cursor = collection.find(query, projection).batch_size(batch_size)
-
- for doc in cursor:
- if not first:
- f.write(',\n')
- else:
- first = False
-
- # 写入文档
- json.dump(doc, f, ensure_ascii=False)
-
- count += 1
- if count % batch_size == 0:
- print(f"已处理 {count} 条记录")
-
- # 写入JSON数组的结束标记
- f.write('\n]')
-
- print(f"成功导出 {count} 条记录到 large_data.json")
- client.close()
- if __name__ == "__main__":
- start_time = time.time()
- export_large_collection()
- end_time = time.time()
- print(f"导出完成,耗时: {end_time - start_time:.2f} 秒")
复制代码
在导出数据前,确保查询条件使用的字段有适当的索引,可以显著提高导出速度。
- // 创建复合索引
- db.orders.createIndex({ status: 1, date: -1 })
- // 使用索引友好的查询
- db.orders.find({
- status: "completed",
- date: { $gte: new Date("2023-01-01") }
- }).sort({ date: -1 })
复制代码
4.2 数据安全
在导出数据时,可能需要处理敏感信息,如密码、个人身份信息等。
使用聚合管道屏蔽敏感字段
- db.users.aggregate([
- {
- $project: {
- name: 1,
- email: 1,
- // 屏蔽密码字段
- password: 0,
- // 对电话号码进行部分屏蔽
- phone: {
- $concat: [
- { $substrCP: ["$phone", 0, 3] },
- "****",
- { $substrCP: ["$phone", 7, 4] }
- ]
- }
- }
- }
- ])
复制代码
Node.js示例:在导出前处理敏感数据
- const { MongoClient } = require('mongodb');
- const fs = require('fs');
- async function exportWithSensitiveDataHandling() {
- const uri = "mongodb://localhost:27017";
- const client = new MongoClient(uri);
-
- try {
- await client.connect();
- const database = client.db("mydb");
- const collection = database.collection("users");
-
- const cursor = collection.find({});
-
- const results = [];
- for await (const doc of cursor) {
- // 创建一个新对象,只包含需要的字段
- const sanitizedDoc = {
- id: doc._id,
- name: doc.name,
- email: doc.email,
- // 屏蔽部分电话号码
- phone: doc.phone ? doc.phone.replace(/(\d{3})\d{4}(\d{4})/, '$1****$2') : null,
- // 屏蔽信用卡号
- creditCard: doc.creditCard ? '****-****-****-' + doc.creditCard.slice(-4) : null
- };
-
- results.push(sanitizedDoc);
- }
-
- // 转换为JSON字符串并格式化
- const json = JSON.stringify(results, null, 2);
-
- // 写入文件
- fs.writeFileSync('sanitized_users.json', json);
-
- console.log(`成功导出 ${results.length} 条已处理敏感信息的记录到 sanitized_users.json`);
- } finally {
- await client.close();
- }
- }
- exportWithSensitiveDataHandling().catch(console.error);
复制代码
确保只有授权用户才能导出数据,并且只导出他们有权访问的数据。
- // 在MongoDB中设置基于角色的访问控制
- db.createRole({
- role: "exportOperator",
- privileges: [
- { resource: { db: "sales", collection: "orders" }, actions: ["find"] }
- ],
- roles: []
- })
- // 创建用户并分配角色
- db.createUser({
- user: "export_user",
- pwd: "secure_password",
- roles: [ { role: "exportOperator", db: "admin" } ]
- })
复制代码
4.3 大数据集处理
对于分片集合,可以从每个分片并行导出数据,然后合并结果。
- from pymongo import MongoClient
- import json
- import concurrent.futures
- import os
- def export_shard_collection():
- client = MongoClient('mongodb://localhost:27017/')
- db = client['sharded_db']
- collection = db['sharded_collection']
-
- # 获取分片信息
- shard_info = db.command("collstats", "sharded_collection")
- shards = client.config.shards.find()
-
- # 创建输出目录
- os.makedirs("sharded_exports", exist_ok=True)
-
- # 为每个分片创建导出任务
- def export_shard(shard):
- shard_host = shard['host']
- shard_client = MongoClient(shard_host)
- shard_db = shard_client['sharded_db']
- shard_collection = shard_db['sharded_collection']
-
- # 查询条件(可以根据分片键进行优化)
- query = {}
-
- # 执行查询
- cursor = shard_collection.find(query)
-
- # 转换为列表
- results = list(cursor)
-
- # 写入文件
- shard_name = shard['_id']
- output_file = f"sharded_exports/{shard_name}.json"
-
- with open(output_file, 'w', encoding='utf-8') as f:
- json.dump(results, f, ensure_ascii=False, indent=2)
-
- print(f"成功导出分片 {shard_name} 的 {len(results)} 条记录到 {output_file}")
-
- shard_client.close()
- return len(results)
-
- # 并行执行导出任务
- total_records = 0
- with concurrent.futures.ThreadPoolExecutor() as executor:
- futures = [executor.submit(export_shard, shard) for shard in shards]
-
- for future in concurrent.futures.as_completed(futures):
- count = future.result()
- total_records += count
-
- # 合并所有分片文件
- merge_shard_files("sharded_exports", "merged_output.json")
-
- print(f"所有分片导出完成,共导出 {total_records} 条记录")
- client.close()
- def merge_shard_files(input_dir, output_file):
- import glob
-
- # 获取所有分片文件
- files = glob.glob(f"{input_dir}/*.json")
-
- # 创建合并后的输出文件
- with open(output_file, 'w', encoding='utf-8') as outfile:
- outfile.write('[\n')
-
- first_file = True
- for i, filename in enumerate(files):
- with open(filename, 'r', encoding='utf-8') as infile:
- data = json.load(infile)
-
- if not first_file:
- outfile.write(',\n')
- else:
- first_file = False
-
- # 写入每个文档
- for j, doc in enumerate(data):
- if j > 0:
- outfile.write(',\n')
- json.dump(doc, outfile, ensure_ascii=False, indent=2)
-
- outfile.write('\n]')
-
- print(f"已合并所有分片文件到 {output_file}")
- if __name__ == "__main__":
- export_shard_collection()
复制代码
对于频繁变化的大型集合,可以使用增量导出策略,只导出新增或修改的数据。
- const { MongoClient } = require('mongodb');
- const fs = require('fs');
- const path = require('path');
- async function incrementalExport() {
- const uri = "mongodb://localhost:27017";
- const client = new MongoClient(uri);
-
- try {
- await client.connect();
- const database = client.db("mydb");
- const collection = database.collection("mycollection");
-
- // 读取上次导出的时间戳
- let lastExportTime;
- try {
- const stats = JSON.parse(fs.readFileSync('export_stats.json', 'utf8'));
- lastExportTime = new Date(stats.lastExportTime);
- } catch (err) {
- // 如果不存在,使用一个较早的日期
- lastExportTime = new Date('2000-01-01T00:00:00Z');
- }
-
- console.log(`上次导出时间: ${lastExportTime.toISOString()}`);
-
- // 查询自上次导出以来新增或修改的文档
- const query = {
- $or: [
- { createdAt: { $gt: lastExportTime } },
- { updatedAt: { $gt: lastExportTime } }
- ]
- };
-
- // 执行查询
- const cursor = collection.find(query);
-
- // 转换为数组
- const results = await cursor.toArray();
-
- // 如果有新数据,追加到现有文件或创建新文件
- if (results.length > 0) {
- const now = new Date();
- const exportFileName = `incremental_export_${now.toISOString().replace(/:/g, '-')}.json`;
- const exportFilePath = path.join(__dirname, exportFileName);
-
- // 转换为JSON字符串并格式化
- const json = JSON.stringify(results, null, 2);
-
- // 写入文件
- fs.writeFileSync(exportFilePath, json);
-
- // 更新导出统计信息
- const exportStats = {
- lastExportTime: now.toISOString(),
- exportedCount: results.length,
- exportFile: exportFileName
- };
-
- fs.writeFileSync('export_stats.json', JSON.stringify(exportStats, null, 2));
-
- console.log(`成功导出 ${results.length} 条记录到 ${exportFileName}`);
- } else {
- console.log("没有新数据需要导出");
- }
- } finally {
- await client.close();
- }
- }
- incrementalExport().catch(console.error);
复制代码
5. 常见问题和解决方案
5.1 处理特殊数据类型
MongoDB支持一些JSON中不直接支持的数据类型,如ObjectId、Date、BinaryData等。在导出时需要特殊处理。
- // 在Node.js中处理ObjectId
- const { MongoClient, ObjectId } = require('mongodb');
- const fs = require('fs');
- async function exportWithObjectIdHandling() {
- const uri = "mongodb://localhost:27017";
- const client = new MongoClient(uri);
-
- try {
- await client.connect();
- const database = client.db("mydb");
- const collection = database.collection("mycollection");
-
- const cursor = collection.find({});
-
- const results = [];
- for await (const doc of cursor) {
- // 转换ObjectId为字符串
- const processedDoc = {
- id: doc._id.toString(),
- // 其他字段...
- data: doc.data
- };
-
- results.push(processedDoc);
- }
-
- // 转换为JSON字符串并格式化
- const json = JSON.stringify(results, null, 2);
-
- // 写入文件
- fs.writeFileSync('data_with_string_ids.json', json);
-
- console.log(`成功导出 ${results.length} 条记录,ObjectId已转换为字符串`);
- } finally {
- await client.close();
- }
- }
- exportWithObjectIdHandling().catch(console.error);
复制代码- from pymongo import MongoClient
- import json
- from datetime import datetime
- def export_with_date_handling():
- client = MongoClient('mongodb://localhost:27017/')
- db = client['mydb']
- collection = db['mycollection']
-
- # 自定义JSON编码器处理日期
- class DateTimeEncoder(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, datetime):
- return obj.isoformat()
- return super().default(obj)
-
- # 执行查询
- cursor = collection.find({})
-
- # 转换为列表
- results = list(cursor)
-
- # 使用自定义编码器写入JSON文件
- with open('data_with_dates.json', 'w', encoding='utf-8') as f:
- json.dump(results, f, ensure_ascii=False, indent=2, cls=DateTimeEncoder)
-
- print(f"成功导出 {len(results)} 条记录,日期已转换为ISO格式")
- client.close()
- if __name__ == "__main__":
- export_with_date_handling()
复制代码
5.2 处理大数据量和内存限制
当导出的数据量很大时,可能会遇到内存限制问题。以下是几种解决方案:
- import com.mongodb.client.*;
- import com.mongodb.client.model.Filters;
- import org.bson.Document;
- import org.bson.conversions.Bson;
- import java.io.FileWriter;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
- public class BatchExportExample {
- public static void main(String[] args) {
- String connectionString = "mongodb://localhost:27017";
-
- try (MongoClient mongoClient = MongoClients.create(connectionString)) {
- MongoDatabase database = mongoClient.getDatabase("mydb");
- MongoCollection<Document> collection = database.getCollection("mycollection");
-
- // 设置批处理大小
- int batchSize = 1000;
- int totalExported = 0;
-
- try (FileWriter writer = new FileWriter("batch_export.json")) {
- writer.write("[\n");
-
- boolean firstDocument = true;
-
- // 使用游标和批处理
- MongoCursor<Document> cursor = collection.find().batchSize(batchSize).iterator();
-
- List<Document> batch = new ArrayList<>(batchSize);
-
- while (cursor.hasNext()) {
- batch.add(cursor.next());
-
- if (batch.size() >= batchSize) {
- // 处理当前批次
- if (!firstDocument) {
- writer.write(",\n");
- } else {
- firstDocument = false;
- }
-
- for (int i = 0; i < batch.size(); i++) {
- if (i > 0) {
- writer.write(",\n");
- }
- writer.write(" " + batch.get(i).toJson());
- }
-
- totalExported += batch.size();
- System.out.println("已导出 " + totalExported + " 条记录");
-
- // 清空批次
- batch.clear();
- }
- }
-
- // 处理最后一批
- if (!batch.isEmpty()) {
- if (!firstDocument) {
- writer.write(",\n");
- }
-
- for (int i = 0; i < batch.size(); i++) {
- if (i > 0) {
- writer.write(",\n");
- }
- writer.write(" " + batch.get(i).toJson());
- }
-
- totalExported += batch.size();
- System.out.println("已导出 " + totalExported + " 条记录");
- }
-
- writer.write("\n]");
- }
-
- System.out.println("导出完成,总计导出 " + totalExported + " 条记录");
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
复制代码- from pymongo import MongoClient
- import json
- import concurrent.futures
- import math
- import time
- from datetime import datetime
- import os
- def parallel_export():
- client = MongoClient('mongodb://localhost:27017/')
- db = client['mydb']
- collection = db['mycollection']
-
- # 获取集合中的文档总数
- total_docs = collection.count_documents({})
- print(f"集合中共有 {total_docs} 条文档")
-
- # 设置每个线程处理的文档数量
- docs_per_thread = 5000
-
- # 计算需要的线程数
- num_threads = math.ceil(total_docs / docs_per_thread)
- print(f"将使用 {num_threads} 个线程进行并行导出")
-
- # 创建输出目录
- os.makedirs("parallel_exports", exist_ok=True)
-
- # 自定义JSON编码器处理日期
- class DateTimeEncoder(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, datetime):
- return obj.isoformat()
- return super().default(obj)
-
- # 定义导出函数
- def export_range(start, end, thread_id):
- thread_client = MongoClient('mongodb://localhost:27017/')
- thread_db = thread_client['mydb']
- thread_collection = thread_db['mycollection']
-
- # 查询指定范围的文档
- cursor = thread_collection.find().skip(start).limit(end - start)
-
- # 转换为列表
- results = list(cursor)
-
- # 写入文件
- output_file = f"parallel_exports/thread_{thread_id}.json"
- with open(output_file, 'w', encoding='utf-8') as f:
- json.dump(results, f, ensure_ascii=False, indent=2, cls=DateTimeEncoder)
-
- thread_client.close()
- return len(results)
-
- # 使用线程池并行导出
- start_time = time.time()
- total_exported = 0
-
- with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
- futures = []
-
- for i in range(num_threads):
- start = i * docs_per_thread
- end = min((i + 1) * docs_per_thread, total_docs)
-
- future = executor.submit(export_range, start, end, i)
- futures.append(future)
-
- for future in concurrent.futures.as_completed(futures):
- count = future.result()
- total_exported += count
- print(f"线程完成,已导出 {count} 条记录,总计 {total_exported}/{total_docs}")
-
- # 合并所有线程导出的文件
- merge_json_files("parallel_exports", "merged_export.json")
-
- end_time = time.time()
- print(f"并行导出完成,共导出 {total_exported} 条记录,耗时: {end_time - start_time:.2f} 秒")
-
- client.close()
- def merge_json_files(input_dir, output_file):
- import glob
-
- # 获取所有JSON文件
- files = glob.glob(f"{input_dir}/*.json")
-
- # 创建合并后的输出文件
- with open(output_file, 'w', encoding='utf-8') as outfile:
- outfile.write('[\n')
-
- first_file = True
- for i, filename in enumerate(files):
- with open(filename, 'r', encoding='utf-8') as infile:
- data = json.load(infile)
-
- if not first_file:
- outfile.write(',\n')
- else:
- first_file = False
-
- # 写入每个文档
- for j, doc in enumerate(data):
- if j > 0:
- outfile.write(',\n')
- json.dump(doc, outfile, ensure_ascii=False, indent=2)
-
- outfile.write('\n]')
-
- print(f"已合并所有文件到 {output_file}")
- if __name__ == "__main__":
- parallel_export()
复制代码
5.3 处理连接和认证问题
- const { MongoClient } = require('mongodb');
- const fs = require('fs');
- async function exportWithAuth() {
- // 使用连接字符串处理认证
- const uri = "mongodb://username:password@cluster.example.com:27017,cluster.example.com:27018,cluster.example.com:27019/mydb?replicaSet=myReplicaSet&authSource=admin";
-
- const client = new MongoClient(uri, {
- // 连接池选项
- maxPoolSize: 10,
- minPoolSize: 1,
- maxIdleTimeMS: 30000,
- waitQueueTimeoutMS: 5000,
-
- // 重试选项
- connectTimeoutMS: 10000,
- socketTimeoutMS: 45000,
- retryWrites: true,
-
- // SSL选项(如果需要)
- ssl: true,
- sslValidate: true,
- sslCA: process.env.SSL_CA_FILE
- });
-
- try {
- await client.connect();
- const database = client.db("mydb");
- const collection = database.collection("mycollection");
-
- // 执行查询
- const cursor = collection.find({});
-
- // 转换为数组
- const results = await cursor.toArray();
-
- // 转换为JSON字符串并格式化
- const json = JSON.stringify(results, null, 2);
-
- // 写入文件
- fs.writeFileSync('auth_export.json', json);
-
- console.log(`成功导出 ${results.length} 条记录`);
- } catch (err) {
- console.error("导出过程中出错:", err);
- } finally {
- await client.close();
- }
- }
- exportWithAuth().catch(console.error);
复制代码- from pymongo import MongoClient
- import paramiko
- import json
- import sshtunnel
- from datetime import datetime
- def export_via_ssh_tunnel():
- # SSH隧道配置
- ssh_host = 'ssh.example.com'
- ssh_port = 22
- ssh_username = 'ssh_user'
- ssh_pkey = paramiko.RSAKey.from_private_key_file('/path/to/private/key')
-
- # MongoDB配置
- mongo_host = 'localhost' # 相对于SSH服务器
- mongo_port = 27017
- mongo_db = 'mydb'
- mongo_collection = 'mycollection'
-
- # 创建SSH隧道
- server = sshtunnel.SSHTunnelForwarder(
- (ssh_host, ssh_port),
- ssh_username=ssh_username,
- ssh_pkey=ssh_pkey,
- remote_bind_address=(mongo_host, mongo_port)
- )
-
- try:
- # 启动SSH隧道
- server.start()
-
- # 通过隧道连接到MongoDB
- client = MongoClient(
- host='localhost',
- port=server.local_bind_port,
- username='mongo_user',
- password='mongo_password',
- authSource='admin'
- )
-
- db = client[mongo_db]
- collection = db[mongo_collection]
-
- # 自定义JSON编码器处理日期
- class DateTimeEncoder(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, datetime):
- return obj.isoformat()
- return super().default(obj)
-
- # 执行查询
- cursor = collection.find({})
-
- # 转换为列表
- results = list(cursor)
-
- # 写入JSON文件
- with open('ssh_tunnel_export.json', 'w', encoding='utf-8') as f:
- json.dump(results, f, ensure_ascii=False, indent=2, cls=DateTimeEncoder)
-
- print(f"成功导出 {len(results)} 条记录")
-
- except Exception as e:
- print(f"导出过程中出错: {e}")
- finally:
- # 关闭连接和隧道
- if 'client' in locals():
- client.close()
- server.stop()
- if __name__ == "__main__":
- export_via_ssh_tunnel()
复制代码
6. 实际应用场景和案例研究
6.1 数据备份与恢复
- #!/bin/bash
- # MongoDB备份脚本
- # 配置变量
- MONGO_HOST="localhost"
- MONGO_PORT="27017"
- MONGO_USER="backup_user"
- MONGO_PASS="backup_password"
- MONGO_AUTH_DB="admin"
- BACKUP_DIR="/var/backups/mongodb"
- DATE=$(date +%Y%m%d_%H%M%S)
- RETENTION_DAYS=7
- # 创建备份目录
- mkdir -p "$BACKUP_DIR/$DATE"
- # 获取所有数据库列表
- DATABASES=$(mongo --host $MONGO_HOST --port $MONGO_PORT -u $MONGO_USER -p $MONGO_PASS --authenticationDatabase $MONGO_AUTH_DB --quiet --eval "db.getMongo().getDBNames().join(' ')")
- # 备份每个数据库
- for DB in $DATABASES; do
- echo "备份数据库: $DB"
-
- # 获取数据库中的所有集合
- COLLECTIONS=$(mongo --host $MONGO_HOST --port $MONGO_PORT -u $MONGO_USER -p $MONGO_PASS --authenticationDatabase $MONGO_AUTH_DB $DB --quiet --eval "db.getCollectionNames().join(' ')")
-
- # 为每个集合创建备份目录
- mkdir -p "$BACKUP_DIR/$DATE/$DB"
-
- # 备份每个集合
- for COLLECTION in $COLLECTIONS; do
- echo " 备份集合: $DB.$COLLECTION"
- mongoexport --host $MONGO_HOST --port $MONGO_PORT -u $MONGO_USER -p $MONGO_PASS --authenticationDatabase $MONGO_AUTH_DB \
- --db $DB --collection $COLLECTION \
- --out "$BACKUP_DIR/$DATE/$DB/$COLLECTION.json" \
- --jsonArray
- done
- done
- # 创建备份清单
- echo "备份时间: $DATE" > "$BACKUP_DIR/$DATE/backup_info.txt"
- echo "数据库列表: $DATABASES" >> "$BACKUP_DIR/$DATE/backup_info.txt"
- # 压缩备份
- echo "压缩备份文件..."
- tar -czf "$BACKUP_DIR/mongodb_backup_$DATE.tar.gz" -C "$BACKUP_DIR" "$DATE"
- # 删除未压缩的备份
- rm -rf "$BACKUP_DIR/$DATE"
- # 清理旧备份
- echo "清理超过 $RETENTION_DAYS 天的旧备份..."
- find "$BACKUP_DIR" -name "mongodb_backup_*.tar.gz" -type f -mtime +$RETENTION_DAYS -delete
- echo "备份完成: $BACKUP_DIR/mongodb_backup_$DATE.tar.gz"
复制代码- const { MongoClient } = require('mongodb');
- const fs = require('fs');
- const path = require('path');
- async function restoreFromBackup(backupDir) {
- const uri = "mongodb://localhost:27017";
- const client = new MongoClient(uri);
-
- try {
- await client.connect();
-
- // 读取备份目录中的所有文件
- const backupFiles = fs.readdirSync(backupDir, { withFileTypes: true });
-
- for (const file of backupFiles) {
- if (file.isDirectory()) {
- // 处理数据库目录
- const dbName = file.name;
- const db = client.db(dbName);
-
- console.log(`恢复数据库: ${dbName}`);
-
- // 读取数据库中的所有集合文件
- const collectionFiles = fs.readdirSync(path.join(backupDir, dbName));
-
- for (const collectionFile of collectionFiles) {
- if (collectionFile.endsWith('.json')) {
- // 处理集合文件
- const collectionName = path.basename(collectionFile, '.json');
- const collection = db.collection(collectionName);
-
- console.log(` 恢复集合: ${dbName}.${collectionName}`);
-
- // 读取JSON文件
- const jsonContent = fs.readFileSync(path.join(backupDir, dbName, collectionFile), 'utf8');
- const documents = JSON.parse(jsonContent);
-
- if (documents.length > 0) {
- // 清空现有集合(可选)
- await collection.deleteMany({});
-
- // 插入文档
- await collection.insertMany(documents);
-
- console.log(` 恢复了 ${documents.length} 条文档`);
- }
- }
- }
- }
- }
-
- console.log("备份恢复完成");
- } finally {
- await client.close();
- }
- }
- // 使用示例
- // restoreFromBackup('/var/backups/mongodb/20231115_143022').catch(console.error);
复制代码
6.2 数据迁移
- import pymongo
- import json
- import psycopg2
- from datetime import datetime
- import sys
- def mongo_to_postgres():
- # MongoDB连接配置
- mongo_client = pymongo.MongoClient("mongodb://localhost:27017/")
- mongo_db = mongo_client["source_db"]
-
- # PostgreSQL连接配置
- pg_conn = psycopg2.connect(
- host="localhost",
- database="target_db",
- user="postgres",
- password="postgres"
- )
- pg_cursor = pg_conn.cursor()
-
- try:
- # 迁移用户数据
- print("迁移用户数据...")
- mongo_users = mongo_db["users"].find({})
-
- # 创建PostgreSQL表(如果不存在)
- pg_cursor.execute("""
- CREATE TABLE IF NOT EXISTS users (
- id SERIAL PRIMARY KEY,
- mongo_id VARCHAR(24) UNIQUE,
- name VARCHAR(100) NOT NULL,
- email VARCHAR(100) UNIQUE,
- age INTEGER,
- created_at TIMESTAMP,
- updated_at TIMESTAMP
- )
- """)
-
- # 插入用户数据
- for user in mongo_users:
- pg_cursor.execute(
- """
- INSERT INTO users (mongo_id, name, email, age, created_at, updated_at)
- VALUES (%s, %s, %s, %s, %s, %s)
- ON CONFLICT (mongo_id) DO UPDATE SET
- name = EXCLUDED.name,
- email = EXCLUDED.email,
- age = EXCLUDED.age,
- updated_at = EXCLUDED.updated_at
- """,
- (
- str(user["_id"]),
- user.get("name"),
- user.get("email"),
- user.get("age"),
- user.get("created_at"),
- user.get("updated_at")
- )
- )
-
- pg_conn.commit()
- print(f"用户数据迁移完成,共迁移 {mongo_db['users'].count_documents({})} 条记录")
-
- # 迁移订单数据
- print("迁移订单数据...")
- mongo_orders = mongo_db["orders"].find({})
-
- # 创建PostgreSQL表(如果不存在)
- pg_cursor.execute("""
- CREATE TABLE IF NOT EXISTS orders (
- id SERIAL PRIMARY KEY,
- mongo_id VARCHAR(24) UNIQUE,
- user_id INTEGER REFERENCES users(id),
- order_number VARCHAR(50),
- total DECIMAL(10,2),
- status VARCHAR(20),
- order_date TIMESTAMP,
- created_at TIMESTAMP,
- updated_at TIMESTAMP
- )
- """)
-
- # 创建用户ID映射(MongoDB _id 到 PostgreSQL id)
- pg_cursor.execute("SELECT mongo_id, id FROM users")
- user_id_map = {row[0]: row[1] for row in pg_cursor.fetchall()}
-
- # 插入订单数据
- for order in mongo_orders:
- # 获取对应的PostgreSQL用户ID
- user_mongo_id = str(order.get("user_id"))
- pg_user_id = user_id_map.get(user_mongo_id)
-
- if pg_user_id is None:
- print(f"警告: 找不到用户ID {user_mongo_id},跳过订单 {order.get('_id')}")
- continue
-
- pg_cursor.execute(
- """
- INSERT INTO orders (mongo_id, user_id, order_number, total, status, order_date, created_at, updated_at)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
- ON CONFLICT (mongo_id) DO UPDATE SET
- user_id = EXCLUDED.user_id,
- order_number = EXCLUDED.order_number,
- total = EXCLUDED.total,
- status = EXCLUDED.status,
- order_date = EXCLUDED.order_date,
- updated_at = EXCLUDED.updated_at
- """,
- (
- str(order["_id"]),
- pg_user_id,
- order.get("order_number"),
- order.get("total"),
- order.get("status"),
- order.get("order_date"),
- order.get("created_at"),
- order.get("updated_at")
- )
- )
-
- pg_conn.commit()
- print(f"订单数据迁移完成,共迁移 {mongo_db['orders'].count_documents({})} 条记录")
-
- print("数据迁移完成")
-
- except Exception as e:
- print(f"迁移过程中出错: {e}")
- pg_conn.rollback()
- finally:
- pg_cursor.close()
- pg_conn.close()
- mongo_client.close()
- if __name__ == "__main__":
- mongo_to_postgres()
复制代码- const { MongoClient } = require('mongodb');
- const mysql = require('mysql2/promise');
- const fs = require('fs');
- const path = require('path');
- async function mysqlToMongoDB() {
- // MySQL连接配置
- const mysqlConnection = await mysql.createConnection({
- host: 'localhost',
- user: 'root',
- password: 'password',
- database: 'source_db'
- });
-
- // MongoDB连接配置
- const mongoUri = "mongodb://localhost:27017";
- const mongoClient = new MongoClient(mongoUri);
-
- try {
- await mongoClient.connect();
- const mongoDb = mongoClient.db('target_db');
-
- // 迁移用户数据
- console.log('迁移用户数据...');
- const [users] = await mysqlConnection.execute('SELECT * FROM users');
- const usersCollection = mongoDb.collection('users');
-
- // 转换用户数据格式
- const userDocuments = users.map(user => ({
- _id: user.id, // 使用MySQL ID作为MongoDB _id
- name: user.name,
- email: user.email,
- age: user.age,
- profile: {
- address: user.address,
- phone: user.phone
- },
- createdAt: user.created_at,
- updatedAt: user.updated_at
- }));
-
- // 插入用户数据
- if (userDocuments.length > 0) {
- await usersCollection.insertMany(userDocuments);
- console.log(`用户数据迁移完成,共迁移 ${userDocuments.length} 条记录`);
- }
-
- // 迁移产品数据
- console.log('迁移产品数据...');
- const [products] = await mysqlConnection.execute('SELECT * FROM products');
- const productsCollection = mongoDb.collection('products');
-
- // 转换产品数据格式
- const productDocuments = products.map(product => ({
- _id: product.id,
- name: product.name,
- description: product.description,
- price: parseFloat(product.price),
- category: product.category,
- tags: product.tags ? product.tags.split(',') : [],
- specifications: JSON.parse(product.specifications || '{}'),
- stock: {
- quantity: product.stock_quantity,
- reserved: product.stock_reserved || 0,
- available: product.stock_quantity - (product.stock_reserved || 0)
- },
- createdAt: product.created_at,
- updatedAt: product.updated_at
- }));
-
- // 插入产品数据
- if (productDocuments.length > 0) {
- await productsCollection.insertMany(productDocuments);
- console.log(`产品数据迁移完成,共迁移 ${productDocuments.length} 条记录`);
- }
-
- // 迁移订单数据
- console.log('迁移订单数据...');
- const [orders] = await mysqlConnection.execute('SELECT * FROM orders');
- const ordersCollection = mongoDb.collection('orders');
-
- // 获取订单项数据
- const [orderItems] = await mysqlConnection.execute('SELECT * FROM order_items');
-
- // 按订单ID分组订单项
- const orderItemsByOrderId = {};
- orderItems.forEach(item => {
- if (!orderItemsByOrderId[item.order_id]) {
- orderItemsByOrderId[item.order_id] = [];
- }
- orderItemsByOrderId[item.order_id].push({
- productId: item.product_id,
- quantity: item.quantity,
- price: parseFloat(item.price),
- subtotal: parseFloat(item.subtotal)
- });
- });
-
- // 转换订单数据格式
- const orderDocuments = orders.map(order => {
- const items = orderItemsByOrderId[order.id] || [];
-
- return {
- _id: order.id,
- userId: order.user_id,
- orderNumber: order.order_number,
- status: order.status,
- items: items,
- subtotal: parseFloat(order.subtotal),
- tax: parseFloat(order.tax),
- shipping: parseFloat(order.shipping),
- total: parseFloat(order.total),
- shippingAddress: {
- street: order.shipping_street,
- city: order.shipping_city,
- state: order.shipping_state,
- zipCode: order.shipping_zip_code,
- country: order.shipping_country
- },
- paymentMethod: order.payment_method,
- paymentStatus: order.payment_status,
- orderDate: order.order_date,
- createdAt: order.created_at,
- updatedAt: order.updated_at
- };
- });
-
- // 插入订单数据
- if (orderDocuments.length > 0) {
- await ordersCollection.insertMany(orderDocuments);
- console.log(`订单数据迁移完成,共迁移 ${orderDocuments.length} 条记录`);
- }
-
- console.log('数据迁移完成');
-
- } catch (error) {
- console.error('迁移过程中出错:', error);
- } finally {
- await mysqlConnection.end();
- await mongoClient.close();
- }
- }
- mysqlToMongoDB().catch(console.error);
复制代码
6.3 数据分析和报告生成
- from pymongo import MongoClient
- import json
- import pandas as pd
- from datetime import datetime, timedelta
- import matplotlib.pyplot as plt
- import seaborn as sns
- def sales_data_analysis():
- # 连接到MongoDB
- client = MongoClient('mongodb://localhost:27017/')
- db = client['sales_db']
-
- # 定义分析日期范围
- end_date = datetime.now()
- start_date = end_date - timedelta(days=90) # 最近90天
-
- # 查询销售数据
- pipeline = [
- {
- '$match': {
- 'order_date': {
- '$gte': start_date,
- '$lte': end_date
- },
- 'status': 'completed'
- }
- },
- {
- '$project': {
- '_id': 0,
- 'order_id': '$_id',
- 'customer_id': 1,
- 'order_date': 1,
- 'total': 1,
- 'items': 1,
- 'payment_method': 1,
- 'year': {'$year': '$order_date'},
- 'month': {'$month': '$order_date'},
- 'day': {'$dayOfMonth': '$order_date'},
- 'day_of_week': {'$dayOfWeek': '$order_date'}
- }
- }
- ]
-
- # 执行聚合查询
- sales_data = list(db.orders.aggregate(pipeline))
-
- # 导出原始数据到JSON
- with open('sales_data_raw.json', 'w', encoding='utf-8') as f:
- json.dump(sales_data, f, ensure_ascii=False, indent=2, default=str)
-
- print(f"已导出 {len(sales_data)} 条销售记录到 sales_data_raw.json")
-
- # 转换为DataFrame进行分析
- df = pd.DataFrame(sales_data)
-
- # 1. 按日期分析销售趋势
- df['order_date'] = pd.to_datetime(df['order_date'])
- df.set_index('order_date', inplace=True)
-
- # 按天汇总销售数据
- daily_sales = df['total'].resample('D').sum()
-
- # 导出每日销售数据
- daily_sales.to_json('daily_sales.json', date_format='iso')
-
- # 2. 按产品分析销售情况
- product_sales = {}
- for _, order in df.iterrows():
- for item in order['items']:
- product_id = item['product_id']
- if product_id not in product_sales:
- product_sales[product_id] = {
- 'quantity': 0,
- 'revenue': 0
- }
- product_sales[product_id]['quantity'] += item['quantity']
- product_sales[product_id]['revenue'] += item['price'] * item['quantity']
-
- # 转换为DataFrame并排序
- product_df = pd.DataFrame.from_dict(product_sales, orient='index')
- product_df = product_df.sort_values('revenue', ascending=False)
-
- # 导出产品销售数据
- product_df.to_json('product_sales.json', orient='index')
-
- # 3. 按支付方式分析
- payment_stats = df.groupby('payment_method')['total'].agg(['count', 'sum', 'mean'])
-
- # 导出支付方式统计数据
- payment_stats.to_json('payment_stats.json', orient='index')
-
- # 4. 按星期几分析销售模式
- weekday_stats = df.groupby('day_of_week')['total'].agg(['count', 'sum', 'mean'])
-
- # 导出星期几统计数据
- weekday_stats.to_json('weekday_stats.json', orient='index')
-
- # 生成可视化图表
- plt.figure(figsize=(15, 10))
-
- # 每日销售趋势图
- plt.subplot(2, 2, 1)
- daily_sales.plot()
- plt.title('每日销售趋势')
- plt.xlabel('日期')
- plt.ylabel('销售额')
- plt.grid(True)
-
- # 产品销售Top 10
- plt.subplot(2, 2, 2)
- top_products = product_df.head(10)
- top_products['revenue'].plot(kind='bar')
- plt.title('产品销售额Top 10')
- plt.xlabel('产品ID')
- plt.ylabel('销售额')
- plt.xticks(rotation=45)
-
- # 支付方式分布
- plt.subplot(2, 2, 3)
- payment_stats['count'].plot(kind='pie', autopct='%1.1f%%')
- plt.title('支付方式分布')
- plt.ylabel('')
-
- # 星期几销售模式
- plt.subplot(2, 2, 4)
- weekday_names = ['周日', '周一', '周二', '周三', '周四', '周五', '周六']
- weekday_stats.index = [weekday_names[i-1] for i in weekday_stats.index]
- weekday_stats['sum'].plot(kind='bar')
- plt.title('按星期几的销售额')
- plt.xlabel('星期')
- plt.ylabel('销售额')
-
- plt.tight_layout()
- plt.savefig('sales_analysis.png')
- plt.close()
-
- # 生成分析报告
- report = f"""
- # 销售数据分析报告
- ## 分析期间
- {start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')}
- ## 总体销售情况
- - 总订单数: {len(df)}
- - 总销售额: {df['total'].sum():.2f}
- - 平均订单金额: {df['total'].mean():.2f}
- ## 销售趋势
- - 最高单日销售额: {daily_sales.max():.2f} ({daily_sales.idxmax().strftime('%Y-%m-%d')})
- - 最低单日销售额: {daily_sales.min():.2f} ({daily_sales.idxmin().strftime('%Y-%m-%d')})
- - 平均每日销售额: {daily_sales.mean():.2f}
- ## 热销产品
- - 销售额最高的产品: {product_df.index[0]} (销售额: {product_df.iloc[0]['revenue']:.2f})
- - 销量最高的产品: {product_df.sort_values('quantity', ascending=False).index[0]} (销量: {product_df.sort_values('quantity', ascending=False).iloc[0]['quantity']})
- ## 支付方式分析
- """
-
- for payment_method, stats in payment_stats.iterrows():
- report += f"- {payment_method}: {stats['count']} 笔订单, 总额 {stats['sum']:.2f}, 平均 {stats['mean']:.2f}\n"
-
- report += "\n## 星期几销售模式\n"
-
- for day, stats in weekday_stats.iterrows():
- report += f"- {day}: {stats['count']} 笔订单, 总额 {stats['sum']:.2f}, 平均 {stats['mean']:.2f}\n"
-
- # 保存报告
- with open('sales_analysis_report.md', 'w', encoding='utf-8') as f:
- f.write(report)
-
- print("销售数据分析完成,已生成以下文件:")
- print("- sales_data_raw.json: 原始销售数据")
- print("- daily_sales.json: 每日销售数据")
- print("- product_sales.json: 产品销售数据")
- print("- payment_stats.json: 支付方式统计数据")
- print("- weekday_stats.json: 星期几销售数据")
- print("- sales_analysis.png: 销售分析图表")
- print("- sales_analysis_report.md: 销售分析报告")
-
- client.close()
- if __name__ == "__main__":
- sales_data_analysis()
复制代码- const { MongoClient } = require('mongodb');
- const fs = require('fs');
- async function userBehaviorAnalysis() {
- const uri = "mongodb://localhost:27017";
- const client = new MongoClient(uri);
-
- try {
- await client.connect();
- const database = client.db("analytics_db");
-
- // 1. 用户活动分析
- console.log("分析用户活动...");
-
- const userActivityPipeline = [
- {
- $match: {
- timestamp: {
- $gte: new Date(new Date() - 30 * 24 * 60 * 60 * 1000) // 最近30天
- }
- }
- },
- {
- $group: {
- _id: "$userId",
- sessionCount: { $sum: 1 },
- lastActivity: { $max: "$timestamp" },
- activities: { $push: "$action" }
- }
- },
- {
- $project: {
- userId: "$_id",
- sessionCount: 1,
- lastActivity: 1,
- uniqueActivities: { $size: { $setUnion: "$activities" } },
- mostCommonActivity: {
- $arrayElemAt: [
- {
- $slice: [
- {
- $sortArray: {
- input: {
- $map: {
- input: { $setUnion: "$activities" },
- as: "activity",
- in: {
- activity: "$$activity",
- count: {
- $size: {
- $filter: {
- input: "$activities",
- cond: { $eq: ["$$this", "$$activity"] }
- }
- }
- }
- }
- }
- },
- sortBy: { count: -1 }
- }
- },
- 1
- ]
- },
- 0
- ]
- }
- }
- },
- {
- $sort: { sessionCount: -1 }
- }
- ];
-
- const userActivity = await database.collection("user_sessions").aggregate(userActivityPipeline).toArray();
-
- // 导出用户活动数据
- fs.writeFileSync('user_activity_analysis.json', JSON.stringify(userActivity, null, 2));
- console.log(`已导出 ${userActivity.length} 个用户的活动分析数据`);
-
- // 2. 用户路径分析
- console.log("分析用户路径...");
-
- const userPathPipeline = [
- {
- $match: {
- timestamp: {
- $gte: new Date(new Date() - 30 * 24 * 60 * 60 * 1000) // 最近30天
- }
- }
- },
- {
- $sort: { userId: 1, timestamp: 1 }
- },
- {
- $group: {
- _id: "$userId",
- paths: {
- $push: {
- page: "$page",
- timestamp: "$timestamp"
- }
- }
- }
- },
- {
- $project: {
- userId: "$_id",
- pathSequence: {
- $map: {
- input: "$paths",
- as: "path",
- in: "$$path.page"
- }
- },
- sessionCount: { $size: "$paths" },
- timeSpent: {
- $reduce: {
- input: { $slice: ["$paths", 1, { $size: "$paths" }] },
- initialValue: 0,
- in: {
- $add: [
- "$$value",
- {
- $subtract: [
- "$$this.timestamp",
- {
- $arrayElemAt: [
- "$paths",
- { $subtract: [{ $indexOfArray: ["$paths", "$$this"] }, 1] }
- ]
- }.timestamp
- ]
- }
- ]
- }
- }
- }
- }
- },
- {
- $addFields: {
- avgTimePerPage: {
- $cond: {
- if: { $gt: [{ $size: "$pathSequence" }, 1] },
- then: { $divide: ["$timeSpent", { $subtract: [{ $size: "$pathSequence" }, 1] } },
- else: 0
- }
- }
- }
- },
- {
- $sort: { sessionCount: -1 }
- }
- ];
-
- const userPaths = await database.collection("page_views").aggregate(userPathPipeline).toArray();
-
- // 导出用户路径数据
- fs.writeFileSync('user_path_analysis.json', JSON.stringify(userPaths, null, 2));
- console.log(`已导出 ${userPaths.length} 个用户的路径分析数据`);
-
- // 3. 转化漏斗分析
- console.log("分析转化漏斗...");
-
- const conversionFunnelPipeline = [
- {
- $match: {
- timestamp: {
- $gte: new Date(new Date() - 30 * 24 * 60 * 60 * 1000) // 最近30天
- }
- }
- },
- {
- $group: {
- _id: "$userId",
- events: { $push: "$event" },
- firstEvent: { $first: "$event" },
- lastEvent: { $last: "$event" },
- uniqueEvents: { $addToSet: "$event" }
- }
- },
- {
- $project: {
- userId: "$_id",
- events: 1,
- firstEvent: 1,
- lastEvent: 1,
- visitedHomepage: { $in: ["homepage", "$uniqueEvents"] },
- viewedProduct: { $in: ["view_product", "$uniqueEvents"] },
- addedToCart: { $in: ["add_to_cart", "$uniqueEvents"] },
- initiatedCheckout: { $in: ["initiate_checkout", "$uniqueEvents"] },
- completedPurchase: { $in: ["purchase", "$uniqueEvents"] }
- }
- },
- {
- $group: {
- _id: null,
- totalUsers: { $sum: 1 },
- homepageVisits: { $sum: { $cond: ["$visitedHomepage", 1, 0] } },
- productViews: { $sum: { $cond: ["$viewedProduct", 1, 0] } },
- cartAdditions: { $sum: { $cond: ["$addedToCart", 1, 0] } },
- checkouts: { $sum: { $cond: ["$initiatedCheckout", 1, 0] } },
- purchases: { $sum: { $cond: ["$completedPurchase", 1, 0] } }
- }
- },
- {
- $project: {
- _id: 0,
- totalUsers: 1,
- homepageVisits: 1,
- productViews: 1,
- cartAdditions: 1,
- checkouts: 1,
- purchases: 1,
- homepageConversionRate: { $divide: ["$homepageVisits", "$totalUsers"] },
- productViewRate: { $divide: ["$productViews", "$homepageVisits"] },
- cartAdditionRate: { $divide: ["$cartAdditions", "$productViews"] },
- checkoutRate: { $divide: ["$checkouts", "$cartAdditions"] },
- purchaseRate: { $divide: ["$purchases", "$checkouts"] },
- overallConversionRate: { $divide: ["$purchases", "$totalUsers"] }
- }
- }
- ];
-
- const conversionFunnel = await database.collection("user_events").aggregate(conversionFunnelPipeline).toArray();
-
- // 导出转化漏斗数据
- fs.writeFileSync('conversion_funnel_analysis.json', JSON.stringify(conversionFunnel, null, 2));
- console.log("已导出转化漏斗分析数据");
-
- // 4. 用户留存分析
- console.log("分析用户留存...");
-
- const userRetentionPipeline = [
- {
- $match: {
- event: "login",
- timestamp: {
- $gte: new Date(new Date() - 90 * 24 * 60 * 60 * 1000) // 最近90天
- }
- }
- },
- {
- $group: {
- _id: "$userId",
- firstLogin: { $min: "$timestamp" },
- lastLogin: { $max: "$timestamp" },
- loginCount: { $sum: 1 },
- loginDays: {
- $addToSet: {
- $dateToString: {
- format: "%Y-%m-%d",
- date: "$timestamp"
- }
- }
- }
- }
- },
- {
- $project: {
- userId: "$_id",
- firstLogin: 1,
- lastLogin: 1,
- loginCount: 1,
- activeDays: { $size: "$loginDays" },
- cohort: {
- $dateToString: {
- format: "%Y-%m",
- date: "$firstLogin"
- }
- }
- }
- },
- {
- $group: {
- _id: "$cohort",
- users: { $push: "$$ROOT" },
- userCount: { $sum: 1 }
- }
- },
- {
- $project: {
- cohort: "$_id",
- userCount: 1,
- users: 1,
- day1Retention: {
- $divide: [
- {
- $size: {
- $filter: {
- input: "$users",
- cond: {
- $gte: [
- {
- $dateDiff: {
- startDate: "$$this.firstLogin",
- endDate: "$$this.lastLogin",
- unit: "day"
- }
- },
- 1
- ]
- }
- }
- }
- },
- "$userCount"
- ]
- },
- day7Retention: {
- $divide: [
- {
- $size: {
- $filter: {
- input: "$users",
- cond: {
- $gte: [
- {
- $dateDiff: {
- startDate: "$$this.firstLogin",
- endDate: "$$this.lastLogin",
- unit: "day"
- }
- },
- 7
- ]
- }
- }
- }
- },
- "$userCount"
- ]
- },
- day30Retention: {
- $divide: [
- {
- $size: {
- $filter: {
- input: "$users",
- cond: {
- $gte: [
- {
- $dateDiff: {
- startDate: "$$this.firstLogin",
- endDate: "$$this.lastLogin",
- unit: "day"
- }
- },
- 30
- ]
- }
- }
- }
- },
- "$userCount"
- ]
- }
- }
- },
- {
- $sort: { cohort: 1 }
- }
- ];
-
- const userRetention = await database.collection("user_events").aggregate(userRetentionPipeline).toArray();
-
- // 导出用户留存数据
- fs.writeFileSync('user_retention_analysis.json', JSON.stringify(userRetention, null, 2));
- console.log(`已导出 ${userRetention.length} 个用户群组的留存分析数据`);
-
- // 生成分析报告
- const report = `# 用户行为分析报告
- ## 分析期间
- 最近30天(${new Date(new Date() - 30 * 24 * 60 * 60 * 1000).toISOString().split('T')[0]} 至 ${new Date().toISOString().split('T')[0]})
- ## 用户活动分析
- - 分析用户数: ${userActivity.length}
- - 平均每个用户会话数: ${(userActivity.reduce((sum, user) => sum + user.sessionCount, 0) / userActivity.length).toFixed(2)}
- - 最活跃用户: ${userActivity[0].userId} (${userActivity[0].sessionCount} 次会话)
- ## 用户路径分析
- - 分析用户数: ${userPaths.length}
- - 平均每个用户页面浏览数: ${(userPaths.reduce((sum, user) => sum + user.sessionCount, 0) / userPaths.length).toFixed(2)}
- - 平均每个用户停留时间: ${(userPaths.reduce((sum, user) => sum + user.timeSpent, 0) / userPaths.length / 1000 / 60).toFixed(2)} 分钟
- ## 转化漏斗分析
- `;
-
- if (conversionFunnel.length > 0) {
- const funnel = conversionFunnel[0];
- report += `
- - 总用户数: ${funnel.totalUsers}
- - 首页访问数: ${funnel.homepageVisits} (${(funnel.homepageConversionRate * 100).toFixed(2)}%)
- - 产品浏览数: ${funnel.productViews} (${(funnel.productViewRate * 100).toFixed(2)}%)
- - 加入购物车数: ${funnel.cartAdditions} (${(funnel.cartAdditionRate * 100).toFixed(2)}%)
- - 结账开始数: ${funnel.checkouts} (${(funnel.checkoutRate * 100).toFixed(2)}%)
- - 完成购买数: ${funnel.purchases} (${(funnel.purchaseRate * 100).toFixed(2)}%)
- - 整体转化率: ${(funnel.overallConversionRate * 100).toFixed(2)}%
- `;
- }
-
- report += "\n## 用户留存分析\n";
-
- if (userRetention.length > 0) {
- report += "| 群组 | 用户数 | 次日留存率 | 7日留存率 | 30日留存率 |\n";
- report += "|------|--------|------------|-----------|------------|\n";
-
- userRetention.forEach(cohort => {
- report += `| ${cohort.cohort} | ${cohort.userCount} | ${(cohort.day1Retention * 100).toFixed(2)}% | ${(cohort.day7Retention * 100).toFixed(2)}% | ${(cohort.day30Retention * 100).toFixed(2)}% |\n`;
- });
- }
-
- // 保存报告
- fs.writeFileSync('user_behavior_analysis_report.md', report);
-
- console.log("用户行为分析完成,已生成以下文件:");
- console.log("- user_activity_analysis.json: 用户活动分析数据");
- console.log("- user_path_analysis.json: 用户路径分析数据");
- console.log("- conversion_funnel_analysis.json: 转化漏斗分析数据");
- console.log("- user_retention_analysis.json: 用户留存分析数据");
- console.log("- user_behavior_analysis_report.md: 用户行为分析报告");
-
- } finally {
- await client.close();
- }
- }
- userBehaviorAnalysis().catch(console.error);
复制代码
7. 总结
本文详细介绍了从MongoDB数据库导出JSON数据的各种方法、工具和最佳实践。我们讨论了:
1. 多种导出方法:使用mongoexport命令行工具使用MongoDB Compass图形界面使用编程语言驱动程序(Node.js、Python、Java)使用聚合管道进行数据转换
2. 使用mongoexport命令行工具
3. 使用MongoDB Compass图形界面
4. 使用编程语言驱动程序(Node.js、Python、Java)
5. 使用聚合管道进行数据转换
6. 最佳实践:性能优化技术,包括批量处理和索引优化数据安全措施,包括敏感数据处理和访问控制大数据集处理策略,包括分片集合导出和增量导出
7. 性能优化技术,包括批量处理和索引优化
8. 数据安全措施,包括敏感数据处理和访问控制
9. 大数据集处理策略,包括分片集合导出和增量导出
10. 常见问题和解决方案:处理特殊数据类型(ObjectId、Date等)处理大数据量和内存限制处理连接和认证问题
11. 处理特殊数据类型(ObjectId、Date等)
12. 处理大数据量和内存限制
13. 处理连接和认证问题
14. 实际应用场景:数据备份与恢复数据迁移(MongoDB到其他数据库,其他数据库到MongoDB)数据分析和报告生成
15. 数据备份与恢复
16. 数据迁移(MongoDB到其他数据库,其他数据库到MongoDB)
17. 数据分析和报告生成
多种导出方法:
• 使用mongoexport命令行工具
• 使用MongoDB Compass图形界面
• 使用编程语言驱动程序(Node.js、Python、Java)
• 使用聚合管道进行数据转换
最佳实践:
• 性能优化技术,包括批量处理和索引优化
• 数据安全措施,包括敏感数据处理和访问控制
• 大数据集处理策略,包括分片集合导出和增量导出
常见问题和解决方案:
• 处理特殊数据类型(ObjectId、Date等)
• 处理大数据量和内存限制
• 处理连接和认证问题
实际应用场景:
• 数据备份与恢复
• 数据迁移(MongoDB到其他数据库,其他数据库到MongoDB)
• 数据分析和报告生成
通过本文提供的详细指南和代码示例,开发者可以根据自己的需求选择最适合的方法来导出MongoDB数据为JSON格式,并确保导出过程高效、安全且可靠。
无论您是需要进行数据备份、系统迁移、数据分析还是其他任何需要导出MongoDB数据的场景,本文提供的知识都将帮助您轻松实现数据导出与交换。 |
|