翻译自 https://medium.com/coinmonks/code-a-simple-p2p-blockchain-in-go-46662601f417
Go-libp2p
写一个P2P网络可不是一件开玩笑的事儿。它有一大堆需要注意的细节以及需要大量的编码测试来保证它的健壮性和可扩展性。一个好的工程师首先会看看哪些工具能为我们所用,让我们“站在巨人的肩膀上”。
幸运的是,的确有一个用Go编写叫做 go-libp2p 的P2P库。巧合的是,它也被IPFS用作底层的P2P协议。
警告
就我们所知,go-libp2p
有两个缺点:
- 安装十分困难,它采用了
gx
来作为他们的包管理工具,我们认为这非常不方便。(译者注:gx 是 ipfs 团队开发的基于ipfs的一个go包管理工具,包依赖需要ipfs节点支持) - 它还在开发状态,用他们的代码时,可能会遇到一些数据冲突。
不用太过于担心第一个问题,我们会帮你解决它的(译者注:实际上这个问题很困扰国内用户)。第二个问题会更严重,但它不会影响我们这里的代码。无论如何,如果你真的遇到了数据冲突,它们很可能来自于这个库的底层代码,请务必给他们提交issue。
P2P的开源库很少,尤其在Go语言中。所以,go-libp2p
已经非常棒并且很适合我们的目标。
安装
go get -d github.com/libp2p/go-libp2p/...
-
cd
到上述目录 make
make deps
译者注: 在执行
make deps
可能会出现一直超时的情况,原因是没有连上ipfs节点,译者的解决方案是终端FQ,自建ipfs节点并开启ipfs daemon
我们将在examples
子目录下开发我们的demo。所以让我们在examples
下创建p2p
目录:
mkdir ./examples/p2p
然后在p2p
下创建main.go
,我们所有代码都会在这个main.go
里。
Imports
我们先做一些包的声明,其中大部分的包来自于go-libp2p
,你们会从中学到如何使用它们。
package main
import (
"bufio"
"context"
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"flag"
"fmt"
"io"
"log"
mrand "math/rand"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/davecgh/go-spew/spew"
golog "github.com/ipfs/go-log"
libp2p "github.com/libp2p/go-libp2p"
crypto "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host"
net "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
gologging "github.com/whyrusleeping/go-logging"
)
spew
包是为了能够友好地打印区块链数据,请确保执行了以下命令:
go get github.com/davecgh/go-spew/spew
Blockchain 部分
我们先定义些全局变量
// Block represents each 'item' in the blockchain
type Block struct {
Index int
Timestamp string
BPM int
Hash string
PrevHash string
}
// Blockchain is a series of validated Blocks
var Blockchain []Block
var mutex = &sync.Mutex{}
-
Block
是我们想要的交易信息。我们使用 BPM (每分钟的Beat数)来作为每个区块的关键数据。(译者注:这里为简化数据结构所以用一个简单的int类型来表示区块数据) -
Blockchain
是我们的整条链,也可以说是最后一个区块,它是Block
的slice
。 - 我们还定义了
mutex
,用锁来保证不会发生数据混乱。
接下来我们写些区块链的方法:
// make sure block is valid by checking index, and comparing the hash of the previous block
func isBlockValid(newBlock, oldBlock Block) bool {
if oldBlock.Index+1 != newBlock.Index {
return false
}
if oldBlock.Hash != newBlock.PrevHash {
return false
}
if calculateHash(newBlock) != newBlock.Hash {
return false
}
return true
}
// SHA256 hashing
func calculateHash(block Block) string {
record := strconv.Itoa(block.Index) + block.Timestamp + strconv.Itoa(block.BPM) + block.PrevHash
h := sha256.New()
h.Write([]byte(record))
hashed := h.Sum(nil)
return hex.EncodeToString(hashed)
}
// create a new block using previous block's hash
func generateBlock(oldBlock Block, BPM int) Block {
var newBlock Block
t := time.Now()
newBlock.Index = oldBlock.Index + 1
newBlock.Timestamp = t.String()
newBlock.BPM = BPM
newBlock.PrevHash = oldBlock.Hash
newBlock.Hash = calculateHash(newBlock)
return newBlock
}
-
isBlockValid
用来检查每个区块生成的哈希是否是有效的 -
calculateHash
使用sha256
来计算初始哈希 -
generateBlock
创建一个新区块,并加入到区块链中
P2P 部分
Host
接下来才是正文。第一件事是写出我们的hosts被创建出来的逻辑。当一个节点运行我们的程序时,它应该能扮演一个host并且让其他节点链接进来,代码如下:
// makeBasicHost creates a LibP2P host with a random peer ID listening on the
// given multiaddress. It will use secio if secio is true.
func makeBasicHost(listenPort int, secio bool, randseed int64) (host.Host, error) {
// If the seed is zero, use real cryptographic randomness. Otherwise, use a
// deterministic randomness source to make generated keys stay the same
// across multiple runs
var r io.Reader
if randseed == 0 {
r = rand.Reader
} else {
r = mrand.New(mrand.NewSource(randseed))
}
// Generate a key pair for this host. We will use it
// to obtain a valid host ID.
priv, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)
if err != nil {
return nil, err
}
opts := []libp2p.Option{
libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)),
libp2p.Identity(priv),
}
if !secio {
opts = append(opts, libp2p.NoSecurity) // 原文是libp2p.NoEncryption(),最新库已废弃
}
basicHost, err := libp2p.New(context.Background(), opts...)
if err != nil {
return nil, err
}
// Build host multiaddress
hostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", basicHost.ID().Pretty()))
// Now we can build a full multiaddress to reach this host
// by encapsulating both addresses:
addr := basicHost.Addrs()[0]
fullAddr := addr.Encapsulate(hostAddr)
log.Printf("I am %s\n", fullAddr)
if secio {
log.Printf("Now run \"go run main.go -l %d -d %s -secio\" on a different terminal\n", listenPort+1, fullAddr)
} else {
log.Printf("Now run \"go run main.go -l %d -d %s\" on a different terminal\n", listenPort+1, fullAddr)
}
return basicHost, nil
}
我们的makeBasicHost
方法接受3个参数来返回一个host和一个error(error是nil的话表示没有错误)
-
listenPort
是我们可以在命令行指定的端口来让其他节点链接。 -
secio
布尔值,表示是否对数据流进行加密,推荐打开。 -
randSeed
是一个可选的命令行参数,它允许我们提供一个种子来创建host的随机地址。我们这次不会使用这个参数但有它程序会更健壮。
方法里的第一个if
语句决定了是否有提供种子来生成host的密钥。然后我们生成公私钥对来保证我们host的安全。otps
部分开始构造我们的host地址以便其他节点链接。
!secio
部分表示不采用加密,但是我们决定使用secio
所以这部分可以略过,直接看加密部分代码块就行。
我们之后创建并最终确定我们的地址,最后的log.Printf
打印的信息非常有用它告诉其他节点如何连接到自己的host。
最终我们返回整个host。
Stream handler
我们需要我们的host来处理数据输入,当另一个节点连接到我们时,可能会有新的区块链数据进入,我们需要一些逻辑来决定是否接受这些数据。
并且当我们增加新区块到区块链中时,我们希望广播出去让所有连到我们的节点都知晓。
我们来创建handler的骨架:
func handleStream(s net.Stream) {
log.Println("Got a new stream!")
// Create a buffer stream for non blocking read and write.
rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
go readData(rw)
go writeData(rw)
// stream 's' will stay open until you close it (or the other side closes it).
}
我们创建了一个新的ReadWriter
,并且分别对读写创建了Go routines来处理相应逻辑。
Read
我们先创建readData
方法:
func readData(rw *bufio.ReadWriter) {
for {
str, err := rw.ReadString('\n')
if err != nil {
log.Fatal(err)
}
if str == "" {
return
}
if str != "\n" {
chain := make([]Block, 0)
if err := json.Unmarshal([]byte(str), &chain); err != nil {
log.Fatal(err)
}
mutex.Lock()
if len(chain) > len(Blockchain) {
Blockchain = chain
bytes, err := json.MarshalIndent(Blockchain, "", " ")
if err != nil {
log.Fatal(err)
}
// Green console color: \x1b[32m
// Reset console color: \x1b[0m
fmt.Printf("\x1b[32m%s\x1b[0m> ", string(bytes))
}
mutex.Unlock()
}
}
}
我们的方法是一个死循环,因为它需要一直处理区块数据。我们解析其他节点传输过来的区块数据,这个数据其实就是从ReadString
读取的JSON数据,如果它不是空的(!="\n"
),我们就Unmarshal
它。
然后我们检查新的区块链数据是否比我们本地的数据要长。我们这里只是简单通过链的长短来判断谁能胜出,如果新链长于本地的链,我们会采用新的区块链作为有效数据。
然后我们会把区块链数据重新Marshal
成JSON格式,然后打印在终端上。我们以不同的颜色打印出来以便识别这是新链。
现在我们已经可以接受其他节点的区块链数据了,如果我们在本地增加一个新区块,我们也需要让连接的节点知晓,我们通过writeData
方法来做到。
Write
func writeData(rw *bufio.ReadWriter) {
go func() {
for {
time.Sleep(5 * time.Second)
mutex.Lock()
bytes, err := json.Marshal(Blockchain)
if err != nil {
log.Println(err)
}
mutex.Unlock()
mutex.Lock()
rw.WriteString(fmt.Sprintf("%s\n", string(bytes)))
rw.Flush()
mutex.Unlock()
}
}()
stdReader := bufio.NewReader(os.Stdin)
for {
fmt.Print("> ")
sendData, err := stdReader.ReadString('\n')
if err != nil {
log.Fatal(err)
}
sendData = strings.Replace(sendData, "\n", "", -1)
bpm, err := strconv.Atoi(sendData)
if err != nil {
log.Fatal(err)
}
newBlock := generateBlock(Blockchain[len(Blockchain)-1], bpm)
if isBlockValid(newBlock, Blockchain[len(Blockchain)-1]) {
mutex.Lock()
Blockchain = append(Blockchain, newBlock)
mutex.Unlock()
}
bytes, err := json.Marshal(Blockchain)
if err != nil {
log.Println(err)
}
spew.Dump(Blockchain)
mutex.Lock()
rw.WriteString(fmt.Sprintf("%s\n", string(bytes)))
rw.Flush()
mutex.Unlock()
}
}
我们先开启一个Go routine,每隔5秒广播我们最新的区块链数据给其他连接的节点。他们会接受这些数据并且判断链长。如果新数据更长他们会更新自己本地的数据。所以所有节点都会根据网络持续更新数据状态。
我们现在需要有种途径来创建新区块。我们首先创建一个bufio.NewReader
,使之能从stdin
(终端输入)读取数据。我们也希望能持续创建新区块,所以也放在一个死循环里。
我们做了一些字符串的操作,来确保输入的BPM是整型并且能用来生成新区块。把新区块加入区块链之后我们Marshal
区块链数据,这样我们能用spew.Dump
友好地打印在终端上。最后我们用rw.WriteString
来广播到其他节点。
我们现在已经完成了区块链的部分以及P2P的大部分方法。我们已经创建了处理handler和读写逻辑。通过这些我们能够让每个节点检查本地的区块链数据并互相更新。
剩下的所有内容便是完成我们的main
函数了。
Main函数
这是我们的main函数,先大致看下,我们后面会一步步讲解。
func main() {
t := time.Now()
genesisBlock := Block{}
genesisBlock = Block{0, t.String(), 0, calculateHash(genesisBlock), ""}
Blockchain = append(Blockchain, genesisBlock)
// LibP2P code uses golog to log messages. They log with different
// string IDs (i.e. "swarm"). We can control the verbosity level for
// all loggers with:
golog.SetAllLoggers(gologging.INFO) // Change to DEBUG for extra info
// Parse options from the command line
listenF := flag.Int("l", 0, "wait for incoming connections")
target := flag.String("d", "", "target peer to dial")
secio := flag.Bool("secio", false, "enable secio")
seed := flag.Int64("seed", 0, "set random seed for id generation")
flag.Parse()
if *listenF == 0 {
log.Fatal("Please provide a port to bind on with -l")
}
// Make a host that listens on the given multiaddress
ha, err := makeBasicHost(*listenF, *secio, *seed)
if err != nil {
log.Fatal(err)
}
if *target == "" {
log.Println("listening for connections")
// Set a stream handler on host A. /p2p/1.0.0 is
// a user-defined protocol name.
ha.SetStreamHandler("/p2p/1.0.0", handleStream)
select {} // hang forever
/**** This is where the listener code ends ****/
} else {
ha.SetStreamHandler("/p2p/1.0.0", handleStream)
// The following code extracts target's peer ID from the
// given multiaddress
ipfsaddr, err := ma.NewMultiaddr(*target)
if err != nil {
log.Fatalln(err)
}
pid, err := ipfsaddr.ValueForProtocol(ma.P_IPFS)
if err != nil {
log.Fatalln(err)
}
peerid, err := peer.IDB58Decode(pid)
if err != nil {
log.Fatalln(err)
}
// Decapsulate the /ipfs/<peerID> part from the target
// /ip4/<a.b.c.d>/ipfs/<peer> becomes /ip4/<a.b.c.d>
targetPeerAddr, _ := ma.NewMultiaddr(
fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(peerid)))
targetAddr := ipfsaddr.Decapsulate(targetPeerAddr)
// We have a peer ID and a targetAddr so we add it to the peerstore
// so LibP2P knows how to contact it
ha.Peerstore().AddAddr(peerid, targetAddr, pstore.PermanentAddrTTL)
log.Println("opening stream")
// make a new stream from host B to host A
// it should be handled on host A by the handler we set above because
// we use the same /p2p/1.0.0 protocol
s, err := ha.NewStream(context.Background(), peerid, "/p2p/1.0.0")
if err != nil {
log.Fatalln(err)
}
// Create a buffered stream so that read and writes are non blocking.
rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
// Create a thread to read and write data.
go writeData(rw)
go readData(rw)
select {} // hang forever
}
}
我们开始创建一个创世块,这是我们的种子区块。
我们使用go-libp2p
库中的logger来处理日志,当然这是可选的。
然后我们设置所有的命令行参数:
-
secio
之前也解释过,能够允许安全传输。我们会一直把这个开关打开的。 -
target
指定想要连接的host地址,这里我们其实扮演的节点去连接其他host。 -
listenF
打开指定端口让其他节点连接,这里我们扮演的host。我们可以既充当host(接受连接)也充当peer(连到其他hosts)。这是真正的P2P。
我们然后使用makeBasicHost
创建了一个新host。如果我们只充当一个host(不会主动连接到其他节点),我们会进入if *target == ""
的逻辑,直接用SetStreamHandler
激活我们的监听流程。
如果我们想要连接其他节点,我们就进入else
的逻辑。仍然激活监听流程,因为我们也需要让别的节点连接进来。
下面几行代码解封了target所指定的地址,这样我们能找到host并连接。
我们解封到host的peerID
和目标地址targetAddr
,并将其存储在“store”里以便我们能持续追踪我们连接的节点。这步通过ha.Peerstore().AddAddr
完成。
之后我们通过ha.NewStream
连到我们想要连的节点上。并且我们也希望能够读写数据,和上面一样,我们创建ReadWriter
并且分别建立读写的Go routines readData
和writeData
。最后我们通过空的select
来阻塞程序,这样程序不会停止。
运行
我们会用三个独立的终端来运行这个程序。
在第一个终端,go run main.go -l 10000 -secio
根据打印的 "Now run ..." 指示,打开第二个终端,进入程序目录,运行go run main.go -l 10001 -d <given address in the instructions> -secio
你会看到第一个终端检测到了新的连接!
现在根据第二个终端的指示,打开第三个终端,类似的,运行go run main.go -l 10002 -d <given address in the instructions> -secio
检查第二个终端,会发现检测到第三个终端的连接!
现在让我们输入BPM,在第一个终端输入"70",等几秒,观察各个终端发生了什么。
非常酷!
- 第一个终端增加了一个区块
- 广播给第二个终端
- 第二个终端和本地的区块链比较了下,发现第一个终端的数据较长,所以它覆盖了本地的数据,并且广播给了第三个终端
- 第三个终端也同样比较数据并进行替代
所有的三个终端都以去中心化的方式更新了自己的区块信息。这是P2P的能力。
我们继续测试但这次我们让第二个终端增加区块。在第二个终端输入"80"。
这正是我们想要的,这次第二个终端增加了新区块并且广播到网络中,每个节点都进行了检查并更新数据。