go Channel原理 (二)

news/2024/7/7 20:41:33 标签: golang, 开发语言, 后端

Channel

设计原理

不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。

在主流编程语言中,多个线程传递数据的方式一般都是共享内存。
在这里插入图片描述
Go 可以使用共享内存加互斥锁进行通信,同时也提供了一种不同的并发模型,即通信顺序进程(Communicating sequential processes,CSP)。Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介,Goroutine 之间会通过 Channel 传递数据。
在这里插入图片描述
上图中的两个 Goroutine,一个会向 Channel 中发送数据,另一个会从 Channel 中接收数据,它们两者能够独立运行并不存在直接关联,但是能通过 Channel 间接完成通信。

发送数据

两个 Goroutine,一个会向 Channel 中发送数据,另一个会从 Channel 中接收数据,它们两者能够独立运行并不存在直接关联,但是能通过 Channel 间接完成通信。这是一个 生产者 - 消费者 模型,负责传递数据的 goroutine 发送数据到 channel,channel 起到一个临界区/缓冲区的作用。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   // 如果 channel 是 nil
   if c == nil {
      // 不能阻塞,直接返回 false,表示未发送成功
      if !block {
         return false
      }
      // 当前 goroutine 被挂起
      gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
      throw("unreachable")
   }

   // 省略 debug 相关……

   // 对于不阻塞的 send,快速检测失败场景
   //
   // 如果 channel 未关闭且 channel 没有多余的缓冲空间。这可能是:
   // 1. channel 是非缓冲型的,且等待接收队列里没有 goroutine (c.dataqsiz == 0 && c.recvq.first == nil)
   // 2. channel 是缓冲型的,但循环数组已经装满了元素 (c.dataqsiz > 0 && c.qcount == c.dataqsiz)
   
   // 这里涉及两个观测项:channel 未关闭、channel not ready for sending。
   // 这两个都会因为没加锁而出现观测前后不一致的情况。
   // 但是,因为 close channel 这个行为不能将 channel 的状态从 ready for sending 变成 not ready for sending
   // 所以当观测到 channel 的状态是 not ready for sending,channel 是不是 closed 并不重要,可以直接返回 false。
   if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
      (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
      return false
   }

   var t0 int64
   if blockprofilerate > 0 {
      t0 = cputicks()
   }

   // 锁住 channel,并发安全
   lock(&c.lock)

   // 如果 channel 关闭了
   if c.closed != 0 {
      // 解锁
      unlock(&c.lock)
      // 直接 panic
      panic(plainError("send on closed channel"))
   }

   // 如果接收队列里有 goroutine,直接将要发送的数据拷贝到接收 goroutine
   if sg := c.recvq.dequeue(); sg != nil {
      send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true
   }

   // 对于缓冲型的 channel,如果还有缓冲空间
   if c.qcount < c.dataqsiz {
      // qp 指向 buf 的 sendx 位置
      qp := chanbuf(c, c.sendx)

      // ……

      // 将数据从 ep 处拷贝到 qp
      typedmemmove(c.elemtype, qp, ep)
      // 发送游标值加 1
      c.sendx++
      // 如果发送游标值等于容量值,游标值归 0
      if c.sendx == c.dataqsiz {
         c.sendx = 0
      }
      // 缓冲区的元素数量加一
      c.qcount++

      // 解锁
      unlock(&c.lock)
      return true
   }

   // 如果不需要阻塞,则直接返回错误
   if !block {
      unlock(&c.lock)
      return false
   }

   // channel 满了,发送方会被阻塞。接下来会构造一个 sudog

   // 获取当前 goroutine 的指针
   gp := getg()
   // 获取 sudog 并设置这一次阻塞发送的相关信息
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }

   mysg.elem = ep
   mysg.waitlink = nil
   mysg.g = gp
   mysg.selectdone = nil
   mysg.c = c
   gp.waiting = mysg
   gp.param = nil

   // 当前 goroutine 进入发送等待队列
   c.sendq.enqueue(mysg)

   // 当前 goroutine 被挂起
   // 这里阻塞住了
   goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

   // 从这里开始被唤醒了(channel 有机会可以发送了)
   if mysg != gp.waiting {
      throw("G waiting list is corrupted")
   }
   gp.waiting = nil
   if gp.param == nil {
      if c.closed == 0 {
         throw("chansend: spurious wakeup")
      }
      // 被唤醒后,channel 关闭了。坑爹啊,panic
      panic(plainError("send on closed channel"))
   }
   gp.param = nil
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   // 释放当前 goroutine 的 sudog
   mysg.c = nil
   releaseSudog(mysg)
   return true
}

// sender -> receiver
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // 省略一些用不到的
    // ……
    // sg.elem 指向接收到的值存放的位置,如 val <- ch,指的就是 &val
    // ep:被发送的元素
    if sg.elem != nil {
        // 直接拷贝内存(从发送者到接收者)
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    // sudog 上绑定的 goroutine
    gp := sg.g
    // 解锁
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    // 将等待接收数据的 Goroutine 标记成可运行状态 Grunnable 
    // 并把该 Goroutine 放到发送方所在的处理器的 runnext 上等待执行
    // 该处理器在下一次调度时会立刻唤醒数据的接收方;
    goready(gp, skip+1)
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    // src 在当前 goroutine 的栈上,dst 是另一个 goroutine 的栈
    
    // 直接进行内存"搬迁"
    // 如果目标地址的栈发生了栈收缩,当我们读出了 sg.elem 后
    // 就不能修改真正的 dst 位置的值了
    // 因此需要在读和写之前加上一个屏障
    dst := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}

