ETL工具模块的创建

2023-10-27

01-logging工具模块开发(掌握)

日志记录的工具模块
作用:方便后续在 ETL 程序中记录日志

目标: 当我们在项目的其他位置使用logging模块进行日志记录时,不需要进行配置或者只需要进行简单的配置即可使用.

  1. 为了更方便的使用logging,我们在日志模块中创建一个日志类Logging,专门管理日志器对象
  • 重点: 在创建日志类时可以同时传入日志级别,方便日志级别控制
  1. 创建init_logger函数快速创建日志器对象,并完成日志处理器和日志格式的绑定.
  • 重点: 返回值是一个日志器对象

util/logging_util.py

"""
logging工具模块
1. 创建一个Logging类来管理日志器对象
2. 创建init_logger函数,用于快速获取logger日志器对象,以及绑定日志管理器和输出格式
"""

# 导入模块
import logging


class Logging(object):
    """创建一个Logging类来管理logging对象"""

    def __init__(self, level=logging.INFO):
        self.logger = logging.getLogger()
        self.logger.setLevel(level)


def init_logger():
    """快速获取logger对象,并绑定日志处理器和格式"""
    # 1.创建一个日志器对象
    logger = Logging().logger
    # 2.创建一个日志处理器对象
    file_handler = logging.FileHandler(
        filename='../logs/test1.log',
        mode='a',
        encoding='utf8'
    )
    # 3.将日志处理器绑定到日志器对象上
    logger.addHandler(file_handler)
    # 4.创建一个日志格式
    fmt = logging.Formatter('%(asctime)s - [%(levelname)s] - %(filename)s[%(lineno)d]:%(message)')
    # 5.将日志格式绑定到日志处理器上
    file_handler.setFormatter(fmt)
    # 6.返回logger对象
    return logger
02-抽取配置文件数据(理解)

问题1: 为什么我们要抽取配置信息?

配置文件其实就是一个写满全局变量的文件 我们将其抽取到一个特定的文件中,如果需要使用该变量就导入该文件模块即可

抽取配置文件的目的:

  1. 减少魔法数字和无指定意义字符串,提高代码的可读性
    • 魔法数字:就是没有单位的数字
    • 无意义字符串: 根据字符串中的内容,无法判断其是什么作用
  2. 统一管理全局变量,方便代码的后期维护
  3. 方便多个服务或者模块使用相同的配置

问题2: 什么样的数据可以抽取为配置信息?

  1. 全局需要使用到的信息, 例如用户名,密码,ip,端口号
  2. 需要频繁修改,且在多处引用的数据, log日志存放路径

config/project_config.py

import time

# ################## --程序运行日志的配置项-- ###################
# 配置日志输出的目录
log_root_path = "C:/Users/smart/Desktop/pycode/python-etl/logs/"
# 配置日志输出的文件名
log_name = f'pyetl-{time.strftime("%Y%m%d-%H", time.localtime())}.log'

util/logging_util.py(配置文件抽取替换后)

"""
logging工具模块
1. 创建一个Logging类来管理日志器对象
2. 创建init_logger函数,用于快速获取logger日志器对象,以及绑定日志管理器和输出格式
"""

# 使用配置文件的步骤 : 提取>> 导包 >> 替换

# 导入模块
import logging
from config import project_config as conf


class Logging(object):
    """创建一个Logging类来管理logging对象"""

    def __init__(self, level=logging.INFO):
        self.logger = logging.getLogger()
        self.logger.setLevel(level)


def init_logger():
    """快速获取logger对象,并绑定日志处理器和格式"""
    # 1.创建一个日志器对象
    logger = Logging().logger
    # 2.创建一个日志处理器对象
    file_handler = logging.FileHandler(
        filename=conf.log_root_path + conf.log_name,
        mode='a',
        encoding='utf8'
    )
    # 3.将日志处理器绑定到日志器对象上
    logger.addHandler(file_handler)
    # 4.创建一个日志格式
    fmt = logging.Formatter(conf.log_format)
    # 5.将日志格式绑定到日志处理器上
    file_handler.setFormatter(fmt)
    # 6.返回logger对象
    return logger
03-logging工具模块的单元测试(理解)

测试文件名称一般为test_被测试文件名,即为test_logging_util.py

测试类名称一般为 Test被测试文件名,即为TestLoggingUtil

测试方法名称一般为 test_被测试方法名称,即为test_init_logger

测试核心:

测试返回值是否为RootLogger类型

assertIsInstance

test/test_logging_util.py

"""
日志模块的单元测试
"""

# 导入测试模块
import logging
import unittest
# 导入被测试模块
from util import logging_util


# 创建一个测试类
class TestLoggingUtil(unittest.TestCase):

    def test_init_logger(self):
        """测试init_logger函数"""
        # 运行被测试函数,并获取返回值
        logger = logging_util.init_logger()
        # 使用断言测试该运行结果是否满足要求
        # 反向导包 alt + 回车
        self.assertIsInstance(logger, logging.RootLogger)
