1、协程机制
package groutine_test
import (
"fmt"
"testing"
"time"
)
func TestGroutine(t *testing.T) {
for i := 0; i < 10; i++ {
// 值传递
go func(i int) {
//time.Sleep(time.Second * 1)
fmt.Println(i)
}(i)
// 共享内存导致异常
/* go func() {
//time.Sleep(time.Second * 1)
fmt.Println(i) // 输出10 10 10 ...
}()*/
}
// 防止执行过快
time.Sleep(time.Millisecond * 50)
}
2、共享内存并发机制
package share_mem
import (
"sync"
"testing"
"time"
)
//非安全
func TestCounter(t *testing.T) {
counter := 0
for i := 0; i < 5000; i++ {
go func() {
counter++
}()
}
time.Sleep(1 * time.Second)
t.Logf("counter = %d", counter)
}
//安全协程
func TestCounterThreadSafe(t *testing.T) {
var mut sync.Mutex
counter := 0
for i := 0; i < 5000; i++ {
go func() {
defer func() {
mut.Unlock()
}()
mut.Lock()
counter++
}()
}
//防止最外层协程执行过快
time.Sleep(1 * time.Second)
t.Logf("counter = %d", counter)
}
//等待安全协程
func TestCounterWaitGroup(t *testing.T) {
var mut sync.Mutex
var wg sync.WaitGroup
counter := 0
for i := 0; i < 5000; i++ {
wg.Add(1)
go func() {
defer func() {
mut.Unlock()
}()
mut.Lock()
counter++
wg.Done()
}()
}
wg.Wait()
t.Logf("counter = %d", counter)
}
3、CSP并发机制
package concurrency
import (
"fmt"
"testing"
"time"
)
func service() string {
time.Sleep(time.Millisecond * 50)
return "Done"
}
func otherTask() {
fmt.Println("working on something else")
time.Sleep(time.Millisecond * 100)
fmt.Println("Task is done.")
}
func TestService(t *testing.T) {
fmt.Println(service())
otherTask()
}
func AsyncService() chan string {
retCh := make(chan string)
go func() {
ret := service()
fmt.Println("returned result.")
retCh <- ret
fmt.Println("service exited.")
}()
return retCh
}
func AsyncServiceBuff() chan string {
retCh := make(chan string, 1)
//retCh := make(chan string, 1)
go func() {
ret := service()
fmt.Println("returned result.")
retCh <- ret
fmt.Println("service exited.")
}()
return retCh
}
//CSP并发
func TestAsynService(t *testing.T) {
retCh := AsyncService()
//retCh := AsyncServiceBuff()
otherTask()
fmt.Println(<-retCh)
time.Sleep(time.Second * 1)
}
4、多路选择和超时
package select_test
import (
"fmt"
"testing"
"time"
)
func service() string {
// 50 => 500
time.Sleep(time.Millisecond * 500)
return "Done"
}
func AsyncService() chan string {
retCh := make(chan string, 1)
//retCh := make(chan string, 1)
go func() {
ret := service()
fmt.Println("returned result.")
retCh <- ret
fmt.Println("service exited.")
}()
return retCh
}
func TestSelect(t *testing.T) {
select {
case ret := <-AsyncService():
t.Log(ret)
case <-time.After(time.Millisecond * 100):
t.Error("time out")
}
}
5、channel的关闭
package channel_close
import (
"fmt"
"sync"
"testing"
)
func dataProducer(ch chan int, wg *sync.WaitGroup) {
go func() {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
// 关闭后再放入channel报错
//ch <- 11
wg.Done()
}()
}
func dataReceiver(ch chan int, wg *sync.WaitGroup) {
go func() {
for {
// ok值获取channel关闭与否状态 true 开启 false 关闭
if data, ok := <-ch; ok {
fmt.Println(data)
} else {
break
}
}
wg.Done()
}()
}
func TestCloseChannel(t *testing.T) {
var wg sync.WaitGroup
ch := make(chan int)
wg.Add(1)
dataProducer(ch, &wg)
wg.Add(1)
dataReceiver(ch, &wg)
// wg.Add(1)
// dataReceiver(ch, &wg)
wg.Wait()
}
6、广播机制任务取消
package concurrency
import (
"fmt"
"testing"
"time"
)
func isCancelled(cancelChan chan struct{}) bool {
select {
case <-cancelChan:
return true
default:
return false
}
}
func cancel_1(cancelChan chan struct{}) {
cancelChan <- struct{}{}
}
func cancel_2(cancelChan chan struct{}) {
close(cancelChan)
}
func TestCancel(t *testing.T) {
cancelChan := make(chan struct{}, 0)
for i := 0; i < 5; i++ {
go func(i int, cancelCh chan struct{}) {
for {
if isCancelled(cancelCh) {
break
}
time.Sleep(time.Millisecond * 5)
}
fmt.Println(i, "Cancelled")
}(i, cancelChan)
}
cancel_2(cancelChan)
time.Sleep(time.Second * 1)
}
7、Context与任务取消
Context
• 根 Context:通过 context.Background () 创建
• ⼦ Context:context.WithCancel(parentContext) 创建
• ctx, cancel := context.WithCancel(context.Background())
• 当前 Context 被取消时,基于他的⼦ context 都会被取消
• 接收取消通知 <-ctx.Done()
package cancel
import (
"context"
"fmt"
"testing"
"time"
)
func isCancelled(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}
func TestCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < 5; i++ {
go func(i int, ctx context.Context) {
for {
if isCancelled(ctx) {
break
}
time.Sleep(time.Millisecond * 5)
}
fmt.Println(i, "Cancelled")
}(i, ctx)
}
cancel()
time.Sleep(time.Second * 1)
}