NLP进阶,使用TextRNN和TextRNN_ATT实现文本分类

2023-11-15

TextRNN

TextRNN仅仅是将Word Embedding后,输入到双向LSTM中,然后对最后一位的输出输入到全连接层中,在对其进行softmax分类即可,模型如下图:

img

代码:

class RNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim,
                 n_layers=2, bidirectional=True, dropout=0.2, pad_idx=0):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
        self.rnn = nn.LSTM(embedding_dim, hidden_dim, num_layers=n_layers,batch_first=True,
                           bidirectional=bidirectional)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        # 这里hidden_dim乘以2是因为是双向,需要拼接两个方向,跟n_layers的层数无关。
        self.dropout = nn.Dropout(dropout)
    def forward(self, text):
        # text.shape=[seq_len, batch_size]
        embedded = self.dropout(self.embedding(text))
        # output: [batch,seq,2*hidden if bidirection else hidden]
        # hidden/cell: [bidirec * n_layers, batch, hidden]
        output, (hidden, cell) = self.rnn(embedded)
        # concat the final forward (hidden[-2,:,:]) and backward (hidden[-1,:,:]) hidden layers
        hidden = self.dropout(torch.cat((hidden[-2, :, :], hidden[-1, :, :]), dim=1))
        # hidden = [batch size, hid dim * num directions],
        return self.fc(hidden.squeeze(0))  # 在接一个全连接层,最终输出[batch size, output_dim]

TextRNN_ATT

在TextRNN的基础上加入注意力机制,代码:

class RNN_ATTs(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim,
                 n_layers=2, bidirectional=True, dropout=0.2, pad_idx=0, hidden_size2=64):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, n_layers,
                            bidirectional=bidirectional, batch_first=True, dropout=dropout)
        self.tanh1 = nn.Tanh()
        # self.u = nn.Parameter(torch.Tensor(config.hidden_size * 2, config.hidden_size * 2))
        self.w = nn.Parameter(torch.zeros(hidden_dim * 2))
        self.tanh2 = nn.Tanh()
        self.fc1 = nn.Linear(hidden_dim * 2, hidden_size2)
        self.fc = nn.Linear(hidden_size2, output_dim)

    def forward(self, x):
        emb = self.embedding(x)  # [batch_size, seq_len, embeding]=[128, 32, 300]
        H, _ = self.lstm(emb)  # [batch_size, seq_len, hidden_size * num_direction]=[128, 32, 256]

        M = self.tanh1(H)  # [128, 32, 256]
        # M = torch.tanh(torch.matmul(H, self.u))
        alpha = F.softmax(torch.matmul(M, self.w), dim=1).unsqueeze(-1)  # [128, 32, 1]
        out = H * alpha  # [128, 32, 256]
        out = torch.sum(out, 1)  # [128, 256]
        out = F.relu(out)
        out = self.fc1(out)
        out = self.fc(out)  # [128, 64]
        return out

数据集

数据集采用cnews数据集,包含三个文件,分别是cnews.train.txt,cnews.val.txt,cnews,test.txt。类别:体育, 娱乐, 家居, 房产, 教育, 时尚, 时政, 游戏, 科技, 财经,共10个类别。网盘地址:

链接:https://pan.baidu.com/s/1awlBYclO_mxntEgL_tUF0g
提取码:rtnv

构建词向量

第一步,读取预料,做分词。

思路:

1、创建默认方式的分词对象seg。

2、打开文件,按照行读取文章。

3、去掉收尾的空格,将label和文章分割开。

4、将分词后的文章放到src_data,label放入labels里。

5、返回结果。

我对代码做了注解,如下:

def read_corpus(file_path):
    """读取语料
    :param file_path:
    :param type:
    :return:
    """
    src_data = []
    labels = []
    seg = pkuseg.pkuseg() #使用默认分词方式。
    with codecs.open(file_path,'r',encoding='utf-8') as fout:
        for line in tqdm(fout.readlines(),desc='reading corpus'):
            if line is not None:
                # line.strip()的意思是去掉每句话句首句尾的空格
                # .split(‘\t’)的意思是根据'\t'把label和文章内容分开,label和内容是通过‘\t’隔开的。
                # \t表示空四个字符,也称缩进,相当于按一下Tab键
                pair = line.strip().split('\t')
                if len(pair) != 2:
                    print(pair)
                    continue
                src_data.append(seg.cut(pair[1]))# 对文章内容分词。
                labels.append(pair[0])
    return (src_data, labels) #返回文章内容的分词结果和labels

