在本文中,我们将尝试用Python创建一个回测框架,需要包含以下功能:

  • 模块化 – 我们希望把它做成模块化的,可以随意组合、替换。
  • 可扩展性 – 代码应易于扩展。
  • 支持单资产和多资产策略
  • 访问历史股权数据和多个数据提供商
  • 包含交易费用和佣金
  • 具有性能指标

为了实现这一目标,我们需要几个关键组件,包括:

  • 数据管理: 负责OHLCV数据的导入、存储和检索,以及任何用于生成信号的替代数据源。
  • 信号生成: 包含用于分析数据并基于预定义策略或指标生成买卖信号的逻辑。
  • 执行引擎: 模拟基于信号的交易执行,考虑佣金、滑点,并可选地考虑买卖价差。
  • 性能评估: 计算关键性能指标,如回报率、波动率、夏普比率、回撤等,以评估策略的有效性。
  • 实用工具: 包括日志记录、配置管理以及其他支持功能。

下面是我们将用到的Python库:

  • Poetry
  • OpenBB Platform – 这将为我们提供无缝访问多个数据提供商的市场数据。你可以在这里阅读更多信息。
  • Pandas
  • Numpy
  • Matplotlib
  • Ruff, Black, MyPy – 我个人偏好的代码检查工具(可选)。你可以在这里阅读更多相关信息。

使用OpenBB创建数据处理器

在OpenBB平台上创建数据处理器非常简单。平台帮我们处理了不同API规范、各类数据提供商、杂乱无章的输出和数据验证等难题。

这样一来,我们就不再需要为数据验证和处理创建自定义类。你可以轻松访问多个数据提供商、上百个数据点、不同资产类别等。平台还确保了返回的数据符合标准,质量有保障。

在这里,我会专注于股票资产,并将数据限定为日K线。当然,你可以根据需要扩展和修改这些设置。我还会允许用户更改数据提供商、交易代码以及数据的开始和结束日期。

我特别喜欢OpenBB平台的一点是,它的某些端点允许传递多个股票代码。这意味着我们已经在支持多资产交易的道路上迈出了重要的一步,只需传递一个用逗号分隔的符号列表即可。

下面是代码:

"""用于加载和处理数据的数据处理模块。"""

from typing import Optional

import pandas as pd
from openbb import obb


class DataHandler:
    """用于加载和处理数据的数据处理类。"""

    def __init__(
        self,
        symbol: str,
        start_date: Optional[str] = None,
        end_date: Optional[str] = None,
        provider: str = "fmp",
    ):
        """初始化数据处理器。"""
        self.symbol = symbol.upper()
        self.start_date = start_date
        self.end_date = end_date
        self.provider = provider

    def load_data(self) -> pd.DataFrame | dict[str, pd.DataFrame]:
        """加载股票数据。"""
        data = obb.equity.price.historical(
            symbol=self.symbol,
            start_date=self.start_date,
            end_date=self.end_date,
            provider=self.provider,
        ).to_df()

        if "," in self.symbol:
            data = data.reset_index().set_index("symbol")
            return {symbol: data.loc[symbol] for symbol in self.symbol.split(",")}

        return data

    def load_data_from_csv(self, file_path) -> pd.DataFrame:
        """从CSV文件加载数据。"""
        return pd.read_csv(file_path, index_col="date", parse_dates=True)

注意,当传递多个交易代码时,它会返回一个包含Pandas数据框的字典。我还添加了一个函数,可以从自定义的CSV文件中加载数据,并使用日期列作为索引。你可以根据自己的需求进一步扩展和修改这个功能。

要获取一些数据,我们只需要初始化这个类,然后像这样调用load_data方法即可:

data = DataHandler("AAPL").load_data()
data.head()

创建策略处理器

下一步是创建一个用于处理策略的模块。我的意思是,构建一个能够根据策略需求生成信号并将其附加到数据上的模块,这样执行器就可以在回测中使用这些信号。

我想要实现的是一个类似于策略基类的东西,开发者可以继承它、修改它,或者构建自己的自定义策略。我还希望它在处理多资产时能无缝工作,能够对多个资产应用相同的信号逻辑。

以下是代码示例:

class Strategy:

    def __init__(self, indicators: dict, signal_logic: Any):
        """使用指标和信号逻辑初始化策略。"""
        self.indicators = indicators
        self.signal_logic = signal_logic

    def generate_signals(
        self, data: pd.DataFrame | dict[str, pd.DataFrame]
    ) -> pd.DataFrame | dict[str, pd.DataFrame]:
        """根据策略的指标和信号逻辑生成交易信号。"""
        if isinstance(data, dict):
            for _, asset_data in data.items():
                self._apply_strategy(asset_data)
        else:
            self._apply_strategy(data)
        return data

    def _apply_strategy(self, df: pd.DataFrame) -> None:
        """将策略应用于单个数据框。"""
        for name, indicator in self.indicators.items():
            df[name] = indicator(df)

        df["signal"] = df.apply(lambda row: self.signal_logic(row), axis=1)
        df["positions"] = df["signal"].diff().fillna(0)

它的原理是接受一个需要计算的指标字典,以及生成信号的逻辑,这些信号可以是 -1 表示卖出,+1 表示买入。它还跟踪我们当前的持仓状态。

目前的编码方式是,我们传递给它的 Lambda 函数将应用于数据框。

示例:

strategy = Strategy(
    indicators={
        "sma_20": lambda row: row["close"].rolling(window=20).mean(),
        "sma_60": lambda row: row["close"].rolling(window=60).mean(),
    },
    signal_logic=lambda row: 1 if row["sma_20"] > row["sma_60"] else -1,
)
data = strategy.generate_signals(data)
data.tail()

在上面的例子中,我创建了一个慢速和一个快速移动平均线,并在此基础上定义了我的交易逻辑:当快速移动平均线突破慢速移动平均线时做多,反之则做空。

现在我们已经有了获取数据和生成交易信号的方法,只缺少一个实际运行回测的方法。这是最复杂的部分。

创建主要回测逻辑

主要的回测器逻辑将由几个部分组成。我们需要包含的主要部分如下:

  • 交易执行器
  • 佣金计算器
  • 性能指标计算器
  • 投资组合管理器
  • 将所有这些部分联系在一起的纽带

我们首先定义类并设置一些我们希望其处理的基本变量:

class Backtester:

    def __init__(
        self,
        initial_capital: float = 10000.0,
        commission_pct: float = 0.001,
        commission_fixed: float = 1.0,
    ):
        """Initialize the backtester with initial capital and commission fees."""
        self.initial_capital: float = initial_capital
        self.commission_pct: float = commission_pct
        self.commission_fixed: float = commission_fixed
        self.assets_data: Dict = {}
        self.portfolio_history: Dict = {}
        self.daily_portfolio_values: List[float] = []

现在,我们将定义交易执行器:

 def execute_trade(self, asset: str, signal: int, price: float) -> None:
    if signal > 0 and self.assets_data[asset]["cash"] > 0:  # Buy
        trade_value = self.assets_data[asset]["cash"]
        commission = self.calculate_commission(trade_value)
        shares_to_buy = (trade_value - commission) / price
        self.assets_data[asset]["positions"] += shares_to_buy
        self.assets_data[asset]["cash"] -= trade_value
    elif signal < 0 and self.assets_data[asset]["positions"] > 0:  # Sell
        trade_value = self.assets_data[asset]["positions"] * price
        commission = self.calculate_commission(trade_value)
        self.assets_data[asset]["cash"] += trade_value - commission
        self.assets_data[asset]["positions"] = 0

交易执行器将在信号大于0时购买资产,在信号小于0时卖出资产。它还会确保我们有足够的现金用于购买,并且我们处于能够卖出的仓位。此外,它还将计算我们可以购买的股票数量并考虑交易佣金。

要计算佣金,我们需要执行以下步骤:

def calculate_commission(self, trade_value: float) -> float:
    return max(trade_value * self.commission_pct, self.commission_fixed)

现在,我们需要跟踪我们交易的资产的持仓、其价值以及历史记录:

def update_portfolio(self, asset: str, price: float) -> None:
    self.assets_data[asset]["position_value"] = (
        self.assets_data[asset]["positions"] * price
    )
    self.assets_data[asset]["total_value"] = (
        self.assets_data[asset]["cash"] + self.assets_data[asset]["position_value"]
    )
    self.portfolio_history[asset].append(self.assets_data[asset]["total_value"])