04-文件工具模块开发(掌握)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aUNAa5IM-1688341224407)(day02-工具模块的创建.assets/1672369877220.png)]

log日志文件和json订单文件都是以文件的形式保存在计算机的指定目录下, 如果我们想要处理这些数据,我们要做的事情:

  1. 将所有的文件信息提取出来
  2. 比对已经处理过的文件,筛选出未处理的文件

这两个需求,都需要使用到os模块,所以我们就使用os模块封装一个file_util模块来方便我们调用相关功能.

功能1: 获取指定目录下的所有文件列表

  • 此处使用了递归函数, 递归出口是当文件夹内部没有子文件夹时,则跳出递归, 递归嵌套层数不会超过1000层.
def get_dir_files_list(path='./', recursion=False):

 参数1: path 获取文件列表的目标路径
 参数2: recursion 是否在获取文件列表过程中递归

功能2: 找出all_list中在processed_list中没有出现过的元素

def get_new_by_compare_lists(processed_list, all_list):
	参数1: processed_list 已经处理过的文件列表
	参数2: all_list 所有文件的文件列表

util/file_util.py

"""
文件处理工具模块
1. 获取全部的文件路径
2. 比对全部文件中未处理过的文件
"""
# 导入os模块
import os


def get_dir_files_list(path='./', recursion=False):
    """
    获取指定目录中的所有文件路径
    :param path: 目标路径
    :param recursion: 是否进行递归(如果不递归则只获取当前目录中的文件不获取子目录中的文件)
    :return: 全部文件路径组成的列表  (文件路径为字符串信息)
    """
    # 0.创建一个空目录,用来存储文件路径信息
    path_list = []
    # 1. 获取指定目录下所有文件的文件名称
    file_names = os.listdir(path)
    # 2. 循环遍历文件名称
    for file_name in file_names:
        # 3. 使用文件名称与目标路径进行组合获取完整的路径信息
        file_path = os.path.join(path, file_name)
        # 4. 将所有的相对路径转换为绝对路径
        abs_path = os.path.abspath(file_path)
        # 5. 判断该路径是否为文件
        if os.path.isfile(abs_path):
            # 5.1 如果为文件,添加到指定的空列表中
            path_list.append(abs_path)

        else:
            # 5.2 如果不为文件, 判断是否要进行递归
            if recursion:
                # 获取非文件路径下所有的文件
                # 5.2.1如果进行递归,再次调用函数本身
                sub_path_list = get_dir_files_list(abs_path, True)
                path_list += sub_path_list
                # 如果不进行递归则什么也不做
    # 6.返回文件路径列表
    return path_list


def get_new_by_compare_lists(processed_list, all_list):
    """
    获取所有文件路径中,未处理的文件路径信息
    :param processed_list: 已经处理过的文件路径列表
    :param all_list: 全部的文件路径列表
    :return: 未处理的文件路径列表
    """
    # 0.创建一个空的列表,用于保存未处理的文件路径
    new_list = []
    # 1. 循环遍历所有的文件列表
    for file_path in all_list:
        # 2.与已经处理过的文件列表进行比对
        if file_path not in processed_list:
            # 2.1 如果该文件没有被处理过,添加到新的文件目录中
            new_list.append(file_path)
            # 2.2 如果被处理过什么也不做
    # 3. 将new_list进行返回
    return new_list


if __name__ == '__main__':
    print(get_dir_files_list('../', True))
    print(get_new_by_compare_lists([1, 2, 3, 4], [1, 2, 3, 4, 5, 6]))
05-文件工具模块测试(理解)

测试获取所有的文件目录:

"""
请在工程根目录的test文件夹内建立:
test_dir/
 inner1/
     3
     4
     inner2/
         5
 1
 2
使用 test_dir 目录进行文件工具方法的测试
递归结果应该为:['1', '2', '3', '4', '5']
不递归结果应该为:['1', '2']
"""

测试两个列表中数据差

a_list = ['a.txt', 'b.txt', 'c.txt']
b_list = ['a.txt', 'b.txt', 'c.txt', 'd.txt', 'e.txt']

test/test_file_util.py

"""
文件工具模块测试
"""

# 导入测试模块
import os.path
import unittest
# 导入被测试模块
from util import file_util


