E1.2 Go语言实现超大文本文件按行排序和去重复行

对超大文本文件进行排序(这里的排序一般指按行进行排序),是一种很特殊需求,这种“超大”的文本文件一般是指远远超出内存大小因而无法一次加载进内存来进行排序的文件,它的处理方式和一般的排序算法肯定会有所不同。另外,有时候还需要对文本进行去除重复行的任务。本文将给出一个算法思路和实际示例。


-> 首先用一个io.Reader对象来分段读取文件中的内容,假设文件大小有10G(字节)之大,而可用内存仅为4G,比较保守地我们可以每次读取200M左右的数据写入依次编号的文件中,也就是用200M字节大小的字节切片作为缓冲区;这样会产生50个新的文件,每个文件大小约为200M字节;

-> 在写入这50个文件时,可以先进行按行排序;这样就得到50个内部已经排好序的文本文件;而由于每个文件的数据量远远小于内存总数量,所以这个排序在内存中是没有问题的;

-> 新建一个用于写入最终结果的文本文件,并用bufio.Writer来进行追加写入,且可以每次只追加一行;

-> 然后同时打开前面那50个排好序的文件,用bufio.Reader来从每个文件读取第一行,并比较这50个第一行中哪个排名最靠前(最大或最小,根据升序或降序的排序要求),假设第5个文件中的这行最靠前,则将这一行追加到结果文件,然后再从第5个文件中读取下一行替代已被追加到结果的那行重新与其他行做比较;

-> 如此循环重复,直至所有的文件被读取完毕;此时的输出文件就是已经经过排序好的最终文件了。


这样,循环执行完毕之后,结果文件就将是完全排好序之后的超大文本文件了。


下面是使用这个思路实现的实际示例代码:

本文中的代码可以在https://github.com/topxeq/goexamples/tree/master/duplicate1中找到。

package main

import (

    "bufio"

    "io"

    "os"

    "path/filepath"

    "sort"

    "strings"

    "github.com/topxeq/tk"

)

