多路归并排序-Python实现大文件排序,合并排序

2023-05-16

使用python实现多(K)路归并外部排序,解决小内存排序大文件问题

上一篇中,我们实现了一般的归并排序 归并排序递归与非递归-Python实现

在实际工作中,多个有序数列合并成一个,大文件或多个大文件合并成一个并排序的需求常见并不少见,首先,先来看一下多个有序数列情况

合并多个有序数组

比如现在有四路:

  • a0: [1, 3, 6, 7]
  • a1: []
  • a2: [3, 5, 7, 19]
  • a3: [9, 12, 87, 98]

保存每路最小值

第一步需要知道每一路的最小值,如果每一路用数组表示的话需要保存对应的下标,并保存为min_map

  • 第0路: 1
  • 第1路: 没有值
  • 第2路: 3
  • 第3路: 9

初始的 min_map:

{0: (1, 0), 2: (3, 0), 3: (9, 0)}

获取最小值中的最小值

第二部需要将最小值取出来然,后检查被取出值的那一路是否还剩下。

其他元素如果存在,则修改min_map里面对应的值,如果不存在,则删除掉min_map里面对应的记录,以表示该路已经没有元素需要遍历了

代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# 多路归并: 将已经排序好的多个数组合并起来


def nw_merge(arrs):
    """
    需要知道每一路的最小值
    第0路: 1
    第1路: 没有值
    第2路: 3
    第3路: 9
    """

    result = []
    min_map = {} # 用min_map 保存每一路的当前最小值
    for inx, arr in enumerate(arrs):
        if arr:
            min_map[inx] = (arr[0], 0)

    print("初始化的每一路最小值min_map", min_map)

    while min_map:
        """
        需要知道每一路的最小值里面哪一路的最小值, 以及最小值所在的那一路的index
        """

        min_ = min(min_map.items(), key = lambda m: m[1][0])
        way_num, (way_min_v, way_inx) = min_
        result.append(way_min_v)

        """
        检查被取出值的那一路是否还剩下其他元素, 如果存在, 则修改min_map里面对应的值, 如果不存在,
        则删除掉min_map里面对应的记录, 以表示该路已经没有元素需要遍历了
        """
        way_inx += 1
        if way_inx < len(arrs[way_num]):
            min_map[way_num] = (arrs[way_num][way_inx], way_inx)
        else:
            del min_map[way_num]
    return result

a0 = [1, 3, 6, 7]
a1 = []
a2 = [3, 5, 7, 19]
a3 = [9, 12, 87, 98]
arrs = [a0, a1, a2, a3]

print("a0:", a0)
print("a1:", a1)
print("a2:", a2)
print("a3:", a3)

result = nw_merge(arrs)

print("最终合并的:", result)

输出

"""
a0: [1, 3, 6, 7]
a1: []
a2: [3, 5, 7, 19]
a3: [9, 12, 87, 98]

初始化的每一路最小值min_map {0: (1, 0), 2: (3, 0), 3: (9, 0)}
"""

# 最终合并的: 
[1, 3, 3, 5, 6, 7, 7, 9, 12, 19, 87, 98]

对超大文件排序(10G的日志,512M的内存)

绕不开归并核心思想,分治,先拆成小文件,再排序,最后合并所有碎片文件成一个大文件

拆文件

首先第一步,大文件拆分成 x 个 block_size 大的小文件,每个小文件排好序


def save_file(l, fileno):
    filepath = f"/home/xxx/{fileno}"

    f = open(filepath, 'a')
    for i in l:
        f.write(f"{i}\n")
    f.close()
    return filepath

def split_file(file_path, block_size):
    f = open(file_path, 'r')
    fileno = 1 
    files = []
    while True:
        lines = f.readlines(block_size)
        if not lines:
            break
        lines = [int(i.strip()) for i in lines]
        lines.sort()
        files.append(save_file(lines, fileno))
        fileno += 1
    return files

合并

将拆分成的小文件合并起来,然后将归并的东西写到大文件里面去,这里用到的是上面的多路归并的方法