最后,我们可以通过使用这些方法来运行回测器,如下所示:

def backtest(self, data: pd.DataFrame | dict[str, pd.DataFrame]):
    if isinstance(data, pd.DataFrame):  # Single asset
        data = {
            "SINGLE_ASSET": data
        }  
    for asset in data:
        self.assets_data[asset] = {
            "cash": self.initial_capital / len(data),
            "positions": 0,
            "position_value": 0,
            "total_value": 0,
        }
        self.portfolio_history[asset] = []

        for date, row in data[asset].iterrows():
            self.execute_trade(asset, row["signal"], row["close"])
            self.update_portfolio(asset, row["close"])
            if len(self.daily_portfolio_values) < len(data[asset]):
                self.daily_portfolio_values.append(
                    self.assets_data[asset]["total_value"]
                )
            else:
                self.daily_portfolio_values[
                    len(self.portfolio_history[asset]) - 1
                ] += self.assets_data[asset]["total_value"]

现在,我将添加一个方法来计算一些指标,并且可以通过使用第三方库等来扩展这些功能。我还会对绘图功能做同样的处理。具体的代码可以在代码库中查看。

def calculate_performance(self, plot: bool = True) -> None:
    if not self.daily_portfolio_values:
        print("No portfolio history to calculate performance.")
        return

    portfolio_values = pd.Series(self.daily_portfolio_values)
    daily_returns = portfolio_values.pct_change().dropna()

    total_return = calculate_total_return(
        portfolio_values.iloc[-1], self.initial_capital
    )
    annualized_return = calculate_annualized_return(
        total_return, len(portfolio_values)
    )
    annualized_volatility = calculate_annualized_volatility(daily_returns)
    sharpe_ratio = calculate_sharpe_ratio(annualized_return, annualized_volatility)
    sortino_ratio = calculate_sortino_ratio(daily_returns, annualized_return)
    max_drawdown = calculate_maximum_drawdown(portfolio_values)

    print(f"Final Portfolio Value: {portfolio_values.iloc[-1]:.2f}")
    print(f"Total Return: {total_return * 100:.2f}%")
    print(f"Annualized Return: {annualized_return * 100:.2f}%")
    print(f"Annualized Volatility: {annualized_volatility * 100:.2f}%")
    print(f"Sharpe Ratio: {sharpe_ratio:.2f}")
    print(f"Sortino Ratio: {sortino_ratio:.2f}")
    print(f"Maximum Drawdown: {max_drawdown * 100:.2f}%")

    if plot:
        self.plot_performance(portfolio_values, daily_returns)

def plot_performance(self, portfolio_values: Dict, daily_returns: pd.DataFrame):
    plt.figure(figsize=(10, 6))

    plt.subplot(2, 1, 1)
    plt.plot(portfolio_values, label="Portfolio Value")
    plt.title("Portfolio Value Over Time")
    plt.legend()

    plt.subplot(2, 1, 2)
    plt.plot(daily_returns, label="Daily Returns", color="orange")
    plt.title("Daily Returns Over Time")
    plt.legend()

    plt.tight_layout()
    plt.show()

Final Portfolio Value: 最终投资组合价值
Total Return: 总回报
Annualized Return: 年化回报
Annualized Volatility: 年化波动率
Sharpe Ratio: 夏普比率
Sortino Ratio: 索提诺比率
Maximum Drawdown: 最大回撤

既然回测器已经准备好了,让我们用几种不同的策略来试一试。

如何使用 Python 回测一个交叉策略?

该策略的目标是创建一个非常基本的交叉策略,其中我们使用一个快速移动简单移动平均线(SMA)和一个慢速移动简单移动平均线。当快速线超过慢速线时买入,反之卖出。

我们可以用苹果股票跑一下。以下是具体操作方法:

from backtester.data_handler import DataHandler
from backtester.backtester import Backtester
from backtester.strategies import Strategy

symbol = "AAPL,MSFT"
start_date = "2023-01-01"
end_date = "2023-12-31"

data = DataHandler(
        symbol=symbol, start_date=start_date, end_date=end_date
    ).load_data()

