【序列比对】Needleman-Wunsch(全局)和Smith-Waterman(局部)算法py实现(多条回溯路径,三叉树思路,超详细注释)

2023-10-28

Needleman-Wunsch和Smith-Waterman算法py实现(多条回溯路径)

话不多说,直接上结果图,多条回溯路径。
NW结果

原理

在这里插入图片描述在这里插入图片描述

代码详解(以NW为例)

导入包以及参数设置

import numpy as np

sequence_1 = "AACGTACTCAAGTCT"
sequence_2 = "TCGTACTCTAACGAT"
match = 9
mismatch = -6
gap = -2

创建得分矩阵

  • 创建得分矩阵,行数为第一条序列长度加一,列数为第二条序列长度加二
  • 创建是否匹配的矩阵,这个矩阵的长宽就分别是两条序列的长度了。如果匹配了,对应的格子就是匹配的得分,反之就是不匹配的得分
  • 动态规划的思想算每个格子的得分,每个格子需要考虑其左、上、左上的值,也可以说是考虑序列一、序列二引入空缺或直接匹配的最大值
# 创建得分矩阵,行数为第一条序列长度加一,列数为第二条序列长度加二
Score = np.zeros((len(sequence_1) + 1, len(sequence_2) + 1))
# 创建是否匹配的矩阵,这个矩阵的长宽就分别是两条序列的长度了。如果匹配了,对应的格子就是匹配的得分,反之就是不匹配的得分
Match_or_not = np.zeros((len(sequence_1), len(sequence_2)))
for i in range(len(sequence_1)):
    for j in range(len(sequence_2)):
        if sequence_1[i] == sequence_2[j]:
            Match_or_not[i][j] = match
        else:
            Match_or_not[i][j] = mismatch

# 填得分矩阵
# 第一步:初始化第一行和第一列
for i in range(len(sequence_1) + 1):
    Score[i][0] = i * gap
for j in range(len(sequence_2) + 1):
    Score[0][j] = j * gap
# 第二步:动态规划的思想算每个格子的得分,每个格子需要考虑其左、上、左上的值,也可以说是考虑序列一、序列二引入空缺或直接匹配的最大值
for i in range(1, len(sequence_1) + 1):
    for j in range(1, len(sequence_2) + 1):
        Score[i][j] = max(Score[i - 1][j - 1] + Match_or_not[i - 1][j - 1],
                          Score[i - 1][j] + gap,
                          Score[i][j - 1] + gap)

看看得分矩阵长啥样吧
在这里插入图片描述
请添加图片描述
请添加图片描述

回溯

我们需要考虑的是可能会有多条回溯路径。全局比对的回溯是从右下角开始,左上角结束,其中可能会有分叉点。我们可以把右下角看成是一个树的根,矩阵中的每个值看成是一个节点。每个节点都可能会有三个子节点:左,上,对角线。分别对应了回溯的方向。而整个回溯的过程也就是遍历这颗三叉树的过程,严谨的说是从根节点遍历每个叶子节点的过程。

# 开始回溯
'''
我们需要考虑的是可能会有多条回溯路径。
全局比对的回溯是从右下角开始,左上角结束,其中可能会有分叉点。
我们可以把右下角看成是一个树的根,矩阵中的每个值看成是一个节点。
每个节点都可能会有三个子节点:左,上,对角线。分别对应了回溯的方向。
而整个回溯的过程也就是遍历这颗三叉树的过程,严谨的说是从根节点遍历每个叶子节点的过程。
'''


class Node:
    # 用类来建立三叉树节点,属性包括了行、列、得分、左子树、上子树、对角线子树
    def __init__(self, row=None, col=None, score=None):
        self.row = row
        self.col = col
        self.score = score
        self.left = None
        self.up = None
        self.diag = None


def isLeaf(self):
    # 判断是否是叶子节点
    return self.left is None and self.up is None and self.diag is None
    # 递归的函数查找从根节点到每个叶节点的路径


# 回溯路径的个数、回溯路径中的行号和列号
traceback_pathway_number = 0
traceback_pathway_row = [[]]
traceback_pathway_col = [[]]


def SaveRootToLeafPaths(Node, path_row, path_col):
    # 如果没有子树了
    if Node is None:
        return
    # 包含当前节点的路径
    path_row.append(Node.row)
    path_col.append(Node.col)
    global traceback_pathway_number
    global traceback_pathway_row
    global traceback_pathway_col
    # 如果找到叶节点,保存路径
    if isLeaf(Node):
        if traceback_pathway_number == 0:
            traceback_pathway_row[traceback_pathway_number] = list(path_row)
            traceback_pathway_col[traceback_pathway_number] = list(path_col)
        else:
            traceback_pathway_row += [list(path_row)]
            traceback_pathway_col += [list(path_col)]
        traceback_pathway_number += 1
    # 递归左、上、对角子树
    SaveRootToLeafPaths(Node.left, path_row, path_col)
    SaveRootToLeafPaths(Node.up, path_row, path_col)
    SaveRootToLeafPaths(Node.diag, path_row, path_col)
    # 回溯,出栈
    path_row.pop()
    path_col.pop()


