2019-01-05

date[2019-01-05]

go并发编程案例解析

package main

import (
    "bufio"
    "flag"
    "fmt"
    "github.com/influxdata/influxdb/client/v2"
    "io"
    "log"
    "math/rand"
    "net/url"
    "os"
    "regexp"
    "strconv"
    "strings"
    "time"
)

type LogProcess struct {
    rc chan []byte
    wc chan *Message

    read  Reader
    write Writer
}

type Reader interface {
    Read(rc chan []byte)
}

type Writer interface {
    Write(wc chan *Message)
}

type ReadDataFromFile struct {
    path string
}
type WriteDateToInfluxDb struct {
    influxDBua string
}

type Message struct {
    TimeLocal                    time.Time
    BytesSent                    int
    Path, Method, Scheme, Status string
    UpstreamTime, RequestTime    float64
}

//写入模块
func (w *WriteDateToInfluxDb) Write(wc chan *Message) {
    //初始化influxdb client
    //从Write Channel读取数
    //Tags:Path,Method,Scheme,Status
    //Fiedls:
    //Time:
    //写入模块
    infSlic := strings.Split(w.influxDBua, "@")

    c, err := client.NewHTTPClient(client.HTTPConfig{
        Addr:     infSlic[0],
        Username: infSlic[1],
        Password: infSlic[2],
    })
    if err != nil {
        log.Fatal(err)
    }
    defer c.Close()

    for v := range wc {
        // Create a new point batch
        bp, err := client.NewBatchPoints(client.BatchPointsConfig{
            Database:  infSlic[3],
            Precision: infSlic[4],
        })
        if err != nil {
            log.Fatal(err)
        }

        // Create a point and add to batch
        //Tags:Path Method Scheme Status

        tags := map[string]string{"Path": v.Path, "Method": v.Method, "Scheme": v.Scheme, "Status": v.Status,}
        fields := map[string]interface{}{
            "BytesSent":    v.BytesSent,
            "UpstreamTime": v.UpstreamTime,
            "RequestTime":  v.RequestTime,
        }

        pt, err := client.NewPoint("nginx_log", tags, fields, v.TimeLocal)
        if err != nil {
            log.Fatal(err)
        }
        bp.AddPoint(pt)

        // Write the batch
        if err := c.Write(bp); err != nil {
            log.Fatal(err)
        }

        // Close client resources
        if err := c.Close(); err != nil {
            log.Fatal(err)
        }
        log.Println("Write Success!")
    }
}

//读取模块
func (r *ReadDataFromFile) Read(rc chan []byte) {
    //打开文件
    f, err := os.Open(r.path)
    if err != nil {
        panic(fmt.Sprintf("open file eror :%s", err.Error()))
    }

    //从文件末尾读取
    f.Seek(0, 2)
    rd := bufio.NewReader(f)
    for {
        line, err := rd.ReadBytes('\n')
        if err == io.EOF {
            time.Sleep(500 * time.Millisecond)
            continue
        } else if err != nil {
            panic(fmt.Sprintf("ReadBytes error:%s", err.Error()))
        }
        rc <- line
    }
}

//处理模块
func (lp *LogProcess) ProcessData() {
    r := regexp.MustCompile(`([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"`)
    rd := rand.New(rand.NewSource(time.Now().UnixNano()))
    loc, _ := time.LoadLocation("Asia/Shanghai")
    for v := range lp.rc {
        fmt.Println(string(v))
        ret := r.FindStringSubmatch(string(v))
        if len(ret) != 10 {
            log.Println("FindStringSubmatch fail:", string(v))
            continue
        }
        message := &Message{}
        location, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0800", ret[4], loc)
        if err != nil {
            log.Println("ParseInLocation fail:", err.Error(), string(ret[4]))
        }
        message.TimeLocal = location
        byteSent, _ := strconv.Atoi(ret[8])
        message.BytesSent = byteSent

        //GET /foo?query=t HTTP/1.0
        reqSli := strings.Split(ret[5], " ")
        if len(reqSli) != 3 {
            log.Println("strings.Split Fail", ret[5])
            continue
        }

        message.Method = reqSli[1]
        message.Scheme = reqSli[1]
        u, err := url.Parse(reqSli[2])
        if err != nil {
            log.Println("url parse fail:", err)
        }
        message.Path = u.Path
        message.Status = ret[6]

        message.UpstreamTime = rd.Float64() * 4
        message.RequestTime = rd.Float64() * 4
        //message.UpstreamTime, _ = strconv.ParseFloat(ret[12], 64)
        //message.RequestTime, _ = strconv.ParseFloat(ret[13], 64)
        lp.wc <- message
    }
}

