码迷,mamicode.com
首页 > 其他好文 > 详细

土拨鼠-->并发编程

时间:2020-04-03 00:18:23      阅读:115      评论:0      收藏:0      [点我收藏+]

标签:系统资源   print   run   process   lwp   微信   fir   start   引用类型   

土拨鼠之并发编程

  为什么要学习go语言?go语言好在哪里?学习go语言后你能收获什么?这三个问题是我一开始学习go语言时反复在思考的问题,我是因为go语言天生支持多核、高并发才学习go语言的;哎!想想如果你的同事,朋友问你,听说你学习go语言,go语言有什么特点呀?我一想到的就是goroutine、channel,这是绝对是go语言最高大上的东西,也是其他语言没有的东西;学习完go语言能收获什么?能干什么?其实吧,我是一个很直接的人,首先是长知识,其次是找份好工作,升职加薪。

  废话不多说,挖坑开始。在学习go语言并发编程之前,首先普及一下一些基本的知识。

CPU常识

  (1)cpu的时间量级:以ns为基本单位,1s=1000ms、1ms=1000us、1us=1000ns;

  (2)人视觉时间量级:以ms为基本单位;

  (3)中断:指计算机运行过程中,出现某些情况需要主机干预时,机器能自动停止正在运行的程序并转入处理新情况的程序,处理完毕后又返回原被暂停的程序继续运行;CPU通过内部的算法把划分出来的时间片分配给每个程序,当一个程序拿到了时间片,说明它对CPU有占用权力,当时间片的时钟到达后,CPU借助硬件机制强行回收程序的占用权,这叫做中断。中断具有强制机制,不管程序执行到什么位置,时钟一到达必须归还CPU使用权,这时CPU把下一个时间片分配给下一个程序运行,CPU在这些程序之间飞快地切换/轮转;

  (4)32位操作系统,2^32/1024/1024/1024=4G,所以32位操作系统内存地址的上限是4G;64位操作系统内存地址的上限是2^64/1024/1024/1024约等于1千6百万TB。

什么是并行、并发

  并行:指在同一时间段内,有多条指令在多个处理器上执行,前提是多核;

  并发:指一个时间段中有几个程序都在启动运行到运行完毕之间,且在同一个处理器上运行,在任何一个时间点上只有一个程序在运行;CPU运行时间划分成若干个时间段,分配给各程序执行,在一个时间段的程序运行时,其它程序处于挂起状,由于CPU时间量级是ns,运行的速度非常之快,导致人眼宏观上看到的是多个程序在同时运行,就比如你在一台单核的电脑上同时使用微信聊天、爱奇艺看韩剧、在腾讯视频上看NBA一样,其实它们是交替运行。

   区别:并行指两个或多个事件在同一时刻发生,并发指两个或多个事件在同一时间间隔发生。并发性是指在一段时间内宏观上有多个程序在同时运行,在单核处理器中,每一时刻却仅能有一道程序执行,故微观上这些程序只能是分时地交替执行。如果在多核处理器运行,则这些可以并发执行的程序便可被分配到多个处理机上,实现并行执行,这样多个程序便可以同时执行。

  并行是靠硬件层面实现,说白了有钱就可以解决了,有钱的话可以买4核、6核、8核.......,对于学习编程来讲是软件层面,所以我们研究的是并发。

进程

  程序是编译得到地二进制文件,占用磁盘空间;进程是运行起来地程序(在linux是通过fork创建进程),占用系统资源(内存),进程是最小资源单位。进程并发,可以理解为多个程序同时在运行,CPU在这些程序之间轮转,实现进程并发。

  进程地状态:初始态、就绪态、运行态、挂起态、停止态。不管是进程、还是线程、还是协程都会存在这5中状态。

技术图片

  在unix/linux系统下,还会有孤儿进程和僵尸进程:

 (1)孤儿进程:父进程先于子进程结束,则子进程成为孤儿进程;

    (2)僵尸进程:子进程终止,父进程尚未回收子进程残留资源,简单地说就是未被回收的子进程就会变成僵尸进程;

  补充:每当有一个进程启动时,系统会自动打开三个文件:标准输入(stdin)、标准输出(stdout)、标准错误(stderr),当进程结束后,操作系统自动关闭这三个文件。