# 建立三叉树,为 Score 矩阵里所有值都找到它的左、上、对角子树,用一个二位列表来存储节点
NodeTree = [[Node() for _ in range(len(sequence_2) + 1)] for _ in range(len(sequence_1) + 1)]
# 先把节点们的行号列号和得分记录下来
for i in range(len(sequence_1) + 1):
    for j in range(len(sequence_2) + 1):
        NodeTree[i][j].row = i
        NodeTree[i][j].col = j
        NodeTree[i][j].score = Score[i][j]
# 设置第一列和第一行的节点的上子树和左子树(其实也能在下面这个大循环里设置,但是这样可读性更高)
for i in range(1, len(sequence_1) + 1):
    NodeTree[i][0].up = NodeTree[i - 1][0]
for j in range(1, len(sequence_2) + 1):
    NodeTree[0][j].left = NodeTree[0][j - 1]
# 设置剩下的节点
for i in range(1, len(sequence_1) + 1):
    for j in range(1, len(sequence_2) + 1):
        if (Score[i][j] == Score[i - 1][j - 1] + Match_or_not[i - 1][j - 1]):
            NodeTree[i][j].diag = NodeTree[i - 1][j - 1]
        if (Score[i][j] == Score[i - 1][j] + gap):
            NodeTree[i][j].up = NodeTree[i - 1][j]
        if (Score[i][j] == Score[i][j - 1] + gap):
            NodeTree[i][j].left = NodeTree[i][j - 1]
# 遍历树并保存路径
SaveRootToLeafPaths(NodeTree[len(sequence_1)][len(sequence_2)], [], [])
# 改成numpy的ndarray类型,更加方便!
traceback_pathway_row = np.array(traceback_pathway_row)
traceback_pathway_col = np.array(traceback_pathway_col)
# 记录一下回溯时走不走左边或上边,如果走就记为1,不走就记为0
Go_left = traceback_pathway_col[:, range(traceback_pathway_col.shape[1] - 1)] - traceback_pathway_col[:, range(1,
                                                                                                               traceback_pathway_col.shape[
                                                                                                                   1])]
Go_up = traceback_pathway_row[:, range(traceback_pathway_row.shape[1] - 1)] - traceback_pathway_row[:,
                                                                              range(1, traceback_pathway_row.shape[1])]
# 用列表来存储序列一和序列二比对后的结果
seq1_align_set = []
seq2_align_set = []
print("总共有{}个比对结果".format(traceback_pathway_number))
for tpn in range(traceback_pathway_number):
    '''
    下面其实就是经典的nw回溯的代码了,这部分的原理可以参考nw算法回溯的伪代码。
    唯一不同的就是我们是多条回溯路径,所以有多少条路经就得循环多少次。
    值得一提的是,回溯过去的序列是逆序的,
    在python中字符串逆置十分方便,只需要合理利用切片,如:str[::-1]即可。
    '''
    seq1_align = ''
    seq2_align = ''
    i = len(sequence_1)
    j = len(sequence_2)
    k = 0
    while i > 0 or j > 0:
        if i > 0 and j > 0 and Go_left[tpn][k] and Go_up[tpn][k]:
            seq1_align += sequence_1[i - 1]
            seq2_align += sequence_2[j - 1]
            i -= 1
            j -= 1
        elif i > 0 and not (Go_left[tpn][k]) and Go_up[tpn][k]:
            seq1_align += sequence_1[i - 1]
            seq2_align += '-'
            i -= 1
        elif j > 0 and Go_left[tpn][k] and not (Go_up[tpn][k]):
            seq1_align += '-'
            seq2_align += sequence_2[j - 1]
            j -= 1
        k += 1
    seq1_align_set += [seq1_align[::-1]]
    seq2_align_set += [seq2_align[::-1]]
    print("下面是第{}个".format(tpn + 1))
    print(seq1_align[::-1])
    print(seq2_align[::-1])
    print(' ')
输出

总共有15个比对结果
下面是第1个
AA-CGTACTC-AA-G-TCT
–TCGTACTCTAACGAT–

下面是第2个
A-ACGTACTC-AA-G-TCT
-T-CGTACTCTAACGAT–

下面是第3个
-AACGTACTC-AA-G-TCT
T–CGTACTCTAACGAT–

下面是第4个
AA-CGTACTC-AAGTC–T
–TCGTACTCTAA–CGAT

下面是第5个
A-ACGTACTC-AAGTC–T
-T-CGTACTCTAA–CGAT

下面是第6个
-AACGTACTC-AAGTC–T
T–CGTACTCTAA–CGAT

下面是第7个
AA-CGTACTC-AA-GTC-T
–TCGTACTCTAACG–AT

下面是第8个
A-ACGTACTC-AA-GTC-T
-T-CGTACTCTAACG–AT

下面是第9个
-AACGTACTC-AA-GTC-T
T–CGTACTCTAACG–AT

下面是第10个
AA-CGTACTC-AA-GT-CT
–TCGTACTCTAACG-A-T

下面是第11个
A-ACGTACTC-AA-GT-CT
-T-CGTACTCTAACG-A-T

下面是第12个
-AACGTACTC-AA-GT-CT
T–CGTACTCTAACG-A-T

下面是第13个
AA-CGTACTC-AA-G-TCT
–TCGTACTCTAACGA–T

下面是第14个
A-ACGTACTC-AA-G-TCT
-T-CGTACTCTAACGA–T