# 创建一个测试类
class TestFileUtil(unittest.TestCase):

    def test_get_dir_files_list(self):
        """
        请在工程根目录的test文件夹内建立:
        test_dir/
         inner1/
             3
             4
             inner2/
                 5
         1
         2
        使用 test_dir 目录进行文件工具方法的测试
        递归结果应该为:['1', '2', '3', '4', '5']
        不递归结果应该为:['1', '2']
        """
        # 不递归情况下的测试结果
        # 0. 创建一个空列表,保存文件的名称
        file_name_list = []
        # 1. 测试get_dir_files_list函数,返回所有的文件路径
        path_list = file_util.get_dir_files_list('./test_dir')
        # 2. 对于path_list进行遍历
        for path in path_list:
            # 2.1 获取每一个路径的文件名称,加载到file_name_list中
            file_name = os.path.basename(path)
            file_name_list.append(file_name)
        # 优化:最好再获取文件名称后进行排序,减少bug出现的概率
        file_name_list.sort()
        # 3.使用断言进行比对返回数据是否合规
        self.assertListEqual(file_name_list, ['1', '2'])

        # 递归情况下的测试结果
        file_name_list = []
        # 1. 测试get_dir_files_list函数,返回所有的文件路径
        path_list = file_util.get_dir_files_list('./test_dir', True)
        # 2. 对于path_list进行遍历
        for path in path_list:
            # 2.1 获取每一个路径的文件名称,加载到file_name_list中
            file_name = os.path.basename(path)
            file_name_list.append(file_name)
        # 优化:最好再获取文件名称后进行排序,减少bug出现的概率
        file_name_list.sort()
        # 3.使用断言进行比对返回数据是否合规
        self.assertListEqual(file_name_list, ['1', '2', '3', '4', '5'])

    def test_get_new_by_compare_lists(self):
        """测试a_list 和b_list之间的数据差值"""
        a_list = ['a.txt', 'b.txt', 'c.txt']
        b_list = ['a.txt', 'b.txt', 'c.txt', 'd.txt', 'e.txt']
        # 测试get_new_by_compare_lists,并获取返回的数据差
        c_list = file_util.get_new_by_compare_lists(a_list, b_list)
        # 使用断言判断返回数据的值是否正确
        self.assertListEqual(c_list, ['d.txt', 'e.txt'])
06-数据库工具的开发(掌握)
开发数据库功能需求解析

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Fdu8Uqxj-1688341224408)(day02-工具模块的创建.assets/1672369877220.png)]

问题1: 三个需求中哪一个需要使用到mysql工具???

都需要,因为最终都要写入到mysql中

问题2: 都需要哪些功能呢?

  • 数据库的创建
  • 数据表的创建
  • 插入数据
  • 查询数据
  • 切换数据库
  • 事务功能:
    • 在一个json文件或者log日志文件中,会出现多个订单记录或者日志记录,每一条日志记录将执行一次insert指令,将数据插入到mysql数据库中
    • 但是我们在数据库中记录的是已经处理完成的文件名称
    • 如果发生以下情况,一个文件执行了80% 失败了,此时下次数据采集会重新采集这个文件还是跳过呢?
      • 跳过就会丢失一个文件的20%数据, 重新采集就会多一个文件的80%数据
    • 此时可以开启事务,将一个文件的所有插入指令定义为一个事物,要么都成功要么都失败
      • 如果执行了80%失败了,会将之前的80%数据进行回滚
  • 校验数据表是否存在

问题3:在操作数据库时,除了调用sql指令,我们还要做什么事情?

  • 日志记录

数据库操作工具类

  • 初始化对象
  • 执行非查询SQL
  • 执行查询SQL
  • 切换数据库
  • 开启事务
  • 提交事务
  • 回滚事务
  • 判断数据表是否存在
  • 检查表是否存在,不存在则创建
  • 执行一条插入SQL
  • 关闭数据库连接
  • 创建数据库工具对象

util/mysql_util.py

# 导包
import pymysql
from util import logging_util

# 创建一个日志器对象
logger = logging_util.init_logger()