func main() {

    var errT error

    // 获取第1个命令行参数(实际上是第二个命令行参数,可执行文件名是第1个,序号为0)

    fileNameT := tk.GetParameterByIndexWithDefaultValue(os.Args, 1, "")

    // 如果命令行参数中没有指定文件名则报错

    if fileNameT == "" {

        tk.Pl("文件名不能为空")

        return

    }

    // 如果文件不存在也报错

    if !tk.IfFileExists(fileNameT) {

        tk.Pl("文件 %v 不存在。", fileNameT)

        return

    }

    // limitLineCountT限制每个分块文件的大小(行数)

    // 从命令行参数中可以用-size=100000这样的参数来设置,默认为5000000行

    limitLineCountT := tk.GetSwitchWithDefaultIntValue(os.Args, "-size=", 5000000)

    // 总行数

    lineCountT := 0

    // 分块文件数

    fileCountT := 0

    // 打开原始文件准备进行读取

    fileT, errT := os.Open(fileNameT)

    if errT != nil {

        tk.Pl("打开文件时发生错误:%v", errT)

        return

    }

    // 创建一个缓冲式读取器对象

    readerT := bufio.NewReader(fileT)

    // ifEOFT用于判断是否读到了文件末尾

    ifEOFT := false

    // 临时变量,用于存储字符串

    var tmps string

    // 反复循环从源文件中读取行,直至读到文件末尾

    // 每次读取最多limitLineCountT行,写入临时文件中,超出则继续写到下一个临时文件中

    // 临时文件名按数字进行排序,存于变量fileCountT中

    for !ifEOFT {

        // 分配指定大小的切片(可以理解为Go语言中的可变长数组)准备放置读取到的文本行

        bufT := make([]string, 0, limitLineCountT)

        fileCountT++

        tk.Pl("正在读取第%v组数据", fileCountT)

        // 临时文件名,tk.Spr函数相当于fmt.Sprintf函数

        // 本例中的临时文件名将依次为sub00000001.txt、sub00000002.txt...

        subFileNameT := tk.Spr("sub%08d.txt", fileCountT)

        // 默认将临时文件放在执行时的当前目录下

        subPathT := filepath.Join("./", subFileNameT)

        // 循环读取limitLineCountT次,试图读取limitLineCountT行文本

        for j := 0; j < limitLineCountT; j++ {

            strT, errT := readerT.ReadString('\n')

            if errT != nil {

                // 读到文件结尾时的处理

                if errT == io.EOF {

                    tmps = tk.Trim(strT)

                    if tmps != "" {

                        bufT = append(bufT, tmps)

                    }

                    ifEOFT = true

                } else {

                    tk.Pl("文件读取失败:%v", errT)

                    fileT.Close()

                    os.Exit(1)

                }

                break

            }

            tmps = tk.Trim(strT)

            // 本例中空行将被丢弃,即不处理空行(包括含有空格等空白字符的行)

            if tmps != "" {

                bufT = append(bufT, tmps)

            }

        }

        // 对读取到的最多limitLineCountT行文本进行排序

        tk.Pl("正在排序第%v组数据", fileCountT)

        sort.Sort(sort.StringSlice(bufT))

        // 保存排序后的文本到临时文件

        tk.Pl("正在保存第%v组数据到临时文件%v", fileCountT, subPathT)

        rse := tk.SaveStringListBuffered(bufT, subPathT, "\n")

        if tk.IsErrorString(rse) {

            tk.Pl("保存临时文件%v失败:%v", subPathT, tk.GetErrorString(rse))

            fileT.Close()

            os.Exit(1)

        }

        // 记录总共处理的行数

        lineCountT += len(bufT)

    }

    fileT.Close()

    tk.Pl("共读取了%v行,写入了%v个临时文件", lineCountT, fileCountT)

    // 排序写

    tk.Pl("进行多文件排序并去除重复行……")

    // 存放临时文件读取器的变量

    filesT := make([]*os.File, fileCountT)

    readersT := make([]*bufio.Reader, fileCountT)

    // 用于进行对多个文件读取的第一行进行大小比对排序的变量

    strBufT := make([]string, fileCountT)

    compareBufT := make([]int, fileCountT)

    selIndexT := 0

    // 用于保存当前写入的行,用于去除重复行

    currentLineT := ""

    // 统计整体读取的行数和写入的行数

    readCountT := 0

    writeCountT := 0

    // 打开多个临时文件用于同时读取

    for i := 1; i <= fileCountT; i++ {

        subPathT := filepath.Join("./", tk.Spr("sub%08d.txt", i))

        tk.Pl("打开临时文件%v准备读取", subPathT)

        filesT[i-1], errT = os.Open(subPathT)

        if errT != nil {

            tk.Pl("打开文件时发生错误:%v", errT)

            os.Exit(1)

        }

        readersT[i-1] = bufio.NewReader(filesT[i-1])

    }

    // 创建一个新文件用于写入最终结果,默认为当前目录下的output.txt文件

    outputFileT, errT := os.Create("./output.txt")

    if errT != nil {

        tk.Pl("创建输出文件时发生错误:%v", errT)

        os.Exit(1)

    }

    // 创建写入器

    outputWriterT := bufio.NewWriter(outputFileT)

    // 用于判断是否是写入的第一行

    // 如果不是第一行,将再写入每一行文本之前,先写入一个回车换行符

    notFirstFlagT := false

    // 循环读取并写入结果文件

    for true {

        var lineT string

        // 记录一共被关闭了多少个临时文件,表示已经有多少个临时文件被读取完毕

        var closedFileT = 0

        // 是否读到文件结尾

        var eofT bool

        // 从各个文件中都读取一行,空行将被丢弃

        for k := 0; k < fileCountT; k++ {

            if readersT[k] == nil {

                closedFileT++

                continue

            }

            // 如果某个文件对应的一行已空,则再读一行

            if strBufT[k] == "" {

                foundT := false

                for readersT[k] != nil {

                    lineT, eofT, errT = tk.ReadLineFromBufioReader(readersT[k])

                    if errT != nil {

                        tk.Pl("从临时文件%v中读取数据时发生错误:%v", k, errT)

                        os.Exit(1)

                    }

                    lineT = tk.Trim(lineT)

                    if eofT {

                        readersT[k] = nil

                        filesT[k].Close()

                    }

                    if lineT == "" {

                        continue

                    }

                    foundT = true

                    break

                }

                if foundT {

                    strBufT[k] = lineT

                }

            }

        }

        // 进行计数式比对,找出排名最靠前的一行

        var compareT int

        for ii := 0; ii < fileCountT; ii++ {

            compareBufT[ii] = 0

        }

        for ii := 0; ii < (fileCountT - 1); ii++ {

            if strBufT[ii] == "" {

                continue

            }

            for jj := ii + 1; jj < fileCountT; jj++ {

                if strBufT[jj] == "" {

                    compareBufT[ii]++

                    continue

                }

                compareT = strings.Compare(strBufT[ii], strBufT[jj])

                if compareT > 0 {

                    compareBufT[jj]++

                } else if compareT < 0 {

                    compareBufT[ii]++

                }

            }

        }

        maxT := 0

        for kk := 0; kk < fileCountT; kk++ {

            if compareBufT[kk] > maxT {

                maxT = compareBufT[kk]

                selIndexT = kk

            }

        }

        // 处理只有一个文件时的比对

        if fileCountT == 1 && strBufT[0] != "" {

            maxT = 1

            selIndexT = 0

        }

        // 如果所有行都是空行,说明已经读取完毕所有文件,将退出循环

        if maxT <= 0 {

            tk.Pl("读取缓冲区全部为空")

            break

        }

        readCountT++

        // 如果将要写入的一行与上一行一样,说明是重复行,则丢弃

        // 由此实现去除重复行的效果

        // 注意此方法仅对排序后的文本才是正确的

        if currentLineT != "" {

            if strBufT[selIndexT] == currentLineT {

                // tk.Pl("发现重复行:%v", currentLineT)

                strBufT[selIndexT] = ""

                continue

            }

        }

        currentLineT = strBufT[selIndexT]

        strBufT[selIndexT] = ""

        if notFirstFlagT {

            outputWriterT.WriteString("\r\n")

        } else {

            notFirstFlagT = true

        }

        // 将最终选出的文本行写入结果文件

        _, errT = outputWriterT.WriteString(currentLineT)

        if errT != nil {

            tk.Pl("向输出文件中写入数据时发生错误:%v", errT)

            os.Exit(1)

        }

        writeCountT++

        // 所有文件如果都已关闭,说明都已读取完,循环将终止

        if closedFileT == fileCountT {

            break

        }

    }

    // 由于使用的是bufio,即缓冲方式写入文件,注意一定要用Flush来保证在内存中的数据被确保真正写入文件中

    outputWriterT.Flush()

    outputFileT.Close()

    tk.Pl("处理完毕(共写入%v行),按q键加回车退出……", writeCountT)

}



下面是该代码执行的结果示例截图:

其中,test1.txt是大小近1G,包含4千多万行的文本文件。可以看出,最终去除重复后只剩下3千多万行。处理过程中分割成为了10个临时文件,每个文件最多5百万行。最终的结果存放在output.txt中,我们可以用前文中的preview1程序对其进行预览查看,可以看出其中的内容是经过排序的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,928评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,192评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,468评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,186评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,295评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,374评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,403评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,186评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,610评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,906评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,075评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,755评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,393评论 3 320
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,079评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,313评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,934评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,963评论 2 351