下面是第15个
-AACGTACTC-AA-G-TCT
T–CGTACTCTAACGA–T

画一下回溯的路径

请添加图片描述
请添加图片描述

可视化代码

'''
下面就是得分矩阵的热图以及回溯路径(格子)画出来了
'''
import pandas as pd
import seaborn as sns
from matplotlib import pyplot as plt

Score = pd.DataFrame(Score)
row_name = list(sequence_1)
row_name.insert(0, ' ')
col_name = list(sequence_2)
col_name.insert(0, ' ')
Score.index = row_name
Score.columns = col_name
traceback_way_mat = np.ones([len(sequence_1) + 1, len(sequence_2) + 1])

for i in range(traceback_pathway_row.shape[0]):
    traceback_way_mat[traceback_pathway_row[i][:], traceback_pathway_col[i][:]] = 0
ax1 = sns.heatmap(Score, linecolor='white', linewidth=0, square=True, cmap="RdBu_r", annot=True)
plt.savefig('nw_Heatmap with annotation.png',dpi=300)
plt.show()
ax2 = sns.heatmap(Score, linecolor='white', linewidth=0, square=True, cmap="RdBu_r")
plt.savefig('nw_Heatmap.png',dpi=300)
plt.show()
ax3 = sns.heatmap(Score, linecolor='white', linewidth=0, square=True, cmap="RdBu_r", mask=traceback_way_mat)
plt.savefig('nw_Heatmap_traceback',dpi=300)
plt.show()

#%%
# params={'font.family':'serif',
#         'font.serif':'Times New Roman',
#         'font.style':'normal',#'italic'
#         'font.weight':'normal', #or 'blod'
#         'font.size':12,#or large,small
#         'figure.figsize':(6,6)
#         }
plt.rcParams['figure.figsize'] = (6, 6)
# plt.rcParams.update(params)
for j in range(traceback_pathway_col.shape[0]):
    fig = plt.figure()

    ax = plt.axes()
    plt.grid(zorder=0, linestyle='-.')
    for i in range(traceback_pathway_col.shape[1]-1):
        xs = traceback_pathway_col[j][i]
        ys = traceback_pathway_row[j][i]
        xe = traceback_pathway_col[j][i+1]
        ye = traceback_pathway_row[j][i+1]
        ax.arrow(xs, ys, xe-xs, ye-ys, length_includes_head=True,head_width=0.3, fc='crimson', ec='hotpink',zorder=10)

    ax.set_xlim(-0.5, len(col_name)-0.5)
    ax.set_ylim(-0.5, len(col_name)-0.5)
    plt.xticks(np.arange(0,len(col_name),1), col_name)
    plt.yticks(np.arange(0,len(row_name),1), row_name)
    ax.xaxis.set_ticks_position('top')
    ax.invert_yaxis()
    ax.set_title('No.{}'.format(j+1),fontsize=12,color='k',loc='left',y=0.86,x=0.3,fontweight='bold')
    ax.set_title('{}{}{}'.format(seq1_align_set[j],'\n',seq2_align_set[j]),fontsize=16,fontfamily ='monospace',color='k',fontweight='bold',y=0.83,x=0.7)
    # plt.rcParams['figure.figsize'] = (6, 6)
    plt.savefig('nw_No.{}'.format(j+1)+'.png',dpi=300)
    plt.show()

Smith Waterman算法

局部比对的和全局比对差不多,只需再几个小细节上改改就行,大家可以在两个代码之间找找茬~

完整代码

NW

import numpy as np

sequence_1 = "AACGTACTCAAGTCT"
sequence_2 = "TCGTACTCTAACGAT"
match = 9
mismatch = -6
gap = -2

# 创建得分矩阵,行数为第一条序列长度加一,列数为第二条序列长度加二
Score = np.zeros((len(sequence_1) + 1, len(sequence_2) + 1))
# 创建是否匹配的矩阵,这个矩阵的长宽就分别是两条序列的长度了。如果匹配了,对应的格子就是匹配的得分,反之就是不匹配的得分
Match_or_not = np.zeros((len(sequence_1), len(sequence_2)))
for i in range(len(sequence_1)):
    for j in range(len(sequence_2)):
        if sequence_1[i] == sequence_2[j]:
            Match_or_not[i][j] = match
        else:
            Match_or_not[i][j] = mismatch

# 填得分矩阵
# 第一步:初始化第一行和第一列
for i in range(len(sequence_1) + 1):
    Score[i][0] = i * gap
for j in range(len(sequence_2) + 1):
    Score[0][j] = j * gap
# 第二步:动态规划的思想算每个格子的得分,每个格子需要考虑其左、上、左上的值,也可以说是考虑序列一、序列二引入空缺或直接匹配的最大值
for i in range(1, len(sequence_1) + 1):
    for j in range(1, len(sequence_2) + 1):
        Score[i][j] = max(Score[i - 1][j - 1] + Match_or_not[i - 1][j - 1],
                          Score[i - 1][j] + gap,
                          Score[i][j - 1] + gap)

