量化交易框架开发实践(二)

2023-11-15

        我们通过分析代码可以看出,PyAlgoTrade分为六个组件:Strategies、Feeds、Brokers、DataSeries、Technicals、Optimizer。从业务流上看也是比较容易理解的:Feed(数据源)->DataSeries(数据结构化)->Technicals(指标计算)->Strategy(策略构建)->Optimizer(策略回测)->Broker(模拟/实盘交易);

        PS.这套流程是否也可以用于实现投顾组合工具的系统搭建呢?我认为也是可以的。虽然在财富管理行业,对于客户而言可操作交易的资产是基金,而不是股票。但PyAlgoTrade从数据结构设计上就支持兼容不同的数据类型。下面,我们逐一分析下这六大组件:

Strategies:主要是定义交易逻辑,生成交易信号。什么时候买,什么时候卖

Feeds:这是一个抽象的概念。抽象出了数据获取的来源。比如csvfeed表示从csv文件获取行情数据,然后投喂给Strategies。Feeds不仅仅可以支持Bar数据(指一定时间周期内的开盘价、收盘价、最高价、最低价等)还有Tick数据(指每一次交易的价格、数量、时间等)。同时也支持自定义各类数据来源。一个强大的量化交易框架,势必需要支持各类数据来源以扩展策略的丰富度;比如如下的数据类型都可以通过继承feed数据结构来支持:

1. 新闻事件:某支股票相关的新闻发布信息,可以用于事件驱动型交易策略。

2. 财务估值:上市公司的财报信息,用于基本面量化策略。

 3. 宏观经济:GDP、CPI等经济指标消息,用于宏观量化策略。

4. 社交媒体:微博、微信等平台上对某行业的热度与舆情数据,用于市场情绪分析

Brokers:定义经纪商交易接口;比如可以对接各家券商提供的交易接口、Binance的数字货币交易接口、IB的期货期权交易接口等。对于一些交易平台的限制进行backtesting模拟测试也是在这里。

DataSeries:这也是个抽象概念;主要是基于时间维度/时区用于管理价格数据;因为量化分析常常需要处理不同周期(日线、周线、月线)下的不同价格序列(开盘、收盘、均价等)。从功能上看,pandas的dataSeries比pyalgotrade的dataseries更为强大。我们完全可以用pandas来替换这部分的功能实现,并且支持的数据范围和操作也更丰富。

Technicals:技术指标往往是策略制定和评估的一部分。pyalgotrade主要基于DataSeries上用于计算各种技术指标;在设计上technicals也被设计成了dataseries的装饰器。说白了就是定义计算口径,基于价格、成交量、估值、财务、趋势等数据中捕捉到关键信息,从而判断当前的市场情况。比如SMA、EMA、MACD、RSI等指标计算。装饰器的意思就是在不改变dataseries中的原有序列数据的情况下,计算相关的技术指标;

Optimizer:有点类似深度学习中的Optimizer(主要是通过BP算法来对网络模型进行参数调优),pyalgotrade的Optimizer主要是用于在多台机器上进行backtesting任务的分发,目标就是实现并行计算,缩短策略回测所需要的时间。它主要由server和worker两部分组成。server主要负责提供bars、parameters、以及收集每个worker在跑策略时产生的结果。worker主要负责基于bars、parameters来运行策略;相当于构建实现了一个简单的分布式计算。

        简答解释下,在制定交易策略时,往往需要多种技术指标的组合。比如RSI2策略是一种简单的均值回归交易策略:当股票的RSI2跌破10时,那么被认为是超卖,应该生成买入信号。当股票的RSI2涨破90时,被认为是超买,应该生成卖出信号。为了实现这个策略,就需要先计算指标:

1,设置入场移动平均线(150到250之间)和出场移动平均线(5到15之间)

2,设置超买阈值(75到95之间)和超卖阈值(5到25之间)。

        而范围内的数字组合是40w,如果只有一台机器遍历所有的可能性势必是比较慢的,但是如果拆分成多个任务下发到多台机器中,那么策略回测的时间就能大大的缩短。

        对策略进行回测的代码如下:

#生成策略所需要的技术指标 
def parameters_generator():
    instrument = ["0000001.XSHE"]
    entrySMA = range(150, 251)
    exitSMA = range(5, 16)
    rsiPeriod = range(2, 11)
    overBoughtThreshold = range(75, 96)
    overSoldThreshold = range(5, 26)
    return itertools.product(instrument, entrySMA, exitSMA, rsiPeriod, overBoughtThreshold, overSoldThreshold)

#构建csvfeeds,同parameters一同提供给server

feed = quandlfeed.Feed()
feed.addBarsFromCSV(instrument, data_dir)

# 启动5000端口,运行server
server.serve(feed, parameters_generator(), "localhost", 5000)

#启动多个worker
worker.run(rsi2.RSI2, "localhost", 5000, workerName="localworker")

        然后就可以在server端看到RSI2策略跑出来的最优参数组合以及最优结果了。       

        策略构建是一个永远都在迭代的话题,私募二级/公募的产品之所以这么多,也是因为在策略构建模块上各家机构都存在不同的差异。

        PyAlgotrade是基于事件驱动的框架,在事件回调的设计上实现的很优雅。我们继续深入学习一下。其实,所有的事件驱动机制基本上都遵循一个套路模式,就是subject/dispatcher/observer模式。简单来说,subject发布事件,dispatcher收集事件然后分发,observer对事件再进行处理。我们在设计App端架构时也常常会使用这个模式,比如NetParser模块收到API的返回结果,解析完成就可以分发该消息到controller层,然后controller层根据业务逻辑处理数据model后,最后更新view呈现到界面。

        我们先看看这个Event的数据结构和处理方式,它既支持发布事件,也支持处理事件:

1. 用__handlers列表维护所有的事件处理器(handler就是表示订阅者,相当于回调函数指针)。

2. 提供subscribe()方法用于订阅事件,将事件处理器添加到__handlers列表。

3. 提供unsubscribe()方法用于取消订阅,从__handlers列表移除事件处理器。

4. 提供emit()方法发布事件,会调用__handlers列表中的所有事件处理器,传入事件参数。

5. 当事件触发时(__emitting > 0),subscribe()与unsubscribe()方法的调用会被暂存到__deferred列表。等 emit()调用结束后会统一应用__deferred列表中的订阅与取消订阅操作。这个设计避免了在发布事件过程中发生操作__handlers列表的情况(很优秀的实现,避免了多线程需要加锁的设计)

6. 使用__applyChanges()方法来统一应用__deferred列表中的操作。

class Event(object):
    def __init__(self):
        self.__handlers = []
        self.__deferred = []
        self.__emitting = 0
    def __subscribeImpl(self, handler):
        assert not self.__emitting
        if handler not in self.__handlers:
            self.__handlers.append(handler)
    def __unsubscribeImpl(self, handler):
        assert not self.__emitting
        self.__handlers.remove(handler)
    def __applyChanges(self):
        assert not self.__emitting
        for action, param in self.__deferred:
            action(param)
        self.__deferred = []
    def subscribe(self, handler):
        if self.__emitting:
            self.__deferred.append((self.__subscribeImpl, handler))
        elif handler not in self.__handlers:
            self.__subscribeImpl(handler)
    def unsubscribe(self, handler):
        if self.__emitting:
            self.__deferred.append((self.__unsubscribeImpl, handler))
        else:
            self.__unsubscribeImpl(handler)
    def emit(self, *args, **kwargs):
        try:
            self.__emitting += 1
            for handler in self.__handlers:
                handler(*args, **kwargs)
        finally:
            self.__emitting -= 1
            if not self.__emitting:
                self.__applyChanges()

        假设我们定义了一个策略,这个策略需要订阅处理多个事件。那么我们就可以声明一个dispatcher来实现事件驱动的循环。我们看看Dispatcher的代码实现,它定义了一个subject列表用于管理事件生产者,并且通过addSubject支持将Subject加入到Dispatcher,并根据调度优先级插入到相应位置。比如交易下单成功/失败事件的优先级更高。最后在run()方法内循环分发Subjects产生的事件。

        从代码上看,这个Dispatcher为策略提供了多个事件来源的管理与同步功能。策略只需添加各个Subject,然后启动Dispatcher,即可在一个线程内接收来自所有Subject的事件。