class MySQLUtil(object):
    """创建一个mysqlutil类,用来管理数据库操作连接和方法"""
    """
    数据库操作工具类
    * 创建数据库连接-
    * 切换数据库-
    * 执行非查询SQL-
    * 执行查询SQL-
    * 开启事务-
    * 提交事务-
    * 回滚事务-
    * 检查表是否存在-
    * 检查表是否存在,不存在则创建-
    * 查询指定表的数据-
    * 执行一条插入SQL-
    * 关闭数据库连接-
    """

    def __init__(self, host, user, password, port=3306, charset='utf8', autocommit=False):
        """初始化工具类对象,在工具对象中添加连接属性"""
        self.conn = pymysql.connect(
            host=host,
            port=port,
            user=user,
            password=password,
            charset=charset,
            autocommit=autocommit
        )

        # 记录日志信息
        logger.info(f'主机为{host}:{port}的mysql数据服务已经连接....')

    def select_db(self, db_name):
        """切换数据库"""
        self.conn.select_db(db_name)
        # 日志记录信息
        logger.info(f'已经切换数据库,数据库名称为{db_name}')

    def execute(self, sql):
        """执行非查询SQL"""
        # 1. 创建游标
        cursor = self.conn.cursor()
        # 2. 执行sql语句
        cursor.execute(sql)
        # 3. 判断自动提交是否开启
        if not self.conn.get_autocommit():
            # 3.1 如果没有开启,手动进行提交
            self.conn.commit()
            # 3.2 如果开启了,则什么也不做
        # 4. 关闭游标
        cursor.close()
        # 5. 记录日志
        logger.info(f'非查询sql: {sql}已经执行完成...')

    def query(self, sql):
        """执行查询SQL"""
        # 1. 创建游标
        cursor = self.conn.cursor()
        # 2. 执行sql语句
        cursor.execute(sql)
        # 3. 获取结果集
        result = cursor.fetchall()
        # 4. 关闭游标
        cursor.close()
        # 5. 记录日志
        logger.info(f'查询sql: {sql}已经执行完成...')
        # 6. 返回结果集
        return result

    def begin_transaction(self):
        """开启事务"""
        # 问题:什么叫开启事务??? 就是在此处开启事务,如果进行提交回滚,就回滚到这个时间点.
        # 1. 如果自动提交已经开启,则无法开启事务(已经提交的数据无法回滚),手动关闭
        if self.conn.get_autocommit():
            logger.warning('开启事务冲突,当前连接开启了事务自动提交...')
            logger.warning('关闭当前连接的事务自动提交,用于手动开启事务...')
            self.conn.autocommit(False)
        # 2. 已经关闭完,或者没有开启自动提交,则直接开启事务即可
        self.conn.begin()
        # 3.日志记录
        logger.info('事务已经开启...')

    def rollback_transaction(self):
        """回滚事务"""
        # 1. 回滚事务
        self.conn.rollback()
        # 2. 记录日志
        logger.info('事务回滚完成...')

    def commit_transaction(self):
        """提交事务"""
        # 1. 提交事务
        self.conn.commit()
        # 2. 记录日志
        logger.info('事务已提交...')

    def check_table_exists(self, db_name, tb_name):
        """检查数据表是否存在"""
        # 1. 切换数据库
        # self.conn.select_db() 不要使用这种方式,因为没有日志记录
        self.select_db(db_name)
        # 2. 查询数据库中所有的表, 并获取结果集
        # 2.1 创建一个sql查表语句
        sql = 'show tables;'
        # 2.2 执行查询sql方法 , 结果集是元组嵌套元组的方式((python,),(bigdata,))
        result = self.query(sql)
        # 3. 判断当前数据表是否在结果集汇总
        return (tb_name,) in result

    def check_table_exists_and_create(self, db_name, tb_name, tb_cols):
        """
        查询数据表是否存在,如果不存在则创建
        :param db_name: 数据库名称
        :param tb_name: 数据表名称
        :param tb_cols: 数据表的创表语句
        """
        # 1.判断该数据表是否存在
        if not self.check_table_exists(db_name, tb_name):
            # 2.如果数据表不存在,将会创建表
            # 2.1 创建一个创表sql
            sql = f'create table {tb_name} ({tb_cols});'
            # 2.2 执行非查询sql方法
            self.execute(sql)
            # 2.3 记录日志
            logger.info(f'数据表{tb_name},已经在数据库{db_name}中创建成功...')
        else:
            # 3.如果数据表存在则跳过
            # 3.1.记录日志
            logger.info(f'数据库{db_name}中的数据表{tb_name}已经存在,跳过表的创建...')

    def query_all(self, db_name, tb_name, limit=None):
        """
        查询指定表的数据
        :param db_name: 数据库的名称
        :param tb_name: 数据表的名称
        :param limit: 限制查询范围
        :return: 查询出来的结果集
        """
        # 1. 切换数据库
        self.select_db(db_name)
        # 2. 查询指定表中的数据
        sql = f'select * from {tb_name}'
        # 2.1 判断limit是否为None
        if limit is not None:
            # 如果为None则拼接sql语句
            sql += f' limit {limit}'
            # 如果不为None,则不用拼接
        # 2.2 执行sql语句
        result = self.query(sql)
        # 3.返回结果集
        return result

    def insert_single_sql(self, sql):
        """执行一条插入SQL"""
        # 为什么要单独穿件一个插入方法,因为插入时可能存在一些异常,我们需要记录异常,方便后续对于数据的修改和监控
        try:
            # 1. 执行插入sql语句
            self.execute(sql)
        except Exception as e:
            # 2. 捕获异常
            # 2.1 如果出现异常,则记录日志
            logger.warning(f'sql执行出现异常,异常信息为: {e}')
            # 2.2 将异常继续抛出,防止已经错误的程序继续执行
            # 使用raise指令可以手动抛出异常
            raise e
        # 3.如果没有异常则记录日志信息
        logger.info(f'插入sql语句:{sql}执行成功,没有异常出现')

    def close(self):
        """关闭数据库连接"""
        # 只要创建了对象,连接就一直存在
        if self.conn:
            self.conn.close()
        # 记录日志信息
        logger.info('数据库连接已经关闭...')