# 开始回溯
'''
我们需要考虑的是可能会有多条回溯路径。
全局比对的回溯是从右下角开始,左上角结束,其中可能会有分叉点。
我们可以把右下角看成是一个树的根,矩阵中的每个值看成是一个节点。
每个节点都可能会有三个子节点:左,上,对角线。分别对应了回溯的方向。
而整个回溯的过程也就是遍历这颗三叉树的过程,严谨的说是从根节点遍历每个叶子节点的过程。
'''


class Node:
    # 用类来建立三叉树节点,属性包括了行、列、得分、左子树、上子树、对角线子树
    def __init__(self, row=None, col=None, score=None):
        self.row = row
        self.col = col
        self.score = score
        self.left = None
        self.up = None
        self.diag = None


def isLeaf(self):
    # 判断是否是叶子节点
    return self.left is None and self.up is None and self.diag is None
    # 递归的函数查找从根节点到每个叶节点的路径


# 回溯路径的个数、回溯路径中的行号和列号
traceback_pathway_number = 0
traceback_pathway_row = [[]]
traceback_pathway_col = [[]]


def SaveRootToLeafPaths(Node, path_row, path_col):
    # 如果没有子树了
    if Node is None:
        return
    # 包含当前节点的路径
    path_row.append(Node.row)
    path_col.append(Node.col)
    global traceback_pathway_number
    global traceback_pathway_row
    global traceback_pathway_col
    # 如果找到叶节点,保存路径
    if isLeaf(Node):
        if traceback_pathway_number == 0:
            traceback_pathway_row[traceback_pathway_number] = list(path_row)
            traceback_pathway_col[traceback_pathway_number] = list(path_col)
        else:
            traceback_pathway_row += [list(path_row)]
            traceback_pathway_col += [list(path_col)]
        traceback_pathway_number += 1
    # 递归左、上、对角子树
    SaveRootToLeafPaths(Node.left, path_row, path_col)
    SaveRootToLeafPaths(Node.up, path_row, path_col)
    SaveRootToLeafPaths(Node.diag, path_row, path_col)
    # 回溯,出栈
    path_row.pop()
    path_col.pop()


# 建立三叉树,为 Score 矩阵里所有值都找到它的左、上、对角子树,用一个二位列表来存储节点
NodeTree = [[Node() for _ in range(len(sequence_2) + 1)] for _ in range(len(sequence_1) + 1)]
# 先把节点们的行号列号和得分记录下来
for i in range(len(sequence_1) + 1):
    for j in range(len(sequence_2) + 1):
        NodeTree[i][j].row = i
        NodeTree[i][j].col = j
        NodeTree[i][j].score = Score[i][j]
# 设置第一列和第一行的节点的上子树和左子树(其实也能在下面这个大循环里设置,但是这样可读性更高)
for i in range(1, len(sequence_1) + 1):
    NodeTree[i][0].up = NodeTree[i - 1][0]
for j in range(1, len(sequence_2) + 1):
    NodeTree[0][j].left = NodeTree[0][j - 1]
# 设置剩下的节点
for i in range(1, len(sequence_1) + 1):
    for j in range(1, len(sequence_2) + 1):
        if (Score[i][j] == Score[i - 1][j - 1] + Match_or_not[i - 1][j - 1]):
            NodeTree[i][j].diag = NodeTree[i - 1][j - 1]
        if (Score[i][j] == Score[i - 1][j] + gap):
            NodeTree[i][j].up = NodeTree[i - 1][j]
        if (Score[i][j] == Score[i][j - 1] + gap):
            NodeTree[i][j].left = NodeTree[i][j - 1]
# 遍历树并保存路径
SaveRootToLeafPaths(NodeTree[len(sequence_1)][len(sequence_2)], [], [])
# 改成numpy的ndarray类型,更加方便!
traceback_pathway_row = np.array(traceback_pathway_row)
traceback_pathway_col = np.array(traceback_pathway_col)
# 记录一下回溯时走不走左边或上边,如果走就记为1,不走就记为0
Go_left = traceback_pathway_col[:, range(traceback_pathway_col.shape[1] - 1)] - traceback_pathway_col[:, range(1,
                                                                                                               traceback_pathway_col.shape[
                                                                                                                   1])]
Go_up = traceback_pathway_row[:, range(traceback_pathway_row.shape[1] - 1)] - traceback_pathway_row[:,
                                                                              range(1, traceback_pathway_row.shape[1])]
# 用列表来存储序列一和序列二比对后的结果
seq1_align_set = []
seq2_align_set = []
print("总共有{}个比对结果".format(traceback_pathway_number))
for tpn in range(traceback_pathway_number):
    '''
    下面其实就是经典的nw回溯的代码了,这部分的原理可以参考nw算法回溯的伪代码。
    唯一不同的就是我们是多条回溯路径,所以有多少条路经就得循环多少次。
    值得一提的是,回溯过去的序列是逆序的,
    在python中字符串逆置十分方便,只需要合理利用切片,如:str[::-1]即可。
    '''
    seq1_align = ''
    seq2_align = ''
    i = len(sequence_1)
    j = len(sequence_2)
    k = 0
    while i > 0 or j > 0:
        if i > 0 and j > 0 and Go_left[tpn][k] and Go_up[tpn][k]:
            seq1_align += sequence_1[i - 1]
            seq2_align += sequence_2[j - 1]
            i -= 1
            j -= 1
        elif i > 0 and not (Go_left[tpn][k]) and Go_up[tpn][k]:
            seq1_align += sequence_1[i - 1]
            seq2_align += '-'
            i -= 1
        elif j > 0 and Go_left[tpn][k] and not (Go_up[tpn][k]):
            seq1_align += '-'
            seq2_align += sequence_2[j - 1]
            j -= 1
        k += 1
    seq1_align_set += [seq1_align[::-1]]
    seq2_align_set += [seq2_align[::-1]]
    print("下面是第{}个".format(tpn + 1))
    print(seq1_align[::-1])
    print(seq2_align[::-1])
    print(' ')