# This class is responsible for dispatching events from multiple subjects, synchronizing them if necessary.
class Dispatcher(object):
    def __init__(self):
        self.__subjects = []
        self.__stop = False
        self.__startEvent = observer.Event()
        self.__idleEvent = observer.Event()
        self.__currDateTime = None

    # Returns the current event datetime. It may be None for events from realtime subjects.
    def getCurrentDateTime(self):
        return self.__currDateTime

    def getStartEvent(self):
        return self.__startEvent

    def getIdleEvent(self):
        return self.__idleEvent

    def stop(self):
        self.__stop = True

    def getSubjects(self):
        return self.__subjects

    def addSubject(self, subject):
        # Skip the subject if it was already added.
        if subject in self.__subjects:
            return

        # If the subject has no specific dispatch priority put it right at the end.
        if subject.getDispatchPriority() is dispatchprio.LAST:
            self.__subjects.append(subject)
        else:
            # Find the position according to the subject's priority.
            pos = 0
            for s in self.__subjects:
                if s.getDispatchPriority() is dispatchprio.LAST or subject.getDispatchPriority() < s.getDispatchPriority():
                    break
                pos += 1
            self.__subjects.insert(pos, subject)

        subject.onDispatcherRegistered(self)

    # Return True if events were dispatched.
    def __dispatchSubject(self, subject, currEventDateTime):
        ret = False
        # Dispatch if the datetime is currEventDateTime of if its a realtime subject.
        if not subject.eof() and subject.peekDateTime() in (None, currEventDateTime):
            ret = subject.dispatch() is True
        return ret

    # Returns a tuple with booleans
    # 1: True if all subjects hit eof
    # 2: True if at least one subject dispatched events.
    def __dispatch(self):
        smallestDateTime = None
        eof = True
        eventsDispatched = False

        # Scan for the lowest datetime.
        for subject in self.__subjects:
            if not subject.eof():
                eof = False
                smallestDateTime = utils.safe_min(smallestDateTime, subject.peekDateTime())

        # Dispatch realtime subjects and those subjects with the lowest datetime.
        if not eof:
            self.__currDateTime = smallestDateTime

            for subject in self.__subjects:
                if self.__dispatchSubject(subject, smallestDateTime):
                    eventsDispatched = True
        return eof, eventsDispatched

    def run(self):
        try:
            for subject in self.__subjects:
                subject.start()

            self.__startEvent.emit()

            while not self.__stop:
                eof, eventsDispatched = self.__dispatch()
                if eof:
                    self.__stop = True
                elif not eventsDispatched:
                    self.__idleEvent.emit()
        finally:
            # There are no more events.
            self.__currDateTime = None

            for subject in self.__subjects:
                subject.stop()
            for subject in self.__subjects:
                subject.join()

        在EventProfiler这个文件中,PyAlgoTrade实现了四个类,分别是Event、Profiler、Result、Predicate,这个主要是用来评估事件对策略的影响。

        Predicate类是一个事件判断类,用于判断事件是否发生。Event类是一个事件类,用于存储事件的信息,包括事件发生前后的时间长度,以及事件发生前后的收益率。Profiler类是一个事件分析类,用于分析事件发生前后的收益率,以及事件发生前后的收益率的累积收益率。Results类是一个事件分析结果类,用于存储事件分析的结果,包括事件发生前后的收益率,以及事件发生前后的收益率的累积收益率。

        Event类实现的也很巧妙:

1. 在初始化时,Event接受lookBack与lookForward两个参数,代表向前与向后获取数据的范围。

2. 创建一个大小为lookBack + lookForward + 1的数组来保存数据。初始时设为NaN(空值)。

3. 提供setValue方法设置指定时刻t的value值。将值保存至数组中正确的位置。

4. 提供getValue方法获取指定时刻t的值。从数组中取出正确位置的值。

        举个例子,如果我们希望评估某个交易事件对收益率带来影响,或者基于某个事件的发生需要对策略进行调整。那么我们可以设置lookBack为5天,lookForward为3天。然后可以调用setValue方法设置过去5天与未来3天内该事件每天的值。之后可以调用getValues获得过去5天与未来3天这段时间内该事件的所有值进行分析,判断趋势等。