def nw_merge(files):
    fs = [open(file_) for file_ in files]
    min_map = {}
    out = open("/home/xxx/out", "a")
    for f in fs:
        read = f.readline()
        if read:
            min_map[f] = int(read.strip())
    
    while min_map:
        min_ = min(min_map.items(), key = lambda x: x[1])
        min_f, min_v = min_
        out.write("{}".format(min_v))
        out.write("\n")
        nextline = min_f.readline()
        if nextline:
            min_map[min_f] = int(nextline.strip())
        else:
            del min_map[min_f]

全部代码

import os
from pathlib import Path


def nw_merge(files):
    fs = [open(file_) for file_ in files]
    min_map = {}  # 用来记录每一路当前最小值
    out = open(Path(".") / "out/integration.txt", "a+")
    for f in fs:
        read = f.readline()
        if read:
            min_map[f] = int(read.strip())

    while min_map:  # 将最小值取出, 并将该最小值所在的那一路做对应的更新
        min_ = min(min_map.items(), key=lambda x: x[1])
        min_f, min_v = min_
        out.write(f"{min_v}\n")
        nextline = min_f.readline()
        if nextline:
            min_map[min_f] = int(nextline.strip())
        else:
            del min_map[min_f]
    for f in fs:
        f.close()
    out.close()


def save_file(l, fileno):
    path = Path(".") / "split"
    filepath = path / f"{fileno}"
    info = '\n'.join(map(str, l))
    with open(filepath, "a+") as f:
        f.write(f"{info}")

    return filepath


def split_file(file_path, block_size):
    fileno = 1  # 文件数
    files = []  # 小文件目录
    with open(file_path, 'r') as f:
        while True:
            lines = f.readlines(block_size)
            if not lines:
                break
            lines = [int(i.strip()) for i in lines]  # 生成一个列表
            lines.sort()  # 排序
            files.append(save_file(lines, fileno))
            fileno += 1
        return files


if __name__ == "__main__":
    # 每行单个数字
    file_path = Path(".") / "tests.txt"
    block_size = 500 * 1024 * 1024 # 500K
    num_blocks = os.stat(file_path).st_size / block_size
    files = split_file(file_path, block_size)
    nw_merge(files)

阅读更多

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