#%%
'''
下面就是得分矩阵的热图以及回溯路径(格子)画出来了
'''
import pandas as pd
import seaborn as sns
from matplotlib import pyplot as plt

Score = pd.DataFrame(Score)
row_name = list(sequence_1)
row_name.insert(0, ' ')
col_name = list(sequence_2)
col_name.insert(0, ' ')
Score.index = row_name
Score.columns = col_name
traceback_way_mat = np.ones([len(sequence_1) + 1, len(sequence_2) + 1])

for i in range(traceback_pathway_row.shape[0]):
    traceback_way_mat[traceback_pathway_row[i][:], traceback_pathway_col[i][:]] = 0
ax1 = sns.heatmap(Score, linecolor='white', linewidth=0, square=True, cmap="RdBu_r", annot=True)
plt.savefig('nw_Heatmap with annotation.png',dpi=300)
plt.show()
ax2 = sns.heatmap(Score, linecolor='white', linewidth=0, square=True, cmap="RdBu_r")
plt.savefig('nw_Heatmap.png',dpi=300)
plt.show()
ax3 = sns.heatmap(Score, linecolor='white', linewidth=0, square=True, cmap="RdBu_r", mask=traceback_way_mat)
plt.savefig('nw_Heatmap_traceback',dpi=300)
plt.show()

#%%
# params={'font.family':'serif',
#         'font.serif':'Times New Roman',
#         'font.style':'normal',#'italic'
#         'font.weight':'normal', #or 'blod'
#         'font.size':12,#or large,small
#         'figure.figsize':(6,6)
#         }
plt.rcParams['figure.figsize'] = (6, 6)
# plt.rcParams.update(params)
for j in range(traceback_pathway_col.shape[0]):
    fig = plt.figure()

    ax = plt.axes()
    plt.grid(zorder=0, linestyle='-.')
    for i in range(traceback_pathway_col.shape[1]-1):
        xs = traceback_pathway_col[j][i]
        ys = traceback_pathway_row[j][i]
        xe = traceback_pathway_col[j][i+1]
        ye = traceback_pathway_row[j][i+1]
        ax.arrow(xs, ys, xe-xs, ye-ys, length_includes_head=True,head_width=0.3, fc='crimson', ec='hotpink',zorder=10)

    ax.set_xlim(-0.5, len(col_name)-0.5)
    ax.set_ylim(-0.5, len(col_name)-0.5)
    plt.xticks(np.arange(0,len(col_name),1), col_name)
    plt.yticks(np.arange(0,len(row_name),1), row_name)
    ax.xaxis.set_ticks_position('top')
    ax.invert_yaxis()
    ax.set_title('No.{}'.format(j+1),fontsize=12,color='k',loc='left',y=0.86,x=0.3,fontweight='bold')
    ax.set_title('{}{}{}'.format(seq1_align_set[j],'\n',seq2_align_set[j]),fontsize=16,fontfamily ='monospace',color='k',fontweight='bold',y=0.83,x=0.7)
    # plt.rcParams['figure.figsize'] = (6, 6)
    plt.savefig('nw_No.{}'.format(j+1)+'.png',dpi=300)
    plt.show()

import datetime
print("这是代码执行时间: ",datetime.datetime.now())


SW

import numpy as np

sequence_1 = "AACGTACTCAAGTCT"
sequence_2 = "TCGTACTCTAACGAT"
match = 9
mismatch = -6
gap = -2

# 创建得分矩阵,行数为第一条序列长度加一,列数为第二条序列长度加二
Score = np.zeros((len(sequence_1) + 1, len(sequence_2) + 1))
# 创建是否匹配的矩阵,这个矩阵的长宽就分别是两条序列的长度了。如果匹配了,对应的格子就是匹配的得分,反之就是不匹配的得分
Match_or_not = np.zeros((len(sequence_1), len(sequence_2)))
for i in range(len(sequence_1)):
    for j in range(len(sequence_2)):
        if sequence_1[i] == sequence_2[j]:
            Match_or_not[i][j] = match
        else:
            Match_or_not[i][j] = mismatch

# 填得分矩阵
# 第一步:初始化第一行和第一列
for i in range(len(sequence_1) + 1):
    Score[i][0] = 0
for j in range(len(sequence_2) + 1):
    Score[0][j] = 0
# 第二步:动态规划的思想算每个格子的得分,每个格子需要考虑其左、上、左上的值,也可以说是考虑序列一、序列二引入空缺或直接匹配的最大值
for i in range(1, len(sequence_1) + 1):
    for j in range(1, len(sequence_2) + 1):
        Score[i][j] = max(Score[i - 1][j - 1] + Match_or_not[i - 1][j - 1],
                          Score[i - 1][j] + gap,
                          Score[i][j - 1] + gap, 0)