class Event(object):
    def __init__(self, lookBack, lookForward):
        assert(lookBack > 0)
        assert(lookForward > 0)
        self.__lookBack = lookBack
        self.__lookForward = lookForward
        self.__values = np.empty((lookBack + lookForward + 1))
        self.__values[:] = np.NAN

    def __mapPos(self, t):
        assert(t >= -1*self.__lookBack and t <= self.__lookForward)
        return t + self.__lookBack

    def isComplete(self):
        return not any(np.isnan(self.__values))

    def getLookBack(self):
        return self.__lookBack

    def getLookForward(self):
        return self.__lookForward

    def setValue(self, t, value):
        if value is not None:
            pos = self.__mapPos(t)
            self.__values[pos] = value

    def getValue(self, t):
        pos = self.__mapPos(t)
        return self.__values[pos]

    def getValues(self):
        return self.__values

        相比于我们如果要搭建一个基金组合的量化系统,pyAlgoTrade提供的框架基本已经足够了。我看了看火富牛提供的智能投研平台,因为不需要和股票一样有实时行情的数据,再加上策略定制、效果回测都更侧重于长期表现,所以系统的实现难度也会大大降低。尤其是对私募基金而言,拿到的披露数据更少。所以在技术指标这块基本都是固定的。指标主要分两块:基金的业绩和风险评估指标(滚动收益率、区间胜率、年化波动率、夏普比率、最大回撤和回补期等等)、以及基于客户持仓计算组合净值曲线(考虑到加仓、减仓、红利再投等场景,虽然听起来很复杂,但梳理后其实还好,主要难点还是在于对净值缺失,净值频率不一致的情况处理)。这一块我就不赘述了。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

量化交易框架开发实践(二) 的相关文章