经过这个步骤得到了labels和分词后的文章。如下代码:

src_sents, labels = read_corpus('cnews/cnews.train.txt')

对labels做映射:

    labels = {label: idx for idx, label in enumerate(labels)}

得到labels对应的idx的字典,idx的值是最后一次插入label的值。

第二步 构建词向量

这一步主要用到vocab.py的from_corpus方法

思路:

1、创建vocab_entry对象。

2、对分词后的文章统计词频,生成一个词和词频构成的字典。

3、从字典中取出Top size - 2个元素。

4、获取元素的词。

5、执行add方法将词放入vocab_entry,生成词和id,id就是词对应的向量值。

代码如下:

    @staticmethod
    def from_corpus(corpus, size, min_feq=3):
        """从给定语料中创建VocabEntry"""
        vocab_entry = VocabEntry()
        # chain函数来自于itertools库,itertools库提供了非常有用的基于迭代对象的函数,而chain函数则是可以串联多个迭代对象来形成一个更大的迭代对象
        # *的作用:返回单个迭代器。
        # word_freq是个字典,key=词,value=词频
        word_freq = Counter(chain(*corpus))  # Counter 是实现的 dict 的一个子类,可以用来方便地计数,统计词频

        valid_words = word_freq.most_common(size - 2)  # most_common()函数用来实现Top n 功能,在这里选出Top size-2个词
        valid_words = [word for word, value in valid_words if value >= min_feq]  # 把符合要求的词找出来放到list里面。
        print('number of word types: {}, number of word types w/ frequency >= {}: {}'
              .format(len(word_freq), min_feq, len(valid_words)))
        for word in valid_words:  # 将词放进VocabEntry里面。
            vocab_entry.add(word)
        return vocab_entry

创建完成后将词向量保存到json文件中

 vocab = Vocab.build(src_sents, labels, 50000, 3)
    print('generated vocabulary, source %d words' % (len(vocab.vocab)))
    vocab.save('./vocab.json')

训练

训练使用Train_RNN.py,先看分析main方法的参数。

参数

    parse = argparse.ArgumentParser()

    parse.add_argument("--train_data_dir", default='./cnews/cnews.train.txt', type=str, required=False)
    parse.add_argument("--dev_data_dir", default='./cnews/cnews.val.txt', type=str, required=False)
    parse.add_argument("--test_data_dir", default='./cnews/cnews.test.txt', type=str, required=False)
    parse.add_argument("--output_file", default='deep_model.log', type=str, required=False)
    parse.add_argument("--batch_size", default=4, type=int)
    parse.add_argument("--do_train", default=True, action="store_true", help="Whether to run training.")
    parse.add_argument("--do_test", default=True, action="store_true", help="Whether to run training.")
    parse.add_argument("--learnning_rate", default=5e-4, type=float)
    parse.add_argument("--num_epoch", default=50, type=int)
    parse.add_argument("--max_vocab_size", default=50000, type=int)
    parse.add_argument("--min_freq", default=2, type=int)
    parse.add_argument("--hidden_size", default=256, type=int)
    parse.add_argument("--embed_size", default=300, type=int)
    parse.add_argument("--dropout_rate", default=0.2, type=float)
    parse.add_argument("--warmup_steps", default=0, type=int, help="Linear warmup over warmup_steps.")
    parse.add_argument("--GRAD_CLIP", default=1, type=float)
    parse.add_argument("--vocab_path", default='vocab.json', type=str)

参数说明:

train_data_dir:训练集路径。

dev_data_dir:验证集路径

test_data_dir:测试集路径

output_file:输出的log路径

batch_size:batchsize的大小。

do_train:是否训练,默认True、

do_test:是否测试,默认True

learnning_rate:学习率

num_epoch:epoch的数量

max_vocab_size:词向量的个数

min_freq:词频,过滤低于这个数值的词

hidden_size:隐藏层的个数

embed_size:Embedding的长度。

