我们通过分析代码可以看出,PyAlgoTrade分为六个组件:Strategies、Feeds、Brokers、DataSeries、Technicals、Optimizer。从业务流上看也是比较容易理解的:Feed(数据源)->DataSeries(数据结构化)->Technicals(指标计算)->Strategy(策略构建)->Optimizer(策略回测)->Broker(模拟/实盘交易);
PS.这套流程是否也可以用于实现投顾组合工具的系统搭建呢?我认为也是可以的。虽然在财富管理行业,对于客户而言可操作交易的资产是基金,而不是股票。但PyAlgoTrade从数据结构设计上就支持兼容不同的数据类型。下面,我们逐一分析下这六大组件:
Strategies:主要是定义交易逻辑,生成交易信号。什么时候买,什么时候卖
Feeds:这是一个抽象的概念。抽象出了数据获取的来源。比如csvfeed表示从csv文件获取行情数据,然后投喂给Strategies。Feeds不仅仅可以支持Bar数据(指一定时间周期内的开盘价、收盘价、最高价、最低价等)还有Tick数据(指每一次交易的价格、数量、时间等)。同时也支持自定义各类数据来源。一个强大的量化交易框架,势必需要支持各类数据来源以扩展策略的丰富度;比如如下的数据类型都可以通过继承feed数据结构来支持:
1. 新闻事件:某支股票相关的新闻发布信息,可以用于事件驱动型交易策略。
2. 财务估值:上市公司的财报信息,用于基本面量化策略。
3. 宏观经济:GDP、CPI等经济指标消息,用于宏观量化策略。
4. 社交媒体:微博、微信等平台上对某行业的热度与舆情数据,用于市场情绪分析
Brokers:定义经纪商交易接口;比如可以对接各家券商提供的交易接口、Binance的数字货币交易接口、IB的期货期权交易接口等。对于一些交易平台的限制进行backtesting模拟测试也是在这里。
DataSeries:这也是个抽象概念;主要是基于时间维度/时区用于管理价格数据;因为量化分析常常需要处理不同周期(日线、周线、月线)下的不同价格序列(开盘、收盘、均价等)。从功能上看,pandas的dataSeries比pyalgotrade的dataseries更为强大。我们完全可以用pandas来替换这部分的功能实现,并且支持的数据范围和操作也更丰富。
Technicals:技术指标往往是策略制定和评估的一部分。pyalgotrade主要基于DataSeries上用于计算各种技术指标;在设计上technicals也被设计成了dataseries的装饰器。说白了就是定义计算口径,基于价格、成交量、估值、财务、趋势等数据中捕捉到关键信息,从而判断当前的市场情况。比如SMA、EMA、MACD、RSI等指标计算。装饰器的意思就是在不改变dataseries中的原有序列数据的情况下,计算相关的技术指标;
Optimizer:有点类似深度学习中的Optimizer(主要是通过BP算法来对网络模型进行参数调优),pyalgotrade的Optimizer主要是用于在多台机器上进行backtesting任务的分发,目标就是实现并行计算,缩短策略回测所需要的时间。它主要由server和worker两部分组成。server主要负责提供bars、parameters、以及收集每个worker在跑策略时产生的结果。worker主要负责基于bars、parameters来运行策略;相当于构建实现了一个简单的分布式计算。
简答解释下,在制定交易策略时,往往需要多种技术指标的组合。比如RSI2策略是一种简单的均值回归交易策略:当股票的RSI2跌破10时,那么被认为是超卖,应该生成买入信号。当股票的RSI2涨破90时,被认为是超买,应该生成卖出信号。为了实现这个策略,就需要先计算指标:
1,设置入场移动平均线(150到250之间)和出场移动平均线(5到15之间)
2,设置超买阈值(75到95之间)和超卖阈值(5到25之间)。
而范围内的数字组合是40w,如果只有一台机器遍历所有的可能性势必是比较慢的,但是如果拆分成多个任务下发到多台机器中,那么策略回测的时间就能大大的缩短。
对策略进行回测的代码如下:
#生成策略所需要的技术指标
def parameters_generator():
instrument = ["0000001.XSHE"]
entrySMA = range(150, 251)
exitSMA = range(5, 16)
rsiPeriod = range(2, 11)
overBoughtThreshold = range(75, 96)
overSoldThreshold = range(5, 26)
return itertools.product(instrument, entrySMA, exitSMA, rsiPeriod, overBoughtThreshold, overSoldThreshold)
#构建csvfeeds,同parameters一同提供给server
feed = quandlfeed.Feed()
feed.addBarsFromCSV(instrument, data_dir)
# 启动5000端口,运行server
server.serve(feed, parameters_generator(), "localhost", 5000)
#启动多个worker
worker.run(rsi2.RSI2, "localhost", 5000, workerName="localworker")
然后就可以在server端看到RSI2策略跑出来的最优参数组合以及最优结果了。
策略构建是一个永远都在迭代的话题,私募二级/公募的产品之所以这么多,也是因为在策略构建模块上各家机构都存在不同的差异。
PyAlgotrade是基于事件驱动的框架,在事件回调的设计上实现的很优雅。我们继续深入学习一下。其实,所有的事件驱动机制基本上都遵循一个套路模式,就是subject/dispatcher/observer模式。简单来说,subject发布事件,dispatcher收集事件然后分发,observer对事件再进行处理。我们在设计App端架构时也常常会使用这个模式,比如NetParser模块收到API的返回结果,解析完成就可以分发该消息到controller层,然后controller层根据业务逻辑处理数据model后,最后更新view呈现到界面。
我们先看看这个Event的数据结构和处理方式,它既支持发布事件,也支持处理事件:
1. 用__handlers列表维护所有的事件处理器(handler就是表示订阅者,相当于回调函数指针)。
2. 提供subscribe()方法用于订阅事件,将事件处理器添加到__handlers列表。
3. 提供unsubscribe()方法用于取消订阅,从__handlers列表移除事件处理器。
4. 提供emit()方法发布事件,会调用__handlers列表中的所有事件处理器,传入事件参数。
5. 当事件触发时(__emitting > 0),subscribe()与unsubscribe()方法的调用会被暂存到__deferred列表。等 emit()调用结束后会统一应用__deferred列表中的订阅与取消订阅操作。这个设计避免了在发布事件过程中发生操作__handlers列表的情况(很优秀的实现,避免了多线程需要加锁的设计)
6. 使用__applyChanges()方法来统一应用__deferred列表中的操作。
class Event(object):
def __init__(self):
self.__handlers = []
self.__deferred = []
self.__emitting = 0
def __subscribeImpl(self, handler):
assert not self.__emitting
if handler not in self.__handlers:
self.__handlers.append(handler)
def __unsubscribeImpl(self, handler):
assert not self.__emitting
self.__handlers.remove(handler)
def __applyChanges(self):
assert not self.__emitting
for action, param in self.__deferred:
action(param)
self.__deferred = []
def subscribe(self, handler):
if self.__emitting:
self.__deferred.append((self.__subscribeImpl, handler))
elif handler not in self.__handlers:
self.__subscribeImpl(handler)
def unsubscribe(self, handler):
if self.__emitting:
self.__deferred.append((self.__unsubscribeImpl, handler))
else:
self.__unsubscribeImpl(handler)
def emit(self, *args, **kwargs):
try:
self.__emitting += 1
for handler in self.__handlers:
handler(*args, **kwargs)
finally:
self.__emitting -= 1
if not self.__emitting:
self.__applyChanges()
假设我们定义了一个策略,这个策略需要订阅处理多个事件。那么我们就可以声明一个dispatcher来实现事件驱动的循环。我们看看Dispatcher的代码实现,它定义了一个subject列表用于管理事件生产者,并且通过addSubject支持将Subject加入到Dispatcher,并根据调度优先级插入到相应位置。比如交易下单成功/失败事件的优先级更高。最后在run()方法内循环分发Subjects产生的事件。
从代码上看,这个Dispatcher为策略提供了多个事件来源的管理与同步功能。策略只需添加各个Subject,然后启动Dispatcher,即可在一个线程内接收来自所有Subject的事件。
# This class is responsible for dispatching events from multiple subjects, synchronizing them if necessary.
class Dispatcher(object):
def __init__(self):
self.__subjects = []
self.__stop = False
self.__startEvent = observer.Event()
self.__idleEvent = observer.Event()
self.__currDateTime = None
# Returns the current event datetime. It may be None for events from realtime subjects.
def getCurrentDateTime(self):
return self.__currDateTime
def getStartEvent(self):
return self.__startEvent
def getIdleEvent(self):
return self.__idleEvent
def stop(self):
self.__stop = True
def getSubjects(self):
return self.__subjects
def addSubject(self, subject):
# Skip the subject if it was already added.
if subject in self.__subjects:
return
# If the subject has no specific dispatch priority put it right at the end.
if subject.getDispatchPriority() is dispatchprio.LAST:
self.__subjects.append(subject)
else:
# Find the position according to the subject's priority.
pos = 0
for s in self.__subjects:
if s.getDispatchPriority() is dispatchprio.LAST or subject.getDispatchPriority() < s.getDispatchPriority():
break
pos += 1
self.__subjects.insert(pos, subject)
subject.onDispatcherRegistered(self)
# Return True if events were dispatched.
def __dispatchSubject(self, subject, currEventDateTime):
ret = False
# Dispatch if the datetime is currEventDateTime of if its a realtime subject.
if not subject.eof() and subject.peekDateTime() in (None, currEventDateTime):
ret = subject.dispatch() is True
return ret
# Returns a tuple with booleans
# 1: True if all subjects hit eof
# 2: True if at least one subject dispatched events.
def __dispatch(self):
smallestDateTime = None
eof = True
eventsDispatched = False
# Scan for the lowest datetime.
for subject in self.__subjects:
if not subject.eof():
eof = False
smallestDateTime = utils.safe_min(smallestDateTime, subject.peekDateTime())
# Dispatch realtime subjects and those subjects with the lowest datetime.
if not eof:
self.__currDateTime = smallestDateTime
for subject in self.__subjects:
if self.__dispatchSubject(subject, smallestDateTime):
eventsDispatched = True
return eof, eventsDispatched
def run(self):
try:
for subject in self.__subjects:
subject.start()
self.__startEvent.emit()
while not self.__stop:
eof, eventsDispatched = self.__dispatch()
if eof:
self.__stop = True
elif not eventsDispatched:
self.__idleEvent.emit()
finally:
# There are no more events.
self.__currDateTime = None
for subject in self.__subjects:
subject.stop()
for subject in self.__subjects:
subject.join()
在EventProfiler这个文件中,PyAlgoTrade实现了四个类,分别是Event、Profiler、Result、Predicate,这个主要是用来评估事件对策略的影响。
Predicate类是一个事件判断类,用于判断事件是否发生。Event类是一个事件类,用于存储事件的信息,包括事件发生前后的时间长度,以及事件发生前后的收益率。Profiler类是一个事件分析类,用于分析事件发生前后的收益率,以及事件发生前后的收益率的累积收益率。Results类是一个事件分析结果类,用于存储事件分析的结果,包括事件发生前后的收益率,以及事件发生前后的收益率的累积收益率。
Event类实现的也很巧妙:
1. 在初始化时,Event接受lookBack与lookForward两个参数,代表向前与向后获取数据的范围。
2. 创建一个大小为lookBack + lookForward + 1的数组来保存数据。初始时设为NaN(空值)。
3. 提供setValue方法设置指定时刻t的value值。将值保存至数组中正确的位置。
4. 提供getValue方法获取指定时刻t的值。从数组中取出正确位置的值。
举个例子,如果我们希望评估某个交易事件对收益率带来影响,或者基于某个事件的发生需要对策略进行调整。那么我们可以设置lookBack为5天,lookForward为3天。然后可以调用setValue方法设置过去5天与未来3天内该事件每天的值。之后可以调用getValues获得过去5天与未来3天这段时间内该事件的所有值进行分析,判断趋势等。
class Event(object):
def __init__(self, lookBack, lookForward):
assert(lookBack > 0)
assert(lookForward > 0)
self.__lookBack = lookBack
self.__lookForward = lookForward
self.__values = np.empty((lookBack + lookForward + 1))
self.__values[:] = np.NAN
def __mapPos(self, t):
assert(t >= -1*self.__lookBack and t <= self.__lookForward)
return t + self.__lookBack
def isComplete(self):
return not any(np.isnan(self.__values))
def getLookBack(self):
return self.__lookBack
def getLookForward(self):
return self.__lookForward
def setValue(self, t, value):
if value is not None:
pos = self.__mapPos(t)
self.__values[pos] = value
def getValue(self, t):
pos = self.__mapPos(t)
return self.__values[pos]
def getValues(self):
return self.__values
相比于我们如果要搭建一个基金组合的量化系统,pyAlgoTrade提供的框架基本已经足够了。我看了看火富牛提供的智能投研平台,因为不需要和股票一样有实时行情的数据,再加上策略定制、效果回测都更侧重于长期表现,所以系统的实现难度也会大大降低。尤其是对私募基金而言,拿到的披露数据更少。所以在技术指标这块基本都是固定的。指标主要分两块:基金的业绩和风险评估指标(滚动收益率、区间胜率、年化波动率、夏普比率、最大回撤和回补期等等)、以及基于客户持仓计算组合净值曲线(考虑到加仓、减仓、红利再投等场景,虽然听起来很复杂,但梳理后其实还好,主要难点还是在于对净值缺失,净值频率不一致的情况处理)。这一块我就不赘述了。