量化交易平台指支持通过对数据进行多维度的定量分析,结合发现的特征定制策略,并能够基于历史数据对策略进行回测,最后支持实盘买卖的交易平台。
从业务流上看,量化交易可以分解成:行情获取->数据清洗->指标计算->策略开发->策略回测->模拟/实盘交易;其中,指标计算(alpha,risk)+策略开发(算法+交易信号)可以被视为组合构建模块,而策略回测和实盘交易可以被视为执行模块。从框架上看,底层标的几乎可以是金融市场上所有产品,包括股票、期货、外汇、商品、数字货币等。当然,也可以应用到基金诊断、基金组合的构建和回测。
从业务价值上看,最核心的组件自然是策略开发。各家量化机构也都是从不同维度进行因子挖掘,从而捕获交易机会。结合当下火热的GPT4,令人向往的就是策略开发是否可能借助GPT的发展而带来变化。早在2020年,Facebook提出的Pluribus已经在6人德扑桌上击败了顶尖的人类选手,而Pluribus和以往的AI也都不一样,它不需要预先设定最优交易策略,不需要提前设置在哪些牌型就下注,基于pot size和stack size决定是否要raise等等(相当于量化双均线策略、动量策略等),而是根据牌局的变化动态生成下注的信号。但Pluribus并没有开源,所以对于策略生成(AIGS)无法窥之细节。
AIGC和AIGS是否能够被通用智能AGI统一目前还是很难讲的,因为在一些复杂游戏中,策略生成后,执行效果的评估和时间强相关,它其实是一个无限游戏。而无限游戏的目标并不是最大化/最小化某个指标,而是假设未来是未知的,所有人的胜负概率都一样。关键是赢的时候多赢一些,输的时候少输一些。好的策略就是选择合适的牌桌,设法让游戏一直继续,不要造成下牌桌的局面。从而在游戏过程中等待对手犯错获利。很明显,无限游戏不同于有限游戏,后者基于给定的目标总能找到某个逻辑进行分解。比如AutoGPT我用起来感觉就是一款优秀的基于特定目标求解的框架。
话说回来,目前的主流量化框架包括:zipline、QuantLib、Backtrader、Finance-Py、vnpy等;这些开源框架各有特点,主要的差异体现在高性能、功能丰富度、实盘交易支持度、社区活跃度方面。
功能丰富度:QuantLib和VNPY功能最为强大,学习难度最大;而Finance-Py和PyAlgoTrade最为简单,学习难度也是最小的
实盘交易:QuantLib和VNPY更侧重高频与量化基金实盘;而PyAlgoTrade和Finance-Py侧重学习与小额实盘;
社区活跃度:VNPY和QuantLib社区最为活跃,更新速度较好,而Pyalgotrade已经不更新了。
高性能:C++编写的QuantLib、python编写的VNPY在性能方面也很优秀;
尤其是在支持的交易品种方面,vnpy已经支持了市面上几乎所有金融产品的交易,包括期货、股票、期权、外汇、数字货币;
所以综上考虑,个人小规模实践完全可以从PyAlgoTrade入手,再选择VNPY进行深入。
我选择了聚宽平台作为行情获取来源,注册账号后,pip install qdatasdk就可以开始使用。jqdata的api做了速率的限制,循环调用数据获取接口会抛异常。这里,我使用backoff模块控制api调用频率。下载好数据就通过pandas写入本地文件夹,通过证券代码进行索引,以建立一个本地股票数据池。
#获取单个股票的行情数据
@on_exception(expo, Exception, max_tries=5)
def get_stock_data(code, start_date, end_date, source='local'):
if source == 'jqdata':
df = jq.get_price(code, start_date=start_date, end_date=end_date, frequency='daily', fields=None, skip_paused=False, fq='pre', count=None)
# 如果df不为空,则将index转换为datetime类型
if not df.empty:
df.index = pd.to_datetime(df.index)
return df
#指定第1列作为index
df = pd.read_csv('data/stock_data/' + code + '.csv',index_col=0)
if not df.empty:
df.index = pd.to_datetime(df.index)
df = df[start_date:end_date]
else:
print('error in get_stock_data: ', code, ' is empty')
return df
股票数据池可以分成三个部分:行情、基本面、估值;分别调用get_price、get_fundamentals、get_financial_data接口;我们通常也会通过市场指数来初始化股票池,这就要用到get_all_securities,get_index_stocks,get_index_weights三个数据接口;
#获取股票的估值数据pe_ration, pb_ration, ps_ration, pcf_ration等
#codes支持包含多只股票,比如['000001.XSHE', '600000.XSHG']
@on_exception(expo, Exception, max_tries=5)
def get_valuation_data(codes, start_date, end_date, source='jqdata'):
df = pd.DataFrame()
#优先从本地获取数据
#读取valuation_data目录下的valuation_data.csv文件
if os.path.exists('data/valuation_data/valuation_data.csv'):
df = pd.read_csv('data/valuation_data/valuation_data.csv', index_col=0, parse_dates=True)
#如果df包括了codes中的所有股票,那么就直接返回
if set(codes).issubset(set(df['code'])):
return df
if source == 'jqdata':
#获取估值数据
#获取start_date和end_date之间的最每个月估值数据
#比如start_date是2018-01-01,end_date是2020-01-01,那么就获取2018-01-01,2018-02-01...2019-12-01,2020-01-01的估值数据
statDate = []
#start_date是个string类型,需要解析出year
start_year = int(start_date.split('-')[0])
end_year = int(end_date.split('-')[0])
end_month = int(end_date.split('-')[1])
for year in range(start_year, end_year+1):
#explain:下面这句话的意思是,如果year不等于end_year,那么就取13个月,如果year等于end_year,那么就取end_month个月
final_month = 13 if year != end_year else end_month + 1
for month in range(1, final_month):
statDate.append(str(year) + '-' + str(month).zfill(2) + '-01')
#遍历statDate,获取每个月的估值数据
for date in statDate:
df1 = jq.get_fundamentals(jq.query(jq.valuation,jq.income).filter(jq.valuation.code.in_(codes)), date=date)
if not df1.empty:
df = df.append(df1)
#以添加数据的方式保存到本地
if not df.empty:
if os.path.exists('data/valuation_data/valuation_data.csv'):
df.to_csv('data/valuation_data/valuation_data.csv', mode='a', header=None)
else:
df.to_csv('data/valuation_data/valuation_data.csv')
return df
数据池建立好之后,我们可以开始进行一些指标的计算,包括最大回撤、股票涨跌幅、单次收益率、累计收益率等
#计算股价涨跌幅
def calculate_change_pct(df):
#计算股票涨跌幅
df['pct_change'] = df['close'].pct_change()
#nan填充为0
df['pct_change'] = df['pct_change'].fillna(0)
print(df.head(10))
return df
#过滤交易信号
def filter_trading_signal(df):
#过滤出buy和sell信号不为0的数据
temp_df = df[(df['buy']!=0) | (df['sell']!=0)]
#如果有连续buy的信号,则只保留第一个buy信号,其余的buy信号过滤掉
#如果有连续sell的信号,则只保留第一个sell信号,其余的sell信号过滤掉
temp_df['buy'] = temp_df['buy'].replace(1,0)
temp_df['sell'] = temp_df['sell'].replace(-1,0)
return temp_df
#计算股票单次交易的收益率,平仓时股价市值-开仓时股价市值 / 开仓时股价市值
# 开仓时bug=1,平仓时sell=-1
def calculate_single_trade_return(df):
#计算单次交易的收益率,单次交易的收益率 = (卖出时股价市值 - 最近一次的买入时股价市值) / 最近一次买入时的股价市值
#不能用pct_change,因为pct_change表示的是当前股价和前一天股价的涨跌幅,而不是买入时的股价和卖出时的股价的涨跌幅
#当遇到sell时,应该找上一次buy的股价市值,df['sell'] = -1时,找到上一次df['buy'] = 1的股价市值
#df['sell'] = -1时,找到上一次df['buy'] = 1的股价市值
temp_df = filter_trading_signal(df)
temp_df['single_trade_return'] = (temp_df['close'] - temp_df['close'].shift(1)) / temp_df['close'].shift(1)
#temp_df合并给df,如果temp_df中没有对应的数据,则用0填充
df = df.merge(temp_df[['single_trade_return']],how='left',left_index=True,right_index=True)
df['single_trade_return'] = df['single_trade_return'].fillna(0)
#通过iplot绘制收益率曲线,横坐标是时间,纵坐标是收益率
df['single_trade_return'].iplot(kind='line',title='single trade return')
return df
#计算累计收益率
def calcualte_cum_trade_return(df):
#计算累计收益率,需要-1吗?因为是累计收益率,不是累计收益
df['cum_trade_return'] = (df['single_trade_return'] + 1).cumprod() - 1
#通过iplot绘制累计收益率曲线,横坐标是时间,纵坐标是收益率
df['cum_trade_return'].iplot(kind='line',title='cum trade return')
return df
#计算股票的最大回撤:windows表示天数,在windows天内的最大回撤 = (windows天内的最大值 - windows天内的最小值) / windows天内的最大值
def calculate_max_drawdown(df,window=7):
#计算最大回撤,raw=False表示传入的是一个series,而不是一个dataframe。rolling(window)表示计算window天内的最大回撤,返回的是一个series
df['max_drawdown'] = df['close'].rolling(window).apply(lambda x:(x.max() - x.min()) / x.max(),raw=False)
#通过iplot绘制最大回撤曲线,横坐标是时间,纵坐标是最大回撤
df['max_drawdown'].iplot(kind='line',title='max drawdown')
return df
然后,基于PyAlgoTrade框架进行数据对接;PyAlgoTrade内置了对quandl平台(类似聚宽)的支持,我们主要是对中国市场进行实践,所以并不能调用quandl的下载数据接口,使用build_feed进行构建feed即可。
我们定义一个简单的策略进行测试,策略分为两部分:
1,如果收盘价高于SMA(10),那么就生成买入信号
2,如果已经购买了该股票,而收盘价低于SMA(10),那么就生成卖出信号
#定义策略,策略分为两部分
#1,如果收盘价高于SMA(15),那么就生成买入信号
#2,如果已经购买了该股票,而收盘价低于SMA(15),那么就生成卖出信号
class MyStrategy(strategy.BacktestingStrategy):
def __init__(self, feed, instrument):
super(MyStrategy, self).__init__(feed, 1000)#10000是初始资金
#保存股票代码
self.__instrument = instrument
#定义SMA指标
self.__sma = ma.SMA(feed[instrument].getCloseDataSeries(), 10)
#保存上一次的订单
self.__position = None
#获取股票代码
def getInstrument(self):
return self.__instrument
#获取SMA指标
def getSMA(self):
return self.__sma
#监听买入和 卖出事件
def onEnterOk(self, position):
execInfo = position.getEntryOrder().getExecutionInfo()
self.info("BUY at $%.2f" % (execInfo.getPrice()))
def onEnterCanceled(self, position):
self.__position = None
def onExitOk(self, position):
execInfo = position.getExitOrder().getExecutionInfo()
self.info("SELL at $%.2f" % (execInfo.getPrice()))
self.__position = None
def onExitCanceled(self, position):
# If the exit was canceled, re-submit it.
self.__position.exitMarket()
#策略执行函数
def onBars(self, bars):
#如果SMA指标没有准备好,那么就什么都不做
if self.__sma[-1] is None:
return
#获取当前的bar
bar = bars[self.__instrument]
#如果没有持有股票,而且收盘价高于SMA(15),那么就生成买入信号
if self.__position is None:
if bar.getPrice() > self.__sma[-1]:
#计算可以购买的股票数量
#购买股票
self.__position = self.enterLong(self.__instrument, 10, True)
#如果已经购买了该股票,而收盘价低于SMA(15),那么就生成卖出信号;exitActive表示是否已经有订单
elif bar.getPrice() < self.__sma[-1] and not self.__position.exitActive():
#卖出
self.__position.exitMarket()
# print("sell shares: at price of ",bar.getPrice())
# #获取数据
instruments = ['000001.XSHE']
feeds = quandlfeed.Feed()
feeds.addBarsFromCSV("000001.XSHE", "./data/stock_data/000001.XSHE.csv")
#创建策略
myStrategy = MyStrategy(feeds, "000001.XSHE")
#创建回测引擎
plt = plotter.StrategyPlotter(myStrategy)
#设置绘图参数
plt.getInstrumentSubplot("000001.XSHE").addDataSeries("SMA", myStrategy.getSMA())
# 绘制每次交易的收益率
returnsAnalyzer = pyalgotrade.stratanalyzer.returns.Returns()
myStrategy.attachAnalyzer(returnsAnalyzer)
plt.getOrCreateSubplot("returns").addDataSeries("Simple returns", returnsAnalyzer.getReturns())
#运行策略,
myStrategy.setDebugMode(True)
myStrategy.run()
#绘图
plt.plot()
# plt.show()
#打印最终结果
print("Final portfolio value: $%.2f" % myStrategy.getResult())
有了一个跑通的基础框架之后,我们可以逐步分解研究下PyAlgoTrade:
PyAlgoTrade框架主要包含六个部分,最重要的是strategies和feeds:
策略:Strategies;
回测数据:Feeds;
交易经纪人:Brokers;
时间序列数据:DataSeries;
技术分析:Technicals
优化器:Optimizer
从前面的代码我们可以看到,PyAlgoTrade当前支持从csv文件中获取数据,形成feeds;我们也可以对不同的金融产品来源构建不同的feed,比如从mysql、sqlite中读取,或者读取比特币数据。csvfeed从csv文件中读取数据后,就会开始解析Date,Open,High,Low,Close,Volume,Adj Close从而生成。(PS:下载聚宽数据后,需要对colum进行设置,否则会报数据格式出错的问题。)
接下来,我们研究下PyAlgoTrade的事件驱动机制,从而实现状态一致性;它和MetaTrader 4的设计理念是一致的,MQL也是采用事件回调来计算指标或者进行EA交易。