在本文中,我们将尝试用Python创建一个回测框架,需要包含以下功能:
- 模块化 – 我们希望把它做成模块化的,可以随意组合、替换。
- 可扩展性 – 代码应易于扩展。
- 支持单资产和多资产策略
- 访问历史股权数据和多个数据提供商
- 包含交易费用和佣金
- 具有性能指标
为了实现这一目标,我们需要几个关键组件,包括:
- 数据管理: 负责OHLCV数据的导入、存储和检索,以及任何用于生成信号的替代数据源。
- 信号生成: 包含用于分析数据并基于预定义策略或指标生成买卖信号的逻辑。
- 执行引擎: 模拟基于信号的交易执行,考虑佣金、滑点,并可选地考虑买卖价差。
- 性能评估: 计算关键性能指标,如回报率、波动率、夏普比率、回撤等,以评估策略的有效性。
- 实用工具: 包括日志记录、配置管理以及其他支持功能。
下面是我们将用到的Python库:
- Poetry
- OpenBB Platform – 这将为我们提供无缝访问多个数据提供商的市场数据。你可以在这里阅读更多信息。
- Pandas
- Numpy
- Matplotlib
- Ruff, Black, MyPy – 我个人偏好的代码检查工具(可选)。你可以在这里阅读更多相关信息。
使用OpenBB创建数据处理器
在OpenBB平台上创建数据处理器非常简单。平台帮我们处理了不同API规范、各类数据提供商、杂乱无章的输出和数据验证等难题。
这样一来,我们就不再需要为数据验证和处理创建自定义类。你可以轻松访问多个数据提供商、上百个数据点、不同资产类别等。平台还确保了返回的数据符合标准,质量有保障。
在这里,我会专注于股票资产,并将数据限定为日K线。当然,你可以根据需要扩展和修改这些设置。我还会允许用户更改数据提供商、交易代码以及数据的开始和结束日期。
我特别喜欢OpenBB平台的一点是,它的某些端点允许传递多个股票代码。这意味着我们已经在支持多资产交易的道路上迈出了重要的一步,只需传递一个用逗号分隔的符号列表即可。
下面是代码:
from typing import Optional
start_date: Optional[str] = None,
end_date: Optional[str] = None,
self.symbol = symbol.upper()
self.start_date = start_date
def load_data(self) -> pd.DataFrame | dict[str, pd.DataFrame]:
data = obb.equity.price.historical(
start_date=self.start_date,
data = data.reset_index().set_index("symbol")
return {symbol: data.loc[symbol] for symbol in self.symbol.split(",")}
def load_data_from_csv(self, file_path) -> pd.DataFrame:
return pd.read_csv(file_path, index_col="date", parse_dates=True)
"""用于加载和处理数据的数据处理模块。"""
from typing import Optional
import pandas as pd
from openbb import obb
class DataHandler:
"""用于加载和处理数据的数据处理类。"""
def __init__(
self,
symbol: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
provider: str = "fmp",
):
"""初始化数据处理器。"""
self.symbol = symbol.upper()
self.start_date = start_date
self.end_date = end_date
self.provider = provider
def load_data(self) -> pd.DataFrame | dict[str, pd.DataFrame]:
"""加载股票数据。"""
data = obb.equity.price.historical(
symbol=self.symbol,
start_date=self.start_date,
end_date=self.end_date,
provider=self.provider,
).to_df()
if "," in self.symbol:
data = data.reset_index().set_index("symbol")
return {symbol: data.loc[symbol] for symbol in self.symbol.split(",")}
return data
def load_data_from_csv(self, file_path) -> pd.DataFrame:
"""从CSV文件加载数据。"""
return pd.read_csv(file_path, index_col="date", parse_dates=True)
"""用于加载和处理数据的数据处理模块。"""
from typing import Optional
import pandas as pd
from openbb import obb
class DataHandler:
"""用于加载和处理数据的数据处理类。"""
def __init__(
self,
symbol: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
provider: str = "fmp",
):
"""初始化数据处理器。"""
self.symbol = symbol.upper()
self.start_date = start_date
self.end_date = end_date
self.provider = provider
def load_data(self) -> pd.DataFrame | dict[str, pd.DataFrame]:
"""加载股票数据。"""
data = obb.equity.price.historical(
symbol=self.symbol,
start_date=self.start_date,
end_date=self.end_date,
provider=self.provider,
).to_df()
if "," in self.symbol:
data = data.reset_index().set_index("symbol")
return {symbol: data.loc[symbol] for symbol in self.symbol.split(",")}
return data
def load_data_from_csv(self, file_path) -> pd.DataFrame:
"""从CSV文件加载数据。"""
return pd.read_csv(file_path, index_col="date", parse_dates=True)
注意,当传递多个交易代码时,它会返回一个包含Pandas数据框的字典。我还添加了一个函数,可以从自定义的CSV文件中加载数据,并使用日期列作为索引。你可以根据自己的需求进一步扩展和修改这个功能。
要获取一些数据,我们只需要初始化这个类,然后像这样调用load_data
方法即可:
data = DataHandler("AAPL").load_data()
data = DataHandler("AAPL").load_data()
data.head()
data = DataHandler("AAPL").load_data()
data.head()
创建策略处理器
下一步是创建一个用于处理策略的模块。我的意思是,构建一个能够根据策略需求生成信号并将其附加到数据上的模块,这样执行器就可以在回测中使用这些信号。
我想要实现的是一个类似于策略基类的东西,开发者可以继承它、修改它,或者构建自己的自定义策略。我还希望它在处理多资产时能无缝工作,能够对多个资产应用相同的信号逻辑。
以下是代码示例:
def __init__(self, indicators: dict, signal_logic: Any):
self.indicators = indicators
self.signal_logic = signal_logic
self, data: pd.DataFrame | dict[str, pd.DataFrame]
) -> pd.DataFrame | dict[str, pd.DataFrame]:
"""根据策略的指标和信号逻辑生成交易信号。"""
if isinstance(data, dict):
for _, asset_data in data.items():
self._apply_strategy(asset_data)
self._apply_strategy(data)
def _apply_strategy(self, df: pd.DataFrame) -> None:
for name, indicator in self.indicators.items():
df["signal"] = df.apply(lambda row: self.signal_logic(row), axis=1)
df["positions"] = df["signal"].diff().fillna(0)
class Strategy:
def __init__(self, indicators: dict, signal_logic: Any):
"""使用指标和信号逻辑初始化策略。"""
self.indicators = indicators
self.signal_logic = signal_logic
def generate_signals(
self, data: pd.DataFrame | dict[str, pd.DataFrame]
) -> pd.DataFrame | dict[str, pd.DataFrame]:
"""根据策略的指标和信号逻辑生成交易信号。"""
if isinstance(data, dict):
for _, asset_data in data.items():
self._apply_strategy(asset_data)
else:
self._apply_strategy(data)
return data
def _apply_strategy(self, df: pd.DataFrame) -> None:
"""将策略应用于单个数据框。"""
for name, indicator in self.indicators.items():
df[name] = indicator(df)
df["signal"] = df.apply(lambda row: self.signal_logic(row), axis=1)
df["positions"] = df["signal"].diff().fillna(0)
class Strategy:
def __init__(self, indicators: dict, signal_logic: Any):
"""使用指标和信号逻辑初始化策略。"""
self.indicators = indicators
self.signal_logic = signal_logic
def generate_signals(
self, data: pd.DataFrame | dict[str, pd.DataFrame]
) -> pd.DataFrame | dict[str, pd.DataFrame]:
"""根据策略的指标和信号逻辑生成交易信号。"""
if isinstance(data, dict):
for _, asset_data in data.items():
self._apply_strategy(asset_data)
else:
self._apply_strategy(data)
return data
def _apply_strategy(self, df: pd.DataFrame) -> None:
"""将策略应用于单个数据框。"""
for name, indicator in self.indicators.items():
df[name] = indicator(df)
df["signal"] = df.apply(lambda row: self.signal_logic(row), axis=1)
df["positions"] = df["signal"].diff().fillna(0)
它的原理是接受一个需要计算的指标字典,以及生成信号的逻辑,这些信号可以是 -1 表示卖出,+1 表示买入。它还跟踪我们当前的持仓状态。
目前的编码方式是,我们传递给它的 Lambda 函数将应用于数据框。
示例:
"sma_20": lambda row: row["close"].rolling(window=20).mean(),
"sma_60": lambda row: row["close"].rolling(window=60).mean(),
signal_logic=lambda row: 1 if row["sma_20"] > row["sma_60"] else -1,
data = strategy.generate_signals(data)
strategy = Strategy(
indicators={
"sma_20": lambda row: row["close"].rolling(window=20).mean(),
"sma_60": lambda row: row["close"].rolling(window=60).mean(),
},
signal_logic=lambda row: 1 if row["sma_20"] > row["sma_60"] else -1,
)
data = strategy.generate_signals(data)
data.tail()
strategy = Strategy(
indicators={
"sma_20": lambda row: row["close"].rolling(window=20).mean(),
"sma_60": lambda row: row["close"].rolling(window=60).mean(),
},
signal_logic=lambda row: 1 if row["sma_20"] > row["sma_60"] else -1,
)
data = strategy.generate_signals(data)
data.tail()
在上面的例子中,我创建了一个慢速和一个快速移动平均线,并在此基础上定义了我的交易逻辑:当快速移动平均线突破慢速移动平均线时做多,反之则做空。
现在我们已经有了获取数据和生成交易信号的方法,只缺少一个实际运行回测的方法。这是最复杂的部分。
创建主要回测逻辑
主要的回测器逻辑将由几个部分组成。我们需要包含的主要部分如下:
- 交易执行器
- 佣金计算器
- 性能指标计算器
- 投资组合管理器
- 将所有这些部分联系在一起的纽带
我们首先定义类并设置一些我们希望其处理的基本变量:
initial_capital: float = 10000.0,
commission_pct: float = 0.001,
commission_fixed: float = 1.0,
"""Initialize the backtester with initial capital and commission fees."""
self.initial_capital: float = initial_capital
self.commission_pct: float = commission_pct
self.commission_fixed: float = commission_fixed
self.assets_data: Dict = {}
self.portfolio_history: Dict = {}
self.daily_portfolio_values: List[float] = []
class Backtester:
def __init__(
self,
initial_capital: float = 10000.0,
commission_pct: float = 0.001,
commission_fixed: float = 1.0,
):
"""Initialize the backtester with initial capital and commission fees."""
self.initial_capital: float = initial_capital
self.commission_pct: float = commission_pct
self.commission_fixed: float = commission_fixed
self.assets_data: Dict = {}
self.portfolio_history: Dict = {}
self.daily_portfolio_values: List[float] = []
class Backtester:
def __init__(
self,
initial_capital: float = 10000.0,
commission_pct: float = 0.001,
commission_fixed: float = 1.0,
):
"""Initialize the backtester with initial capital and commission fees."""
self.initial_capital: float = initial_capital
self.commission_pct: float = commission_pct
self.commission_fixed: float = commission_fixed
self.assets_data: Dict = {}
self.portfolio_history: Dict = {}
self.daily_portfolio_values: List[float] = []
现在,我们将定义交易执行器:
def execute_trade(self, asset: str, signal: int, price: float) -> None:
if signal > 0 and self.assets_data[asset]["cash"] > 0: # Buy
trade_value = self.assets_data[asset]["cash"]
commission = self.calculate_commission(trade_value)
shares_to_buy = (trade_value - commission) / price
self.assets_data[asset]["positions"] += shares_to_buy
self.assets_data[asset]["cash"] -= trade_value
elif signal < 0 and self.assets_data[asset]["positions"] > 0: # Sell
trade_value = self.assets_data[asset]["positions"] * price
commission = self.calculate_commission(trade_value)
self.assets_data[asset]["cash"] += trade_value - commission
self.assets_data[asset]["positions"] = 0
def execute_trade(self, asset: str, signal: int, price: float) -> None:
if signal > 0 and self.assets_data[asset]["cash"] > 0: # Buy
trade_value = self.assets_data[asset]["cash"]
commission = self.calculate_commission(trade_value)
shares_to_buy = (trade_value - commission) / price
self.assets_data[asset]["positions"] += shares_to_buy
self.assets_data[asset]["cash"] -= trade_value
elif signal < 0 and self.assets_data[asset]["positions"] > 0: # Sell
trade_value = self.assets_data[asset]["positions"] * price
commission = self.calculate_commission(trade_value)
self.assets_data[asset]["cash"] += trade_value - commission
self.assets_data[asset]["positions"] = 0
def execute_trade(self, asset: str, signal: int, price: float) -> None:
if signal > 0 and self.assets_data[asset]["cash"] > 0: # Buy
trade_value = self.assets_data[asset]["cash"]
commission = self.calculate_commission(trade_value)
shares_to_buy = (trade_value - commission) / price
self.assets_data[asset]["positions"] += shares_to_buy
self.assets_data[asset]["cash"] -= trade_value
elif signal < 0 and self.assets_data[asset]["positions"] > 0: # Sell
trade_value = self.assets_data[asset]["positions"] * price
commission = self.calculate_commission(trade_value)
self.assets_data[asset]["cash"] += trade_value - commission
self.assets_data[asset]["positions"] = 0
交易执行器将在信号大于0时购买资产,在信号小于0时卖出资产。它还会确保我们有足够的现金用于购买,并且我们处于能够卖出的仓位。此外,它还将计算我们可以购买的股票数量并考虑交易佣金。
要计算佣金,我们需要执行以下步骤:
def calculate_commission(self, trade_value: float) -> float:
return max(trade_value * self.commission_pct, self.commission_fixed)
def calculate_commission(self, trade_value: float) -> float:
return max(trade_value * self.commission_pct, self.commission_fixed)
def calculate_commission(self, trade_value: float) -> float:
return max(trade_value * self.commission_pct, self.commission_fixed)
现在,我们需要跟踪我们交易的资产的持仓、其价值以及历史记录:
def update_portfolio(self, asset: str, price: float) -> None:
self.assets_data[asset]["position_value"] = (
self.assets_data[asset]["positions"] * price
self.assets_data[asset]["total_value"] = (
self.assets_data[asset]["cash"] + self.assets_data[asset]["position_value"]
self.portfolio_history[asset].append(self.assets_data[asset]["total_value"])
def update_portfolio(self, asset: str, price: float) -> None:
self.assets_data[asset]["position_value"] = (
self.assets_data[asset]["positions"] * price
)
self.assets_data[asset]["total_value"] = (
self.assets_data[asset]["cash"] + self.assets_data[asset]["position_value"]
)
self.portfolio_history[asset].append(self.assets_data[asset]["total_value"])
def update_portfolio(self, asset: str, price: float) -> None:
self.assets_data[asset]["position_value"] = (
self.assets_data[asset]["positions"] * price
)
self.assets_data[asset]["total_value"] = (
self.assets_data[asset]["cash"] + self.assets_data[asset]["position_value"]
)
self.portfolio_history[asset].append(self.assets_data[asset]["total_value"])
最后,我们可以通过使用这些方法来运行回测器,如下所示:
def backtest(self, data: pd.DataFrame | dict[str, pd.DataFrame]):
if isinstance(data, pd.DataFrame): # Single asset
self.assets_data[asset] = {
"cash": self.initial_capital / len(data),
self.portfolio_history[asset] = []
for date, row in data[asset].iterrows():
self.execute_trade(asset, row["signal"], row["close"])
self.update_portfolio(asset, row["close"])
if len(self.daily_portfolio_values) < len(data[asset]):
self.daily_portfolio_values.append(
self.assets_data[asset]["total_value"]
self.daily_portfolio_values[
len(self.portfolio_history[asset]) - 1
] += self.assets_data[asset]["total_value"]
def backtest(self, data: pd.DataFrame | dict[str, pd.DataFrame]):
if isinstance(data, pd.DataFrame): # Single asset
data = {
"SINGLE_ASSET": data
}
for asset in data:
self.assets_data[asset] = {
"cash": self.initial_capital / len(data),
"positions": 0,
"position_value": 0,
"total_value": 0,
}
self.portfolio_history[asset] = []
for date, row in data[asset].iterrows():
self.execute_trade(asset, row["signal"], row["close"])
self.update_portfolio(asset, row["close"])
if len(self.daily_portfolio_values) < len(data[asset]):
self.daily_portfolio_values.append(
self.assets_data[asset]["total_value"]
)
else:
self.daily_portfolio_values[
len(self.portfolio_history[asset]) - 1
] += self.assets_data[asset]["total_value"]
def backtest(self, data: pd.DataFrame | dict[str, pd.DataFrame]):
if isinstance(data, pd.DataFrame): # Single asset
data = {
"SINGLE_ASSET": data
}
for asset in data:
self.assets_data[asset] = {
"cash": self.initial_capital / len(data),
"positions": 0,
"position_value": 0,
"total_value": 0,
}
self.portfolio_history[asset] = []
for date, row in data[asset].iterrows():
self.execute_trade(asset, row["signal"], row["close"])
self.update_portfolio(asset, row["close"])
if len(self.daily_portfolio_values) < len(data[asset]):
self.daily_portfolio_values.append(
self.assets_data[asset]["total_value"]
)
else:
self.daily_portfolio_values[
len(self.portfolio_history[asset]) - 1
] += self.assets_data[asset]["total_value"]
现在,我将添加一个方法来计算一些指标,并且可以通过使用第三方库等来扩展这些功能。我还会对绘图功能做同样的处理。具体的代码可以在代码库中查看。
def calculate_performance(self, plot: bool = True) -> None:
if not self.daily_portfolio_values:
print("No portfolio history to calculate performance.")
portfolio_values = pd.Series(self.daily_portfolio_values)
daily_returns = portfolio_values.pct_change().dropna()
total_return = calculate_total_return(
portfolio_values.iloc[-1], self.initial_capital
annualized_return = calculate_annualized_return(
total_return, len(portfolio_values)
annualized_volatility = calculate_annualized_volatility(daily_returns)
sharpe_ratio = calculate_sharpe_ratio(annualized_return, annualized_volatility)
sortino_ratio = calculate_sortino_ratio(daily_returns, annualized_return)
max_drawdown = calculate_maximum_drawdown(portfolio_values)
print(f"Final Portfolio Value: {portfolio_values.iloc[-1]:.2f}")
print(f"Total Return: {total_return * 100:.2f}%")
print(f"Annualized Return: {annualized_return * 100:.2f}%")
print(f"Annualized Volatility: {annualized_volatility * 100:.2f}%")
print(f"Sharpe Ratio: {sharpe_ratio:.2f}")
print(f"Sortino Ratio: {sortino_ratio:.2f}")
print(f"Maximum Drawdown: {max_drawdown * 100:.2f}%")
self.plot_performance(portfolio_values, daily_returns)
def plot_performance(self, portfolio_values: Dict, daily_returns: pd.DataFrame):
plt.figure(figsize=(10, 6))
plt.plot(portfolio_values, label="Portfolio Value")
plt.title("Portfolio Value Over Time")
plt.plot(daily_returns, label="Daily Returns", color="orange")
plt.title("Daily Returns Over Time")
def calculate_performance(self, plot: bool = True) -> None:
if not self.daily_portfolio_values:
print("No portfolio history to calculate performance.")
return
portfolio_values = pd.Series(self.daily_portfolio_values)
daily_returns = portfolio_values.pct_change().dropna()
total_return = calculate_total_return(
portfolio_values.iloc[-1], self.initial_capital
)
annualized_return = calculate_annualized_return(
total_return, len(portfolio_values)
)
annualized_volatility = calculate_annualized_volatility(daily_returns)
sharpe_ratio = calculate_sharpe_ratio(annualized_return, annualized_volatility)
sortino_ratio = calculate_sortino_ratio(daily_returns, annualized_return)
max_drawdown = calculate_maximum_drawdown(portfolio_values)
print(f"Final Portfolio Value: {portfolio_values.iloc[-1]:.2f}")
print(f"Total Return: {total_return * 100:.2f}%")
print(f"Annualized Return: {annualized_return * 100:.2f}%")
print(f"Annualized Volatility: {annualized_volatility * 100:.2f}%")
print(f"Sharpe Ratio: {sharpe_ratio:.2f}")
print(f"Sortino Ratio: {sortino_ratio:.2f}")
print(f"Maximum Drawdown: {max_drawdown * 100:.2f}%")
if plot:
self.plot_performance(portfolio_values, daily_returns)
def plot_performance(self, portfolio_values: Dict, daily_returns: pd.DataFrame):
plt.figure(figsize=(10, 6))
plt.subplot(2, 1, 1)
plt.plot(portfolio_values, label="Portfolio Value")
plt.title("Portfolio Value Over Time")
plt.legend()
plt.subplot(2, 1, 2)
plt.plot(daily_returns, label="Daily Returns", color="orange")
plt.title("Daily Returns Over Time")
plt.legend()
plt.tight_layout()
plt.show()
def calculate_performance(self, plot: bool = True) -> None:
if not self.daily_portfolio_values:
print("No portfolio history to calculate performance.")
return
portfolio_values = pd.Series(self.daily_portfolio_values)
daily_returns = portfolio_values.pct_change().dropna()
total_return = calculate_total_return(
portfolio_values.iloc[-1], self.initial_capital
)
annualized_return = calculate_annualized_return(
total_return, len(portfolio_values)
)
annualized_volatility = calculate_annualized_volatility(daily_returns)
sharpe_ratio = calculate_sharpe_ratio(annualized_return, annualized_volatility)
sortino_ratio = calculate_sortino_ratio(daily_returns, annualized_return)
max_drawdown = calculate_maximum_drawdown(portfolio_values)
print(f"Final Portfolio Value: {portfolio_values.iloc[-1]:.2f}")
print(f"Total Return: {total_return * 100:.2f}%")
print(f"Annualized Return: {annualized_return * 100:.2f}%")
print(f"Annualized Volatility: {annualized_volatility * 100:.2f}%")
print(f"Sharpe Ratio: {sharpe_ratio:.2f}")
print(f"Sortino Ratio: {sortino_ratio:.2f}")
print(f"Maximum Drawdown: {max_drawdown * 100:.2f}%")
if plot:
self.plot_performance(portfolio_values, daily_returns)
def plot_performance(self, portfolio_values: Dict, daily_returns: pd.DataFrame):
plt.figure(figsize=(10, 6))
plt.subplot(2, 1, 1)
plt.plot(portfolio_values, label="Portfolio Value")
plt.title("Portfolio Value Over Time")
plt.legend()
plt.subplot(2, 1, 2)
plt.plot(daily_returns, label="Daily Returns", color="orange")
plt.title("Daily Returns Over Time")
plt.legend()
plt.tight_layout()
plt.show()
Final Portfolio Value: 最终投资组合价值
Total Return: 总回报
Annualized Return: 年化回报
Annualized Volatility: 年化波动率
Sharpe Ratio: 夏普比率
Sortino Ratio: 索提诺比率
Maximum Drawdown: 最大回撤
既然回测器已经准备好了,让我们用几种不同的策略来试一试。
如何使用 Python 回测一个交叉策略?
该策略的目标是创建一个非常基本的交叉策略,其中我们使用一个快速移动简单移动平均线(SMA)和一个慢速移动简单移动平均线。当快速线超过慢速线时买入,反之卖出。
我们可以用苹果股票跑一下。以下是具体操作方法:
from backtester.data_handler import DataHandler
from backtester.backtester import Backtester
from backtester.strategies import Strategy
start_date = "2023-01-01"
symbol=symbol, start_date=start_date, end_date=end_date
"sma_20": lambda row: row["close"].rolling(window=20).mean(),
"sma_60": lambda row: row["close"].rolling(window=60).mean(),
signal_logic=lambda row: 1 if row["sma_20"] > row["sma_60"] else -1,
data = strategy.generate_signals(data)
backtester = Backtester()
backtester.backtest(data)
backtester.calculate_performance()
from backtester.data_handler import DataHandler
from backtester.backtester import Backtester
from backtester.strategies import Strategy
symbol = "AAPL,MSFT"
start_date = "2023-01-01"
end_date = "2023-12-31"
data = DataHandler(
symbol=symbol, start_date=start_date, end_date=end_date
).load_data()
strategy = Strategy(
indicators={
"sma_20": lambda row: row["close"].rolling(window=20).mean(),
"sma_60": lambda row: row["close"].rolling(window=60).mean(),
},
signal_logic=lambda row: 1 if row["sma_20"] > row["sma_60"] else -1,
)
data = strategy.generate_signals(data)
backtester = Backtester()
backtester.backtest(data)
backtester.calculate_performance()
from backtester.data_handler import DataHandler
from backtester.backtester import Backtester
from backtester.strategies import Strategy
symbol = "AAPL,MSFT"
start_date = "2023-01-01"
end_date = "2023-12-31"
data = DataHandler(
symbol=symbol, start_date=start_date, end_date=end_date
).load_data()
strategy = Strategy(
indicators={
"sma_20": lambda row: row["close"].rolling(window=20).mean(),
"sma_60": lambda row: row["close"].rolling(window=60).mean(),
},
signal_logic=lambda row: 1 if row["sma_20"] > row["sma_60"] else -1,
)
data = strategy.generate_signals(data)
backtester = Backtester()
backtester.backtest(data)
backtester.calculate_performance()
输出结果:
最终投资组合价值:11804.58
总回报:18.05%
年化回报:18.20%
年化波动率:13.06%
夏普比率:1.39
索提诺比率:2.06
最大回撤:-12.07%
看起来还行!
如何使用 Python 回测均值回归策略?
首先,让我们规划策略逻辑:
该策略的目标是:当资产的交易价格超过滚动平均值三个标准差时卖出该资产;当资产的交易价格低于滚动平均值三个标准差时买入该资产。
要使其正常工作,需要注意以下几点:
- 需要有一个滚动平均值
- 需要从滚动平均值中计算标准差
- 需要计算上下界
由于我们的策略类按给定顺序应用计算,我们可以按照逻辑顺序轻松地将这些计算连接起来,并基于这些计算创建信号。
让我们从定义基本回测参数开始:
start_date = "2022-01-01"
symbol = "HE"
start_date = "2022-01-01"
end_date = "2022-12-31"
symbol = "HE"
start_date = "2022-01-01"
end_date = "2022-12-31"
现在,我们需要获取数据,将操作链接在一起:
data = DataHandler(symbol=symbol, start_date=start_date, end_date=end_date).load_data()
"sma_50": lambda row: row["close"].rolling(window=50).mean(),
"std_3": lambda row: row["close"].rolling(window=50).std() * 3,
"std_3_upper": lambda row: row["sma_50"] + row["std_3"],
"std_3_lower": lambda row: row["sma_50"] - row["std_3"],
signal_logic=lambda row: (
if row["close"] < row["std_3_lower"]
else -1 if row["close"] > row["std_3_upper"] else 0
data = strategy.generate_signals(data)
backtester = Backtester()
backtester.backtest(data)
backtester.calculate_performance()
data = DataHandler(symbol=symbol, start_date=start_date, end_date=end_date).load_data()
strategy = Strategy(
indicators={
"sma_50": lambda row: row["close"].rolling(window=50).mean(),
"std_3": lambda row: row["close"].rolling(window=50).std() * 3,
"std_3_upper": lambda row: row["sma_50"] + row["std_3"],
"std_3_lower": lambda row: row["sma_50"] - row["std_3"],
},
signal_logic=lambda row: (
1
if row["close"] < row["std_3_lower"]
else -1 if row["close"] > row["std_3_upper"] else 0
),
)
data = strategy.generate_signals(data)
backtester = Backtester()
backtester.backtest(data)
backtester.calculate_performance()
data = DataHandler(symbol=symbol, start_date=start_date, end_date=end_date).load_data()
strategy = Strategy(
indicators={
"sma_50": lambda row: row["close"].rolling(window=50).mean(),
"std_3": lambda row: row["close"].rolling(window=50).std() * 3,
"std_3_upper": lambda row: row["sma_50"] + row["std_3"],
"std_3_lower": lambda row: row["sma_50"] - row["std_3"],
},
signal_logic=lambda row: (
1
if row["close"] < row["std_3_lower"]
else -1 if row["close"] > row["std_3_upper"] else 0
),
)
data = strategy.generate_signals(data)
backtester = Backtester()
backtester.backtest(data)
backtester.calculate_performance()
最终投资组合价值:10725.54
总回报:7.26%
年化回报:7.29%
年化波动率:18.32%
夏普比率:0.40
索提诺比率:0.53
最大回撤:-23.37%
如何使用 Python 回测配对交易策略?
用 Python 回测配对交易策略是一个更复杂的例子,但我们的回测器应该能够执行这一策略。复杂之处在于我们需要将两个资产的数据放在同一个数据框中。首先,让我们定义一下这个策略。
我们要交易的资产是 Roku (ROKU) 和 Netflix (NFLX),因为根据我们之前的文章和分析,它们具有协整关系。
如果一只股票在过去五天内相对于另一只股票的涨幅达到或超过 5%,我们就会进入仓位(买入)。我们将卖出价格较高的那只,买入价格较低的那只,直到价差反转。让我们开始设置并快速处理数据:
start_date = "2023-01-01"
data["NFLX"].reset_index(),
data["ROKU"].reset_index(),
suffixes=("_NFLX", "_ROKU"),
data = data.rename(columns={"close_ROKU": "close"})
import pandas as pd
symbol = "NFLX,ROKU"
start_date = "2023-01-01"
data = DataHandler(
symbol=symbol,
start_date=start_date,
).load_data()
data = pd.merge(
data["NFLX"].reset_index(),
data["ROKU"].reset_index(),
left_index=True,
right_index=True,
suffixes=("_NFLX", "_ROKU"),
)
data = data.rename(columns={"close_ROKU": "close"})
data.head()
import pandas as pd
symbol = "NFLX,ROKU"
start_date = "2023-01-01"
data = DataHandler(
symbol=symbol,
start_date=start_date,
).load_data()
data = pd.merge(
data["NFLX"].reset_index(),
data["ROKU"].reset_index(),
left_index=True,
right_index=True,
suffixes=("_NFLX", "_ROKU"),
)
data = data.rename(columns={"close_ROKU": "close"})
data.head()
现在,我们只需制定交易逻辑,就可以运行回测器了:
"day_5_lookback_NFLX": lambda row: row["close_NFLX"].shift(5),
"day_5_lookback_ROKU": lambda row: row["close"].shift(5),
signal_logic=lambda row: (
if row["close_NFLX"] > row["day_5_lookback_NFLX"] * 1.05
else -1 if row["close_NFLX"] < row["day_5_lookback_NFLX"] * 0.95 else 0
data = strategy.generate_signals(data)
backtester = Backtester()
backtester.backtest(data)
backtester.calculate_performance()
strategy = Strategy(
indicators={
"day_5_lookback_NFLX": lambda row: row["close_NFLX"].shift(5),
"day_5_lookback_ROKU": lambda row: row["close"].shift(5),
},
signal_logic=lambda row: (
1
if row["close_NFLX"] > row["day_5_lookback_NFLX"] * 1.05
else -1 if row["close_NFLX"] < row["day_5_lookback_NFLX"] * 0.95 else 0
),
)
data = strategy.generate_signals(data)
backtester = Backtester()
backtester.backtest(data)
backtester.calculate_performance()
strategy = Strategy(
indicators={
"day_5_lookback_NFLX": lambda row: row["close_NFLX"].shift(5),
"day_5_lookback_ROKU": lambda row: row["close"].shift(5),
},
signal_logic=lambda row: (
1
if row["close_NFLX"] > row["day_5_lookback_NFLX"] * 1.05
else -1 if row["close_NFLX"] < row["day_5_lookback_NFLX"] * 0.95 else 0
),
)
data = strategy.generate_signals(data)
backtester = Backtester()
backtester.backtest(data)
backtester.calculate_performance()
最终投资组合价值:14387.50
总回报:43.88%
年化回报:34.80%
年化波动率:55.77%
夏普比率:0.62
索提诺比率:0.74
最大回撤:-39.86%