将消息发送到 channel 的 核心函数是 chansend

  1. 当存在等待的 receiver 时,直接将数据发送给阻塞的 goroutine 并将其设置成下一个运行的 goroutine。

这里 send 的时候,涉及到一个 goroutine 直接写另一个 goroutine 栈的操作,一般而言,不同 goroutine 的栈是各自独有的。而这也违反了 GC 的一些假设。为了不出问题,写的过程中增加了写屏障,保证正确地完成写操作。这样做的好处是减少了一次内存 copy:不用先拷贝到 channel 的 buf,直接由发送者到接收者,效率得以提高。

  1. 如果 channel 存在缓冲区并且还有空闲的容量,我们会直接将数据存储到缓冲区 sendx 所在的位置上。
  2. 当不存在缓冲区或者缓冲区已满时,等待其他 goroutine 从 channel 接收数据,sender 进入等待队列并阻塞。
    发送数据的过程中包含几个会触发 goroutine 调度的时机:
  3. 发送数据时发现 channel 上存在等待接收数据的 goroutine,立刻设置处理器的 runnext 属性,但是并不会立刻触发调度。
  4. 发送数据时并没有找到接收方并且缓冲区已经满了,这时会将自己加入 channel 的 sendq 队列并调用 runtime.goparkunlock 触发 Goroutine 的调度让出处理器的使用权。

http://www.niftyadmin.cn/n/5535312.html

相关文章

[Microsoft Office]Word设置页码从第二页开始为1

目录 第一步&#xff1a;设置页码格式 第二步&#xff1a;设置“起始页码”为0 第三步&#xff1a;双击页码&#xff0c;出现“页脚”提示 第四步&#xff1a;选中“首页不同” 第一步&#xff1a;设置页码格式 第二步&#xff1a;设置“起始页码”为0 第三步&#xff1a;双…

Elasticsearch集群部署(下)

目录 上篇&#xff1a;Elasticsearch集群部署&#xff08;上&#xff09;-CSDN博客 七. Filebeat 部署 八. 部署Kafka 九. 集群测试 链接&#xff1a;https://pan.baidu.com/s/1AFXSmDdY5xBb7g35ipKoaw?pwdfa9m 提取码&#xff1a;fa9m 七. Filebeat 部署 为什么用 F…

概述:监督学习(分类,回归)与无监督学习(聚类)

目录&#xff1a; 一、监督学习&#xff1a;1.什么是监督学习&#xff1a;2.监督学习类型: 二、无监督学习1.什么是无监督学习&#xff1a;2.无监督学习类型: 一、监督学习&#xff1a; 1.什么是监督学习&#xff1a; 当前创造市场价值的机器学习中99%都是监督学习。监督学习…

API-正则表达式

学习目标&#xff1a; 掌握正则表达式 学习内容&#xff1a; 什么是正则表达式语法元字符修饰符 什么是正则表达式&#xff1a; 正则表达式是用于匹配字符串中字符组合的模式。在JavaScript中&#xff0c;正则表达式也是对象。 通常用来查找、替换那些符合正则表达式的文本&a…

Django实现部门管理功能

在这篇文章中,我们将介绍如何使用Django框架实现一个简单的部门管理功能。这个功能包括部门列表展示、添加新部门、编辑和删除部门等操作。 1. 项目设置 首先,确保你已经安装了Django并创建了一个新的Django项目。在项目中,我们需要创建一个名为​​app01​​的应用。 2.…

Linux多进程和多线程(五)进程间通信-消息队列

多进程(五) 进程间通信 消息队列 ftok()函数创建消息队列 创建消息队列示例 msgctl 函数示例:在上⼀个示例的基础上&#xff0c;加上删除队列的代码 发送消息 示例: 接收消息示例 多进程(五) 进程间通信 消息队列 消息队列是一种进程间通信机制&#xff0c;它允许两个或多个…

【机器学习】Google开源大模型Gemma2:原理、微调训练及推理部署实战

目录 一、引言 二、模型简介 2.1 Gemma2概述 2.2 Gemma2 模型架构 三、训练与推理 3.1 Gemma2 模型训练 3.1.1 下载基座模型 3.1.2 导入依赖库 3.1.3 量化配置 3.1.4 分词器和模型实例化 3.1.5 引入PEFT进行LORA配置 3.1.6 样本数据清洗与加载 3.1.7 模型训练与保…

下载安装MySQL

1.软件的下载 打开官网下载mysql-installer-community-8.0.37.0.msi 2.软件的安装 mysql下载完成后&#xff0c;找到下载文件&#xff0c;双击安装 3.配置环境变量 4.自带客户端登录与退出