dropout_rate:dropout的值。

warmup_steps:设置预热的值。

vocab_path:词向量保存的路径

构建词向量

    vocab = build_vocab(args)
    label_map = vocab.labels
    print(label_map)

build_vocab的方法:

def build_vocab(args):
    if not os.path.exists(args.vocab_path):
        src_sents, labels = read_corpus(args.train_data_dir)
        labels = {label: idx for idx, label in enumerate(labels)}
        vocab = Vocab.build(src_sents, labels, args.max_vocab_size, args.min_freq)
        vocab.save(args.vocab_path)
    else:
        vocab = Vocab.load(args.vocab_path)
    return vocab

创建模型

创建CNN模型,将模型放到GPU上,调用train方法,训练。

  rnn_model = RNN_ATTs(len(vocab.vocab), args.embed_size, args.hidden_size,
                        len(label_map), n_layers=1, bidirectional=True, dropout=args.dropout_rate)
  rnn_model.to(device)
  train(args, rnn_model, train_data, dev_data, vocab, dtype='RNN')

对train方法做了一些注解,如下:

def train(args, model, train_data, dev_data, vocab, dtype='CNN'):
    LOG_FILE = args.output_file
    #记录训练log
    with open(LOG_FILE, "a") as fout:
        fout.write('\n')
        fout.write('==========' * 6)
        fout.write('start trainning: {}'.format(dtype))
        fout.write('\n')

    time_start = time.time()
    if not os.path.exists(os.path.join('./runs', dtype)):
        os.makedirs(os.path.join('./runs', dtype))
    tb_writer = SummaryWriter(os.path.join('./runs', dtype))
    # 计算总的迭代次数
    t_total = args.num_epoch * (math.ceil(len(train_data) / args.batch_size))
    #optimizer = bnb.optim.Adam8bit(model.parameters(), lr=0.001, betas=(0.9, 0.995))  # add bnb optimizer
    optimizer = AdamW(model.parameters(), lr=args.learnning_rate, eps=1e-8)#设置优化器
    scheduler = get_linear_schedule_with_warmup(optimizer=optimizer, num_warmup_steps=args.warmup_steps,
                                                num_training_steps=t_total) #设置预热。
    criterion = nn.CrossEntropyLoss()# 设置loss为交叉熵
    global_step = 0
    total_loss = 0.
    logg_loss = 0.
    val_acces = []
    train_epoch = trange(args.num_epoch, desc='train_epoch')
    for epoch in train_epoch:#训练epoch
        model.train()
        for src_sents, labels in batch_iter(train_data, args.batch_size, shuffle=True):
            src_sents = vocab.vocab.to_input_tensor(src_sents, args.device)
            global_step += 1
            optimizer.zero_grad()
            logits = model(src_sents)
            y_labels = torch.tensor(labels, device=args.device)
            example_losses = criterion(logits, y_labels)
            example_losses.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), args.GRAD_CLIP)
            optimizer.step()
            scheduler.step()

            total_loss += example_losses.item()
            if global_step % 100 == 0:
                loss_scalar = (total_loss - logg_loss) / 100
                logg_loss = total_loss

                with open(LOG_FILE, "a") as fout:
                    fout.write("epoch: {}, iter: {}, loss: {},learn_rate: {}\n".format(epoch, global_step, loss_scalar,
                                                                                       scheduler.get_lr()[0]))
                print("epoch: {}, iter: {}, loss: {}, learning_rate: {}".format(epoch, global_step, loss_scalar,
                                                                                scheduler.get_lr()[0]))
                tb_writer.add_scalar("lr", scheduler.get_lr()[0], global_step)
                tb_writer.add_scalar("loss", loss_scalar, global_step)

        print("Epoch", epoch, "Training loss", total_loss / global_step)
        eval_loss, eval_result = evaluate(args, criterion, model, dev_data, vocab)  # 评估模型
        with open(LOG_FILE, "a") as fout:
            fout.write("EVALUATE: epoch: {}, loss: {},eval_result: {}\n".format(epoch, eval_loss, eval_result))
        eval_acc = eval_result['acc']
        if len(val_acces) == 0 or eval_acc > max(val_acces):
            # 如果比之前的acc要da,就保存模型
            print("best model on epoch: {}, eval_acc: {}".format(epoch, eval_acc))
            torch.save(model.state_dict(), "classifa-best-{}.th".format(dtype))
            val_acces.append(eval_acc)

    time_end = time.time()
    print("run model of {},taking total {} m".format(dtype, (time_end - time_start) / 60))
    with open(LOG_FILE, "a") as fout:
        fout.write("run model of {},taking total {} m\n".format(dtype, (time_end - time_start) / 60))