# 开始回溯
'''
我们需要考虑的是可能会有多条回溯路径。
全局比对的回溯是从右下角开始,左上角结束,其中可能会有分叉点。
我们可以把右下角看成是一个树的根,矩阵中的每个值看成是一个节点。
每个节点都可能会有三个子节点:左,上,对角线。分别对应了回溯的方向。
而整个回溯的过程也就是遍历这颗三叉树的过程,严谨的说是从根节点遍历每个叶子节点的过程。
'''


class Node:
    # 用类来建立三叉树节点,属性包括了行、列、得分、左子树、上子树、对角线子树
    def __init__(self, row=None, col=None, score=None):
        self.row = row
        self.col = col
        self.score = score
        self.left = None
        self.up = None
        self.diag = None


def isLeaf(self):
    # 判断是否是叶子节点
    return self.left is None and self.up is None and self.diag is None
    # 递归的函数查找从根节点到每个叶节点的路径


# 回溯路径的个数、回溯路径中的行号和列号
traceback_pathway_number = 0
traceback_pathway_row = [[]]
traceback_pathway_col = [[]]


def SaveRootToLeafPaths(Node, path_row, path_col):
    # 如果没有子树了
    if Node is None:
        return
    # 包含当前节点的路径
    path_row.append(Node.row)
    path_col.append(Node.col)
    global traceback_pathway_number
    global traceback_pathway_row
    global traceback_pathway_col
    # 如果找到叶节点,保存路径
    if isLeaf(Node):
        if traceback_pathway_number == 0:
            traceback_pathway_row[traceback_pathway_number] = list(path_row)
            traceback_pathway_col[traceback_pathway_number] = list(path_col)
        else:
            traceback_pathway_row += [list(path_row)]
            traceback_pathway_col += [list(path_col)]
        traceback_pathway_number += 1
    # 递归左、上、对角子树
    SaveRootToLeafPaths(Node.left, path_row, path_col)
    SaveRootToLeafPaths(Node.up, path_row, path_col)
    SaveRootToLeafPaths(Node.diag, path_row, path_col)
    # 回溯,出栈
    path_row.pop()
    path_col.pop()


# 建立三叉树,为 Score 矩阵里所有值都找到它的左、上、对角子树,用一个二位列表来存储节点
NodeTree = [[Node() for _ in range(len(sequence_2) + 1)] for _ in range(len(sequence_1) + 1)]
# 先把节点们的行号列号和得分记录下来
for i in range(len(sequence_1) + 1):
    for j in range(len(sequence_2) + 1):
        NodeTree[i][j].row = i
        NodeTree[i][j].col = j
        NodeTree[i][j].score = Score[i][j]
# 设置第一列和第一行的节点的上子树和左子树(其实也能在下面这个大循环里设置,但是这样可读性更高)
for i in range(1, len(sequence_1) + 1):
    NodeTree[i][0].up = NodeTree[i - 1][0]
for j in range(1, len(sequence_2) + 1):
    NodeTree[0][j].left = NodeTree[0][j - 1]
# 设置剩下的节点
for i in range(1, len(sequence_1) + 1):
    for j in range(1, len(sequence_2) + 1):
        if (Score[i][j] == Score[i - 1][j - 1] + Match_or_not[i - 1][j - 1]):
            NodeTree[i][j].diag = NodeTree[i - 1][j - 1]
        if (Score[i][j] == Score[i - 1][j] + gap):
            NodeTree[i][j].up = NodeTree[i - 1][j]
        if (Score[i][j] == Score[i][j - 1] + gap):
            NodeTree[i][j].left = NodeTree[i][j - 1]
# 遍历树并保存路径
r, c = np.where(Score == np.max(Score))
SaveRootToLeafPaths(NodeTree[int(r)][int(c)], [], [])
# 改成numpy的ndarray类型,更加方便!
traceback_pathway_row = np.array(traceback_pathway_row)
traceback_pathway_col = np.array(traceback_pathway_col)
# 记录一下回溯时走不走左边或上边,如果走就记为1,不走就记为0
Go_left = traceback_pathway_col[:, range(traceback_pathway_col.shape[1] - 1)] - traceback_pathway_col[:, range(1,
                                                                                                               traceback_pathway_col.shape[
                                                                                                                   1])]
Go_up = traceback_pathway_row[:, range(traceback_pathway_row.shape[1] - 1)] - traceback_pathway_row[:,
                                                                              range(1, traceback_pathway_row.shape[1])]