07-数据库工具抽取配置信息
# ################## --元数据库metadata配置项-- ###################
# 元数据库配置
metadata_host = 'localhost'
metadata_port = 3306
metadata_user = 'root'
metadata_password = 'mysql'
metadata_db = 'metadata'

# ################## --订单JSON数据采集配置项-- ###################
# 采集订单JSON数据,元数据表配置项
file_monitor_meta_table_name = "json_file_monitor"
file_monitor_meta_table_create_cols = \
    "id INT PRIMARY KEY AUTO_INCREMENT, " \
    "file_name VARCHAR(255) NOT NULL COMMENT '处理文件名称', " \
    "process_lines INT NULL COMMENT '文件处理行数', " \
    "process_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '文件处理时间'"
    
    
# ################## --后台日志数据采集配置项-- ###################


# 采集后台日志数据,元数据表配置项
logs_monitor_meta_table_name = "backend_logs_monitor"
logs_monitor_meta_table_create_cols = \
    "id INT PRIMARY KEY AUTO_INCREMENT, " \
    "file_name VARCHAR(255) NOT NULL COMMENT '处理文件名称', " \
    "process_lines INT NULL COMMENT '文件处理行数', " \
    "process_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '文件处理时间'"
08-已采集文件的元数据库表创建

问题1: 什么叫做元数据???

在大数据场景下,元数据非常常见,对于不同的服务,元数据的内容也不相同,但是都是描述数据的数据

举例:

例1: 我们有一个文件 1.txt 此时 1.txt这个文件就是数据, 文件的大小,名称, 位置, 类型…都是元数据—HDFS

例2: 我们将一个数据表存储到数据仓库中, 数据表中所有的记录时数据, 字段的名称, 表名, 库名…都是元数据–Hive

问题2:为什么要记录元数据???

我们一般会将元数据存储在一个服务中(msyql orcale hive 内存), 因为我们只有获取了元数据,才能查询到数据

举例: 我们想获取person表中的一个数据记录, 必须知道数据库的名称,数据表的名称,以及记录的约束条件

问题3: 我们项目中记录的元数据是什么???

在metadata元数据库中,创建一个 json_file_monitor 表,用于保存被处理过的订单JSON文件被的表述信息。

在metadata元数据库中还会插件一个backend_logs_monitor表,用于保存被处理过的日志文件的表述信息

-- 创建元数据库
create database metadata;
-- 使用元数据管理库
USE metadata;
-- 创建已采集文件数据表
CREATE TABLE json_file_monitor (
    id INT PRIMARY KEY AUTO_INCREMENT,
    file_name VARCHAR(255) NOT NULL COMMENT '处理文件名称',
    process_lines INT NULL COMMENT '文件处理行数',
    process_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '文件处理时间'
);
-- 执行如下SQL,添加一些测试数据,假设已有文件被采集过
INSERT INTO json_file_monitor (file_name, process_lines)
VALUES
    ('D:/etl/json/x00', 1024),
    ('D:/etl/json/x01', 1024),
    ('D:/etl/json/x02', 1024);

创建元数据库并插入部分信息,方便后续进行测试

字段含义:

  • file_name 处理过的文件名称
  • process_lines 文件处理行数
  • process_time 处理文件的时间

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9t0Z1Bw9-1688341224409)(day02-工具模块的创建.assets/1672392238145.png)]

09-从mysql中获取已经处理的JSON文件列表

获取已被处理的订单JSON文件列表
如果元数据记录表 json_file_monitor 不存在,则创建它

util/mysql_util.py 追加到末尾

def get_processed_files(util: MySQLUtil, db_name, tb_name, tb_cols):
    """
    获取元数据库中已经处理过的文件
    :param util: MySQLUtil对象
    :param db_name: 元数据库名称
    :param tb_name: 元数据表名称
    :param tb_cols: 元数据表的创表语句
    :return: 未处理过的文件绝对路径列表
    """
    # 0. 创建一个空列表用于存储已处理文件的路径字符串
    new_files = []
    # 1. 判断该数据表是否存在,如果不存在则创建,如果存在则跳过
    util.check_table_exists_and_create(
        db_name=db_name,
        tb_name=tb_name,
        tb_cols=tb_cols
    )
    # 2. 查询元数据表中所有的数据内容
    result = util.query_all(db_name, tb_name)
    # 3. 将查询集中的文件路径提取出来放入空列表中
    # new_files = [path_tuple[1] for path_tuple in result]
    for path_tuple in result:
        new_files.append(path_tuple[1])
    # 4. 返回记录已处理数据的列表
    return new_files


def get_mysql_util(host, user, password, port=3306, charset='utf8', autocommit=False):
    """快速获取MySQLUtil对象"""
    return MySQLUtil(host, user, password, port, charset, autocommit)
10-数据库工具模块测试
  1. 在setUP方法中创建数据库连接对象,并选择指定的数据库创建测试数据

  2. 在tearDown方法中删除测试数据并关闭数据库连接

  3. 测试插入数据

  4. 测试查询数据

  5. 检测数据库是否存在

  6. 测试获取已经处理过得文件列表

