01-logging工具模块开发(掌握)
日志记录的工具模块
作用:方便后续在 ETL 程序中记录日志
目标: 当我们在项目的其他位置使用logging模块进行日志记录时,不需要进行配置或者只需要进行简单的配置即可使用.
- 为了更方便的使用logging,我们在日志模块中创建一个日志类Logging,专门管理日志器对象
-
重点: 在创建日志类时可以同时传入日志级别,方便日志级别控制
- 创建init_logger函数快速创建日志器对象,并完成日志处理器和日志格式的绑定.
util/logging_util.py
"""
logging工具模块
1. 创建一个Logging类来管理日志器对象
2. 创建init_logger函数,用于快速获取logger日志器对象,以及绑定日志管理器和输出格式
"""
# 导入模块
import logging
class Logging(object):
"""创建一个Logging类来管理logging对象"""
def __init__(self, level=logging.INFO):
self.logger = logging.getLogger()
self.logger.setLevel(level)
def init_logger():
"""快速获取logger对象,并绑定日志处理器和格式"""
# 1.创建一个日志器对象
logger = Logging().logger
# 2.创建一个日志处理器对象
file_handler = logging.FileHandler(
filename='../logs/test1.log',
mode='a',
encoding='utf8'
)
# 3.将日志处理器绑定到日志器对象上
logger.addHandler(file_handler)
# 4.创建一个日志格式
fmt = logging.Formatter('%(asctime)s - [%(levelname)s] - %(filename)s[%(lineno)d]:%(message)')
# 5.将日志格式绑定到日志处理器上
file_handler.setFormatter(fmt)
# 6.返回logger对象
return logger
02-抽取配置文件数据(理解)
问题1: 为什么我们要抽取配置信息?
配置文件其实就是一个写满全局变量的文件
我们将其抽取到一个特定的文件中,如果需要使用该变量就导入该文件模块即可
抽取配置文件的目的:
- 减少魔法数字和无指定意义字符串,提高代码的可读性
- 魔法数字:就是没有单位的数字
- 无意义字符串: 根据字符串中的内容,无法判断其是什么作用
- 统一管理全局变量,方便代码的后期维护
- 方便多个服务或者模块使用相同的配置
问题2: 什么样的数据可以抽取为配置信息?
- 全局需要使用到的信息, 例如用户名,密码,ip,端口号
- 需要频繁修改,且在多处引用的数据, log日志存放路径
config/project_config.py
import time
# ################## --程序运行日志的配置项-- ###################
# 配置日志输出的目录
log_root_path = "C:/Users/smart/Desktop/pycode/python-etl/logs/"
# 配置日志输出的文件名
log_name = f'pyetl-{time.strftime("%Y%m%d-%H", time.localtime())}.log'
util/logging_util.py
(配置文件抽取替换后)
"""
logging工具模块
1. 创建一个Logging类来管理日志器对象
2. 创建init_logger函数,用于快速获取logger日志器对象,以及绑定日志管理器和输出格式
"""
# 使用配置文件的步骤 : 提取>> 导包 >> 替换
# 导入模块
import logging
from config import project_config as conf
class Logging(object):
"""创建一个Logging类来管理logging对象"""
def __init__(self, level=logging.INFO):
self.logger = logging.getLogger()
self.logger.setLevel(level)
def init_logger():
"""快速获取logger对象,并绑定日志处理器和格式"""
# 1.创建一个日志器对象
logger = Logging().logger
# 2.创建一个日志处理器对象
file_handler = logging.FileHandler(
filename=conf.log_root_path + conf.log_name,
mode='a',
encoding='utf8'
)
# 3.将日志处理器绑定到日志器对象上
logger.addHandler(file_handler)
# 4.创建一个日志格式
fmt = logging.Formatter(conf.log_format)
# 5.将日志格式绑定到日志处理器上
file_handler.setFormatter(fmt)
# 6.返回logger对象
return logger
03-logging工具模块的单元测试(理解)
测试文件名称一般为test_被测试文件名
,即为test_logging_util.py
测试类名称一般为 Test被测试文件名
,即为TestLoggingUtil
测试方法名称一般为 test_被测试方法名称
,即为test_init_logger
测试核心:
测试返回值是否为RootLogger类型
assertIsInstance
test/test_logging_util.py
"""
日志模块的单元测试
"""
# 导入测试模块
import logging
import unittest
# 导入被测试模块
from util import logging_util
# 创建一个测试类
class TestLoggingUtil(unittest.TestCase):
def test_init_logger(self):
"""测试init_logger函数"""
# 运行被测试函数,并获取返回值
logger = logging_util.init_logger()
# 使用断言测试该运行结果是否满足要求
# 反向导包 alt + 回车
self.assertIsInstance(logger, logging.RootLogger)
04-文件工具模块开发(掌握)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aUNAa5IM-1688341224407)(day02-工具模块的创建.assets/1672369877220.png)]
log日志文件和json订单文件都是以文件的形式保存在计算机的指定目录下, 如果我们想要处理这些数据,我们要做的事情:
- 将所有的文件信息提取出来
- 比对已经处理过的文件,筛选出未处理的文件
这两个需求,都需要使用到os模块,所以我们就使用os模块封装一个file_util模块来方便我们调用相关功能.
功能1: 获取指定目录下的所有文件列表
- 此处使用了递归函数, 递归出口是当文件夹内部没有子文件夹时,则跳出递归, 递归嵌套层数不会超过1000层.
def get_dir_files_list(path='./', recursion=False):
参数1: path 获取文件列表的目标路径
参数2: recursion 是否在获取文件列表过程中递归
功能2: 找出all_list中在processed_list中没有出现过的元素
def get_new_by_compare_lists(processed_list, all_list):
参数1: processed_list 已经处理过的文件列表
参数2: all_list 所有文件的文件列表
util/file_util.py
"""
文件处理工具模块
1. 获取全部的文件路径
2. 比对全部文件中未处理过的文件
"""
# 导入os模块
import os
def get_dir_files_list(path='./', recursion=False):
"""
获取指定目录中的所有文件路径
:param path: 目标路径
:param recursion: 是否进行递归(如果不递归则只获取当前目录中的文件不获取子目录中的文件)
:return: 全部文件路径组成的列表 (文件路径为字符串信息)
"""
# 0.创建一个空目录,用来存储文件路径信息
path_list = []
# 1. 获取指定目录下所有文件的文件名称
file_names = os.listdir(path)
# 2. 循环遍历文件名称
for file_name in file_names:
# 3. 使用文件名称与目标路径进行组合获取完整的路径信息
file_path = os.path.join(path, file_name)
# 4. 将所有的相对路径转换为绝对路径
abs_path = os.path.abspath(file_path)
# 5. 判断该路径是否为文件
if os.path.isfile(abs_path):
# 5.1 如果为文件,添加到指定的空列表中
path_list.append(abs_path)
else:
# 5.2 如果不为文件, 判断是否要进行递归
if recursion:
# 获取非文件路径下所有的文件
# 5.2.1如果进行递归,再次调用函数本身
sub_path_list = get_dir_files_list(abs_path, True)
path_list += sub_path_list
# 如果不进行递归则什么也不做
# 6.返回文件路径列表
return path_list
def get_new_by_compare_lists(processed_list, all_list):
"""
获取所有文件路径中,未处理的文件路径信息
:param processed_list: 已经处理过的文件路径列表
:param all_list: 全部的文件路径列表
:return: 未处理的文件路径列表
"""
# 0.创建一个空的列表,用于保存未处理的文件路径
new_list = []
# 1. 循环遍历所有的文件列表
for file_path in all_list:
# 2.与已经处理过的文件列表进行比对
if file_path not in processed_list:
# 2.1 如果该文件没有被处理过,添加到新的文件目录中
new_list.append(file_path)
# 2.2 如果被处理过什么也不做
# 3. 将new_list进行返回
return new_list
if __name__ == '__main__':
print(get_dir_files_list('../', True))
print(get_new_by_compare_lists([1, 2, 3, 4], [1, 2, 3, 4, 5, 6]))
05-文件工具模块测试(理解)
测试获取所有的文件目录:
"""
请在工程根目录的test文件夹内建立:
test_dir/
inner1/
3
4
inner2/
5
1
2
使用 test_dir 目录进行文件工具方法的测试
递归结果应该为:['1', '2', '3', '4', '5']
不递归结果应该为:['1', '2']
"""
测试两个列表中数据差
a_list = ['a.txt', 'b.txt', 'c.txt']
b_list = ['a.txt', 'b.txt', 'c.txt', 'd.txt', 'e.txt']
test/test_file_util.py
"""
文件工具模块测试
"""
# 导入测试模块
import os.path
import unittest
# 导入被测试模块
from util import file_util
# 创建一个测试类
class TestFileUtil(unittest.TestCase):
def test_get_dir_files_list(self):
"""
请在工程根目录的test文件夹内建立:
test_dir/
inner1/
3
4
inner2/
5
1
2
使用 test_dir 目录进行文件工具方法的测试
递归结果应该为:['1', '2', '3', '4', '5']
不递归结果应该为:['1', '2']
"""
# 不递归情况下的测试结果
# 0. 创建一个空列表,保存文件的名称
file_name_list = []
# 1. 测试get_dir_files_list函数,返回所有的文件路径
path_list = file_util.get_dir_files_list('./test_dir')
# 2. 对于path_list进行遍历
for path in path_list:
# 2.1 获取每一个路径的文件名称,加载到file_name_list中
file_name = os.path.basename(path)
file_name_list.append(file_name)
# 优化:最好再获取文件名称后进行排序,减少bug出现的概率
file_name_list.sort()
# 3.使用断言进行比对返回数据是否合规
self.assertListEqual(file_name_list, ['1', '2'])
# 递归情况下的测试结果
file_name_list = []
# 1. 测试get_dir_files_list函数,返回所有的文件路径
path_list = file_util.get_dir_files_list('./test_dir', True)
# 2. 对于path_list进行遍历
for path in path_list:
# 2.1 获取每一个路径的文件名称,加载到file_name_list中
file_name = os.path.basename(path)
file_name_list.append(file_name)
# 优化:最好再获取文件名称后进行排序,减少bug出现的概率
file_name_list.sort()
# 3.使用断言进行比对返回数据是否合规
self.assertListEqual(file_name_list, ['1', '2', '3', '4', '5'])
def test_get_new_by_compare_lists(self):
"""测试a_list 和b_list之间的数据差值"""
a_list = ['a.txt', 'b.txt', 'c.txt']
b_list = ['a.txt', 'b.txt', 'c.txt', 'd.txt', 'e.txt']
# 测试get_new_by_compare_lists,并获取返回的数据差
c_list = file_util.get_new_by_compare_lists(a_list, b_list)
# 使用断言判断返回数据的值是否正确
self.assertListEqual(c_list, ['d.txt', 'e.txt'])
06-数据库工具的开发(掌握)
开发数据库功能需求解析
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Fdu8Uqxj-1688341224408)(day02-工具模块的创建.assets/1672369877220.png)]
问题1: 三个需求中哪一个需要使用到mysql工具???
都需要,因为最终都要写入到mysql中
问题2: 都需要哪些功能呢?
- 数据库的创建
- 数据表的创建
- 插入数据
- 查询数据
- 切换数据库
- 事务功能:
- 在一个json文件或者log日志文件中,会出现多个订单记录或者日志记录,每一条日志记录将执行一次insert指令,将数据插入到mysql数据库中
- 但是我们在数据库中记录的是已经处理完成的文件名称
- 如果发生以下情况,一个文件执行了80% 失败了,此时下次数据采集会重新采集这个文件还是跳过呢?
- 跳过就会丢失一个文件的20%数据, 重新采集就会多一个文件的80%数据
- 此时可以开启事务,将一个文件的所有插入指令定义为一个事物,要么都成功要么都失败
- 如果执行了80%失败了,会将之前的80%数据进行回滚
- 校验数据表是否存在
问题3:在操作数据库时,除了调用sql指令,我们还要做什么事情?
数据库操作工具类
- 执行非查询SQL
- 执行查询SQL
- 切换数据库
- 开启事务
- 提交事务
- 回滚事务
- 判断数据表是否存在
- 检查表是否存在,不存在则创建
- 执行一条插入SQL
- 关闭数据库连接
- 创建数据库工具对象
util/mysql_util.py
# 导包
import pymysql
from util import logging_util
# 创建一个日志器对象
logger = logging_util.init_logger()
class MySQLUtil(object):
"""创建一个mysqlutil类,用来管理数据库操作连接和方法"""
"""
数据库操作工具类
* 创建数据库连接-
* 切换数据库-
* 执行非查询SQL-
* 执行查询SQL-
* 开启事务-
* 提交事务-
* 回滚事务-
* 检查表是否存在-
* 检查表是否存在,不存在则创建-
* 查询指定表的数据-
* 执行一条插入SQL-
* 关闭数据库连接-
"""
def __init__(self, host, user, password, port=3306, charset='utf8', autocommit=False):
"""初始化工具类对象,在工具对象中添加连接属性"""
self.conn = pymysql.connect(
host=host,
port=port,
user=user,
password=password,
charset=charset,
autocommit=autocommit
)
# 记录日志信息
logger.info(f'主机为{host}:{port}的mysql数据服务已经连接....')
def select_db(self, db_name):
"""切换数据库"""
self.conn.select_db(db_name)
# 日志记录信息
logger.info(f'已经切换数据库,数据库名称为{db_name}')
def execute(self, sql):
"""执行非查询SQL"""
# 1. 创建游标
cursor = self.conn.cursor()
# 2. 执行sql语句
cursor.execute(sql)
# 3. 判断自动提交是否开启
if not self.conn.get_autocommit():
# 3.1 如果没有开启,手动进行提交
self.conn.commit()
# 3.2 如果开启了,则什么也不做
# 4. 关闭游标
cursor.close()
# 5. 记录日志
logger.info(f'非查询sql: {sql}已经执行完成...')
def query(self, sql):
"""执行查询SQL"""
# 1. 创建游标
cursor = self.conn.cursor()
# 2. 执行sql语句
cursor.execute(sql)
# 3. 获取结果集
result = cursor.fetchall()
# 4. 关闭游标
cursor.close()
# 5. 记录日志
logger.info(f'查询sql: {sql}已经执行完成...')
# 6. 返回结果集
return result
def begin_transaction(self):
"""开启事务"""
# 问题:什么叫开启事务??? 就是在此处开启事务,如果进行提交回滚,就回滚到这个时间点.
# 1. 如果自动提交已经开启,则无法开启事务(已经提交的数据无法回滚),手动关闭
if self.conn.get_autocommit():
logger.warning('开启事务冲突,当前连接开启了事务自动提交...')
logger.warning('关闭当前连接的事务自动提交,用于手动开启事务...')
self.conn.autocommit(False)
# 2. 已经关闭完,或者没有开启自动提交,则直接开启事务即可
self.conn.begin()
# 3.日志记录
logger.info('事务已经开启...')
def rollback_transaction(self):
"""回滚事务"""
# 1. 回滚事务
self.conn.rollback()
# 2. 记录日志
logger.info('事务回滚完成...')
def commit_transaction(self):
"""提交事务"""
# 1. 提交事务
self.conn.commit()
# 2. 记录日志
logger.info('事务已提交...')
def check_table_exists(self, db_name, tb_name):
"""检查数据表是否存在"""
# 1. 切换数据库
# self.conn.select_db() 不要使用这种方式,因为没有日志记录
self.select_db(db_name)
# 2. 查询数据库中所有的表, 并获取结果集
# 2.1 创建一个sql查表语句
sql = 'show tables;'
# 2.2 执行查询sql方法 , 结果集是元组嵌套元组的方式((python,),(bigdata,))
result = self.query(sql)
# 3. 判断当前数据表是否在结果集汇总
return (tb_name,) in result
def check_table_exists_and_create(self, db_name, tb_name, tb_cols):
"""
查询数据表是否存在,如果不存在则创建
:param db_name: 数据库名称
:param tb_name: 数据表名称
:param tb_cols: 数据表的创表语句
"""
# 1.判断该数据表是否存在
if not self.check_table_exists(db_name, tb_name):
# 2.如果数据表不存在,将会创建表
# 2.1 创建一个创表sql
sql = f'create table {tb_name} ({tb_cols});'
# 2.2 执行非查询sql方法
self.execute(sql)
# 2.3 记录日志
logger.info(f'数据表{tb_name},已经在数据库{db_name}中创建成功...')
else:
# 3.如果数据表存在则跳过
# 3.1.记录日志
logger.info(f'数据库{db_name}中的数据表{tb_name}已经存在,跳过表的创建...')
def query_all(self, db_name, tb_name, limit=None):
"""
查询指定表的数据
:param db_name: 数据库的名称
:param tb_name: 数据表的名称
:param limit: 限制查询范围
:return: 查询出来的结果集
"""
# 1. 切换数据库
self.select_db(db_name)
# 2. 查询指定表中的数据
sql = f'select * from {tb_name}'
# 2.1 判断limit是否为None
if limit is not None:
# 如果为None则拼接sql语句
sql += f' limit {limit}'
# 如果不为None,则不用拼接
# 2.2 执行sql语句
result = self.query(sql)
# 3.返回结果集
return result
def insert_single_sql(self, sql):
"""执行一条插入SQL"""
# 为什么要单独穿件一个插入方法,因为插入时可能存在一些异常,我们需要记录异常,方便后续对于数据的修改和监控
try:
# 1. 执行插入sql语句
self.execute(sql)
except Exception as e:
# 2. 捕获异常
# 2.1 如果出现异常,则记录日志
logger.warning(f'sql执行出现异常,异常信息为: {e}')
# 2.2 将异常继续抛出,防止已经错误的程序继续执行
# 使用raise指令可以手动抛出异常
raise e
# 3.如果没有异常则记录日志信息
logger.info(f'插入sql语句:{sql}执行成功,没有异常出现')
def close(self):
"""关闭数据库连接"""
# 只要创建了对象,连接就一直存在
if self.conn:
self.conn.close()
# 记录日志信息
logger.info('数据库连接已经关闭...')
07-数据库工具抽取配置信息
# ################## --元数据库metadata配置项-- ###################
# 元数据库配置
metadata_host = 'localhost'
metadata_port = 3306
metadata_user = 'root'
metadata_password = 'mysql'
metadata_db = 'metadata'
# ################## --订单JSON数据采集配置项-- ###################
# 采集订单JSON数据,元数据表配置项
file_monitor_meta_table_name = "json_file_monitor"
file_monitor_meta_table_create_cols = \
"id INT PRIMARY KEY AUTO_INCREMENT, " \
"file_name VARCHAR(255) NOT NULL COMMENT '处理文件名称', " \
"process_lines INT NULL COMMENT '文件处理行数', " \
"process_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '文件处理时间'"
# ################## --后台日志数据采集配置项-- ###################
# 采集后台日志数据,元数据表配置项
logs_monitor_meta_table_name = "backend_logs_monitor"
logs_monitor_meta_table_create_cols = \
"id INT PRIMARY KEY AUTO_INCREMENT, " \
"file_name VARCHAR(255) NOT NULL COMMENT '处理文件名称', " \
"process_lines INT NULL COMMENT '文件处理行数', " \
"process_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '文件处理时间'"
08-已采集文件的元数据库表创建
问题1: 什么叫做元数据???
在大数据场景下,元数据非常常见,对于不同的服务,元数据的内容也不相同,但是都是描述数据的数据
举例:
例1: 我们有一个文件 1.txt 此时 1.txt这个文件就是数据, 文件的大小,名称, 位置, 类型…都是元数据—HDFS
例2: 我们将一个数据表存储到数据仓库中, 数据表中所有的记录时数据, 字段的名称, 表名, 库名…都是元数据–Hive
问题2:为什么要记录元数据???
我们一般会将元数据存储在一个服务中(msyql orcale hive 内存), 因为我们只有获取了元数据,才能查询到数据
举例: 我们想获取person表中的一个数据记录, 必须知道数据库的名称,数据表的名称,以及记录的约束条件
问题3: 我们项目中记录的元数据是什么???
在metadata元数据库中,创建一个 json_file_monitor 表,用于保存被处理过的订单JSON文件被的表述信息。
在metadata元数据库中还会插件一个backend_logs_monitor表,用于保存被处理过的日志文件的表述信息
-- 创建元数据库
create database metadata;
-- 使用元数据管理库
USE metadata;
-- 创建已采集文件数据表
CREATE TABLE json_file_monitor (
id INT PRIMARY KEY AUTO_INCREMENT,
file_name VARCHAR(255) NOT NULL COMMENT '处理文件名称',
process_lines INT NULL COMMENT '文件处理行数',
process_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '文件处理时间'
);
-- 执行如下SQL,添加一些测试数据,假设已有文件被采集过
INSERT INTO json_file_monitor (file_name, process_lines)
VALUES
('D:/etl/json/x00', 1024),
('D:/etl/json/x01', 1024),
('D:/etl/json/x02', 1024);
创建元数据库并插入部分信息,方便后续进行测试
字段含义:
- file_name 处理过的文件名称
- process_lines 文件处理行数
- process_time 处理文件的时间
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9t0Z1Bw9-1688341224409)(day02-工具模块的创建.assets/1672392238145.png)]
09-从mysql中获取已经处理的JSON文件列表
获取已被处理的订单JSON文件列表
如果元数据记录表 json_file_monitor
不存在,则创建它
util/mysql_util.py
追加到末尾
def get_processed_files(util: MySQLUtil, db_name, tb_name, tb_cols):
"""
获取元数据库中已经处理过的文件
:param util: MySQLUtil对象
:param db_name: 元数据库名称
:param tb_name: 元数据表名称
:param tb_cols: 元数据表的创表语句
:return: 未处理过的文件绝对路径列表
"""
# 0. 创建一个空列表用于存储已处理文件的路径字符串
new_files = []
# 1. 判断该数据表是否存在,如果不存在则创建,如果存在则跳过
util.check_table_exists_and_create(
db_name=db_name,
tb_name=tb_name,
tb_cols=tb_cols
)
# 2. 查询元数据表中所有的数据内容
result = util.query_all(db_name, tb_name)
# 3. 将查询集中的文件路径提取出来放入空列表中
# new_files = [path_tuple[1] for path_tuple in result]
for path_tuple in result:
new_files.append(path_tuple[1])
# 4. 返回记录已处理数据的列表
return new_files
def get_mysql_util(host, user, password, port=3306, charset='utf8', autocommit=False):
"""快速获取MySQLUtil对象"""
return MySQLUtil(host, user, password, port, charset, autocommit)
10-数据库工具模块测试
-
在setUP方法中创建数据库连接对象,并选择指定的数据库创建测试数据
-
在tearDown方法中删除测试数据并关闭数据库连接
-
测试插入数据
-
测试查询数据
-
检测数据库是否存在
-
测试获取已经处理过得文件列表
test/test_mysql_util.py
"""
mysql工具模块单元测试
1. 在setUP方法中创建数据库连接对象,并选择指定的数据库创建测试数据
2. 在tearDown方法中删除测试数据并关闭数据库连接
3. 测试插入数据
4. 测试查询数据
5. 检测数据表是否存在
6. 测试获取已经处理过得文件列表
"""
# 0.导包
import unittest
from util import mysql_util
from config import project_config as conf
class TestMySQLUtil(unittest.TestCase):
"""mysql工具模块测试类的创建"""
def setUp(self) -> None:
"""创建数据库连接对象,并选择指定的数据库创建测试数据"""
# 1. 创建一个数据库连接对象
self.util = mysql_util.get_mysql_util(
host=conf.metadata_host,
user=conf.metadata_user,
password=conf.metadata_password
)
# 2. 创建一个测试数据库
self.util.execute('create database if not exists test charset="utf8";')
# 3. 切换数据库为测试数据库
self.util.select_db('test')
# 4. 创建一个测试数据表
self.util.execute('create table if not exists unit_test(id int, name varchar(100));')
def tearDown(self) -> None:
"""删除测试数据并关闭数据库连接"""
# 1. 删除数据表
self.util.execute('drop table if exists unit_test;')
# 2. 删除数据库
self.util.execute('drop database if exists test;')
# 3. 关闭数据库连接
self.util.close()
def test_insert_single_sql(self):
"""测试插入"""
# 1. 向测试数据库中插入数据
self.util.insert_single_sql('insert into unit_test values(1, "小明")')
# 2. 从测试数据库中查询数据
result = self.util.query('select * from unit_test;')
# 3. 使用断言判断查询数据是否正确
self.assertEqual((1, "小明"), result[0])
def test_query_all(self):
"""测试查询"""
# 1. 向测试数据库中插入数据
self.util.insert_single_sql('insert into unit_test values(1, "小明")')
# 2. 从测试数据库中查询数据
result = self.util.query_all(db_name='test', tb_name='unit_test')
# 3. 使用断言判断查询数据是否正确
self.assertEqual((1, "小明"), result[0])
def test_check_table_exists_and_create(self):
"""测试数据表是否已经存在,如果不存在则创建"""
# 1. 删除测试数据表
self.util.execute('drop table if exists unit_test;')
# 2. 调用判断方法
self.util.check_table_exists_and_create(
db_name='test',
tb_name='unit_test',
tb_cols='id int, name varchar(100)'
)
# 3. 查询数据库中所有的数据表
result = self.util.query('show tables;')
# 4. 使用断言对比测试数据表是否存在
self.assertIn(('unit_test',), result)
def test_get_processed_files(self):
"""测试获取已经处理过得文件列表"""
# 1. 调用获取已经处理数据文件的方法
# get_processed_files是一个函数,直接导入模块即可使用
new_files = mysql_util.get_processed_files(
util=self.util,
db_name=conf.metadata_db,
tb_name=conf.file_monitor_meta_table_name,
tb_cols=conf.file_monitor_meta_table_create_cols
)
# 2. 使用方法返回的结果列表和下方列表进行对比
list1 = ['D:/etl/json/x00', 'D:/etl/json/x01', 'D:/etl/json/x02']
self.assertListEqual(list1, new_files)
11-字符串处理工具的开发和测试(复制讲解)
判断时遵循如下原则:
- 容器非空即真
- 数字非0即真
- None做判断条件时当做false使用
util/str_util.py
"""
字符串工具方法
"""
def check_null(data):
"""
功能:检查传入的字符串是否为空、None或其他无意义的内容,True表示空,False表示非空
"""
if not data:
return True
# 转小写
data = data.lower()
if data in ('null', 'none', 'undefined'):
return True
return False
def check_null_and_transform(data):
"""
功能:检查字符串,如果是空内容,返回空字符串,同时兼具去除字符串两边空字符的功能
"""
if check_null(str(data)):
return ''
elif isinstance(data, str):
return data.strip()
else:
return data
def check_str_null_and_transform_to_sql_null(data):
"""
功能:检查传入字符串
如果是空内容,返回'null'字符串,用于sql插入
否则返回数据本身,并带上""包裹用于sql插入
"""
if check_null(str(data)):
return 'null'
else:
return f"'{data}'"
def check_number_null_and_transform_to_sql_null(data):
"""
功能:检查传入的数字或字符串数据是否是空
如果是空内容,则返回'null'字符串
否则返回数据本身
"""
if not data or check_null(str(data)):
return 'null'
else:
return data
def clear_str(data):
"""
功能:处理字符串中异常字符,如 单引号,双引号,逗号,分号等
Note:这个API有可能破坏数据本身的内容,慎用
"""
if check_null(data):
# 如果是无意义内容,直接返回不处理了.
return data
data = data.replace("'", "")
data = data.replace("\"", "")
data = data.replace(";", "")
data = data.replace(",", "")
data = data.replace("@", "")
data = data.replace("\\", "")
return data
test/test_str_util.py
"""
字符串工具单元测试类
"""
from unittest import TestCase
from util import str_util
class TestStrUtil(TestCase):
"""字符串工具单元测试类"""
def setUp(self) -> None:
pass
def test_check_null(self):
data = None
result = str_util.check_null(data)
self.assertEqual(True, result)
data = ''
result = str_util.check_null(data)
self.assertEqual(True, result)
data = 'None'
result = str_util.check_null(data)
self.assertEqual(True, result)
data = 'NONE'
result = str_util.check_null(data)
self.assertEqual(True, result)
data = 'null'
result = str_util.check_null(data)
self.assertEqual(True, result)
data = 'NULL'
result = str_util.check_null(data)
self.assertEqual(True, result)
data = 'aaa'
result = str_util.check_null(data)
self.assertEqual(False, result)
def test_check_str_null_and_transform_to_sql_null(self):
data = None
result = str_util.check_number_null_and_transform_to_sql_null(data)
self.assertEqual('null', result)
data = 'None'
result = str_util.check_number_null_and_transform_to_sql_null(data)
self.assertEqual('null', result)
data = '哈哈'
result = str_util.check_str_null_and_transform_to_sql_null(data)
self.assertEqual("'哈哈'", result)
12-时间戳处理工具的开发和测试(复制讲解)
util/time_util.py
"""
时间处理工具方法
"""
import time
def ts13_to_ts10(ts):
"""将13位的时间戳规范成10位的时间戳"""
# JSON数据当中,时间戳是以毫秒为单位的,
# 从JSON中读取出来的时间戳要被python使用要转换为秒为单位(10位)
return ts // 1000
def ts10_to_date_str(ts, format_str='%Y-%m-%d %H:%M:%S'):
"""将10位的时间戳转换为日期字符串"""
time_array = time.localtime(ts)
return time.strftime(format_str, time_array)
def ts13_to_date_str(ts, format_str='%Y-%m-%d %H:%M:%S'):
"""将13位时间戳转换为日期字符串"""
ts = ts13_to_ts10(ts)
return ts10_to_date_str(ts, format_str)
util/test_time_util.py
"""
时间处理工具单元测试类
"""
from unittest import TestCase
from util import time_util
class TestTimeUtil(TestCase):
"""时间处理工具单元测试类"""
def setUp(self) -> None:
pass
def test_ts13_to_ts10(self):
ts = 1645539742000
result = time_util.ts13_to_ts10(ts)
self.assertEqual(1645539742, result)
def test_ts13_to_date_str(self):
ts = 1645539742000
s = time_util.ts13_to_date_str(ts)
self.assertEqual("2022-02-22 22:22:22", s)
def test_ts10_to_date_str(self):
ts = 1645539742
s = time_util.ts10_to_date_str(ts)
self.assertEqual("2022-02-22 22:22:22", s)
问题: 为什么字符串处理和时间戳工具不记录日志呢?
- 事务型操作,修改了数据内容,一定要进行日志记录
- 可能出现异常的位置要进行日志记录
- 操作了其他服务一定要记录日志