


question_classifier.py --问题分类器代码解析


#!/usr/bin/env python3
# coding: utf-8
# File: question_classifier.py
# Author: lhy<lhy_in_blcu@126.com,https://huangyong.github.io>
# Date: 18-10-4

# 导入操作系统接口模块
import os
# ahocosick:自动机的意思
#  可实现自动批量匹配字符串的作用,即可一次返回该条字符串中命中的所有关键词
import ahocorasick

# 建立问题分类器的类
class QuestionClassifier:
    def __init__(self):
        # cur_dir 是当前目录 其中[:-1]可以达到返回上一层的效果
        cur_dir = '/'.join(os.path.abspath(__file__).split('/')[:-1])
        # 加载特征词路径
        self.disease_path = os.path.join(cur_dir, 'dict/disease.txt')
        self.department_path = os.path.join(cur_dir, 'dict/department.txt')
        self.check_path = os.path.join(cur_dir, 'dict/check.txt')
        self.drug_path = os.path.join(cur_dir, 'dict/drug.txt')
        self.food_path = os.path.join(cur_dir, 'dict/food.txt')
        self.producer_path = os.path.join(cur_dir, 'dict/producer.txt')
        self.symptom_path = os.path.join(cur_dir, 'dict/symptom.txt')
        self.deny_path = os.path.join(cur_dir, 'dict/deny.txt')
        # 加载特征词  这里encoding用的是‘utf-8’模式,不加的话,我的pycharm会报错
        self.disease_wds= [i.strip() for i in open(self.disease_path,encoding='utf-8') if i.strip()]
        self.department_wds= [i.strip() for i in open(self.department_path,encoding='utf-8') if i.strip()]
        self.check_wds= [i.strip() for i in open(self.check_path,encoding='utf-8') if i.strip()]
        self.drug_wds= [i.strip() for i in open(self.drug_path,encoding='utf-8') if i.strip()]
        self.food_wds= [i.strip() for i in open(self.food_path,encoding='utf-8') if i.strip()]
        self.producer_wds= [i.strip() for i in open(self.producer_path,encoding='utf-8') if i.strip()]
        self.symptom_wds= [i.strip() for i in open(self.symptom_path,encoding='utf-8') if i.strip()]
        self.region_words = set(self.department_wds + self.disease_wds + self.check_wds + self.drug_wds + self.food_wds + self.producer_wds + self.symptom_wds)
        self.deny_words = [i.strip() for i in open(self.deny_path,encoding='utf-8') if i.strip()]
        # 构造领域 actree
        self.region_tree = self.build_actree(list(self.region_words))
        # 构建词典 格式比如{'感冒':'disease'....}
        self.wdtype_dict = self.build_wdtype_dict()
        # 问句疑问词
        self.symptom_qwds = ['症状', '表征', '现象', '症候', '表现']
        self.cause_qwds = ['原因','成因', '为什么', '怎么会', '怎样才', '咋样才', '怎样会', '如何会', '为啥', '为何', '如何才会', '怎么才会', '会导致', '会造成']
        self.acompany_qwds = ['并发症', '并发', '一起发生', '一并发生', '一起出现', '一并出现', '一同发生', '一同出现', '伴随发生', '伴随', '共现']
        self.food_qwds = ['饮食', '饮用', '吃', '食', '伙食', '膳食', '喝', '菜' ,'忌口', '补品', '保健品', '食谱', '菜谱', '食用', '食物','补品']
        self.drug_qwds = ['药', '药品', '用药', '胶囊', '口服液', '炎片']
        self.prevent_qwds = ['预防', '防范', '抵制', '抵御', '防止','躲避','逃避','避开','免得','逃开','避开','避掉','躲开','躲掉','绕开',
                             '怎样才能不', '怎么才能不', '咋样才能不','咋才能不', '如何才能不',
                             '怎样才不', '怎么才不', '咋样才不','咋才不', '如何才不',
                             '怎样才可以不', '怎么才可以不', '咋样才可以不', '咋才可以不', '如何可以不',
                             '怎样才可不', '怎么才可不', '咋样才可不', '咋才可不', '如何可不']
        self.lasttime_qwds = ['周期', '多久', '多长时间', '多少时间', '几天', '几年', '多少天', '多少小时', '几个小时', '多少年']
        self.cureway_qwds = ['怎么治疗', '如何医治', '怎么医治', '怎么治', '怎么医', '如何治', '医治方式', '疗法', '咋治', '怎么办', '咋办', '咋治']
        self.cureprob_qwds = ['多大概率能治好', '多大几率能治好', '治好希望大么', '几率', '几成', '比例', '可能性', '能治', '可治', '可以治', '可以医']
        self.easyget_qwds = ['易感人群', '容易感染', '易发人群', '什么人', '哪些人', '感染', '染上', '得上']
        self.check_qwds = ['检查', '检查项目', '查出', '检查', '测出', '试出']
        self.belong_qwds = ['属于什么科', '属于', '什么科', '科室']
        self.cure_qwds = ['治疗什么', '治啥', '治疗啥', '医治啥', '治愈啥', '主治啥', '主治什么', '有什么用', '有何用', '用处', '用途',
                          '有什么好处', '有什么益处', '有何益处', '用来', '用来做啥', '用来作甚', '需要', '要']

        print('model init finished ......')


    def classify(self, question):
        data = {}
        # check_medical 是定义在后面的函数 搜寻最终提取词的信息 比如{'感冒‘:’diseases‘.....}
        medical_dict = self.check_medical(question)
        # 若不存在
        if not medical_dict:
            return {}
        data['args'] = medical_dict
        # 收集问句当中所涉及到的实体类型
        types = []
        for type_ in medical_dict.values():
            types += type_
        # 定义问题类型
        question_type = 'others'
        question_types = []

        # 症状
        if self.check_words(self.symptom_qwds, question) and ('disease' in types):
            question_type = 'disease_symptom'
        if self.check_words(self.symptom_qwds, question) and ('symptom' in types):
            question_type = 'symptom_disease'

        # 原因
        if self.check_words(self.cause_qwds, question) and ('disease' in types):
            question_type = 'disease_cause'

        # 并发症
        if self.check_words(self.acompany_qwds, question) and ('disease' in types):
            question_type = 'disease_acompany'

        # 推荐食品
        if self.check_words(self.food_qwds, question) and 'disease' in types:
            deny_status = self.check_words(self.deny_words, question)
            if deny_status:
                question_type = 'disease_not_food'
                question_type = 'disease_do_food'

        # 已知食物找疾病
        if self.check_words(self.food_qwds+self.cure_qwds, question) and 'food' in types:
            deny_status = self.check_words(self.deny_words, question)
            if deny_status:
                question_type = 'food_not_disease'
                question_type = 'food_do_disease'

        # 推荐药品
        if self.check_words(self.drug_qwds, question) and 'disease' in types:
            question_type = 'disease_drug'

        # 药品治啥病
        if self.check_words(self.cure_qwds, question) and 'drug' in types:
            question_type = 'drug_disease'

        # 疾病接受检查项目
        if self.check_words(self.check_qwds, question) and 'disease' in types:
            question_type = 'disease_check'

        # 已知检查项目查相应疾病
        if self.check_words(self.check_qwds+self.cure_qwds, question) and 'check' in types:
            question_type = 'check_disease'

        # 症状防御
        if self.check_words(self.prevent_qwds, question) and 'disease' in types:
            question_type = 'disease_prevent'

        # 疾病医疗周期
        if self.check_words(self.lasttime_qwds, question) and 'disease' in types:
            question_type = 'disease_lasttime'

        # 疾病治疗方式
        if self.check_words(self.cureway_qwds, question) and 'disease' in types:
            question_type = 'disease_cureway'

        # 疾病治愈可能性
        if self.check_words(self.cureprob_qwds, question) and 'disease' in types:
            question_type = 'disease_cureprob'

        # 疾病易感染人群
        if self.check_words(self.easyget_qwds, question) and 'disease' in types :
            question_type = 'disease_easyget'

        # 若没有查到相关的外部查询信息,那么则将该疾病的描述信息返回
        if question_types == [] and 'disease' in types:
            question_types = ['disease_desc']

        # 若没有查到相关的外部查询信息,那么则将该疾病的描述信息返回
        if question_types == [] and 'symptom' in types:
            question_types = ['symptom_disease']

        # 将多个分类结果进行合并处理,组装成一个字典
        data['question_types'] = question_types

        return data

    def build_wdtype_dict(self):
        wd_dict = dict()
        # region_words 包含了一系列信息
        for wd in self.region_words:
            wd_dict[wd] = []
            # 查询 关键词 是否在对应的列表中存在,若存在则添加,不存在返回空
            if wd in self.disease_wds:
            if wd in self.department_wds:
            if wd in self.check_wds:
            if wd in self.drug_wds:
            if wd in self.food_wds:
            if wd in self.symptom_wds:
            if wd in self.producer_wds:
        return wd_dict

    def build_actree(self, wordlist):
        # 类似kmp  快速匹配
        actree = ahocorasick.Automaton()
        for index, word in enumerate(wordlist):
            actree.add_word(word, (index, word))
        return actree

    def check_medical(self, question):
        region_wds = []
        # region_tree 是一棵用region_wds 做出来的actree,快速找出question与之匹配的实体
        # 但是有时候匹配的结果与我们想的不一,比如“瓜烧白菜”和“白菜”是不一样的
        for i in self.region_tree.iter(question):
            # wd是question 用actree做了加速
            wd = i[1][1]
        # 利用停用词过滤
        stop_wds = []
        for wd1 in region_wds:
            for wd2 in region_wds:
                # 如果词语不一样,则添加较长的
                if wd1 in wd2 and wd1 != wd2:
        # 更新最后剩下的词语组合
        final_wds = [i for i in region_wds if i not in stop_wds]
        # 更新字典,格式比如{'感冒':'disease'....}
        final_dict = {i:self.wdtype_dict.get(i) for i in final_wds}
        return final_dict

    def check_words(self, wds, sent):
        for wd in wds:
            if wd in sent:
                return True
        return False

if __name__ == '__main__':
    handler = QuestionClassifier()
    # 问题输入到分类过程
    while 1:
        question = input('input an question:')
        data = handler.classify(question)