test/test_mysql_util.py

"""
mysql工具模块单元测试

1. 在setUP方法中创建数据库连接对象,并选择指定的数据库创建测试数据
2. 在tearDown方法中删除测试数据并关闭数据库连接
3. 测试插入数据
4. 测试查询数据
5. 检测数据表是否存在
6. 测试获取已经处理过得文件列表
"""

# 0.导包
import unittest
from util import mysql_util
from config import project_config as conf


class TestMySQLUtil(unittest.TestCase):
    """mysql工具模块测试类的创建"""

    def setUp(self) -> None:
        """创建数据库连接对象,并选择指定的数据库创建测试数据"""
        # 1. 创建一个数据库连接对象
        self.util = mysql_util.get_mysql_util(
            host=conf.metadata_host,
            user=conf.metadata_user,
            password=conf.metadata_password
        )
        # 2. 创建一个测试数据库
        self.util.execute('create database if not exists test charset="utf8";')
        # 3. 切换数据库为测试数据库
        self.util.select_db('test')
        # 4. 创建一个测试数据表
        self.util.execute('create table if not exists unit_test(id int, name varchar(100));')

    def tearDown(self) -> None:
        """删除测试数据并关闭数据库连接"""
        # 1. 删除数据表
        self.util.execute('drop table if exists unit_test;')
        # 2. 删除数据库
        self.util.execute('drop database if exists test;')
        # 3. 关闭数据库连接
        self.util.close()

    def test_insert_single_sql(self):
        """测试插入"""
        # 1. 向测试数据库中插入数据
        self.util.insert_single_sql('insert into unit_test values(1, "小明")')
        # 2. 从测试数据库中查询数据
        result = self.util.query('select * from unit_test;')
        # 3. 使用断言判断查询数据是否正确
        self.assertEqual((1, "小明"), result[0])

    def test_query_all(self):
        """测试查询"""
        # 1. 向测试数据库中插入数据
        self.util.insert_single_sql('insert into unit_test values(1, "小明")')
        # 2. 从测试数据库中查询数据
        result = self.util.query_all(db_name='test', tb_name='unit_test')
        # 3. 使用断言判断查询数据是否正确
        self.assertEqual((1, "小明"), result[0])

    def test_check_table_exists_and_create(self):
        """测试数据表是否已经存在,如果不存在则创建"""
        # 1. 删除测试数据表
        self.util.execute('drop table if exists unit_test;')
        # 2. 调用判断方法
        self.util.check_table_exists_and_create(
            db_name='test',
            tb_name='unit_test',
            tb_cols='id int, name varchar(100)'
        )
        # 3. 查询数据库中所有的数据表
        result = self.util.query('show tables;')
        # 4. 使用断言对比测试数据表是否存在
        self.assertIn(('unit_test',), result)

    def test_get_processed_files(self):
        """测试获取已经处理过得文件列表"""
        # 1. 调用获取已经处理数据文件的方法
        # get_processed_files是一个函数,直接导入模块即可使用
        new_files = mysql_util.get_processed_files(
            util=self.util,
            db_name=conf.metadata_db,
            tb_name=conf.file_monitor_meta_table_name,
            tb_cols=conf.file_monitor_meta_table_create_cols
        )
        # 2. 使用方法返回的结果列表和下方列表进行对比
        list1 = ['D:/etl/json/x00', 'D:/etl/json/x01', 'D:/etl/json/x02']
        self.assertListEqual(list1, new_files)
11-字符串处理工具的开发和测试(复制讲解)

判断时遵循如下原则:

  1. 容器非空即真
  2. 数字非0即真
  3. None做判断条件时当做false使用

util/str_util.py

"""
字符串工具方法
"""


def check_null(data):
    """
    功能:检查传入的字符串是否为空、None或其他无意义的内容,True表示空,False表示非空
    """
    if not data:
        return True

    # 转小写
    data = data.lower()
    if data in ('null', 'none', 'undefined'):
        return True

    return False


def check_null_and_transform(data):
    """
    功能:检查字符串,如果是空内容,返回空字符串,同时兼具去除字符串两边空字符的功能
    """
    if check_null(str(data)):
        return ''
    elif isinstance(data, str):
        return data.strip()
    else:
        return data


def check_str_null_and_transform_to_sql_null(data):
    """
    功能:检查传入字符串
        如果是空内容,返回'null'字符串,用于sql插入
        否则返回数据本身,并带上""包裹用于sql插入
    """
    if check_null(str(data)):
        return 'null'
    else:
        return f"'{data}'"


def check_number_null_and_transform_to_sql_null(data):
    """
    功能:检查传入的数字或字符串数据是否是空
    如果是空内容,则返回'null'字符串
    否则返回数据本身
    """
    if not data or check_null(str(data)):
        return 'null'
    else:
        return data


