|
|
马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
引言
在当今数据驱动的世界中,数据质量是组织成功的关键因素之一。低质量的数据会导致错误的决策、低效的运营和错失商机。数据清洗作为数据预处理的重要环节,是确保数据质量的关键步骤。PostgreSQL作为一款功能强大的开源关系型数据库管理系统,提供了丰富的工具和功能,使其成为进行高效数据清洗的理想选择。
本文将详细介绍如何使用PostgreSQL进行高效的数据清洗,从识别异常数据到数据标准化,帮助你一步步提升数据质量。我们将通过实际代码示例和最佳实践,展示如何充分利用PostgreSQL的功能来解决各种数据清洗挑战。
数据清洗的基本概念和流程
数据清洗是指识别并纠正(或删除)数据中的错误、不一致和不准确之处的过程。数据清洗的主要目标是提高数据质量,使其适合分析、报告和决策制定。
数据清洗的基本流程通常包括以下步骤:
1. 数据评估:了解数据的结构、内容和质量
2. 识别异常数据:发现缺失值、重复值、异常值和不一致数据
3. 处理异常数据:根据业务规则和数据特性,决定如何处理识别出的问题
4. 数据标准化:将数据转换为一致的格式和标准
5. 验证:确保清洗后的数据满足质量要求
PostgreSQL提供了强大的SQL功能、丰富的数据类型和内置函数,以及可扩展的架构,使其能够高效地执行这些数据清洗任务。
使用PostgreSQL识别异常数据
缺失值检测
缺失值是数据集中最常见的问题之一。在PostgreSQL中,我们可以使用多种方法来检测缺失值(NULL值)。
- -- 检查表中每列的NULL值数量
- SELECT
- column_name,
- COUNT(*) - COUNT(column_name) AS null_count
- FROM
- your_table
- CROSS JOIN
- (SELECT array_agg(attname) AS columns FROM pg_attribute WHERE attrelid = 'your_table'::regclass AND attnum > 0 AND NOT attisdropped) AS cols,
- unnest(columns) AS column_name
- GROUP BY
- column_name;
复制代码
更简单的方法是针对特定列进行检查:
- -- 检查特定列的NULL值
- SELECT
- COUNT(*) AS total_rows,
- COUNT(column_name) AS non_null_rows,
- COUNT(*) - COUNT(column_name) AS null_rows,
- ROUND((COUNT(*) - COUNT(column_name)) * 100.0 / COUNT(*), 2) AS null_percentage
- FROM
- your_table;
复制代码
对于大型表,我们可以使用采样方法来快速评估数据质量:
- -- 使用TABLESAMPLE进行数据采样检查
- SELECT
- column_name,
- COUNT(*) - COUNT(column_name) AS null_count
- FROM
- your_table TABLESAMPLE BERNOULLI(10) -- 10%的随机采样
- CROSS JOIN
- unnest(ARRAY['column1', 'column2', 'column3']) AS column_name
- GROUP BY
- column_name;
复制代码
重复数据检测
重复数据是另一个常见的数据质量问题。PostgreSQL提供了多种方法来检测重复数据。
- -- 检测完全重复的行
- SELECT
- column1, column2, column3, COUNT(*) AS duplicate_count
- FROM
- your_table
- GROUP BY
- column1, column2, column3
- HAVING
- COUNT(*) > 1;
复制代码
如果需要基于特定列检测重复数据:
- -- 检测基于特定列的重复数据
- SELECT
- duplicate_column, COUNT(*) AS duplicate_count
- FROM
- your_table
- GROUP BY
- duplicate_column
- HAVING
- COUNT(*) > 1;
复制代码
要查看完整的重复记录,可以使用窗口函数:
- -- 使用窗口函数查看重复记录
- WITH duplicates AS (
- SELECT
- *,
- COUNT(*) OVER (PARTITION BY duplicate_column) AS duplicate_count
- FROM
- your_table
- )
- SELECT
- *
- FROM
- duplicates
- WHERE
- duplicate_count > 1
- ORDER BY
- duplicate_column, id;
复制代码
异常值检测
异常值是显著偏离其他观测值的数据点。检测异常值的方法多种多样,下面是一些常用的统计方法。
使用标准差法检测数值异常:
- -- 使用标准差法检测异常值(假设数据呈正态分布)
- SELECT *
- FROM your_table
- WHERE numeric_column < (SELECT AVG(numeric_column) - 3 * STDDEV(numeric_column) FROM your_table)
- OR numeric_column > (SELECT AVG(numeric_column) + 3 * STDDEV(numeric_column) FROM your_table);
复制代码
使用四分位距(IQR)法检测异常值:
- -- 使用四分位距(IQR)法检测异常值
- WITH stats AS (
- SELECT
- PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY numeric_column) AS q1,
- PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY numeric_column) AS q3
- FROM
- your_table
- )
- SELECT *
- FROM your_table, stats
- WHERE numeric_column < (q1 - 1.5 * (q3 - q1))
- OR numeric_column > (q3 + 1.5 * (q3 - q1));
复制代码
对于文本数据,可以检测不符合特定模式的值:
- -- 检测不符合特定模式的文本数据(例如,电子邮件格式)
- SELECT *
- FROM your_table
- WHERE email_column !~ '^[A-Za-z0-9._%-]+@[A-Za-z0-9.-]+[.][A-Za-z]+$';
复制代码
不一致数据检测
不一致数据可能表现为格式不统一、编码不一致或违反业务规则等问题。
- -- 检测日期格式不一致的数据
- SELECT DISTINCT
- date_column,
- CASE
- WHEN date_column ~ '^\d{4}-\d{2}-\d{2}$' THEN 'YYYY-MM-DD'
- WHEN date_column ~ '^\d{2}/\d{2}/\d{4}$' THEN 'MM/DD/YYYY'
- WHEN date_column ~ '^\d{2}-\d{2}-\d{4}$' THEN 'MM-DD-YYYY'
- ELSE 'Unknown format'
- END AS detected_format
- FROM
- your_table
- WHERE
- date_column IS NOT NULL;
复制代码- -- 检测违反业务规则的数据(例如,年龄小于0或大于150)
- SELECT *
- FROM your_table
- WHERE age_column < 0 OR age_column > 150;
复制代码
数据清洗技术
处理缺失值
处理缺失值的方法取决于数据特性和业务需求。常见的处理方法包括删除、替换和插补。
- -- 删除包含NULL值的行
- DELETE FROM your_table
- WHERE column_name IS NULL;
- -- 或者创建一个新表,不包含NULL值行
- CREATE TABLE cleaned_table AS
- SELECT * FROM your_table
- WHERE column_name IS NOT NULL;
复制代码- -- 使用固定值替换NULL值
- UPDATE your_table
- SET column_name = 'default_value'
- WHERE column_name IS NULL;
- -- 使用列的平均值替换NULL值(数值列)
- UPDATE your_table
- SET numeric_column = COALESCE(numeric_column, (SELECT AVG(numeric_column) FROM your_table WHERE numeric_column IS NOT NULL))
- WHERE numeric_column IS NULL;
复制代码- -- 使用前一个或后一个有效值填充NULL值(时间序列数据)
- -- 首先确保表有适当的排序列,如时间戳
- UPDATE your_table t1
- SET column_name = (
- SELECT column_name
- FROM your_table t2
- WHERE t2.id < t1.id AND t2.column_name IS NOT NULL
- ORDER BY t2.id DESC
- LIMIT 1
- )
- WHERE t1.column_name IS NULL;
复制代码
对于更复杂的插补方法,可以使用PostgreSQL的扩展功能,如PL/R或PL/Python,实现线性插值、回归插值等高级方法。
处理重复数据
处理重复数据通常涉及删除重复项或合并重复记录。
- -- 删除完全重复的行,保留每个组中的一个
- DELETE FROM your_table
- WHERE ctid NOT IN (
- SELECT min(ctid)
- FROM your_table
- GROUP BY column1, column2, column3
- );
- -- 注意:ctid是PostgreSQL的物理行标识符,每个表都有
复制代码- -- 创建一个新表,只包含不重复的记录
- CREATE TABLE unique_table AS
- SELECT DISTINCT ON (duplicate_column) *
- FROM your_table
- ORDER BY duplicate_column, id;
复制代码
对于需要合并重复记录的情况,可以使用聚合函数:
- -- 合并重复记录,例如保留最新记录
- WITH ranked AS (
- SELECT
- *,
- ROW_NUMBER() OVER (PARTITION BY duplicate_column ORDER BY update_timestamp DESC) AS rn
- FROM
- your_table
- )
- DELETE FROM your_table
- WHERE id IN (SELECT id FROM ranked WHERE rn > 1);
复制代码
处理异常值
处理异常值的方法包括删除、替换和转换。
- -- 删除异常值
- DELETE FROM your_table
- WHERE numeric_column < (SELECT AVG(numeric_column) - 3 * STDDEV(numeric_column) FROM your_table)
- OR numeric_column > (SELECT AVG(numeric_column) + 3 * STDDEV(numeric_column) FROM your_table);
复制代码- -- 将异常值替换为边界值(winsorization)
- WITH stats AS (
- SELECT
- AVG(numeric_column) - 3 * STDDEV(numeric_column) AS lower_bound,
- AVG(numeric_column) + 3 * STDDEV(numeric_column) AS upper_bound
- FROM
- your_table
- )
- UPDATE your_table
- SET numeric_column =
- CASE
- WHEN numeric_column < (SELECT lower_bound FROM stats) THEN (SELECT lower_bound FROM stats)
- WHEN numeric_column > (SELECT upper_bound FROM stats) THEN (SELECT upper_bound FROM stats)
- ELSE numeric_column
- END;
复制代码- -- 对数转换处理偏态分布中的异常值
- UPDATE your_table
- SET log_numeric_column = LOG(numeric_column + 1) -- 加1避免log(0)未定义
- WHERE numeric_column > 0;
复制代码
数据转换和标准化
数据转换和标准化是确保数据一致性和可比性的关键步骤。
- -- 文本数据标准化(例如,统一大小写)
- UPDATE your_table
- SET text_column = UPPER(text_column);
- -- 或者使用首字母大写
- UPDATE your_table
- SET text_column = INITCAP(text_column);
复制代码- -- 去除文本数据中的多余空格
- UPDATE your_table
- SET text_column = TRIM(BOTH ' ' FROM text_column);
- -- 替换多个空格为单个空格
- UPDATE your_table
- SET text_column = REGEXP_REPLACE(text_column, '[ ]{2,}', ' ', 'g');
复制代码- -- 标准化日期格式
- UPDATE your_table
- SET date_column = TO_DATE(date_column, 'YYYY-MM-DD')
- WHERE date_column ~ '^\d{4}-\d{2}-\d{2}$';
- UPDATE your_table
- SET date_column = TO_DATE(date_column, 'MM/DD/YYYY')
- WHERE date_column ~ '^\d{2}/\d{2}/\d{4}$';
复制代码
数据标准化和规范化
数据格式标准化
数据格式标准化确保相同类型的数据在整个数据库中具有一致的格式。
- -- 电话号码标准化
- UPDATE your_table
- SET phone_column = REGEXP_REPLACE(phone_column, '[^0-9]', '', 'g');
- -- 然后格式化为标准形式
- UPDATE your_table
- SET phone_column =
- CASE
- WHEN LENGTH(phone_column) = 10 THEN '(' || SUBSTRING(phone_column, 1, 3) || ') ' || SUBSTRING(phone_column, 4, 3) || '-' || SUBSTRING(phone_column, 7, 4)
- WHEN LENGTH(phone_column) = 11 AND SUBSTRING(phone_column, 1, 1) = '1' THEN '(' || SUBSTRING(phone_column, 2, 3) || ') ' || SUBSTRING(phone_column, 5, 3) || '-' || SUBSTRING(phone_column, 8, 4)
- ELSE phone_column -- 保持原样,如果不符合已知格式
- END;
复制代码- -- 地址标准化(简单示例)
- UPDATE your_table
- SET address_column = INITCAP(TRIM(BOTH ' ' FROM address_column));
复制代码
数据类型转换
确保数据以正确的类型存储对于数据质量和查询性能至关重要。
- -- 将文本转换为数值类型
- UPDATE your_table
- SET numeric_column = CAST(text_numeric_column AS NUMERIC)
- WHERE text_numeric_column ~ '^[0-9]+(\.[0-9]+)?$';
- -- 将文本转换为日期类型
- UPDATE your_table
- SET date_column = TO_DATE(text_date_column, 'YYYY-MM-DD')
- WHERE text_date_column ~ '^\d{4}-\d{2}-\d{2}$';
复制代码- -- 创建一个新表,使用正确的数据类型
- CREATE TABLE typed_table AS
- SELECT
- id,
- CAST(text_numeric_column AS NUMERIC) AS numeric_column,
- TO_DATE(text_date_column, 'YYYY-MM-DD') AS date_column,
- -- 其他列
- FROM
- your_table;
复制代码
数据规范化
数据规范化是减少数据冗余和提高数据一致性的过程。
- -- 示例:将非规范化的表转换为第一范式(1NF)
- -- 假设原始表有一个包含多个值的列(如逗号分隔的标签)
- -- 创建新表存储标签
- CREATE TABLE tags (
- id SERIAL PRIMARY KEY,
- tag_name VARCHAR(100) UNIQUE NOT NULL
- );
- -- 创建关联表
- CREATE TABLE item_tags (
- item_id INTEGER REFERENCES items(id),
- tag_id INTEGER REFERENCES tags(id),
- PRIMARY KEY (item_id, tag_id)
- );
- -- 提取唯一标签并插入tags表
- INSERT INTO tags (tag_name)
- SELECT DISTINCT UNNEST(STRING_TO_ARRAY(tags_column, ','))
- FROM items
- WHERE tags_column IS NOT NULL;
- -- 填充关联表
- INSERT INTO item_tags (item_id, tag_id)
- SELECT
- i.id,
- t.id
- FROM
- items i
- CROSS JOIN
- UNNEST(STRING_TO_ARRAY(i.tags_column, ',')) AS tag_name
- JOIN
- tags t ON t.tag_name = tag_name
- WHERE
- i.tags_column IS NOT NULL;
复制代码- -- 示例:将表转换为第三范式(3NF)
- -- 假设有一个包含员工和部门信息的表
- -- 创建部门表
- CREATE TABLE departments (
- id SERIAL PRIMARY KEY,
- department_name VARCHAR(100) UNIQUE NOT NULL,
- location VARCHAR(100)
- );
- -- 从员工表中提取唯一部门
- INSERT INTO departments (department_name, location)
- SELECT DISTINCT department_name, department_location
- FROM employees
- WHERE department_name IS NOT NULL;
- -- 更新员工表,引用部门ID
- ALTER TABLE employees ADD COLUMN department_id INTEGER;
- UPDATE employees e
- SET department_id = d.id
- FROM departments d
- WHERE e.department_name = d.department_name;
- -- 移除原始部门列
- ALTER TABLE employees DROP COLUMN department_name;
- ALTER TABLE employees DROP COLUMN department_location;
- -- 添加外键约束
- ALTER TABLE employees ADD CONSTRAINT fk_department
- FOREIGN KEY (department_id) REFERENCES departments(id);
复制代码
高级数据清洗技术
使用窗口函数进行数据清洗
窗口函数是PostgreSQL中强大的工具,特别适用于复杂的数据清洗任务。
- -- 使用窗口函数填充时间序列中的缺失值
- WITH numbered AS (
- SELECT
- date,
- value,
- ROW_NUMBER() OVER (ORDER BY date) AS rn
- FROM
- time_series_data
- )
- SELECT
- d1.date,
- COALESCE(
- d1.value,
- (
- SELECT d2.value
- FROM numbered d2
- WHERE d2.rn = (
- SELECT MAX(d3.rn)
- FROM numbered d3
- WHERE d3.rn < d1.rn AND d3.value IS NOT NULL
- )
- )
- ) AS filled_value
- FROM
- numbered d1
- ORDER BY
- d1.date;
复制代码- -- 使用窗口函数识别数据中的突变点
- WITH stats AS (
- SELECT
- date,
- value,
- LAG(value) OVER (ORDER BY date) AS prev_value,
- value - LAG(value) OVER (ORDER BY date) AS day_over_day_change,
- STDDEV(value) OVER (ORDER BY date ROWS BETWEEN 30 PRECEDING AND CURRENT ROW) AS rolling_stddev
- FROM
- time_series_data
- )
- SELECT
- date,
- value,
- prev_value,
- day_over_day_change,
- ABS(day_over_day_change) / rolling_stddev AS z_score
- FROM
- stats
- WHERE
- ABS(day_over_day_change) / rolling_stddev > 3 -- 3个标准差的变化
- ORDER BY
- date;
复制代码
使用正则表达式进行复杂数据清洗
PostgreSQL的正则表达式功能非常强大,适用于复杂的数据清洗任务。
- -- 从文本中提取特定模式的数据(例如,从地址中提取邮政编码)
- UPDATE your_table
- SET zip_code = REGEXP_REPLACE(address_column, '.*\b([0-9]{5})\b.*', '\1')
- WHERE address_column ~ '\b[0-9]{5}\b';
- -- 从混合格式的电话号码中提取数字
- UPDATE your_table
- SET clean_phone = REGEXP_REPLACE(phone_column, '[^0-9]', '', 'g');
复制代码- -- 标准化姓名格式(例如,将"Smith, John"转换为"John Smith")
- UPDATE your_table
- SET full_name =
- CASE
- WHEN full_name ~ '^[A-Za-z-]+, [A-Za-z-]+$' THEN
- REGEXP_REPLACE(full_name, '^([A-Za-z-]+), ([A-Za-z-]+)$', '\2 \1')
- ELSE full_name
- END;
复制代码- -- 验证和标准化电子邮件地址
- -- 首先检查电子邮件格式是否有效
- SELECT *
- FROM your_table
- WHERE email_column !~ '^[A-Za-z0-9._%-]+@[A-Za-z0-9.-]+[.][A-Za-z]+$';
- -- 标准化电子邮件地址(转换为小写)
- UPDATE your_table
- SET email_column = LOWER(TRIM(email_column));
复制代码
使用PostgreSQL扩展进行数据清洗
PostgreSQL的扩展生态系统提供了许多额外的功能,可以增强数据清洗能力。
- -- 安装和使用fuzzystrmatch扩展进行模糊字符串匹配
- -- 首先安装扩展
- CREATE EXTENSION IF NOT EXISTS fuzzystrmatch;
- -- 使用levenshtein距离查找相似的字符串
- SELECT
- column1,
- column2,
- LEVENSHTEIN(column1, column2) AS distance
- FROM
- your_table
- WHERE
- LEVENSHTEIN(column1, column2) < 3; -- 距离小于3的字符串被认为是相似的
复制代码- -- 使用pg_trgm扩展进行更高级的文本匹配
- -- 安装扩展
- CREATE EXTENSION IF NOT EXISTS pg_trgm;
- -- 使用相似度函数查找相似的字符串
- SELECT
- column1,
- column2,
- SIMILARITY(column1, column2) AS similarity_score
- FROM
- your_table
- WHERE
- column1 % column2; -- %操作符表示相似度超过pg_trgm相似性阈值
复制代码- -- 使用tablefunc扩展进行数据透视
- -- 安装扩展
- CREATE EXTENSION IF NOT EXISTS tablefunc;
- -- 创建交叉表(数据透视表)
- SELECT *
- FROM crosstab(
- 'SELECT category_id, month, amount FROM sales ORDER BY 1,2',
- 'SELECT DISTINCT month FROM sales ORDER BY 1'
- ) AS ct(category_id INTEGER, jan NUMERIC, feb NUMERIC, mar NUMERIC, apr NUMERIC, may NUMERIC, jun NUMERIC,
- jul NUMERIC, aug NUMERIC, sep NUMERIC, oct NUMERIC, nov NUMERIC, dec NUMERIC);
复制代码
数据清洗的最佳实践
1. 始终备份原始数据
在进行任何数据清洗操作之前,始终创建原始数据的备份。
- -- 创建原始表的备份
- CREATE TABLE your_table_backup AS SELECT * FROM your_table;
- -- 或者使用事务,以便在出现问题时可以回滚
- BEGIN;
- -- 执行数据清洗操作
- UPDATE your_table SET column_name = TRIM(column_name);
- -- 检查结果
- SELECT COUNT(*) FROM your_table WHERE column_name LIKE ' %' OR column_name LIKE '% ';
- -- 如果结果不符合预期,可以回滚
- -- ROLLBACK;
- -- 如果结果符合预期,提交事务
- COMMIT;
复制代码
2. 使用事务确保数据一致性
对于复杂的数据清洗操作,使用事务可以确保操作的原子性。
- BEGIN;
- -- 执行多个相关的数据清洗操作
- UPDATE customers SET email = LOWER(TRIM(email));
- UPDATE orders SET status = UPPER(TRIM(status));
- UPDATE products SET name = INITCAP(TRIM(name));
- -- 验证结果
- SELECT COUNT(*) FROM customers WHERE email <> LOWER(TRIM(email));
- SELECT COUNT(*) FROM orders WHERE status <> UPPER(TRIM(status));
- SELECT COUNT(*) FROM products WHERE name <> INITCAP(TRIM(name));
- -- 如果所有计数都为0,则提交事务
- COMMIT;
- -- 否则回滚
- -- ROLLBACK;
复制代码
3. 创建可重复的数据清洗脚本
将数据清洗操作组织成可重复的脚本,便于维护和重用。
- -- 创建一个函数来封装数据清洗逻辑
- CREATE OR REPLACE FUNCTION clean_customer_data()
- RETURNS VOID AS $$
- BEGIN
- -- 清洗电子邮件
- UPDATE customers SET email = LOWER(TRIM(email)) WHERE email IS NOT NULL;
-
- -- 标准化电话号码
- UPDATE customers SET phone = REGEXP_REPLACE(phone, '[^0-9]', '', 'g') WHERE phone IS NOT NULL;
-
- -- 清洗姓名
- UPDATE customers SET first_name = INITCAP(TRIM(first_name)) WHERE first_name IS NOT NULL;
- UPDATE customers SET last_name = INITCAP(TRIM(last_name)) WHERE last_name IS NOT NULL;
-
- -- 处理缺失值
- UPDATE customers SET middle_name = NULL WHERE TRIM(middle_name) = '';
-
- RAISE NOTICE 'Customer data cleaned successfully';
- END;
- $$ LANGUAGE plpgsql;
- -- 执行函数
- SELECT clean_customer_data();
复制代码
4. 记录数据清洗过程
维护数据清洗操作的记录,以便追踪和审计。
- -- 创建数据清洗日志表
- CREATE TABLE data_cleaning_log (
- id SERIAL PRIMARY KEY,
- table_name VARCHAR(100) NOT NULL,
- operation VARCHAR(100) NOT NULL,
- operation_time TIMESTAMP NOT NULL DEFAULT NOW(),
- rows_affected INTEGER,
- details TEXT
- );
- -- 创建一个函数来记录数据清洗操作
- CREATE OR REPLACE FUNCTION log_cleaning_operation(
- p_table_name VARCHAR,
- p_operation VARCHAR,
- p_rows_affected INTEGER DEFAULT NULL,
- p_details TEXT DEFAULT NULL
- ) RETURNS VOID AS $$
- BEGIN
- INSERT INTO data_cleaning_log (table_name, operation, rows_affected, details)
- VALUES (p_table_name, p_operation, p_rows_affected, p_details);
- END;
- $$ LANGUAGE plpgsql;
- -- 在数据清洗操作中使用日志函数
- BEGIN;
- -- 执行数据清洗操作
- UPDATE customers SET email = LOWER(TRIM(email)) WHERE email IS NOT NULL;
- -- 记录操作
- SELECT log_cleaning_operation('customers', 'Standardize email format', ROW_COUNT);
- COMMIT;
复制代码
5. 验证数据清洗结果
数据清洗后,验证结果以确保数据质量得到改善。
- -- 创建数据质量检查函数
- CREATE OR REPLACE FUNCTION validate_data_quality()
- RETURNS TABLE(check_name VARCHAR, failed_rows INTEGER) AS $$
- BEGIN
- RETURN QUERY
- -- 检查NULL值
- SELECT 'NULL emails' AS check_name, COUNT(*) AS failed_rows
- FROM customers WHERE email IS NULL
-
- UNION ALL
-
- -- 检查无效电子邮件格式
- SELECT 'Invalid email format', COUNT(*)
- FROM customers WHERE email !~ '^[A-Za-z0-9._%-]+@[A-Za-z0-9.-]+[.][A-Za-z]+$'
-
- UNION ALL
-
- -- 检查重复电子邮件
- SELECT 'Duplicate emails', COUNT(*) - COUNT(DISTINCT email)
- FROM customers WHERE email IS NOT NULL;
- END;
- $$ LANGUAGE plpgsql;
- -- 执行数据质量检查
- SELECT * FROM validate_data_quality();
复制代码
案例研究:使用PostgreSQL进行实际数据清洗项目
让我们通过一个实际案例来综合应用前面介绍的技术。假设我们有一个客户数据表,需要进行全面的数据清洗。
场景描述
我们有一个客户表customers_raw,包含以下字段:
• id: 客户ID
• first_name: 名
• last_name: 姓
• email: 电子邮件
• phone: 电话号码
• address: 地址
• city: 城市
• state: 州/省
• zip_code: 邮政编码
• registration_date: 注册日期
• last_purchase_date: 最后购买日期
• total_purchases: 总购买次数
• total_amount: 总购买金额
数据中存在各种质量问题,包括缺失值、重复记录、格式不一致、异常值等。
步骤1: 数据评估
首先,我们需要评估数据质量,识别问题。
- -- 创建数据质量评估表
- CREATE TABLE data_quality_assessment AS
- SELECT
- 'customers_raw' AS table_name,
- 'first_name' AS column_name,
- COUNT(*) AS total_rows,
- COUNT(first_name) AS non_null_rows,
- COUNT(*) - COUNT(first_name) AS null_rows,
- ROUND((COUNT(*) - COUNT(first_name)) * 100.0 / COUNT(*), 2) AS null_percentage,
- COUNT(DISTINCT first_name) AS unique_values,
- COUNT(*) - COUNT(DISTINCT first_name) AS duplicate_values
- FROM
- customers_raw
- UNION ALL
- SELECT
- 'customers_raw' AS table_name,
- 'last_name' AS column_name,
- COUNT(*) AS total_rows,
- COUNT(last_name) AS non_null_rows,
- COUNT(*) - COUNT(last_name) AS null_rows,
- ROUND((COUNT(*) - COUNT(last_name)) * 100.0 / COUNT(*), 2) AS null_percentage,
- COUNT(DISTINCT last_name) AS unique_values,
- COUNT(*) - COUNT(DISTINCT last_name) AS duplicate_values
- FROM
- customers_raw
- -- 为其他列重复此模式...
- UNION ALL
- SELECT
- 'customers_raw' AS table_name,
- 'email' AS column_name,
- COUNT(*) AS total_rows,
- COUNT(email) AS non_null_rows,
- COUNT(*) - COUNT(email) AS null_rows,
- ROUND((COUNT(*) - COUNT(email)) * 100.0 / COUNT(*), 2) AS null_percentage,
- COUNT(DISTINCT email) AS unique_values,
- COUNT(*) - COUNT(DISTINCT email) AS duplicate_values
- FROM
- customers_raw;
- -- 查看数据质量评估结果
- SELECT * FROM data_quality_assessment ORDER BY null_percentage DESC;
复制代码
步骤2: 创建清洗后的表
为了避免直接修改原始数据,我们创建一个新表来存储清洗后的数据。
- -- 创建清洗后的表结构
- CREATE TABLE customers_clean (
- id SERIAL PRIMARY KEY,
- first_name VARCHAR(50),
- last_name VARCHAR(50),
- email VARCHAR(100) UNIQUE,
- phone VARCHAR(20),
- address VARCHAR(100),
- city VARCHAR(50),
- state VARCHAR(50),
- zip_code VARCHAR(20),
- registration_date DATE,
- last_purchase_date DATE,
- total_purchases INTEGER,
- total_amount NUMERIC(10,2)
- );
复制代码
步骤3: 处理重复数据
- -- 识别基于电子邮件的重复记录
- WITH duplicate_emails AS (
- SELECT
- email,
- COUNT(*) AS duplicate_count
- FROM
- customers_raw
- WHERE
- email IS NOT NULL
- GROUP BY
- email
- HAVING
- COUNT(*) > 1
- ),
- ranked_customers AS (
- SELECT
- c.*,
- ROW_NUMBER() OVER (PARTITION BY c.email ORDER BY c.last_purchase_date DESC NULLS LAST, c.registration_date DESC) AS rn
- FROM
- customers_raw c
- JOIN
- duplicate_emails d ON c.email = d.email
- )
- -- 查看将被保留的记录
- SELECT * FROM ranked_customers WHERE rn = 1;
- -- 插入非重复记录和每个重复组中的最新记录
- INSERT INTO customers_clean (
- first_name, last_name, email, phone, address, city, state, zip_code,
- registration_date, last_purchase_date, total_purchases, total_amount
- )
- SELECT
- first_name, last_name, email, phone, address, city, state, zip_code,
- registration_date, last_purchase_date, total_purchases, total_amount
- FROM (
- SELECT
- c.*,
- ROW_NUMBER() OVER (PARTITION BY c.email ORDER BY c.last_purchase_date DESC NULLS LAST, c.registration_date DESC) AS rn
- FROM
- customers_raw c
- WHERE
- c.email IS NOT NULL
- ) ranked
- WHERE rn = 1;
- -- 插入没有电子邮件的记录
- INSERT INTO customers_clean (
- first_name, last_name, email, phone, address, city, state, zip_code,
- registration_date, last_purchase_date, total_purchases, total_amount
- )
- SELECT
- first_name, last_name, email, phone, address, city, state, zip_code,
- registration_date, last_purchase_date, total_purchases, total_amount
- FROM
- customers_raw
- WHERE
- email IS NULL;
复制代码
步骤4: 清洗和标准化数据
- -- 清洗和标准化姓名
- UPDATE customers_clean
- SET first_name = INITCAP(TRIM(first_name))
- WHERE first_name IS NOT NULL;
- UPDATE customers_clean
- SET last_name = INITCAP(TRIM(last_name))
- WHERE last_name IS NOT NULL;
- -- 标准化电子邮件
- UPDATE customers_clean
- SET email = LOWER(TRIM(email))
- WHERE email IS NOT NULL;
- -- 标准化电话号码
- UPDATE customers_clean
- SET phone = REGEXP_REPLACE(phone, '[^0-9]', '', 'g')
- WHERE phone IS NOT NULL;
- -- 格式化电话号码
- UPDATE customers_clean
- SET phone =
- CASE
- WHEN LENGTH(phone) = 10 THEN '(' || SUBSTRING(phone, 1, 3) || ') ' || SUBSTRING(phone, 4, 3) || '-' || SUBSTRING(phone, 7, 4)
- WHEN LENGTH(phone) = 11 AND SUBSTRING(phone, 1, 1) = '1' THEN '(' || SUBSTRING(phone, 2, 3) || ') ' || SUBSTRING(phone, 5, 3) || '-' || SUBSTRING(phone, 8, 4)
- ELSE phone
- END
- WHERE phone IS NOT NULL;
- -- 标准化地址
- UPDATE customers_clean
- SET address = INITCAP(TRIM(address))
- WHERE address IS NOT NULL;
- -- 标准化城市
- UPDATE customers_clean
- SET city = INITCAP(TRIM(city))
- WHERE city IS NOT NULL;
- -- 标准化州/省
- UPDATE customers_clean
- SET state = UPPER(TRIM(state))
- WHERE state IS NOT NULL;
- -- 标准化邮政编码
- UPDATE customers_clean
- SET zip_code = TRIM(zip_code)
- WHERE zip_code IS NOT NULL;
- -- 标准化日期
- UPDATE customers_clean
- SET registration_date = TO_DATE(registration_date, 'YYYY-MM-DD')
- WHERE registration_date ~ '^\d{4}-\d{2}-\d{2}$';
- UPDATE customers_clean
- SET last_purchase_date = TO_DATE(last_purchase_date, 'YYYY-MM-DD')
- WHERE last_purchase_date ~ '^\d{4}-\d{2}-\d{2}$';
复制代码
步骤5: 处理缺失值
- -- 处理缺失的姓名
- -- 如果first_name缺失但last_name存在,尝试从email中提取
- UPDATE customers_clean
- SET first_name = INITCAP(SPLIT_PART(SPLIT_PART(email, '@', 1), '.', 1))
- WHERE first_name IS NULL AND email IS NOT NULL AND email ~ '@';
- -- 如果last_name缺失但first_name存在,尝试从email中提取
- UPDATE customers_clean
- SET last_name = INITCAP(SPLIT_PART(SPLIT_PART(email, '@', 1), '.', 2))
- WHERE last_name IS NULL AND email IS NOT NULL AND email ~ '@' AND email ~ '.';
- -- 处理缺失的日期
- -- 如果registration_date缺失,设置为当前日期
- UPDATE customers_clean
- SET registration_date = CURRENT_DATE
- WHERE registration_date IS NULL;
- -- 如果last_purchase_date缺失,设置为registration_date
- UPDATE customers_clean
- SET last_purchase_date = registration_date
- WHERE last_purchase_date IS NULL AND registration_date IS NOT NULL;
- -- 处理缺失的数值
- -- 如果total_purchases缺失,设置为0
- UPDATE customers_clean
- SET total_purchases = 0
- WHERE total_purchases IS NULL;
- -- 如果total_amount缺失,设置为0
- UPDATE customers_clean
- SET total_amount = 0
- WHERE total_amount IS NULL;
复制代码
步骤6: 处理异常值
- -- 检测并处理total_purchases的异常值
- WITH purchase_stats AS (
- SELECT
- AVG(total_purchases) AS avg_purchases,
- STDDEV(total_purchases) AS stddev_purchases
- FROM
- customers_clean
- )
- UPDATE customers_clean
- SET total_purchases = (SELECT avg_purchases + 3 * stddev_purchases FROM purchase_stats)
- WHERE total_purchases > (SELECT avg_purchases + 3 * stddev_purchases FROM purchase_stats);
- -- 检测并处理total_amount的异常值
- WITH amount_stats AS (
- SELECT
- AVG(total_amount) AS avg_amount,
- STDDEV(total_amount) AS stddev_amount
- FROM
- customers_clean
- )
- UPDATE customers_clean
- SET total_amount = (SELECT avg_amount + 3 * stddev_amount FROM amount_stats)
- WHERE total_amount > (SELECT avg_amount + 3 * stddev_amount FROM amount_stats);
- -- 处理日期异常值(例如,last_purchase_date早于registration_date)
- UPDATE customers_clean
- SET last_purchase_date = registration_date
- WHERE last_purchase_date < registration_date;
复制代码
步骤7: 验证清洗后的数据
- -- 创建数据质量验证函数
- CREATE OR REPLACE FUNCTION validate_customers_data()
- RETURNS TABLE(check_name VARCHAR, failed_rows INTEGER) AS $$
- BEGIN
- RETURN QUERY
- -- 检查NULL值
- SELECT 'NULL first_name' AS check_name, COUNT(*) AS failed_rows
- FROM customers_clean WHERE first_name IS NULL
-
- UNION ALL
-
- SELECT 'NULL last_name' AS check_name, COUNT(*) AS failed_rows
- FROM customers_clean WHERE last_name IS NULL
-
- UNION ALL
-
- SELECT 'NULL email' AS check_name, COUNT(*) AS failed_rows
- FROM customers_clean WHERE email IS NULL
-
- UNION ALL
-
- -- 检查无效电子邮件格式
- SELECT 'Invalid email format', COUNT(*)
- FROM customers_clean WHERE email IS NOT NULL AND email !~ '^[A-Za-z0-9._%-]+@[A-Za-z0-9.-]+[.][A-Za-z]+$'
-
- UNION ALL
-
- -- 检查重复电子邮件
- SELECT 'Duplicate emails', COUNT(*) - COUNT(DISTINCT email)
- FROM customers_clean WHERE email IS NOT NULL
-
- UNION ALL
-
- -- 检查日期异常
- SELECT 'Last purchase before registration', COUNT(*)
- FROM customers_clean WHERE last_purchase_date < registration_date
-
- UNION ALL
-
- -- 检查数值异常
- SELECT 'Negative total purchases', COUNT(*)
- FROM customers_clean WHERE total_purchases < 0
-
- UNION ALL
-
- SELECT 'Negative total amount', COUNT(*)
- FROM customers_clean WHERE total_amount < 0;
- END;
- $$ LANGUAGE plpgsql;
- -- 执行数据质量验证
- SELECT * FROM validate_customers_data();
复制代码
步骤8: 创建数据清洗报告
- -- 创建数据清洗报告表
- CREATE TABLE data_cleaning_report (
- id SERIAL PRIMARY KEY,
- cleaning_date TIMESTAMP NOT NULL DEFAULT NOW(),
- original_table VARCHAR(50) NOT NULL,
- cleaned_table VARCHAR(50) NOT NULL,
- original_rows INTEGER NOT NULL,
- cleaned_rows INTEGER NOT NULL,
- duplicates_removed INTEGER NOT NULL,
- null_values_handled INTEGER NOT NULL,
- outliers_handled INTEGER NOT NULL,
- validation_checks INTEGER NOT NULL,
- validation_failures INTEGER NOT NULL
- );
- -- 填充报告
- INSERT INTO data_cleaning_report (
- original_table, cleaned_table, original_rows, cleaned_rows,
- duplicates_removed, null_values_handled, outliers_handled,
- validation_checks, validation_failures
- )
- SELECT
- 'customers_raw' AS original_table,
- 'customers_clean' AS cleaned_table,
- (SELECT COUNT(*) FROM customers_raw) AS original_rows,
- (SELECT COUNT(*) FROM customers_clean) AS cleaned_rows,
- (SELECT COUNT(*) FROM customers_raw) - (SELECT COUNT(DISTINCT email) FROM customers_raw WHERE email IS NOT NULL) -
- (SELECT COUNT(*) FROM customers_raw WHERE email IS NULL) AS duplicates_removed,
- (SELECT COUNT(*) FROM data_quality_assessment WHERE table_name = 'customers_raw' AND null_rows > 0) AS null_values_handled,
- (SELECT COUNT(*) FROM customers_raw WHERE total_purchases > (SELECT AVG(total_purchases) + 3 * STDDEV(total_purchases) FROM customers_raw) OR
- total_amount > (SELECT AVG(total_amount) + 3 * STDDEV(total_amount) FROM customers_raw)) AS outliers_handled,
- (SELECT COUNT(*) FROM validate_customers_data()) AS validation_checks,
- (SELECT SUM(failed_rows) FROM validate_customers_data()) AS validation_failures;
- -- 查看报告
- SELECT * FROM data_cleaning_report;
复制代码
结论
数据清洗是确保数据质量和可靠性的关键步骤,而PostgreSQL提供了丰富的工具和功能来支持高效的数据清洗过程。通过本文介绍的技术和最佳实践,你可以利用PostgreSQL的强大功能来识别异常数据、处理缺失值和重复数据、标准化数据格式,以及验证数据质量。
在实际应用中,数据清洗通常是一个迭代过程,需要根据具体的数据特性和业务需求进行调整。通过创建可重复的数据清洗脚本、记录清洗过程和验证结果,你可以建立一个可靠的数据清洗流程,持续提高数据质量。
记住,高质量的数据是做出准确决策和获得有价值洞察的基础。投资于数据清洗流程将为你的组织带来长期的回报,提高数据分析的准确性和可靠性。 |
|