重点注释了一下batch_iter方法,如下:

def batch_iter(data, batch_size, shuffle=False):
    """
        batch数据
    :param data: list of tuple
    :param batch_size:
    :param shuffle:
    :return:
    """
    batch_num = math.ceil(len(data) / batch_size)# 计算迭代的次数
    index_array = list(range(len(data))) #按照data的长度,映射list
    if shuffle:#是否打乱顺序
        random.shuffle(index_array)

    for i in range(batch_num):
        indices = index_array[i*batch_size:(i+1)*batch_size]# 选出batchsize个index
        examples = [data[idx] for idx in indices]# 通过index找到对应的data
        examples = sorted(examples,key=lambda x: len(x[1]),reverse=True)#按照label排序
        src_sents = [e[0] for e in examples] #把data中的文章放到src_sents
        labels = [label_map[e[1]] for e in examples] #将标题映射label_map对应的value
        yield src_sents, labels

下面一个重要的方法是vocab.vocab.to_input_tensor,核心思路:

1、将数据通过 self.words2indices方法转为词对应的数值。

2、找出一个batch中最长的数据,剩下的数据后面补0,形成统一的长度。

3、将第二步得到的结果放入torch.tensor

代码如下:

 def to_input_tensor(self, sents: List[List[str]], device: torch.device):
        """
        将原始句子list转为tensor,同时将句子PAD成max_len
        :param sents: list of list<str>
        :param device:
        :return:
        """
        sents = self.words2indices(sents)
        sents = pad_sents(sents, self.word2id['<PAD>'])
        sents_var = torch.tensor(sents, device=device)
        return sents_var

开始训练:

image-20211112210114989

验证

将do_train改为False,do_test改为True就可以开启验证模型,TextRNN能达到0.96的成绩。

parse.add_argument("--do_train", default=False, action="store_true", help="Whether to run training.")

image-20211112200928120
完整代码链接:
https://download.csdn.net/download/hhhhhhhhhhwwwwwwwwww/40816205

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

NLP进阶,使用TextRNN和TextRNN_ATT实现文本分类 的相关文章

