go内存排序

说明:

Source And Sink

package pipeline

import (
    "encoding/binary"
    "fmt"
    "io"
    "math/rand"
    "sort"
    "time"
)

var startTime time.Time

func Init() {
    startTime = time.Now()
}

func ArraySource(a ...int) chan int {
    out := make(chan int)
    go func() {
        for _, v := range a {
            out <- v
        }
        close(out)
    }()
    return out
}

func InMemSort(in <-chan int) <-chan int {
    out := make(chan int, 1024)
    go func() {
        //Read into memo
        var a []int
        for v := range in {
            a = append(a, v)
        }
        fmt.Println("Read done:", time.Now().Sub(startTime))
        //
        sort.Ints(a)
        fmt.Println("InMemSort done:", time.Now().Sub(startTime))
        for _, v := range a {
            out <- v
        }
        close(out)
    }()
    return out
}

func Merge(in1, in2 <-chan int) <-chan int {
    out := make(chan int, 1024)
    go func() {
        v1, ok1 := <-in1
        v2, ok2 := <-in2
        for ok1 && ok2 {
            if v1 <= v2 {
                out <- v1
                v1, ok1 = <-in1
            } else {
                out <- v2
                v2, ok2 = <-in2
            }
        }
        if !ok1 {
            for ok2 {
                out <- v2
                v2, ok2 = <-in2
            }
        }
        if !ok2 {
            for ok1 {
                out <- v1
                v1, ok1 = <-in1
            }
        }
        close(out)
        fmt.Println("Merge done:", time.Now().Sub(startTime))
    }()
    return out
}

func ReaderSource(reader io.Reader, chunkSize int) <-chan int {
    out := make(chan int, 1024)
    go func() {
        buffer := make([]byte, 8)
        bytesRead := 0
        for {
            n, err := reader.Read(buffer)
            bytesRead += n
            if n > 0 {
                v := int(binary.BigEndian.Uint64(buffer))
                out <- v
            }
            if err != nil || (chunkSize != -1 && bytesRead >= chunkSize) {
                break
            }
        }
        close(out)
    }()
    return out
}

func WriteSink(writer io.Writer, in <-chan int) {
    for v := range in {
        buffer := make([]byte, 8)
        binary.BigEndian.PutUint64(buffer, uint64(v))
        writer.Write(buffer)
    }
}

func RandomSource(count int) <-chan int {
    out := make(chan int, 1024)
    go func() {
        for i := 0; i < count; i++ {
            out <- rand.Int()
        }
        close(out)
    }()
    return out
}

func MergeN(inputs ...<-chan int) <-chan int {
    if len(inputs) == 1 {
        return inputs[0]
    }
    m := len(inputs) / 2
    //merge inputs[0...m) and inputs[m...end)
    return Merge(
        MergeN(inputs[:m]...),
        MergeN(inputs[m:]...), )
}

testfile文件生成

生成测试文件(small.in/large.in),对应的小文件和大文件

package main

import (
    "bufio"
    "fmt"
    "os"
    "pipeline"
)

//生成小的测试文件
//const FILE_NAME = "small.in"
//const N = 64

//生成大的测试文件
const FILE_NAME = "large.in"
const N = 1000000

func main() {
    file, e := os.Create(FILE_NAME)
    if e != nil {
        panic(e)
    }
    defer file.Close()

    p := pipeline.RandomSource(N)
    writer := bufio.NewWriter(file)
    pipeline.WriteSink(writer, p)
    writer.Flush()

    file, e = os.Open(FILE_NAME)
    if e != nil {
        panic(e)
    }
    defer file.Close()

    p = pipeline.ReaderSource(file, -1)

    count := 0
    for v := range p {
        count++
        if count < 100 {
            fmt.Println(v)
        }
    }
}

//测试内存的二分排序
func main0() {
    p := pipeline.Merge(
        pipeline.InMemSort(pipeline.ArraySource(8, 2, 3, 0, 1)),
        pipeline.InMemSort(pipeline.ArraySource(9, 7)),
    )
    for v := range p {
        fmt.Println(v)
    }
}

利用网络排序

package pipeline

import (
    "bufio"
    "net"
)

