|
|
马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
引言
在数据分析领域,时间序列数据是一种非常常见且重要的数据类型。无论是金融数据、销售记录、传感器数据还是网站流量分析,时间序列数据都无处不在。pandas作为Python数据分析的核心库,提供了强大而灵活的时间序列处理功能,能够帮助我们高效地处理各种时间相关的数据。
掌握pandas的时间处理技巧,不仅能够提升我们的工作效率,还能让我们更深入地挖掘数据中的时间模式,为业务决策提供有力支持。本文将从基础到进阶,系统地介绍pandas中的时间处理技巧,帮助你轻松应对数据分析中的时间序列挑战,成为数据处理高手。
基础部分
时间数据类型和创建
在pandas中,处理时间数据首先需要了解pandas提供的时间数据类型。主要有以下几种:
1. Timestamp:表示单个时间戳,类似于Python的datetime.datetime,但功能更强大。
2. DatetimeIndex:时间戳的索引,用于Series或DataFrame的索引。
3. Period:表示时间段,如某一天、某一月等。
4. Timedelta:表示时间差,如”5天”、”3小时”等。
5. PeriodIndex:时间段的索引。
6. TimedeltaIndex:时间差的索引。
让我们看看如何创建这些时间数据类型:
- import pandas as pd
- import numpy as np
- # 创建Timestamp
- # 从字符串创建
- ts1 = pd.Timestamp('2023-01-01')
- print(f"Timestamp from string: {ts1}")
- # 从Python datetime创建
- from datetime import datetime
- ts2 = pd.Timestamp(datetime(2023, 1, 1))
- print(f"Timestamp from datetime: {ts2}")
- # 创建DatetimeIndex
- # 从字符串数组创建
- dates = ['2023-01-01', '2023-01-02', '2023-01-03']
- dt_index1 = pd.DatetimeIndex(dates)
- print(f"\nDatetimeIndex from strings:\n{dt_index1}")
- # 使用date_range创建
- dt_index2 = pd.date_range('2023-01-01', periods=3, freq='D')
- print(f"\nDatetimeIndex using date_range:\n{dt_index2}")
- # 创建Period
- p1 = pd.Period('2023-01-01')
- print(f"\nPeriod: {p1}")
- # 创建PeriodIndex
- periods = ['2023-01', '2023-02', '2023-03']
- p_index1 = pd.PeriodIndex(periods, freq='M')
- print(f"\nPeriodIndex:\n{p_index1}")
- # 创建Timedelta
- td1 = pd.Timedelta('5 days')
- print(f"\nTimedelta: {td1}")
- # 创建TimedeltaIndex
- td_index1 = pd.TimedeltaIndex(['5 days', '10 days', '15 days'])
- print(f"\nTimedeltaIndex:\n{td_index1}")
复制代码
时间索引和选择
创建了时间数据后,我们需要学会如何使用时间索引进行数据选择和操作。pandas提供了多种方法来处理时间索引的数据:
- # 创建一个带时间索引的Series
- dates = pd.date_range('2023-01-01', periods=10, freq='D')
- ts = pd.Series(np.random.randn(10), index=dates)
- print(f"Time series with DatetimeIndex:\n{ts}")
- # 选择特定日期的数据
- print(f"\nData for 2023-01-03: {ts['2023-01-03']}")
- # 选择某一年的数据
- print(f"\nData for 2023:\n{ts['2023']}")
- # 选择某年某月的数据
- print(f"\nData for 2023-01:\n{ts['2023-01']}")
- # 使用truncate方法选择时间范围
- print(f"\nData between 2023-01-03 and 2023-01-07:\n{ts.truncate(after='2023-01-07').truncate(before='2023-01-03')}")
- # 使用loc选择时间范围
- print(f"\nData between 2023-01-03 and 2023-01-07 using loc:\n{ts.loc['2023-01-03':'2023-01-07']}")
- # 使用asof方法获取特定时间点之前最近的数据
- print(f"\nLast value before or on 2023-01-05: {ts.asof('2023-01-05')}")
复制代码
时间序列的基本操作
pandas提供了丰富的时间序列操作方法,让我们能够轻松处理时间相关的计算:
- # 创建两个时间序列
- dates1 = pd.date_range('2023-01-01', periods=5, freq='D')
- ts1 = pd.Series([1, 2, 3, 4, 5], index=dates1)
- dates2 = pd.date_range('2023-01-03', periods=5, freq='D')
- ts2 = pd.Series([10, 20, 30, 40, 50], index=dates2)
- # 时间序列的算术运算(自动对齐)
- print(f"Time series addition:\n{ts1 + ts2}")
- # 时间序列的移位
- print(f"\nShifted forward by 1 day:\n{ts1.shift(1)}")
- print(f"\nShifted backward by 1 day:\n{ts1.shift(-1)}")
- # 计算变化率
- print(f"\nPercentage change:\n{ts1.pct_change()}")
- # 计算差分
- print(f"\nDifference:\n{ts1.diff()}")
- # 累计计算
- print(f"\nCumulative sum:\n{ts1.cumsum()}")
- # 重采样(稍后会详细介绍)
- print(f"\nResampled to 2-day frequency, sum:\n{ts1.resample('2D').sum()}")
- # 时间序列的滚动计算(稍后会详细介绍)
- print(f"\nRolling mean with window size 2:\n{ts1.rolling(window=2).mean()}")
复制代码
进阶部分
时间重采样和频率转换
重采样(Resampling)是指将时间序列从一个频率转换到另一个频率的过程。这是时间序列分析中非常重要的操作,常用于降采样(降低频率)和升采样(提高频率)。
- # 创建一个高频时间序列
- # 每天数据,共30天
- dates = pd.date_range('2023-01-01', periods=30, freq='D')
- ts_daily = pd.Series(np.random.randn(30), index=dates)
- # 降采样:从日数据转换为周数据
- # 默认使用每个区间的结束标签
- ts_weekly_end = ts_daily.resample('W').sum()
- print(f"Weekly data (end label):\n{ts_weekly_end}")
- # 使用每个区间的开始标签
- ts_weekly_start = ts_daily.resample('W-SUN').sum()
- print(f"\nWeekly data (start label):\n{ts_weekly_start}")
- # 使用OHLC重采样
- ts_weekly_ohlc = ts_daily.resample('W').ohlc()
- print(f"\nWeekly OHLC:\n{ts_weekly_ohlc}")
- # 升采样:从日数据转换为小时数据
- # 需要指定填充方法
- ts_hourly = ts_daily.resample('H').ffill() # 前向填充
- print(f"\nHourly data (forward fill):\n{ts_hourly.head(10)}")
- # 使用不同的聚合函数
- # 每周的平均值
- ts_weekly_mean = ts_daily.resample('W').mean()
- print(f"\nWeekly mean:\n{ts_weekly_mean}")
- # 自定义聚合函数
- ts_weekly_custom = ts_daily.resample('W').agg(['mean', 'std', 'min', 'max'])
- print(f"\nWeekly custom aggregation:\n{ts_weekly_custom}")
- # 使用groupby进行重采样
- # 按周几分组计算平均值
- ts_weekday = ts_daily.groupby(ts_daily.index.dayofweek).mean()
- print(f"\nMean by weekday (0=Monday):\n{ts_weekday}")
复制代码
时间窗口计算
时间窗口计算(也称为滚动窗口计算)是时间序列分析中的重要技术,它允许我们在一个滑动的时间窗口上应用各种统计函数。
- # 创建一个时间序列
- dates = pd.date_range('2023-01-01', periods=30, freq='D')
- ts = pd.Series(np.random.randn(30), index=dates)
- # 滚动窗口计算
- # 窗口大小为7天
- rolling_mean = ts.rolling(window=7).mean()
- print(f"Rolling mean (window=7):\n{rolling_mean}")
- # 滚动窗口的标准差
- rolling_std = ts.rolling(window=7).std()
- print(f"\nRolling std (window=7):\n{rolling_std}")
- # 指数加权窗口
- # 与简单滚动窗口不同,指数加权窗口给予近期数据更高的权重
- ewm_mean = ts.ewm(span=7).mean()
- print(f"\nExponentially weighted mean (span=7):\n{ewm_mean}")
- # 扩展窗口
- # 扩展窗口包含从开始到当前的所有数据
- expanding_mean = ts.expanding().mean()
- print(f"\nExpanding mean:\n{expanding_mean}")
- # 自定义滚动窗口函数
- # 例如,计算窗口内最大值和最小值的差
- def range_func(x):
- return x.max() - x.min()
- rolling_range = ts.rolling(window=7).apply(range_func)
- print(f"\nRolling range (max-min):\n{rolling_range}")
- # 时间加权滚动窗口
- # 使用时间作为权重,而不是固定数量的观测值
- time_weighted_mean = ts.rolling(window='7D').mean()
- print(f"\nTime-weighted rolling mean (window=7D):\n{time_weighted_mean}")
复制代码
时区处理
在处理全球数据或需要跨时区分析时,时区处理变得尤为重要。pandas提供了强大的时区处理功能:
- # 创建一个不带时区的时间序列
- dates = pd.date_range('2023-01-01', periods=5, freq='D')
- ts_no_tz = pd.Series(np.random.randn(5), index=dates)
- print(f"Time series without timezone:\n{ts_no_tz.index}")
- # 本地化时区
- # 将时间索引转换为特定时区
- ts_utc = ts_no_tz.tz_localize('UTC')
- print(f"\nTime series localized to UTC:\n{ts_utc.index}")
- # 转换时区
- ts_est = ts_utc.tz_convert('US/Eastern')
- print(f"\nTime series converted to US/Eastern:\n{ts_est.index}")
- # 创建带时区的时间序列
- dates_tz = pd.date_range('2023-01-01', periods=5, freq='D', tz='Asia/Shanghai')
- ts_with_tz = pd.Series(np.random.randn(5), index=dates_tz)
- print(f"\nTime series with timezone (Asia/Shanghai):\n{ts_with_tz.index}")
- # 处理时区感知和时区无知的时间戳
- # 时区感知的时间戳
- tz_aware = pd.Timestamp('2023-01-01 12:00', tz='UTC')
- print(f"\nTimezone-aware timestamp: {tz_aware}")
- # 时区无知的时间戳
- tz_naive = pd.Timestamp('2023-01-01 12:00')
- print(f"Timezone-naive timestamp: {tz_naive}")
- # 混合时区操作
- # 将时区无知的时间戳本地化
- tz_naive_localized = tz_naive.tz_localize('UTC')
- print(f"\nLocalized timestamp: {tz_naive_localized}")
- # 获取所有可用时区
- import pytz
- print(f"\nNumber of available timezones: {len(pytz.all_timezones)}")
- print(f"First 10 timezones: {list(pytz.all_timezones)[:10]}")
复制代码
时间偏移和周期
时间偏移和周期是处理时间序列数据时非常有用的工具,它们允许我们灵活地表示和操作时间间隔。
- # 时间偏移(DateOffset)
- # 创建一个时间戳
- ts = pd.Timestamp('2023-01-01')
- # 添加一天
- ts_plus_1day = ts + pd.DateOffset(days=1)
- print(f"Timestamp + 1 day: {ts_plus_1day}")
- # 添加一个月
- ts_plus_1month = ts + pd.DateOffset(months=1)
- print(f"Timestamp + 1 month: {ts_plus_1month}")
- # 添加一年
- ts_plus_1year = ts + pd.DateOffset(years=1)
- print(f"Timestamp + 1 year: {ts_plus_1year}")
- # 使用特定频率的偏移
- # 月初
- month_start = ts + pd.offsets.MonthBegin()
- print(f"\nMonth begin: {month_start}")
- # 月末
- month_end = ts + pd.offsets.MonthEnd()
- print(f"Month end: {month_end}")
- # 季度末
- quarter_end = ts + pd.offsets.QuarterEnd()
- print(f"Quarter end: {quarter_end}")
- # 年初
- year_start = ts + pd.offsets.YearBegin()
- print(f"Year begin: {year_start}")
- # 周偏移
- # 下一个周一
- next_monday = ts + pd.offsets.Week(weekday=0) # 0=Monday
- print(f"\nNext Monday: {next_monday}")
- # 时间周期(Period)
- # 创建一个周期
- p = pd.Period('2023-01', freq='M')
- print(f"\nPeriod: {p}")
- # 获取周期的开始和结束时间
- print(f"Period start time: {p.start_time}")
- print(f"Period end time: {p.end_time}")
- # 周期运算
- p_plus_1 = p + 1
- print(f"\nPeriod + 1: {p_plus_1}")
- # 创建周期范围
- period_range = pd.period_range('2023-01', periods=5, freq='M')
- print(f"\nPeriod range:\n{period_range}")
- # 周期转换为时间戳
- timestamps = period_range.to_timestamp()
- print(f"\nPeriods converted to timestamps:\n{timestamps}")
- # 时间戳转换为周期
- periods = timestamps.to_period(freq='M')
- print(f"\nTimestamps converted to periods:\n{periods}")
复制代码
实战应用
时间序列数据可视化
可视化是理解时间序列数据的重要手段。pandas与matplotlib等可视化库结合,可以轻松创建各种时间序列图表:
- import matplotlib.pyplot as plt
- import matplotlib.dates as mdates
- # 创建一个时间序列
- np.random.seed(42)
- dates = pd.date_range('2023-01-01', periods=365, freq='D')
- ts = pd.Series(np.random.randn(365).cumsum(), index=dates)
- # 绘制时间序列图
- plt.figure(figsize=(12, 6))
- ts.plot()
- plt.title('Daily Time Series')
- plt.xlabel('Date')
- plt.ylabel('Value')
- plt.grid(True)
- plt.tight_layout()
- plt.show()
- # 绘制月度平均值的柱状图
- monthly_mean = ts.resample('M').mean()
- plt.figure(figsize=(12, 6))
- monthly_mean.plot(kind='bar')
- plt.title('Monthly Mean Values')
- plt.xlabel('Month')
- plt.ylabel('Mean Value')
- plt.grid(True)
- plt.tight_layout()
- plt.show()
- # 绘制滚动平均值和原始数据
- rolling_mean = ts.rolling(window=30).mean()
- plt.figure(figsize=(12, 6))
- ts.plot(label='Original')
- rolling_mean.plot(label='30-day Rolling Mean', color='red')
- plt.title('Original Data and Rolling Mean')
- plt.xlabel('Date')
- plt.ylabel('Value')
- plt.legend()
- plt.grid(True)
- plt.tight_layout()
- plt.show()
- # 绘制季节性分解图
- from statsmodels.tsa.seasonal import seasonal_decompose
- # 执行季节性分解
- # 假设周期为7天(周季节性)
- decomposition = seasonal_decompose(ts, model='additive', period=7)
- # 绘制分解结果
- plt.figure(figsize=(12, 8))
- plt.subplot(411)
- plt.plot(ts, label='Original')
- plt.legend()
- plt.subplot(412)
- plt.plot(decomposition.trend, label='Trend')
- plt.legend()
- plt.subplot(413)
- plt.plot(decomposition.seasonal, label='Seasonality')
- plt.legend()
- plt.subplot(414)
- plt.plot(decomposition.resid, label='Residuals')
- plt.legend()
- plt.tight_layout()
- plt.show()
- # 绘制自相关和偏自相关图
- from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
- plt.figure(figsize=(12, 6))
- plt.subplot(121)
- plot_acf(ts, lags=40, ax=plt.gca())
- plt.title('Autocorrelation')
- plt.subplot(122)
- plot_pacf(ts, lags=40, ax=plt.gca())
- plt.title('Partial Autocorrelation')
- plt.tight_layout()
- plt.show()
复制代码
时间特征工程
在时间序列分析和预测中,特征工程是非常重要的一步。通过从时间索引中提取各种特征,我们可以帮助模型更好地理解数据中的时间模式。
- # 创建一个时间序列
- np.random.seed(42)
- dates = pd.date_range('2023-01-01', periods=365, freq='D')
- ts = pd.Series(np.random.randn(365).cumsum(), index=dates)
- # 创建一个DataFrame
- df = pd.DataFrame({'value': ts})
- # 从时间索引中提取基本特征
- df['year'] = df.index.year
- df['month'] = df.index.month
- df['day'] = df.index.day
- df['dayofweek'] = df.index.dayofweek # 0=Monday
- df['dayofyear'] = df.index.dayofyear
- df['weekofyear'] = df.index.isocalendar().week
- df['quarter'] = df.index.quarter
- print("Basic time features:")
- print(df.head())
- # 创建周期性特征
- # 使用sin和cos函数捕捉周期性模式
- df['month_sin'] = np.sin(2 * np.pi * df['month'] / 12)
- df['month_cos'] = np.cos(2 * np.pi * df['month'] / 12)
- df['day_sin'] = np.sin(2 * np.pi * df['day'] / 31)
- df['day_cos'] = np.cos(2 * np.pi * df['day'] / 31)
- df['dayofweek_sin'] = np.sin(2 * np.pi * df['dayofweek'] / 7)
- df['dayofweek_cos'] = np.cos(2 * np.pi * df['dayofweek'] / 7)
- print("\nCyclical features:")
- print(df[['month', 'month_sin', 'month_cos', 'dayofweek', 'dayofweek_sin', 'dayofweek_cos']].head())
- # 创建滞后特征
- # 滞后特征可以帮助捕捉时间依赖性
- for lag in [1, 2, 3, 7, 14, 30]:
- df[f'lag_{lag}'] = df['value'].shift(lag)
- print("\nLag features:")
- print(df[['value', 'lag_1', 'lag_2', 'lag_7']].head(10))
- # 创建窗口特征
- # 窗口特征可以帮助捕捉短期和长期趋势
- for window in [3, 7, 14, 30]:
- df[f'rolling_mean_{window}'] = df['value'].rolling(window=window).mean()
- df[f'rolling_std_{window}'] = df['value'].rolling(window=window).std()
- df[f'rolling_min_{window}'] = df['value'].rolling(window=window).min()
- df[f'rolling_max_{window}'] = df['value'].rolling(window=window).max()
- print("\nWindow features:")
- print(df[['value', 'rolling_mean_7', 'rolling_std_7', 'rolling_min_7', 'rolling_max_7']].head(10))
- # 创建扩展窗口特征
- # 扩展窗口特征包含从开始到当前的所有数据
- df['expanding_mean'] = df['value'].expanding().mean()
- df['expanding_std'] = df['value'].expanding().std()
- df['expanding_min'] = df['value'].expanding().min()
- df['expanding_max'] = df['value'].expanding().max()
- print("\nExpanding window features:")
- print(df[['value', 'expanding_mean', 'expanding_std', 'expanding_min', 'expanding_max']].head(10))
- # 创建差分特征
- # 差分特征可以帮助捕捉变化率
- df['diff_1'] = df['value'].diff(1)
- df['diff_7'] = df['value'].diff(7)
- df['pct_change_1'] = df['value'].pct_change(1)
- df['pct_change_7'] = df['value'].pct_change(7)
- print("\nDifference features:")
- print(df[['value', 'diff_1', 'diff_7', 'pct_change_1', 'pct_change_7']].head(10))
- # 创建时间戳特征
- # 将时间戳转换为数值,可以捕捉长期趋势
- df['timestamp'] = df.index.astype(np.int64) // 10**9 # 转换为Unix时间戳(秒)
- print("\nTimestamp feature:")
- print(df[['value', 'timestamp']].head())
- # 创建特殊日期特征
- # 标记特殊日期,如节假日、周末等
- from pandas.tseries.holiday import USFederalHolidayCalendar
- cal = USFederalHolidayCalendar()
- holidays = cal.holidays(start=df.index.min(), end=df.index.max())
- df['is_holiday'] = df.index.isin(holidays).astype(int)
- df['is_weekend'] = (df.index.dayofweek >= 5).astype(int)
- df['is_month_start'] = df.index.is_month_start.astype(int)
- df['is_month_end'] = df.index.is_month_end.astype(int)
- df['is_quarter_start'] = df.index.is_quarter_start.astype(int)
- df['is_quarter_end'] = df.index.is_quarter_end.astype(int)
- df['is_year_start'] = df.index.is_year_start.astype(int)
- df['is_year_end'] = df.index.is_year_end.astype(int)
- print("\nSpecial date features:")
- print(df[['value', 'is_holiday', 'is_weekend', 'is_month_start', 'is_month_end']].head(10))
复制代码
常见业务场景案例
让我们通过几个常见的业务场景,看看如何应用pandas的时间处理技巧解决实际问题。
- # 创建销售数据
- np.random.seed(42)
- dates = pd.date_range('2022-01-01', periods=365, freq='D')
- base_sales = 100 + 10 * np.sin(2 * np.pi * np.arange(365) / 7) # 周季节性
- trend = 0.1 * np.arange(365) # 趋势
- noise = np.random.normal(0, 5, 365) # 随机噪声
- sales = base_sales + trend + noise
- sales = np.maximum(sales, 10) # 确保销售额为正
- # 创建DataFrame
- sales_df = pd.DataFrame({'date': dates, 'sales': sales})
- sales_df.set_index('date', inplace=True)
- print("Sales data sample:")
- print(sales_df.head())
- # 计算每日、每周、每月的销售额
- daily_sales = sales_df['sales']
- weekly_sales = sales_df['sales'].resample('W').sum()
- monthly_sales = sales_df['sales'].resample('M').sum()
- print("\nWeekly sales:")
- print(weekly_sales.head())
- # 计算同比增长率
- # 首先按月聚合数据
- monthly_sales_df = sales_df['sales'].resample('M').sum().to_frame('monthly_sales')
- # 计算去年同期销售额
- monthly_sales_df['prev_year'] = monthly_sales_df['monthly_sales'].shift(12)
- # 计算同比增长率
- monthly_sales_df['yoy_growth'] = (monthly_sales_df['monthly_sales'] - monthly_sales_df['prev_year']) / monthly_sales_df['prev_year'] * 100
- print("\nYear-over-year growth:")
- print(monthly_sales_df.tail())
- # 计算环比增长率
- monthly_sales_df['mom_growth'] = monthly_sales_df['monthly_sales'].pct_change() * 100
- print("\nMonth-over-month growth:")
- print(monthly_sales_df.tail())
- # 计算移动平均
- sales_df['7_day_ma'] = sales_df['sales'].rolling(window=7).mean()
- sales_df['30_day_ma'] = sales_df['sales'].rolling(window=30).mean()
- print("\nMoving averages:")
- print(sales_df.tail())
- # 计算年度累计销售额
- sales_df['cumulative_sales'] = sales_df['sales'].groupby(sales_df.index.year).cumsum()
- print("\nCumulative sales:")
- print(sales_df.tail())
- # 找出销售额最高的几天
- top_sales_days = sales_df.sort_values('sales', ascending=False).head(10)
- print("\nTop 10 sales days:")
- print(top_sales_days)
- # 分析周内销售模式
- weekday_sales = sales_df.groupby(sales_df.index.dayofweek)['sales'].mean()
- weekday_sales.index = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
- print("\nAverage sales by weekday:")
- print(weekday_sales)
- # 分析月内销售模式
- # 将日期转换为每月的第几天
- sales_df['day_of_month'] = sales_df.index.day
- day_of_month_sales = sales_df.groupby('day_of_month')['sales'].mean()
- print("\nAverage sales by day of month:")
- print(day_of_month_sales.head(15))
- # 预测下个月的销售额
- # 使用简单的移动平均方法
- last_30_days_avg = sales_df['sales'].tail(30).mean()
- next_month_prediction = last_30_days_avg * 30 # 假设下个月有30天
- print(f"\nNext month sales prediction: {next_month_prediction:.2f}")
复制代码- # 创建网站流量数据
- np.random.seed(42)
- dates = pd.date_range('2023-01-01', periods=24*7*4, freq='H') # 4周的小时数据
- # 创建基础流量模式(工作日和周末不同)
- base_traffic = np.zeros(len(dates))
- for i, date in enumerate(dates):
- hour = date.hour
- dayofweek = date.dayofweek
-
- # 工作日模式
- if dayofweek < 5:
- if 9 <= hour <= 17: # 工作时间
- base_traffic[i] = 100
- elif 18 <= hour <= 22: # 晚间
- base_traffic[i] = 150
- else: # 深夜和凌晨
- base_traffic[i] = 30
- else: # 周末
- if 10 <= hour <= 22: # 白天和晚间
- base_traffic[i] = 200
- else: # 深夜和凌晨
- base_traffic[i] = 50
- # 添加趋势和噪声
- trend = 0.01 * np.arange(len(dates))
- noise = np.random.normal(0, 10, len(dates))
- traffic = base_traffic + trend + noise
- traffic = np.maximum(traffic, 5) # 确保流量为正
- # 创建DataFrame
- traffic_df = pd.DataFrame({'datetime': dates, 'traffic': traffic})
- traffic_df.set_index('datetime', inplace=True)
- print("Website traffic data sample:")
- print(traffic_df.head())
- # 按天聚合流量
- daily_traffic = traffic_df['traffic'].resample('D').sum()
- print("\nDaily traffic:")
- print(daily_traffic.head())
- # 按周聚合流量
- weekly_traffic = traffic_df['traffic'].resample('W').sum()
- print("\nWeekly traffic:")
- print(weekly_traffic)
- # 分析一天内的流量模式
- hourly_pattern = traffic_df.groupby(traffic_df.index.hour)['traffic'].mean()
- print("\nAverage traffic by hour of day:")
- print(hourly_pattern)
- # 分析一周内的流量模式
- weekday_pattern = traffic_df.groupby(traffic_df.index.dayofweek)['traffic'].mean()
- weekday_pattern.index = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
- print("\nAverage traffic by weekday:")
- print(weekday_pattern)
- # 分析工作日和周末的小时流量模式
- traffic_df['hour'] = traffic_df.index.hour
- traffic_df['is_weekend'] = (traffic_df.index.dayofweek >= 5).astype(int)
- # 工作日和周末的小时流量
- weekday_hourly = traffic_df[traffic_df['is_weekend'] == 0].groupby('hour')['traffic'].mean()
- weekend_hourly = traffic_df[traffic_df['is_weekend'] == 1].groupby('hour')['traffic'].mean()
- print("\nWeekday hourly traffic pattern:")
- print(weekday_hourly)
- print("\nWeekend hourly traffic pattern:")
- print(weekend_hourly)
- # 计算峰值流量时间
- # 找出流量最高的时间段
- peak_hours = traffic_df.groupby(traffic_df.index.hour)['traffic'].mean().sort_values(ascending=False).head(5)
- print("\nPeak traffic hours:")
- print(peak_hours)
- # 计算流量增长率
- # 按天聚合
- daily_traffic_df = traffic_df['traffic'].resample('D').sum().to_frame('daily_traffic')
- # 计算日增长率
- daily_traffic_df['growth_rate'] = daily_traffic_df['daily_traffic'].pct_change() * 100
- print("\nDaily traffic growth rate:")
- print(daily_traffic_df.tail())
- # 计算周增长率
- weekly_traffic_df = traffic_df['traffic'].resample('W').sum().to_frame('weekly_traffic')
- weekly_traffic_df['growth_rate'] = weekly_traffic_df['weekly_traffic'].pct_change() * 100
- print("\nWeekly traffic growth rate:")
- print(weekly_traffic_df)
- # 检测异常流量
- # 使用Z-score方法检测异常
- from scipy import stats
- daily_traffic_df['z_score'] = np.abs(stats.zscore(daily_traffic_df['daily_traffic']))
- threshold = 2.5 # Z-score阈值
- anomalies = daily_traffic_df[daily_traffic_df['z_score'] > threshold]
- print("\nTraffic anomalies:")
- print(anomalies)
- # 预测下一天的流量
- # 使用简单移动平均方法
- last_7_days_avg = daily_traffic_df['daily_traffic'].tail(7).mean()
- next_day_prediction = last_7_days_avg
- print(f"\nNext day traffic prediction: {next_day_prediction:.2f}")
复制代码- # 创建股票价格数据
- np.random.seed(42)
- dates = pd.date_range('2022-01-01', periods=252, freq='D') # 一年的交易日
- # 生成随机游走作为股票价格
- returns = np.random.normal(0.001, 0.02, len(dates)) # 日收益率
- price = 100 * np.exp(returns.cumsum()) # 价格
- # 创建OHLC数据
- ohlc_dict = {
- 'open': price * (1 + np.random.normal(0, 0.005, len(dates))),
- 'high': price * (1 + np.abs(np.random.normal(0, 0.01, len(dates)))),
- 'low': price * (1 - np.abs(np.random.normal(0, 0.01, len(dates)))),
- 'close': price,
- 'volume': np.random.lognormal(10, 1, len(dates))
- }
- # 确保high >= open, close >= low
- for i in range(len(dates)):
- ohlc_dict['high'][i] = max(ohlc_dict['open'][i], ohlc_dict['close'][i], ohlc_dict['high'][i])
- ohlc_dict['low'][i] = min(ohlc_dict['open'][i], ohlc_dict['close'][i], ohlc_dict['low'][i])
- # 创建DataFrame
- stock_df = pd.DataFrame(ohlc_dict, index=dates)
- print("Stock price data sample:")
- print(stock_df.head())
- # 计算日收益率
- stock_df['daily_return'] = stock_df['close'].pct_change()
- print("\nDaily returns:")
- print(stock_df[['close', 'daily_return']].head())
- # 计算累计收益率
- stock_df['cumulative_return'] = (1 + stock_df['daily_return']).cumprod() - 1
- print("\nCumulative returns:")
- print(stock_df[['close', 'daily_return', 'cumulative_return']].tail())
- # 计算移动平均
- stock_df['ma_5'] = stock_df['close'].rolling(window=5).mean()
- stock_df['ma_20'] = stock_df['close'].rolling(window=20).mean()
- stock_df['ma_50'] = stock_df['close'].rolling(window=50).mean()
- print("\nMoving averages:")
- print(stock_df[['close', 'ma_5', 'ma_20', 'ma_50']].tail())
- # 计算波动率
- stock_df['volatility'] = stock_df['daily_return'].rolling(window=20).std() * np.sqrt(252) # 年化波动率
- print("\nVolatility:")
- print(stock_df[['close', 'daily_return', 'volatility']].tail())
- # 计算RSI (相对强弱指数)
- def calculate_rsi(data, window=14):
- delta = data.diff()
- gain = delta.where(delta > 0, 0)
- loss = -delta.where(delta < 0, 0)
-
- avg_gain = gain.rolling(window=window).mean()
- avg_loss = loss.rolling(window=window).mean()
-
- rs = avg_gain / avg_loss
- rsi = 100 - (100 / (1 + rs))
-
- return rsi
- stock_df['rsi'] = calculate_rsi(stock_df['close'])
- print("\nRSI:")
- print(stock_df[['close', 'rsi']].tail())
- # 计算MACD (指数平滑异同移动平均线)
- def calculate_macd(data, fast_period=12, slow_period=26, signal_period=9):
- ema_fast = data.ewm(span=fast_period).mean()
- ema_slow = data.ewm(span=slow_period).mean()
- macd = ema_fast - ema_slow
- signal = macd.ewm(span=signal_period).mean()
- histogram = macd - signal
-
- return macd, signal, histogram
- stock_df['macd'], stock_df['macd_signal'], stock_df['macd_histogram'] = calculate_macd(stock_df['close'])
- print("\nMACD:")
- print(stock_df[['close', 'macd', 'macd_signal', 'macd_histogram']].tail())
- # 计算布林带
- def calculate_bollinger_bands(data, window=20, num_std=2):
- ma = data.rolling(window=window).mean()
- std = data.rolling(window=window).std()
- upper_band = ma + (std * num_std)
- lower_band = ma - (std * num_std)
-
- return upper_band, ma, lower_band
- stock_df['bb_upper'], stock_df['bb_middle'], stock_df['bb_lower'] = calculate_bollinger_bands(stock_df['close'])
- print("\nBollinger Bands:")
- print(stock_df[['close', 'bb_upper', 'bb_middle', 'bb_lower']].tail())
- # 按月分析股票表现
- monthly_performance = stock_df['close'].resample('M').last().pct_change() * 100
- print("\nMonthly performance:")
- print(monthly_performance)
- # 计算最大回撤
- def calculate_max_drawdown(data):
- cumulative = (1 + data.pct_change()).cumprod()
- running_max = cumulative.expanding().max()
- drawdown = (cumulative - running_max) / running_max
- max_drawdown = drawdown.min()
-
- return max_drawdown
- max_drawdown = calculate_max_drawdown(stock_df['close'])
- print(f"\nMaximum drawdown: {max_drawdown:.2%}")
- # 计算夏普比率
- risk_free_rate = 0.02 # 假设无风险利率为2%
- annual_return = stock_df['daily_return'].mean() * 252
- annual_volatility = stock_df['daily_return'].std() * np.sqrt(252)
- sharpe_ratio = (annual_return - risk_free_rate) / annual_volatility
- print(f"\nSharpe ratio: {sharpe_ratio:.2f}")
- # 计算信息比率(假设基准收益率为5%)
- benchmark_return = 0.05
- excess_return = annual_return - benchmark_return
- information_ratio = excess_return / annual_volatility
- print(f"Information ratio: {information_ratio:.2f}")
- # 找出最佳和最差交易日
- best_days = stock_df['daily_return'].nlargest(5)
- worst_days = stock_df['daily_return'].nsmallest(5)
- print("\nBest trading days:")
- print(best_days)
- print("\nWorst trading days:")
- print(worst_days)
复制代码
总结与最佳实践
通过本文的学习,我们已经系统地掌握了pandas中时间处理的各种技巧,从基础的时间数据类型创建和操作,到进阶的时间重采样、窗口计算、时区处理,再到实际业务场景中的应用。下面,让我们总结一些关键的最佳实践,帮助你在日常工作中更高效地处理时间序列数据。
最佳实践
1. 选择合适的时间数据类型对于单个时间点,使用Timestamp对于时间序列索引,使用DatetimeIndex对于时间段,使用Period对于时间差,使用Timedelta
2. 对于单个时间点,使用Timestamp
3. 对于时间序列索引,使用DatetimeIndex
4. 对于时间段,使用Period
5. 对于时间差,使用Timedelta
6. 时区处理的一致性尽早确定时区,并在整个分析过程中保持一致如果可能,将所有时间数据转换为UTC时区,以避免混淆在显示结果时,再转换为本地时区
7. 尽早确定时区,并在整个分析过程中保持一致
8. 如果可能,将所有时间数据转换为UTC时区,以避免混淆
9. 在显示结果时,再转换为本地时区
10. 高效的时间索引操作使用部分字符串索引快速选择时间范围,如ts['2023-01']对于大型数据集,考虑使用truncate方法而不是布尔索引使用asof方法获取特定时间点之前最近的数据
11. 使用部分字符串索引快速选择时间范围,如ts['2023-01']
12. 对于大型数据集,考虑使用truncate方法而不是布尔索引
13. 使用asof方法获取特定时间点之前最近的数据
14. 合理使用重采样降采样时,选择合适的聚合函数(sum, mean, ohlc等)升采样时,选择合适的填充方法(ffill, bfill, interpolate等)注意重采样后的标签位置(start, end等)
15. 降采样时,选择合适的聚合函数(sum, mean, ohlc等)
16. 升采样时,选择合适的填充方法(ffill, bfill, interpolate等)
17. 注意重采样后的标签位置(start, end等)
18. 窗口计算的优化根据数据特点选择合适的窗口类型(rolling, expanding, ewm)对于时间序列数据,考虑使用基于时间的窗口而不是固定大小的窗口注意处理窗口计算中的NaN值
19. 根据数据特点选择合适的窗口类型(rolling, expanding, ewm)
20. 对于时间序列数据,考虑使用基于时间的窗口而不是固定大小的窗口
21. 注意处理窗口计算中的NaN值
22. 特征工程的系统性从时间索引中提取尽可能多的相关特征(年、月、日、周几等)使用周期性编码(sin/cos)捕捉周期性模式创建滞后特征和窗口特征捕捉时间依赖性考虑特殊日期(节假日、周末等)的影响
23. 从时间索引中提取尽可能多的相关特征(年、月、日、周几等)
24. 使用周期性编码(sin/cos)捕捉周期性模式
25. 创建滞后特征和窗口特征捕捉时间依赖性
26. 考虑特殊日期(节假日、周末等)的影响
27. 可视化的有效性选择合适的图表类型展示时间序列数据对于多时间尺度数据,考虑使用子图或双轴使用滚动平均等平滑技术帮助识别趋势
28. 选择合适的图表类型展示时间序列数据
29. 对于多时间尺度数据,考虑使用子图或双轴
30. 使用滚动平均等平滑技术帮助识别趋势
31. 业务场景的适应性根据具体业务需求选择合适的时间粒度定义与业务相关的指标(如同比增长率、环比增长率等)考虑业务周期性(如零售业的季节性)对分析的影响
32. 根据具体业务需求选择合适的时间粒度
33. 定义与业务相关的指标(如同比增长率、环比增长率等)
34. 考虑业务周期性(如零售业的季节性)对分析的影响
选择合适的时间数据类型
• 对于单个时间点,使用Timestamp
• 对于时间序列索引,使用DatetimeIndex
• 对于时间段,使用Period
• 对于时间差,使用Timedelta
时区处理的一致性
• 尽早确定时区,并在整个分析过程中保持一致
• 如果可能,将所有时间数据转换为UTC时区,以避免混淆
• 在显示结果时,再转换为本地时区
高效的时间索引操作
• 使用部分字符串索引快速选择时间范围,如ts['2023-01']
• 对于大型数据集,考虑使用truncate方法而不是布尔索引
• 使用asof方法获取特定时间点之前最近的数据
合理使用重采样
• 降采样时,选择合适的聚合函数(sum, mean, ohlc等)
• 升采样时,选择合适的填充方法(ffill, bfill, interpolate等)
• 注意重采样后的标签位置(start, end等)
窗口计算的优化
• 根据数据特点选择合适的窗口类型(rolling, expanding, ewm)
• 对于时间序列数据,考虑使用基于时间的窗口而不是固定大小的窗口
• 注意处理窗口计算中的NaN值
特征工程的系统性
• 从时间索引中提取尽可能多的相关特征(年、月、日、周几等)
• 使用周期性编码(sin/cos)捕捉周期性模式
• 创建滞后特征和窗口特征捕捉时间依赖性
• 考虑特殊日期(节假日、周末等)的影响
可视化的有效性
• 选择合适的图表类型展示时间序列数据
• 对于多时间尺度数据,考虑使用子图或双轴
• 使用滚动平均等平滑技术帮助识别趋势
业务场景的适应性
• 根据具体业务需求选择合适的时间粒度
• 定义与业务相关的指标(如同比增长率、环比增长率等)
• 考虑业务周期性(如零售业的季节性)对分析的影响
常见问题与解决方案
1. 问题:时间数据解析错误解决方案:使用pd.to_datetime的format参数指定格式,或使用errors='coerce'将无法解析的值设为NaT
2. 解决方案:使用pd.to_datetime的format参数指定格式,或使用errors='coerce'将无法解析的值设为NaT
3. 问题:时区转换混乱解决方案:始终保持时区一致性,使用tz_localize和tz_convert方法正确处理时区
4. 解决方案:始终保持时区一致性,使用tz_localize和tz_convert方法正确处理时区
5. 问题:重采样导致数据丢失解决方案:选择合适的聚合函数,或使用fillna方法处理缺失值
6. 解决方案:选择合适的聚合函数,或使用fillna方法处理缺失值
7. 问题:窗口计算边界效应解决方案:使用min_periods参数控制窗口最小观测值数,或考虑使用其他类型的窗口
8. 解决方案:使用min_periods参数控制窗口最小观测值数,或考虑使用其他类型的窗口
9. 问题:时间序列预测准确性低解决方案:增加更多相关特征,考虑季节性和趋势,尝试不同的预测模型
10. 解决方案:增加更多相关特征,考虑季节性和趋势,尝试不同的预测模型
问题:时间数据解析错误
• 解决方案:使用pd.to_datetime的format参数指定格式,或使用errors='coerce'将无法解析的值设为NaT
问题:时区转换混乱
• 解决方案:始终保持时区一致性,使用tz_localize和tz_convert方法正确处理时区
问题:重采样导致数据丢失
• 解决方案:选择合适的聚合函数,或使用fillna方法处理缺失值
问题:窗口计算边界效应
• 解决方案:使用min_periods参数控制窗口最小观测值数,或考虑使用其他类型的窗口
问题:时间序列预测准确性低
• 解决方案:增加更多相关特征,考虑季节性和趋势,尝试不同的预测模型
未来学习方向
虽然本文已经涵盖了pandas时间处理的大部分内容,但时间序列分析是一个广阔的领域,还有很多值得深入学习的方向:
1. 高级时间序列模型ARIMA、SARIMA等统计模型Prophet、FBATS等商业预测工具LSTM、GRU等深度学习模型
2. ARIMA、SARIMA等统计模型
3. Prophet、FBATS等商业预测工具
4. LSTM、GRU等深度学习模型
5. 实时时间序列处理使用Apache Kafka、Spark Streaming等工具处理实时数据流在线学习和模型更新
6. 使用Apache Kafka、Spark Streaming等工具处理实时数据流
7. 在线学习和模型更新
8. 多变量时间序列分析向量自回归(VAR)模型因果推断和格兰杰因果检验
9. 向量自回归(VAR)模型
10. 因果推断和格兰杰因果检验
11. 时间序列异常检测基于统计的方法基于机器学习的方法基于深度学习的方法
12. 基于统计的方法
13. 基于机器学习的方法
14. 基于深度学习的方法
15. 时间序列聚类和分类动态时间规整(DTW)基于特征的聚类和分类深度学习嵌入方法
16. 动态时间规整(DTW)
17. 基于特征的聚类和分类
18. 深度学习嵌入方法
高级时间序列模型
• ARIMA、SARIMA等统计模型
• Prophet、FBATS等商业预测工具
• LSTM、GRU等深度学习模型
实时时间序列处理
• 使用Apache Kafka、Spark Streaming等工具处理实时数据流
• 在线学习和模型更新
多变量时间序列分析
• 向量自回归(VAR)模型
• 因果推断和格兰杰因果检验
时间序列异常检测
• 基于统计的方法
• 基于机器学习的方法
• 基于深度学习的方法
时间序列聚类和分类
• 动态时间规整(DTW)
• 基于特征的聚类和分类
• 深度学习嵌入方法
总之,pandas提供了强大而灵活的时间处理功能,是数据分析中不可或缺的工具。通过掌握本文介绍的各种技巧,并结合实际业务场景进行应用,你将能够更高效地处理时间序列数据,发现数据中的时间模式,为业务决策提供有力支持。不断学习和实践,你将成为数据处理领域的专家! |
|