# 用列表来存储序列一和序列二比对后的结果
seq1_align_set = []
seq2_align_set = []
print("总共有{}个比对结果".format(traceback_pathway_number))
for tpn in range(traceback_pathway_number):
    '''
    下面其实就是经典的nw回溯的代码了,这部分的原理可以参考nw算法回溯的伪代码。
    唯一不同的就是我们是多条回溯路径,所以有多少条路经就得循环多少次。
    值得一提的是,回溯过去的序列是逆序的,
    在python中字符串逆置十分方便,只需要合理利用切片,如:str[::-1]即可。
    '''
    seq1_align = ''
    seq2_align = ''
    i = int(r)
    j = int(c)
    k = 0
    while Score[i][j] > 0:
        # waterman修改条件,到零结束
        if k < traceback_pathway_col.shape[1] - 1:
            if Go_left[tpn][k] and Go_up[tpn][k]:
                seq1_align += sequence_1[i - 1]
                seq2_align += sequence_2[j - 1]
                i -= 1
                j -= 1
            elif not (Go_left[tpn][k]) and Go_up[tpn][k]:
                seq1_align += sequence_1[i - 1]
                seq2_align += '-'
                i -= 1
            elif Go_left[tpn][k] and not (Go_up[tpn][k]):
                seq1_align += '-'
                seq2_align += sequence_2[j - 1]
                j -= 1
            k += 1
    seq1_align_set += [seq1_align[::-1]]
    seq2_align_set += [seq2_align[::-1]]
    print("下面是第{}个".format(tpn + 1))
    print(seq1_align[::-1])
    print(seq2_align[::-1])
    print(' ')
#%%
'''
下面就是得分矩阵的热图以及回溯路径(格子)画出来了
'''
import pandas as pd
import seaborn as sns
from matplotlib import pyplot as plt

Score = pd.DataFrame(Score)
row_name = list(sequence_1)
row_name.insert(0, ' ')
col_name = list(sequence_2)
col_name.insert(0, ' ')
Score.index = row_name
Score.columns = col_name
traceback_way_mat = np.ones([len(sequence_1) + 1, len(sequence_2) + 1])

for i in range(traceback_pathway_row.shape[0]):
    traceback_way_mat[traceback_pathway_row[i][:], traceback_pathway_col[i][:]] = 0
ax1 = sns.heatmap(Score, linecolor='white', linewidth=0, square=True, cmap="RdBu_r", annot=True)
plt.savefig('sw_Heatmap with annotation.png',dpi=300)
plt.show()
ax2 = sns.heatmap(Score, linecolor='white', linewidth=0, square=True, cmap="RdBu_r")
plt.savefig('sw_Heatmap.png',dpi=300)
plt.show()
ax3 = sns.heatmap(Score, linecolor='white', linewidth=0, square=True, cmap="RdBu_r", mask=traceback_way_mat)
plt.savefig('sw_Heatmap_traceback',dpi=300)
plt.show()

#%%
plt.rcParams['figure.figsize'] = (6, 6)
for j in range(traceback_pathway_col.shape[0]):
    fig = plt.figure()
    ax = plt.axes()
    plt.grid(zorder=0, linestyle='-.')
    for i in range(traceback_pathway_col.shape[1]-2):
        xs = traceback_pathway_col[j][i]
        ys = traceback_pathway_row[j][i]
        xe = traceback_pathway_col[j][i+1]
        ye = traceback_pathway_row[j][i+1]
        ax.arrow(xs, ys, xe-xs, ye-ys, length_includes_head=True,head_width=0.3, fc='crimson', ec='hotpink',zorder=10)
    ax.set_xlim(-0.5, len(col_name)-0.5)
    ax.set_ylim(-0.5, len(col_name)-0.5)
    plt.xticks(np.arange(0,len(col_name),1), col_name)
    plt.yticks(np.arange(0,len(row_name),1), row_name)
    ax.xaxis.set_ticks_position('top')
    ax.invert_yaxis()
    # ax.set_title('No.{}'.format(j+1),fontsize=12,color='k',loc='left')
    ax.set_title('No.{}'.format(j + 1), fontsize=12, color='k', loc='left', y=0.86, x=0.38, fontweight='bold')
    ax.set_title('{}{}{}'.format(seq1_align_set[j], '\n', seq2_align_set[j]), fontsize=16, fontfamily='monospace',
                 color='k', fontweight='bold', y=0.83, x=0.7)
    plt.savefig('sw_No.{}'.format(j+1)+'.png',dpi=300)
    plt.show()

import datetime
print("这是代码执行时间: ",datetime.datetime.now())
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【序列比对】Needleman-Wunsch(全局)和Smith-Waterman(局部)算法py实现(多条回溯路径,三叉树思路,超详细注释) 的相关文章