def clear_str(data):
    """
    功能:处理字符串中异常字符,如 单引号,双引号,逗号,分号等
    Note:这个API有可能破坏数据本身的内容,慎用
    """
    if check_null(data):
        # 如果是无意义内容,直接返回不处理了.
        return data

    data = data.replace("'", "")
    data = data.replace("\"", "")
    data = data.replace(";", "")
    data = data.replace(",", "")
    data = data.replace("@", "")
    data = data.replace("\\", "")

    return data

test/test_str_util.py

"""
字符串工具单元测试类
"""
from unittest import TestCase
from util import str_util


class TestStrUtil(TestCase):
    """字符串工具单元测试类"""
    def setUp(self) -> None:
        pass

    def test_check_null(self):
        data = None
        result = str_util.check_null(data)
        self.assertEqual(True, result)

        data = ''
        result = str_util.check_null(data)
        self.assertEqual(True, result)

        data = 'None'
        result = str_util.check_null(data)
        self.assertEqual(True, result)

        data = 'NONE'
        result = str_util.check_null(data)
        self.assertEqual(True, result)

        data = 'null'
        result = str_util.check_null(data)
        self.assertEqual(True, result)

        data = 'NULL'
        result = str_util.check_null(data)
        self.assertEqual(True, result)

        data = 'aaa'
        result = str_util.check_null(data)
        self.assertEqual(False, result)

    def test_check_str_null_and_transform_to_sql_null(self):
        data = None
        result = str_util.check_number_null_and_transform_to_sql_null(data)
        self.assertEqual('null', result)

        data = 'None'
        result = str_util.check_number_null_and_transform_to_sql_null(data)
        self.assertEqual('null', result)

        data = '哈哈'
        result = str_util.check_str_null_and_transform_to_sql_null(data)
        self.assertEqual("'哈哈'", result)
12-时间戳处理工具的开发和测试(复制讲解)

util/time_util.py

"""
时间处理工具方法
"""
import time


def ts13_to_ts10(ts):
    """将13位的时间戳规范成10位的时间戳"""
    # JSON数据当中,时间戳是以毫秒为单位的,
    # 从JSON中读取出来的时间戳要被python使用要转换为秒为单位(10位)
    return ts // 1000


def ts10_to_date_str(ts, format_str='%Y-%m-%d %H:%M:%S'):
    """将10位的时间戳转换为日期字符串"""
    time_array = time.localtime(ts)
    return time.strftime(format_str, time_array)


def ts13_to_date_str(ts, format_str='%Y-%m-%d %H:%M:%S'):
    """将13位时间戳转换为日期字符串"""
    ts = ts13_to_ts10(ts)
    return ts10_to_date_str(ts, format_str)

util/test_time_util.py

"""
时间处理工具单元测试类
"""
from unittest import TestCase
from util import time_util


class TestTimeUtil(TestCase):
    """时间处理工具单元测试类"""
    def setUp(self) -> None:
        pass

    def test_ts13_to_ts10(self):
        ts = 1645539742000
        result = time_util.ts13_to_ts10(ts)
        self.assertEqual(1645539742, result)

    def test_ts13_to_date_str(self):
        ts = 1645539742000
        s = time_util.ts13_to_date_str(ts)
        self.assertEqual("2022-02-22 22:22:22", s)

    def test_ts10_to_date_str(self):
        ts = 1645539742
        s = time_util.ts10_to_date_str(ts)
        self.assertEqual("2022-02-22 22:22:22", s)

问题: 为什么字符串处理和时间戳工具不记录日志呢?

  1. 事务型操作,修改了数据内容,一定要进行日志记录
  2. 可能出现异常的位置要进行日志记录
  3. 操作了其他服务一定要记录日志
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

ETL工具模块的创建 的相关文章