随机推荐

  • 记一次udp服务性能优化经历

    目录 概述 磁盘io 网络io 减少重复计算 减少内存复制 减少互斥锁 概述 手上有个go项目 接收udp信息 主要是syslog和snmp trap 并查询设备信息 将信息结构化 设备ip名称 匹配了什么规则之类的 后发送到kafka和e
  • 哈夫曼编码的实现

    2 哈夫曼编码的实现 对教材P167中习题5 18 编码实现哈夫曼编码树 并对 Chapter Graphs surveys the most important graph processing problems including de
  • org.hibernate.UnknownEntityTypeException: Unable to locate persister:xxx类

    看了网上其他人的解决办法 发现出现的错误跟我的并不相同 基本就是没有引入映射文件 或者映射文件路径错误 我的错误是抽取了一个公共的dao 其中 get方法应该传入get x class id 而我写入的是类名 所以运行时总是提示找不到这个类
  • 51单片机学习笔记-12LCD1602液晶屏

    12 LCD1602液晶屏 toc 注 笔记主要参考B站江科大自化协教学视频 51单片机入门教程 2020版 程序全程纯手打 从零开始入门 注 工程及代码文件放在了本人的Github仓库 12 1 LCD1602介绍 LCD1602 Liq
  • ArcSDE 日志文件表(二)

    基于会话的或独立的日志文件组成的池 Pools of log file tables 以下为ArcGIS10 1中文帮助 归地理数据库管理员所有的日志文件池 地理数据库管理员可以创建可由其他用户检出和使用的日志文件池 这些日志文件可以是基于
  • spark性能优化调优指导性文件

    1 让我们看一下前面的核心参数设置 num executors 10 20 executor cores 1 2 executor memory 10 20 driver memory 20 spark default parallelis
  • Linux常用命令与JavaWeb开发环境的搭建

    文章目录 前言 一 系统信息以及查看文件 1 1系统信息 1 2查看文件 二 查看进程和防火墙的开关 三 搭建Java Web开发环境 3 1JDK 3 2Tomcat 3 3Mysql 总结 前言 Linux 特点 免费 开源 免费 安全
  • 继电器驱动电路原理及注意事项

    继电器驱动电流一般需要20 40mA或更大 线圈电阻100 200欧姆 因此要加驱动电路 1 晶体管用来驱动继电器 必须将晶体管的发射极接地 具体电路如下 NPN晶体管 PNP晶体管 NPN晶体管驱动时 当晶体管T1基极被输入高电平时 晶体
  • 导入数据的几种方法

    采用标准python类库导入数据 读取文件 from csv import reader import numpy as np filename pima csv with open filename rt as raw data read
  • centOS7服务器搭建

    一 安装jdk 运行代码 yum search jdk 1 查询当前云服务器里面通过yum可以安装哪些jdk 以这个jdk1 8的版本为例 运行代码 yum y install java 1 8 0 openjdk 2 安装jdk1 8版本
  • obj(判断对象中是否包含某个key属性)

    key in obj 不包含 obj hasOwnProperty key 包含
  • 纯代码构建Swift工程

    有些东西很简单 但是我还是把它记录了下来 使用Storyboard创建一个新的项目后 应用程序从闪屏 到主窗口 再到第一个界面经过的文件分别是 LaunchScreen storyboard gt Main storyboard gt Vi
  • NBIoT与LoRa技术详解及竞争态势分析

    物联网的无线通信技术很多 主要分为两类 一类是Zigbee WiFi 蓝牙 Z wave等短距离通信技术 另一类是LPWAN low power Wide Area Network 低功耗广域网 即广域网通信技术 LPWA又可分为两类 一类
  • 计算机视觉领域经典模型汇总(RCNN、YOLO等)

    一 RCNN系列 1 RCNN RCNN是用于目标检测的经典方法 其核心思想是将目标检测任务分解为两个主要步骤 候选区域生成和目标分类 候选区域生成 RCNN的第一步是生成可能包含目标的候选区域 RCNN使用传统的计算机视觉技术 特别是选择
  • linux中tmp文件在哪,Linux系统中/tmp文件夹

    在Linux系统中 tmp文件夹里面的文件会被清空 至于多长时间被清空 如何清空的 可能大家知识的就不多了 所以 今天我们就来剖析一个这两个问题 在RHEL CentOS Fedora 系统中 本次实验是在RHEL6中进行的 1 tmpwa
  • 数字后端dbGet使用方法合集

    以下资料是我之前写过的 芯片数字后端中Innovus Encounter dbGet命令使用方法的介绍 整理了一下 做成合集 方便大家查询 点击标题就可以选择文章查看 会直接挂在公众号的主页菜单栏里的 后端资料 里 感觉好的话 请多多推广喔
  • java报错:Connection reset by peer: socket write error

    用java做excel导出时 报错 ClientAbortException java net SocketException Connection reset by peer socket write error 大致出现问题的原因如下
  • java指纹识别+谷歌图片识别技术(采用Hash方法)

    转载自 http blog csdn net yjflinchong article details 7469213 java指纹识别 谷歌图片识别技术 前阵子在阮一峰的博客上看到了这篇 相似图片搜索原理 博客 就有一种冲动要将这些原理实现
  • Python3 PyCharm 捕获异常报 Too broad exception clause 警告

    最近在 PyCharm 中写代码的时候会抱怨 Exception 没有指定错误类型 Too broad exception clause 这是因为捕获的异常过于宽泛 没有针对性 可以通过指定精确的异常类型来解决 BaseException
  • NLP进阶,使用TextRNN和TextRNN_ATT实现文本分类

    TextRNN TextRNN仅仅是将Word Embedding后 输入到双向LSTM中 然后对最后一位的输出输入到全连接层中 在对其进行softmax分类即可 模型如下图 代码 class RNN nn Module def init