func main() {
    var path, influxDsn string
    flag.StringVar(&path, "path", "C:/soft/nginx-1.15.8/logs/access.log", "read file path")
    flag.StringVar(&influxDsn, "influxDsn", "http://127.0.0.1:8086@imooc@imoocpass@imooc@s", "read influxdb datasource")

    r := &ReadDataFromFile{
        path: path,
    }
    w := &WriteDateToInfluxDb{
        influxDBua: influxDsn,
    }
    lp := &LogProcess{
        rc:    make(chan []byte),
        wc:    make(chan *Message),
        read:  r,
        write: w,
    }

    go lp.read.Read(lp.rc)
    go lp.ProcessData()
    go lp.write.Write(lp.wc)

    time.Sleep(time.Duration(30000000) * time.Second)
}

influxdb的简单使用

一、influxdb与传统数据库的比较

influxDB 传统数据库中的概念
database 数据库
measurement 数据库中的表
points 表里面的一行数据

influxdb数据的构成:

Point由时间戳(time)、数据(field)、标签(tags)组成。

Point属性 传统数据库中的概念
time 每个数据记录时间,是数据库中的主索引(会自动生成)
fields 各种记录值(没有索引的属性)也就是记录的值:温度, 湿度
tags 各种有索引的属性:地区,海拔

这里不得不提另一个名词:series:

所有在数据库中的数据,都需要通过图表来展示,而这个series表示这个表里面的数据,可以在图表上画成几条线:通过tags排列组合算出来。具体可以通过SHOW SERIES FROM "表名" 进行查询。

二、用户管理

//显示用户
> SHOW USERS
//创建用户
> CREATE USER "username" WITH PASSWORD 'password'
//创建管理员权限的用户
> CREATE USER "username" WITH PASSWORD 'password' WITH ALL PRIVILEGES
//删除用户
> DROP USER "username"

三、命令行下创建及查询

> CREATE DATABASE "testDB"
//查询当前的所有数据库显示所有表
> show databases   
//显示所有表
> SHOW MEASUREMENTS
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,753评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,668评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,090评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,010评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,054评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,806评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,484评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,380评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,873评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,021评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,158评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,838评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,499评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,044评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,159评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,449评论 3 374
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,136评论 2 356

推荐阅读更多精彩内容

  • 1, 创建 用户 CREATE USER '用户名'@'可以登陆此账户的ip地址' IDENTIFIED BY '...
    质检员晓东阅读 539评论 0 0
  • 离开了伦敦,我们搭火车赶往苏格兰首府,爱丁堡。沿途是大片的牧场和排列收割整齐的草坪,火车是沿着海岸线一路北上,有幸...
    苏遥suyao阅读 722评论 1 1
  • 又一年的高考过去了,有些忐忑,有些期待,却不是为了自己。朋友笑谈,时间过得真快,我们陪伴彼此,从校服到了婚纱。是啊...
    南臣阅读 364评论 0 2
  • 1、不脱嫌热,脱后嫌冷,此乃春天。 2、不送不安,送后不廉,此乃春节。 3、不看失落,看后失望,此乃春晚。 ...
    谢金星_336阅读 494评论 0 0