随机推荐

  • 深度学习Pytorch(十)——基于torchvision的目标检测模型

    深度学习Pytorch 十 基于torchvision的目标检测模型 文章目录 深度学习Pytorch 十 基于torchvision的目标检测模型 一 定义数据集 二 为PennFudan编写自定义数据集 1 下载数据集 2 为数据集编写
  • C++ 中基础的几种变量作用域,类作用域(C++复习向p5)

    文章目录 三种变量 变量作用域 初始化变量 类作用域 三种变量 局部变量 函数 代码块中的变量 形式参数 函数参数中定义的变量 在函数体中有效 全局变量 所有函数外部声明的变量 变量作用域 局部作用域 局部变量在函数执行完后销毁 全局作用域
  • 最全面的Socket使用解析

    前言 Socket的使用在Android的网络编程中非常重要 今天我将带大家全面了解Socket及其使用方法 目录 1 网络基础 1 1 计算机网络分层 计算机网络分为五层 物理层 数据链路层 网络层 运输层 应用层 其中 网络层 负责根据
  • 一次内网 Harbor 镜像仓库导出迁移过程记录

    1 整体思路 Harbor 提供有丰富的 API 接口 可以获取所有项目信息 镜像和标签等信息 通过编写 shell 脚本循环处理即可实现批量导出镜像包的需求 登陆 Harbor 后 左下角有 API 控制中心按钮 进入可以查看和调试 2
  • centos 安装配置l2tp实现***

    centos 安装配置l2tp实现 1 前言 L2TP是一种工业标准的Internet隧道协议 功能大致和PPTP协议类似 比如同样可以对网络数据流进行加密 不过也有不同之处 比如PPTP要求网络为IP网络 L2TP要求面向数据包的点对点连
  • OSI七层模型---数据链路层(以太网帧、MAC地址、MTU、MSS、ARP协议)

    我们首先来了解一下物理层的作用 物理层的主要目的是实现比特流的透明传输 为数据链路层提供服务 物理层接口解决了用几根线 多大电压 每根线什么功能 以及几根线之间是怎么协调的问题 物理层介质解决了数据载体材质以及价格优缺点的问题 通信技术解决
  • 01_I.MX6U芯片简介

    目录 I MX6芯片简介 Corterx A7架构简介 Cortex A处理器运行模型 Cortex A 寄存器组 IMX6U IO表示形式 I MX6芯片简介 ARM Cortex A7内核可达900 MHz 128 KB L2缓存 并行
  • 李宏毅 机器学习 2016 秋:6、Classification: Logistic Regression

    文章目录 六 Classification Logistic Regression 六 Classification Logistic Regression 我们来讲 Logistic Regression 我们在上一份投影片里面 我们都已
  • 点云Las格式分析及python实现

    目录 一 Las格式分析 1 公共头 2 变长记录 3 参考文献 二 安装laspy 2 0 2 三 代码实现 一 Las格式分析 1 公共头 公共头用来记录数据集的基本信息 如Li DAR点总数 数据范围 Li DAR点格式 变长记录总数
  • 在switch语句中使用字符串以及实现原理

    对于Java语言来说 在Java 7之前 switch语句中的条件表达式的类型只能是与整数类型兼容的类型 包括基本类型char byte short和int 与这些基本类型对应的封装类Character Byte Short和Integer
  • Go单体服务开发最佳实践

    单体最佳实践的由来 对于很多初创公司来说 业务的早期我们更应该关注于业务价值的交付 并且此时用户体量也很小 QPS 也非常低 我们应该使用更简单的技术架构来加速业务价值的交付 此时单体的优势就体现出来了 正如我直播分享时经常提到 我们在使用
  • 什么是等保合规

    近年来 随着国家对网络安全的重视 我国对网络安全的监管要求也越来越高 各互联网企业都在积极落实网络安全等级保护 关键信息基础设施安全保护制度 为了保护网络安全 企业也在按照 网络安全法 及 等保2 0 系列标准要求 积极寻求等级保护测评 整
  • C语言进阶:C陷阱与缺陷(读书笔记总)

    大家不要只收藏不关注呀 哪怕只是点个赞也可以呀 粉丝私信发邮箱 免费发你PDF 最近读了一本C语言书 C陷阱与缺陷 还不错 挺适合刚刚工作后的人 特此分享读书笔记 写代码时应注意这些问题 笔记已做精简 读完大概需要30min 如果读起来感觉
  • 广义线性模型(GLM)

    在线性回归中 y丨x N 2 在逻辑回归中 y丨x Bernoulli 这两个都是GLM中的特殊的cases 我们首先引入一个指数族 the exponential family 的概念 如果一个分布能写成下列形式 那么我们说这个分布属于指
  • Bert机器问答模型QA(阅读理解)

    Github参考代码 https github com edmondchensj ChineseQA with BERT https zhuanlan zhihu com p 333682032 数据集来源于DuReader Dataset
  • Unity基础3——Resources资源动态加载

    一 特殊文件夹 一 工程路径获取 注意 该方式 获取到的路径 一般情况下 只在 编辑模式下使用 我们不会在实际发布游戏后 还使用该路径 游戏发布过后 该路径就不存在了 print Application dataPath 二 Resourc
  • C++ vector find()使用? ( if!=vec.end())

    std vector find是C STL中的一个函数 它可以用来在std vector中查找给定的元素 如果找到了这个元素 它将返回一个迭代器指向该元素 否则将返回一个名为end 的迭代器 下面是一个使用find的示例代码 include
  • C++11 条件变量(condition_variable) 使用详解

    官网 一 总述 在C 11中 我们可以使用条件变量 condition variable 实现多个线程间的同步操作 当条件不满足时 相关线程被一直阻塞 直到某种条件出现 这些线程才会被唤醒 主要成员函数如下 二 具体函数 1 wait函数
  • 泰勒阵列天线综合与matlab,阵列天线综合之切比雪夫低副瓣阵列设计Matlab

    在 自适应天线与相控阵 这门课中 我了解到了关于理想低副瓣阵列设计的一些方法 其中切比雪夫等副瓣阵列设计方法是一种基础的方法 故将其设计流程写成maltab程序供以后学习使用 在此分享一下 此方法全称为道尔夫 切比雪夫综合法 简称为切比雪夫
  • 量化交易框架开发实践(二)

    我们通过分析代码可以看出 PyAlgoTrade分为六个组件 Strategies Feeds Brokers DataSeries Technicals Optimizer 从业务流上看也是比较容易理解的 Feed 数据源 gt Data