线程

  线程:LWP,light weight process 轻量级进程,它是最小的执行单元,没有独立的地址空间,由进程提供给线程执行环境。当一个进程中有多个线程时,这些线程共享相同的地址空间(由进程提供),它在进程中只需能标识自己身份而已。当进程启动线程后,就不是以进程为单位抢占CPU,而是以线程为单位进行抢占CPU。

  线程并发:一个进程中多个线程运行,以线程为单位抢占CPU时间片,同一时刻只有一个线程运行;如果是多核CPU,多线程是可以并行运行的。

  线程同步:即协同步调,按预定的先后次序运行线程。当有多个线程共同操作一个共享资源时,需要线程同步,来避免竞态问题。

协程

 为什么

  线程涉及到锁、同步竞争、阻塞状态和运行状态之间的切换、上下文的切换,这些都是非常耗费性能的操作,协程就是用来解决这些问题的。其实一个线程分为 “内核态 “线程和” 用户态 “线程,一个用户态线程最终是要绑定到内核态线程上运行的,因为CPU只知道运行的是内核态线程(linux的PCB进程控制模块),由此引出了协程的概念。

什么是协程

  协程,英文coroutine,是一种轻量级线程,一个线程可以拥有多个协程。协程不是被操作系统内核所管理的,而完全是由程序所控制,也就是用户态执行。好处是性能大幅度的提升,因为不会像线程切换那样消耗资源。

  协程不是进程也不是线程,而是一个特殊的函数,可以被挂起,也可以重新继续运行,与进程、线程相比并不是一个维度的概念。一个线程内的多个协程的运行是串行的。如果是多核CPU,多个进程或多线程是可以并行运行的,但一个线程内的协程绝对是串行的,无论CPU多少个核,一个线程可以运行多个协程(函数),但这些协程(函数)都是串行运行的,当一个协程运行时,其他协程必须挂起。

进程、线程、协程的对比

  (a)进程稳定性、安全性高(地址空间独立,比如linux下的进程并发),资源开销大;

  (b)线程资源开销一般;

  (c)协程效率高,缺点是必须绑定到一个内核态线程上;

  进程、线程的切换都会进入内核态,而协程是在用户态线程完成切换,不会陷入内核态,这种切换非常的轻量快速。

 goroutine

  上文说了很多概念的东西,回到本文的主题goroutine。在c++/java中实现并发编程中,往往需要自己维护一个线程池,并且需要自己去调度线程执行任务并维护上下文切换,这会很耗费程序员心思。Go在语言层面上内置了调度和上下文切换机制,go会智能地将goroutine中的任务合理地分配给每个CPU实现并发执行,简而言之就是当你需要某个任务并发执行,只需把这个任务封装成一个函数,在函数前面加上go关键字即可,就是这么简单粗暴。

  os线程(操作系统线程),启动一个os线程其栈内存一般都有2MB,一个goroutine的栈内存典型下只有2KB,但这不是固定的,是按需增大和缩小的,所以在go语言中,轻轻松松创建10万、百万的goroutine。

goroutine调度(GPM)

技术图片

  (1)G是goroutine的缩写,代表一个goroutine对象;

  (2)P(processor)包含了运行goroutine所需要的资源;

  (3)M (machine)代表一个线程,是运行goroutine的实体。  

  go调度原理:

技术图片  

  (1)全局队列:存放等待运行的G;

  (2)P的本地队列:也是存放等待运行的G,存放数量不超过256个,新建G时,优先加入到P的本地队列,如果队列满了,则会把本地队列中一半的 G 移动到全局队列;

  (3)P列表:所有的P都在程序启动时创建,并保存在数组中,最多有GOMAXPROCS(可配置)个;

  (4)M:线程想运行任务就得获取P,从P的本地队列取G,P队列为空时,M也会从全局队列取一批G放到P的本地队列,或从其他P的本地队列取一半G放到自己的本地队列中,M运行G,G执行后,M会从P获取下一个G,不断重复;

  (5)操作系统调度器:负责把内核线程分配到CPU的核上执行。

  当本地线程因为G阻塞时,线程会释放绑定的P,把 P 转移给其他空闲的线程执行。Go 调度本质是把大量的 goroutine 分配到少量线程上去执行,并利用多核并行,实现更强大的并发。

 启动多个goroutine