func NetworkSink(addr string, in <-chan int) {
    listen, err := net.Listen("tcp", addr)
    if err != nil {
        panic(err)
    }

    go func() {
        defer listen.Close()
        conn, err := listen.Accept()
        if err != nil {
            panic(err)
        }
        defer conn.Close()
        writer := bufio.NewWriter(conn)
        defer writer.Flush()
        WriteSink(writer, in)
    }()
}

func NetworkSource(addr string) <-chan int {
    out := make(chan int)
    go func() {
        conn, err := net.Dial("tcp", addr)
        if err != nil {
            panic(err)
        }

        defer conn.Close()
        r := ReaderSource(conn, -1)
        for v := range r {
            out <- v
        }
        close(out)
    }()
    return out
}

测试主方法

package main

import (
    "bufio"
    "fmt"
    "os"
    "pipeline"
    "strconv"
)

func main2() {
    p := createNetworkPipeline("large.in", 8000000, 4)

    writeToFile(p, "large-networkpipeline.out")
    printFile("large-networkpipeline.out")
}

func main1() {
    p := createPipeline("large.in", 8000000 , 4)
    writeToFile(p, "large-pipeline.out")
    printFile("large-pipeline.out")
}

func printFile(filename string) {
    file, err := os.Open(filename)
    if err != nil {
        panic(err)
    }
    defer file.Close()

    p := pipeline.ReaderSource(file, -1)
    count := 0
    for v := range p {
        if count < 100 {
            fmt.Println(v)
            count ++
        }
    }
}

func writeToFile(p <-chan int, filename string) {
    file, err := os.Create(filename)
    if err != nil {
        panic(err)
    }
    defer file.Close()

    writer := bufio.NewWriter(file)
    defer writer.Flush()

    pipeline.WriteSink(writer, p)
}

func createPipeline(filename string, filesize, chunkCount int) <-chan int {
    chunkSize := filesize / chunkCount
    pipeline.Init()
    var sortResults []<-chan int
    for i := 0; i < chunkCount; i++ {
        file, err := os.Open(filename)
        if err != nil {
            panic(err)
        }
        file.Seek(int64(i*chunkSize), 0)

        source := pipeline.ReaderSource(
            bufio.NewReader(file), chunkSize)
        sortResults = append(sortResults, pipeline.InMemSort(source))
    }
    return pipeline.MergeN(sortResults...)
}

func createNetworkPipeline(filename string, filesize, chunkCount int) <-chan int {
    chunkSize := filesize / chunkCount
    pipeline.Init()
    var sortAddr []string
    for i := 0; i < chunkCount; i++ {
        file, err := os.Open(filename)
        if err != nil {
            panic(err)
        }
        file.Seek(int64(i*chunkSize), 0)

        source := pipeline.ReaderSource(
            bufio.NewReader(file), chunkSize)
        addr := ":" + strconv.Itoa(7000+i)
        pipeline.NetworkSink(addr, pipeline.InMemSort(source))
        sortAddr = append(sortAddr, addr)
    }

    var sortResults []<-chan int
    for _, addr := range sortAddr {
        sortResults = append(sortResults, pipeline.NetworkSource(addr))
    }
    return pipeline.MergeN(sortResults...)
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,546评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,224评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,911评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,737评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,753评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,598评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,338评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,249评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,696评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,888评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,013评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,731评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,348评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,929评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,048评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,203评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,960评论 2 355

推荐阅读更多精彩内容

  • rljs by sennchi Timeline of History Part One The Cognitiv...
    sennchi阅读 7,332评论 0 10
  • pyspark.sql模块 模块上下文 Spark SQL和DataFrames的重要类: pyspark.sql...
    mpro阅读 9,456评论 0 13
  • http://spark.apache.org/docs/latest/api/python/index.html...
    mpro阅读 6,094评论 0 4
  • 喜佗说,兜兜先和几个姐姐玩,玩得都不怎么开心,后来我一去她就玩得很开心了,你说这是为什么呢? 我很配合地回应他这赤...
    疯甜阅读 144评论 0 1
  • 18年5月,在培文济宁二中一天连续听了6堂《白马篇》的课,非常累,但是也听得非常认真,通过这样的实际听课,通过同一...
    流萤edu阅读 861评论 0 2