随机推荐

  • Ros小车应用篇(一)——Ros小车wifiQT上位机

    Ros小车多功能QT上位机 代码仓库 https github com zhuchen99899 RosCar tree master Ros car pid test 小车嵌入式控制可以查看我的博客 https blog csdn net
  • Win10 开机密码破解

    1 开机 当出现Windows图标时 就强制关机 重复2 3次 系统便会进入自动修复 2 高级设置 gt 疑难解答 gt 高级选项 gt 命令提示符 3 依次输入 cd c windows system32 ren sethc exe ab
  • 为什么用GIF做埋点?

    原因 防止跨域拦截 一般而言 打点域名都不是当前域名 所以所有的接口请求都会构成跨域 而跨域请求很容易出现由于配置不当被浏览器拦截并报错 这是不能接受的 但图片的 src 属性并不会跨域 并且同样可以发起请求 防止阻塞页面加载 影响用户体验
  • linux FIO命令详解(一):磁盘IO测试工具 fio (并简要介绍iostat工具)

    FIO介绍 FIO是测试IOPS的非常好的工具 用来对磁盘进行压力测试和验证 磁盘IO是检查磁盘性能的重要指标 可以按照负载情况分成照顺序读写 随机读写两大类 FIO是一个可以产生很多线程或进程并执行用户指定的特定类型I O操作的工具 FI
  • 【技能树笔记】网络篇——练习题解析(一)

    目录 一 认识身边的计算机网络 1 1 常见的网络设备 1 2 网络中拓扑的分类 二 认识网络模型 2 1 网络模型概述 2 2 OSI模型 2 2 1 OSI参考模型 2 2 2 数据的加密和解密 2 3 TCP IP模型 2 3 1 T
  • WPF后台动态创建Grid行与列,并将控件添加到Grid中的指定行指定列

  • 算法基础复盘笔记Day10【动态规划】—— 线性DP

    作者主页 欢迎来到我的技术博客 个人介绍 大家好 本人热衷于Java后端开发 欢迎来交流学习哦 如果文章对您有帮助 记得关注 点赞 收藏 评论 您的支持将是我创作的动力 让我们一起加油进步吧 第一章 线性DP 一 数字三角形 1 题目描述
  • 如何使用 AWS 和 ChatGPT 创建最智能的多语言虚拟助手

    上周ChatGPT发布了 每个人都在尝试令人惊奇的事情 我也开始使用它并想尝试它如何使用AWS的AI 服务进行集成 结果非常棒 在这篇文章中 我将逐步解释我是如何创建这个项目的 这样你也可以做到 最重要的是 您无需成为AI 专家即可创建它
  • openvino是啥

    英特尔发布的开源框架 用于深度学习的推理优化与模型部署 openvino具体使用方法还是看官方文档比较好https docs openvino ai 支持多种框架 tensorflow caffe pytorch mxnet keras o
  • Wireshark的抓包和分析,看这篇就够了!

    点击上方蓝字 关注 程序IT圈 WireShark是一个网络封包分析软件 网络封包分析软件的功能是撷取网络封包 并尽可能显示出最为详细的网络封包资料 Wireshark使用WinPCAP作为接口 直接与网卡进行数据报文交换 在网络封包和流量
  • 【DDD架构】

    DDD domain driven design 领域驱动设计模型 一 DP domain primitive 1 什么是DP 2 为什么要用DP 2 1 API接口清晰度 2 2 数据验证和错误处理 2 3 业务代码的清晰度 3 DP原则
  • 彻底搞懂JDBC的运行过程

    转载自 https blog csdn net y277an article details 96937010 JDBC的作用 JDBC的全称是Java DataBase Connection 也就是Java数据库连接 我们可以用它来操作关
  • Qt 工程

    Qt 工程 工程文件 项目文件 QMake 添加模块 添加与特定平台有关的文件 文件不存在时停止 qmake 控制台输出调试信息 变量 Variables CONFIG HEADERS SOURCES FORMS INCLUDEPATH T
  • 测试用例设计方法2——边界值

    介于有效等价类和无效等价类之间 一 边界值 三点 上点 边界上的点 离点 离上点最近的点 根据上点的精度确定 内点 边界有效范围内的任意一点 如何确定离点 若边界是闭区间 则离点在外 如用户名长度 6 18 之间 上点为6 18 离点为5
  • 浅谈C++元编程

    随着 C 11 14 17 标准的不断更新 C 语言得到了极大的完善和补充 元编程作为一种新兴的编程方式 受到了越来越多的广泛关注 结合已有文献和个人实践 对有关 C 元编程进行了系统的分析 首先介绍了 C 元编程中的相关概念和背景 然后利
  • VS019创建C++工程基本步骤

    1 打开VS2019新建工程 2 选择C 空项目 点击下一步 3 设置项目名称和项目位置 然后点击创建 4 设置自己喜欢的布局方式 关于背景网上有很多设置方法 5 新建入口Main c工程 右键源文件 添加 新项目 6 选择C 文件 取个名
  • 持续集成与持续交付(CI/CD):探讨在云计算中实现快速软件交付的最佳实践

    文章目录 持续集成 CI 的最佳实践 持续交付 CD 的最佳实践 云计算环境下的特别注意事项 个人主页 程序员 小侯 CSDN新晋作者 欢迎 点赞 评论 收藏 收录专栏 云计算 文章内容 软件交付 希望作者的文章能对你有所帮助 有不足的地方
  • python爬取站长素材上的图片

    Python爬取站长素材上的图片 罗纳尔康 首先这是一个学习的案例 我将其记录下来 因为所学的内容有点多 爬取这个图片 我是用的xpath来解析网页 当然也可以用bs4来进行解析 看个人喜好 该案例比较简单 但涉及的内容并不少 1 分析每页
  • Web3域名,会是新的应用场景么?

    在互联网时代 域名一直都是一个十分有价值的资产 无论是个人还是企业 想要在互联网中建立一个交互性的平台 网站 都需要一个域名来与 IP 地址进行映射 域名是具有唯一性的 因此 它遵循 先到先得 的原则 即一旦有用户注册了一个域名之后 其他人
  • 【序列比对】Needleman-Wunsch(全局)和Smith-Waterman(局部)算法py实现(多条回溯路径,三叉树思路,超详细注释)

    Needleman Wunsch和Smith Waterman算法py实现 多条回溯路径 话不多说 直接上结果图 多条回溯路径 原理 代码详解 以NW为例 导入包以及参数设置 import numpy as np sequence 1 AA