随机推荐

  • 米哈游服务器位置,米哈游(米哈游账号中心系统)

    只要绑定了手机号就不要紧了 别人也无法用这个邮箱改密码 别人用也是无法验证 去官网下载崩坏3 不要在小米游戏这种第三方应用市场下载 有人买了米哈游的账号被找回吗 我买的三无号直接登不上 登陆的时候提示说没有这个账号 所以最好不要买 自己注册
  • 对SFBC空频编码和FSTD-SFBC频率切换分集与空频编码的MIMO系统误码率matlab仿真

    目录 一 理论基础 二 核心程序 三 测试结果 一 理论基础 Space Frequency Block Code SFBC 是TD LTE系统中的一种抗干扰技术 其基本原理与Wimax中基于Alamuti 编码的STBC类似 LTE标准中
  • 服务器如何安装 宝塔国外版本

    linux服务器如何安装 宝塔国外版本 centos安装方式 yum install y wget wget O install sh http www aapanel com script install 6 0 en sh bash i
  • fiddlerJScript脚本工具类,轻松写出fiddler 脚本,完成数据抓取

    fiddler是常用的抓包工具 fiddlerJScript脚本是用JScript NET 编写的 使用起来和JS 还是有很多区别的 目前网络上的文章比较分散 有些内容也比较旧 对新手不友好 所以自己动手写了一个工具类 封装了一些常用的方法
  • 进程概念(详解)

    进程概念 基本概念 进程的描述 pcb pcb task struct中内容分类 查看进程 查看进程的信息 通过系统调用来查看进程的标识符 创建进程 fork 杀掉进程 kill 进程状态 特殊进程 僵尸进程 孤儿进程 进程的优先级 PRI
  • java获取季度日期

    获取季度第一天和最后一天 获取当前季度日期 方式一 param dateStr 当前日期字符串 默认为当前日期 return String 季度起始日期 季度结束日期 public static String getSeasonDay St
  • Nginx+Tomcat搭建高性能负载均衡集群

    本文转载至 http blog csdn net wang379275614 article details 47778201 一 工具 nginx 1 8 0 apache tomcat 6 0 33 二 目标 实现高性能负载均衡的Tom
  • Node.js详解(四):连接MongoDB

    文章目录 一 安装MongoDB访问驱动 二 连接数据库 三 添加数据 四 添加多条数据 五 修改数据 六 查询数据 1 查询单条记录 2 查询多条记录 七 删除数据 八 完整示例代码 1 路由 Api 接口 2 运行结果 MongoDB
  • 从不懂到会用,PID从理论到实践~笔记

    从不懂到会用 PID从理论到实践 哔哩哔哩 bilibili PID的适用性 一阶 二阶的线性系统 前馈系统只是对干扰做一个补偿 单闭环系统 双闭环控制系统 一个是还没发生干扰但我知道你会干扰进行反应 一个是看到你对我干扰后并发生后才进行反
  • Nginx重中之重的知识点

    1 反向代理 proxy pass http baidu com location proxy pass http atcui com 代理服务器的概念 代理服务器 客户机在发送请求时 不会直接发送给目的主机 而是先发送给代理服务器 代理服
  • wenstorm 快捷键(java)

    向下复制一行 Duplicate Lines Ctrl Down 修改变量名与方法名 Alt Shift R 向下移动行 Alt Down 显示设置窗口 Ctrl Alt S 向上开始新的一行 Start New Line before c
  • Vscode 调试arm64 linux内核

    对于linux内存系列的阅读和测试记录 https zhuanlan zhihu com p 105069730 https zhuanlan zhihu com p 510289859 搭建arm64内核调试环境 安装工具 sudo ap
  • BAT大佬分享:Linux 工程师的 6 类好习惯和 23 个教训

    一 线上操作规范 1 测试使用 当初学习 Linux 的使用 从基础到服务到集群 都是在虚拟机做的 虽然老师告诉我们跟真机没有什么差别 可是对真实环境的渴望日渐上升 不过虚拟机的各种快照却让我们养成了各种手贱的习惯 以致于拿到服务器操作权限
  • uniapp小程序封装常用工具函数

    1 formatTime格式化时间 function formatTime time format if typeof time number typeof format string return time var formateArr
  • linux php自动执行_linux中定时运行php(每分钟执行一次为例)

    注 使用Crontab定时执行php脚本文件 1 安装crontab yum install crontabs 说明 sbin service crond start 启动服务 sbin service crond stop 关闭服务 sb
  • Chisel入门------Chisel的基本语法4

    概述 本节将具体的介绍集中常见的硬件电路 并探索如何使用Chisel语言进行描述 10 示例设计 10 1 FIFO Bufer 通过在写入端和读取端插入缓冲器可以解耦 常见的FIFO是先进先出buffer 其中empty信号和full信号
  • IE6 firefox2

    所以说 如果我们这样来写 head width 100px important width 70px IE 说 它是70px 因为IE 不认识 important 提升优先权 发现了重复定义width时候就按最后一个来显示Firefox说
  • Spring详解(0 控制反转和依赖注入 AOP简介 )

    目录 依赖注入 控制反转和依赖注入的关系 Spring中的依赖注入 AOP 面向切面编程 Spring 框架本身的四大原则 1 使用pojo进行轻量级和最小侵入式开发 2 通过依赖注入和基于接口编程实现松耦合 3 通过AOP和默认习惯进行声
  • Linux命令常见命令用例

    文章目录 常见命令 awk find Tcp抓包 常见命令 awk 筛选nginx日志时间大于5s cat access log sed s g awk 11 NF gt 5 gt out5s log 筛选nginx状态码等于200 cat
  • ETL工具模块的创建

    01 logging工具模块开发 掌握 日志记录的工具模块 作用 方便后续在 ETL 程序中记录日志 目标 当我们在项目的其他位置使用logging模块进行日志记录时 不需要进行配置或者只需要进行简单的配置即可使用 为了更方便的使用logg