var wg sync.WaitGroup
func main() {
    for i := 0; i < 10; i++ {
        wg.Add(1)//启动一个goroutine就+1
        go func(j int) {
            fmt.Printf("go%d\n", j)
            wg.Done()//结束就-1
        }(i)
    }
    wg.Wait() // 等待所有的goroutine都结束
}

Runtime包

  这里介绍一下runtime包的几个与goroutine相关常用函数:Gosched、Goexit、GOMAXPROCS。

  Gosched用于让出CPU时间片,让出当前goroutine的执行权限,调度器安排其他等待的任务运行,并在下次再获得CPU时间轮片的时候,从上次让出CPU的位置恢复执行。

func main() {
    go func() {
        for {
            fmt.Println("this is goroutine")
        }
    }()

    for {
        runtime.Gosched() //出让时间片
        fmt.Println("this is main goroutine")
    }
}

  Goexit将立即终止当前goroutine执行,调度器确保所有已注册defer延迟调用被执行,而未注册的defer不会执行,同理return语句也类似。

func test() {
    defer fmt.Println("c")
    runtime.Goexit()//退出当前goroutine
    defer fmt.Println("d")
}

func main() {
    go func() {
        defer fmt.Println("a")
        test()
        defer fmt.Println("b")
    }()
    for {
    }
}
//输出
//c
//a

  GOMAXPROCS用来确定使用多少个OS线程来同时执行go代码,默认值是机器上CPU的核心数,可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数,这个函数的返回值是上一次设置核心数的数量,如果是第一次设置,则返回默认的核心数。

var wg sync.WaitGroup
func a() {
    for i := 1; i < 10; i++ {
        fmt.Println("A:", i)
    }
    wg.Done()
}

func b() {
    for i := 1; i < 10; i++ {
        fmt.Println("B:", i)
    }
    wg.Done()
}

func main() {
    runtime.GOMAXPROCS(2)
    wg.Add(2)
    go a()
    go b()
    wg.Wait()
}

channel

  单单将函数并发执行,没有太大意义,函数与函数之间进行数据交互才能体现并发的意义。像C++/java是通过共享内存实现通信,容易发生竞态问题,为了避免这个问题,于是用互斥量对内存进行加锁,最终会大大地降低了程序执行性能;而go语言通过通信(channel)共享内存,大幅度提升了性能。

  如果说goroutine是go语言程序并发的执行体,channel就是它们之间的链接,channel是一种特殊的类型,遵循先入先出(First In First Out)的规则,保证收发数据的顺序。channel是引用类型,需使用make函数初始化后才能使用,缓冲区的大小是可选的,创建channel格式如下:

make(chan 元素类型, [缓冲大小])

  channel的有发送(send)、接收(receive)和关闭(close)三种操作。

ch := make(chan int)//定义一个通道
//发送
ch <- 1 // 把1发送到ch中
//接收
x := <- ch // 从ch中接收值并赋值给变量x
<-ch       // 从ch中接收值,忽略结果
//关闭
close(ch)//通过内置函数close来关闭

 无缓冲通道

  无缓冲的通道又称为阻塞的通道,在make初始化通道时,ch := make(chan int) 忽略通道的大小时,创建的是无缓冲的通道。无缓冲的通道只有在有人接收值的时候才能发送值,同理无缓冲通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时值才发送成功。缓冲通道进行通信将导致发送和接收的goroutine同步化,所以无缓冲通道就叫做同步通道,需同时操作无缓冲通道。

func recv(c chan int) {
    ret := <-c
    fmt.Println("接收成功", ret)
}
func main() {
    ch := make(chan int)
    go recv(ch) // 启用goroutine从通道接收值
    ch <- 10
    fmt.Println("发送成功")
}

有缓冲通道

   在使用make函数初始化通道的时候,为其指定通道的容量,只要通道的容量大于零,那么该通道就是有缓冲的通道,可以使用内置函数len获取通道元素的数量,使用cap函数获取通道的容量;有缓冲区可以进行数据存储,储存至容量上限后才阻塞,具有异步能力,不需要同时操作有缓冲通道。

func main() {
    ch := make(chan int, 1) // 容量为1的有缓冲区通道
    ch <- 2
    fmt.Println("发送成功")
}

  说一下我遇见的一个特殊的例子:

var wg sync.WaitGroup
func main() {
    ch := make(chan int, 5)
    wg.Add(1)
    go func() {
        for i := 0; i < 10; i++ {
            ch <- i
            fmt.Println("子go", i, "len=", len(ch), "cap=", cap(ch))
        }
        wg.Done()
    }()

    for i := 0; i < 10; i++ {
        num := <-ch
        fmt.Println("主go读取到", num)
    }
    wg.Wait()
}
//输出
子go 0 len= 0 cap= 5
子go 1 len= 1 cap= 5
子go 2 len= 2 cap= 5
子go 3 len= 3 cap= 5
子go 4 len= 4 cap= 5
子go 5 len= 5 cap= 5
主go读取到 0
主go读取到 1
主go读取到 2
主go读取到 3
主go读取到 4
主go读取到 5
主go读取到 6
子go 6 len= 5 cap= 5
子go 7 len= 0 cap= 5
子go 8 len= 1 cap= 5
子go 9 len= 2 cap= 5
主go读取到 7
主go读取到 8
主go读取到 9

  分析:子go程先写入6个数字进入缓冲通道,主go程读取到了7个数据,而且通道的长度也出现了混论,原因是使用fmt打印输出是一种IO方式输出,耗时,主go程和子go程轮流切换CPU执行,没等主go程或者子go程执行fmt打印出来,它们都轮流在执行。

  总结:已经关闭channel,不能再向其写数据,但可以读取数据,此时,如果读的是无缓冲channel,读到的是0,如果读的是有缓冲channel,先把channel里的数据读取完,然后再读取,得到的是0。

 for range

  通道可以使用for range遍历通道取数据,当通道被关闭的时候就会退出for range。

func main() {
    ch := make(chan int)//无缓冲通道
    go func() {
        for i := 0; i < 100; i++ {
            ch <- i
        }
        close(ch)
    }()
    for i := range ch { // 通道关闭后会退出for range循环
        fmt.Println(i)
    }
}

单向通道

  (1)单向写channel:ch := make(chan<- int),只可以对其写入数据;

  (2)单向读channel:ch := make(<-chan int),只可以对其读取数据;

  在函数的传参或者返回值中,都可以使用单向channel,同时双向channel可以转换为单向channel,但反过来不可以。

var wg sync.WaitGroup

func send(out chan<- int) {
    out <- 100
    close(out)
    wg.Done()
}

func recv(in <-chan int) {
    num := <-in
    fmt.Println(num)
    wg.Done()
}
func main() {
    ch := make(chan int)
    wg.Add(2)
    go send(ch)
    go recv(ch)
    wg.Wait()
}
//注意:通道是一种引用类型,所以传参进去,便可写可读同一个通道

实现生产者消费者模型

//订单
type orderInfo struct {
    id       int      //订单号
    shopping []string //商品
    tel      int64    //电话
}

var wg sync.WaitGroup

//生成订单--生产者
func producer(out chan<- orderInfo) {
    for i := 0; i < 100; i++ {
        order := orderInfo{
            id:       i + 1,
            shopping: []string{"go从入门到入土", "361运动鞋", "杠铃"},
            tel:      rand.New(rand.NewSource(time.Now().UnixNano())).Int63n(100000000000),//生成11位随机数
        }
        out <- order
    }
    close(out)
    wg.Done()
}

//处理订单--消费者
func consumer(in <-chan orderInfo) {
    for num := range in {
        fmt.Println(num)
    }
    wg.Done()
}
func main() {
    //ch := make(chan int)//无缓冲,同步
    ch := make(chan orderInfo, 5) //有缓冲,异步
    wg.Add(2)
    go producer(ch)
    go consumer(ch)
    wg.Wait()
}

select多路复用

  select类似于switch语句,它有一系列case分支和一个默认分支,case语句后面不可以写任意表达式。

select{
    case <-ch1:
        ...
    case data := <-ch2:
        ...
    case ch3<-data:
        ...
    default:
        默认操作
}

  例子:

func main() {
    ch := make(chan int, 1)
    for i := 0; i < 10; i++ {
        select {
        case x := <-ch:
            fmt.Println(x)
        case ch <- i:
        }
    }
}

  select语句能提高代码的可读性:

  (1)可处理一个或多个channel的发送/接收操作;

  (2)如果多个case同时满足,select会随机选择一个;

  (3)对于没有case的select{}会一直等待,可用于阻塞main函数;

  (4)break跳出当前select中的case选项。

select实现斐波那契数列:

func fibonacci(ch <-chan int, quit <-chan bool) {
    for {
        select {
        case num := <-ch:
            fmt.Printf("%d  ", num)
        case <-quit:
            return
        }
    }
}
func main() {
    ch := make(chan int)
    quit := make(chan bool)
    go fibonacci(ch, quit)
    x, y := 1, 1
    for i := 0; i < 20; i++ {//计算前20位的斐波那契数列
        ch <- x
        x, y = y, x+y
    }
    quit <- true//退出子go程
}

select实现超时:

  在使用select监听无缓冲通道时,会出现阻塞的情况,为了避免这种情况,可以使用select实现设置超时。

func main() {
    c := make(chan int)
    o := make(chan bool)
    go func() {
        for {
            select {
            case v := <-c:
                fmt.Println(v)
            case <-time.After(time.Second * 5):
                fmt.Println("timeOut")
                o <- true
            }
        }
    }()
    <-o
}

  有时候在Go代码中可能会存在多个goroutine同时操作一个资源,这种情况会发生竞态问题,可以通过加锁的方式处理。死锁的情况对于go语言是不存在的,在你编译程序时,如果有死锁会报panic,而大多数语句如果代码出现死锁,编译时是不会报错的。

互斥锁

  互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。

var x int64
var wg sync.WaitGroup
var lock sync.Mutex
func main() {
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func ()  {
            for i := 0; i < 8000; i++ {
                lock.Lock() // 加锁
                x=x+1
                lock.Unlock() // 解锁
            }
        }()
    }
    wg.Wait()
}

读写锁

  互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。

  当一个goroutine获取读锁之后,其他的goroutine如果是获取读锁会继续获得锁,如果是获取写锁就会等待;当一个goroutine获取写锁之后,其他的goroutine无论是获取读锁还是写锁都会等待。

var (
    x      int64
    wg     sync.WaitGroup
    lock   sync.Mutex
    rwlock sync.RWMutex
)

func write() {
    rwlock.Lock() // 加写锁
    x = x + 1
    time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒
    rwlock.Unlock()                   // 解写锁
    wg.Done()
}

func read() {
    rwlock.RLock()               // 加读锁
    time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
    rwlock.RUnlock()             // 解读锁
    wg.Done()
}

func main() {
    start := time.Now()
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go write()
    }

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go read()
    }

    wg.Wait()
    end := time.Now()
    fmt.Println(end.Sub(start))
}

  在读写锁中使用channel。

var rw sync.RWMutex
var wg sync.WaitGroup

func read(in <-chan int) {
    for {
        rw.RLock()
        num := <-in //阻塞,出现死锁
        fmt.Println("读取到:", num)
        rw.RUnlock()
    }
    wg.Done()
}

func write(out chan<- int) {
    for {
        rw.Lock()
        num := rand.Intn(100)
        out <- num
        fmt.Println("写入:", num)
        rw.Unlock()
    }
    wg.Done()
}
func main() {
    ch := make(chan int)
    wg.Add(2)
    go write(ch)
    go read(ch)
    wg.Wait()
}

  以上的例子是一个错误的例子,得出的结论是,尽量不要将互斥锁、读写锁与channel混用,会出现隐性死锁(可以使用全局变量)。

sync.Once

  在编程的很多场景下我们需要确保某些操作在高并发的场景下只执行一次,例如只加载一次配置文件、只关闭一次通道等。

type singleton struct {}

var instance *singleton
var once sync.Once

func GetInstance() *singleton {
    once.Do(func() {
        instance = &singleton{}
    })
    return instance
}

sync.Map

  Go语言中内置的map不是并发安全的,开启多个goroutine,并发多了之后执行上面的代码就会报fatal error:concurrent map writes错误,像这种场景下就需要为map加锁来保证并发的安全性了。

var m = sync.Map{}

func main() {
    wg := sync.WaitGroup{}
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go func(n int) {
            key := strconv.Itoa(n)
            m.Store(key, n)
            value, _ := m.Load(key)
            fmt.Printf("k=:%v,v:=%v\n", key, value)
            wg.Done()
        }(i)
    }
    wg.Wait()
}

 

土拨鼠-->并发编程

标签:系统资源   print   run   process   lwp   微信   fir   start   引用类型   

原文地址:https://www.cnblogs.com/juju-niuniu/p/12548760.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!