多路归并排序-Python实现大文件排序,合并排序 的相关文章

  • vue3源码分析(二)—— 初始化流程

    系列文章目录 目录分析初始化流程响应式系统shared工具函数 文章目录 系列文章目录前言一 createApp在项目中的使用二 createApp源码追溯1 创建app实例1 1 ensureRenderer1 2 ensureRende
  • JS基础 ——解释执行

    文章目录 前言一 词法分析二 预编译创建全局作用域GO对象创建局部作用域AO对象 三 代码执行总结 前言 大家都知道 xff0c JS是一种不需要编译的解释型语言 但其实在浏览器执行JS代码前 xff0c 也有一个词法分析和预编译过程 xf
  • vue 项目中引入字体文件的正确方式~

    文章目录 前言一 开发中需要什么样的字体1 字体图标2 特殊字体 二 项目中引入字体文件1 字体文件2 css文件3 项目使用该字体 总结 前言 在UI设计稿中 xff0c 可能会有一些特殊字体 xff0c 或者是一些字体图标 对于特殊字体
  • vue3 使用 swiper轮播库

    文章目录 前言一 Swiper引入方式1 HTML标签引入方式2 CommonJs引入方式3 ES引入方式 xff08 采用 xff09 二 使用Swiper总结 前言 轮播图在前端开发中 xff0c 是常见需求 而Swiper库是最受前端
  • vue-cli2 老项目webpack3升级webpack5过程总结

    文章目录 背景一 webpack5环境要求二 升级步骤1 脚手架webpack cli2 升级webpack包3 更新webpack相关插件3 1 不推荐或被移除的插件3 2 升级babel到7版本3 3 更新插件 4 修改配置4 1 新增
  • 前端下载文件

    文章目录 前言二进制流前端核心实现下载功能有 xff1a 一 a标签 43 download属性二 window open url 34 blank 34 三 form表单四 接口请求 43 blob 43 a标签 43 download属
  • 前端JS 云打印 LODOP实践

    文章目录 前言一 Lodop是什么 xff1f 二 如何使用Lodop1 下载打印插件2 配置打印机3 html中植入打印控件4 调用Lodop对应的JS相关方法接口实现打印功能 三 Lodop主要方法接口三 注意点总结 前言 一般B S系
  • axios源码——工具函数utils.js

    文章目录 前言一 工具函数所在目录二 判定数据类型的函数1 isArray 判定数组 2 isString 判定字符串 3 isNumber 判定数值 4 isObject 判定对象 5 isPlainObject 判定纯对象 6 isUn
  • 源码阅读——validate-npm-package-name

    文章目录 前言一 源码阅读工具二 阅读源码1 目录结构2 package json3 index js 三 使用该包1 vue cli中使用2 create react app 中使用 总结 前言 validate npm package
  • 学习rtklib

    数据下载 日期转换和一些常用数据下载 http www gnsscalendar com index html year 61 2019 多系统精密星历和精密钟差下载 2021年10月25日更新 xff1a 单GPS精密星历文件要在这里下载
  • echarts 绘制多条折线图(横坐标,折线图条数不确定)

    项目场景 xff1a 使用echarts做业务图表统计 xff0c 记录一下在项目中图表接口返回的数据处理问题 需求描述 1 一个统计图中实现多条折现图的显示 xff0c 如下图所示 2 后台返回的数据结构 span class token
  • 线性二次型调节器(LQR)原理详解

    文章目录 前言算法解释代价函数的意义 推导过程可控性LQR控制实例参考资料 前言 LQR Linaer Quadratic Regulator xff0c 即线性二次型调节器 xff0c 是一种现代控制理论中设计状态反馈控制器 State
  • 嵌入式软件工程师常见面试问题

    嵌入式软件工程师面试题 1 stm32启动方式 xff1f 有三种 xff1a 从Flash启动 xff0c 将Flash地址0x0800 0000映射到0x00000000 这样启动以后就相当于从0x0800 0000开始的 xff0c
  • 使用python爬虫把自己的CSDN文章爬取下来并保存到MD文件

    导言 爬虫作为一个敏感技术 xff0c 千万要把握好 xff0c 如果人家不让爬那就不要头铁去试了 如何确定某个网站是否允许爬虫 在域名后面加上 robots txt查看即可 xff0c 例如 xff1a https blog csdn n
  • 寻找矩阵中的指定路径

    给定一个矩阵和一个要找的字符串 xff0c 如果有的话找出矩阵中的字符串路径 测试用例 功能测试 xff1a 在多行多列的矩阵中存在或者不存在路径 边界值测试 xff1a 矩阵只有一行或一列 xff1b 矩阵和路径中的所有字母都是相同的 特
  • Kalibr标定Intel D435i相机

    Kalibr标定Intel D435i相机 文章目录 Kalibr标定Intel D435i相机1 相机标定2 IMU标定3 相机 43 IMU联合标定4 标定结果评价 系统环境 xff1a ubuntu16 04 43 roskineti
  • VINS初始化

    VINS初始化 VINS初始化之外参在线标定 前面主要分析了外参标定出来旋转矩阵 xff0c 接下来接着分析初始化 if solver flag 61 61 INITIAL if frame count 61 61 WINDOW SIZE
  • ROS中关于两个话题时间同步遇到的问题 message_filters

    ROS中关于两个话题时间同步遇到的问题 message filters 参考链接 CMakeFiles imu data dir src imu data cpp o xff1a 在函数 message filters Synchroniz
  • ROS之多传感器融合算法实现

    ROS之多传感器融合算法实现 文章目录 1 motivation2 method2 1 订阅ROS的多个话题并对数据进行处理2 2 订阅ROS的多个话题并发布成一个话题 1 motivation IntelRealsenseD435i传感器
  • ignav代码阅读笔记

    整个代码还是根据rtklib进行改的 xff0c 功能很完善 xff0c 但是我主要只关注ppp ins紧组合 代码链接 https github com Erensu ignav 代码功能 可以完成ppp和ins的紧组合 xff0c 把c

随机推荐