strategy = Strategy(
    indicators={
        "sma_20": lambda row: row["close"].rolling(window=20).mean(),
        "sma_60": lambda row: row["close"].rolling(window=60).mean(),
    },
    signal_logic=lambda row: 1 if row["sma_20"] > row["sma_60"] else -1,
)
data = strategy.generate_signals(data)

backtester = Backtester()
backtester.backtest(data)
backtester.calculate_performance()

输出结果:

最终投资组合价值:11804.58
总回报:18.05%
年化回报:18.20%
年化波动率:13.06%
夏普比率:1.39
索提诺比率:2.06
最大回撤:-12.07%

看起来还行!

如何使用 Python 回测均值回归策略?

首先,让我们规划策略逻辑:

该策略的目标是:当资产的交易价格超过滚动平均值三个标准差时卖出该资产;当资产的交易价格低于滚动平均值三个标准差时买入该资产。

要使其正常工作,需要注意以下几点:

  • 需要有一个滚动平均值
  • 需要从滚动平均值中计算标准差
  • 需要计算上下界

由于我们的策略类按给定顺序应用计算,我们可以按照逻辑顺序轻松地将这些计算连接起来,并基于这些计算创建信号。

让我们从定义基本回测参数开始:

symbol = "HE"
start_date = "2022-01-01"
end_date = "2022-12-31"

现在,我们需要获取数据,将操作链接在一起:

data = DataHandler(symbol=symbol, start_date=start_date, end_date=end_date).load_data()

strategy = Strategy(
    indicators={
        "sma_50": lambda row: row["close"].rolling(window=50).mean(),
        "std_3": lambda row: row["close"].rolling(window=50).std() * 3,
        "std_3_upper": lambda row: row["sma_50"] + row["std_3"],
        "std_3_lower": lambda row: row["sma_50"] - row["std_3"],
    },
    signal_logic=lambda row: (
        1
        if row["close"] < row["std_3_lower"]
        else -1 if row["close"] > row["std_3_upper"] else 0
    ),
)
data = strategy.generate_signals(data)

backtester = Backtester()
backtester.backtest(data)
backtester.calculate_performance()

最终投资组合价值:10725.54
总回报:7.26%
年化回报:7.29%
年化波动率:18.32%
夏普比率:0.40
索提诺比率:0.53
最大回撤:-23.37%

如何使用 Python 回测配对交易策略?

用 Python 回测配对交易策略是一个更复杂的例子,但我们的回测器应该能够执行这一策略。复杂之处在于我们需要将两个资产的数据放在同一个数据框中。首先,让我们定义一下这个策略。

我们要交易的资产是 Roku (ROKU) 和 Netflix (NFLX),因为根据我们之前的文章和分析,它们具有协整关系。

如果一只股票在过去五天内相对于另一只股票的涨幅达到或超过 5%,我们就会进入仓位(买入)。我们将卖出价格较高的那只,买入价格较低的那只,直到价差反转。让我们开始设置并快速处理数据:

import pandas as pd

symbol = "NFLX,ROKU"
start_date = "2023-01-01"

data = DataHandler(
    symbol=symbol,
    start_date=start_date,
).load_data()

data = pd.merge(
    data["NFLX"].reset_index(),
    data["ROKU"].reset_index(),
    left_index=True,
    right_index=True,
    suffixes=("_NFLX", "_ROKU"),
)

data = data.rename(columns={"close_ROKU": "close"})
data.head()

现在,我们只需制定交易逻辑,就可以运行回测器了:

strategy = Strategy(
    indicators={
        "day_5_lookback_NFLX": lambda row: row["close_NFLX"].shift(5),
        "day_5_lookback_ROKU": lambda row: row["close"].shift(5),
    },
    signal_logic=lambda row: (
        1
        if row["close_NFLX"] > row["day_5_lookback_NFLX"] * 1.05
        else -1 if row["close_NFLX"] < row["day_5_lookback_NFLX"] * 0.95 else 0
    ),
)
data = strategy.generate_signals(data)

backtester = Backtester()
backtester.backtest(data)
backtester.calculate_performance()

最终投资组合价值:14387.50
总回报:43.88%
年化回报:34.80%
年化波动率:55.77%
夏普比率:0.62
索提诺比率:0.74
最大回撤:-39.86%