扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
Go提供了一种称为通道的机制,用于在goroutine之间共享数据。当您作为goroutine执行并发活动时,需要在goroutine之间共享资源或数据,通道充当goroutine之间的管道(管道)并提供一种机制来保证同步交换。
张家川回族自治ssl适用于网站、小程序/APP、API接口等需要进行数据传输应用场景,ssl证书未来市场广阔!成为成都创新互联公司的ssl证书销售渠道,可以享受市场价格4-6折优惠!如果有意向欢迎电话联系或者加微信:028-86922220(备注:SSL证书合作)期待与您的合作!
根据数据交换的行为,有两种类型的通道:无缓冲通道和缓冲通道。无缓冲通道用于执行goroutine之间的同步通信,而缓冲通道用于执行异步通信。无缓冲通道保证在发送和接收发生的瞬间两个goroutine之间的交换。缓冲通道没有这样的保证。
通道由make函数创建,该函数指定chan关键字和通道的元素类型。
这是创建无缓冲和缓冲通道的代码块:
语法
使用内置函数make创建无缓冲和缓冲通道。make的第一个参数需要关键字chan,然后是通道允许交换的数据类型。
这是将值发送到通道的代码块需要使用-运算符:
语法
一个包含5个值的缓冲区的字符串类型的goroutine1通道。然后我们通过通道发送字符串“Australia”。
这是从通道接收值的代码块:
语法
- 运算符附加到通道变量(goroutine1)的左侧,以接收来自通道的值。
在无缓冲通道中,在接收到任何值之前没有能力保存它。在这种类型的通道中,发送和接收goroutine在任何发送或接收操作完成之前的同一时刻都准备就绪。如果两个goroutine没有在同一时刻准备好,则通道会让执行其各自发送或接收操作的goroutine首先等待。同步是通道上发送和接收之间交互的基础。没有另一个就不可能发生。
在缓冲通道中,有能力在接收到一个或多个值之前保存它们。在这种类型的通道中,不要强制goroutine在同一时刻准备好执行发送和接收。当发送和接收阻塞时也有不同的条件。只有当通道中没有要接收的值时,接收才会阻塞。仅当没有可用缓冲区来放置正在发送的值时,发送才会阻塞。
实例
运行结果
无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。
这种类型的通道要求发送goroutine和接收goroutine同时准备好,才能完成发送和接收操作。否则,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。
这种对通道进行发送和接收的交互行为本身就是同步的。其中任意一个操作都无法离开另一个操作单独存在。
阻塞:由于某种原因数据没有到达,当前协程(线程)持续处于等待状态,直到条件满足,才接触阻塞。
同步:在两个或多个协程(线程)间,保持数据内容一致性的机制。
下图展示两个 goroutine 如何利用无缓冲的通道来共享一个值:
在第 1 步,两个 goroutine 都到达通道,但哪个都没有开始执行发送或者接收。
在第 2 步,左侧的 goroutine 将它的手伸进了通道,这模拟了向通道发送数据的行为。这时,这个 goroutine 会在通道中被锁住,直到交换完成。
在第 3 步,右侧的 goroutine 将它的手放入通道,这模拟了从通道里接收数据。这个 goroutine 一样也会在通道中被锁住,直到交换完成。
在第 4 步和第 5 步,进行交换,并最终,在第 6 步,两个 goroutine 都将它们的手从通道里拿出来,这模拟了被锁住的 goroutine 得到释放。两个 goroutine 现在都可以去做别的事情了。
如果没有指定缓冲区容量,那么该通道就是同步的,因此会阻塞到发送者准备好发送和接收者准备好接收。
无缓冲channel: —— 同步通信
一、Kafka简述
1. 为什么需要用到消息队列
异步:对比以前的串行同步方式来说,可以在同一时间做更多的事情,提高效率;
解耦:在耦合太高的场景,多个任务要对同一个数据进行操作消费的时候,会导致一个任务的处理因为另一个任务对数据的操作变得及其复杂。
缓冲:当遇到突发大流量的时候,消息队列可以先把所有消息有序保存起来,避免直接作用于系统主体,系统主题始终以一个平稳的速率去消费这些消息。
2.为什么选择kafka呢?
这没有绝对的好坏,看个人需求来选择,我这里就抄了一段他人总结的的优缺点,可见原文
kafka的优点:
1.支持多个生产者和消费者2.支持broker的横向拓展3.副本集机制,实现数据冗余,保证数据不丢失4.通过topic将数据进行分类5.通过分批发送压缩数据的方式,减少数据传输开销,提高吞高量6.支持多种模式的消息7.基于磁盘实现数据的持久化8.高性能的处理信息,在大数据的情况下,可以保证亚秒级的消息延迟9.一个消费者可以支持多种topic的消息10.对CPU和内存的消耗比较小11.对网络开销也比较小12.支持跨数据中心的数据复制13.支持镜像集群
kafka的缺点:
1.由于是批量发送,所以数据达不到真正的实时2.对于mqtt协议不支持3.不支持物联网传感数据直接接入4.只能支持统一分区内消息有序,无法实现全局消息有序5.监控不完善,需要安装插件6.需要配合zookeeper进行元数据管理7.会丢失数据,并且不支持事务8.可能会重复消费数据,消息会乱序,可用保证一个固定的partition内部的消息是有序的,但是一个topic有多个partition的话,就不能保证有序了,需要zookeeper的支持,topic一般需要人工创建,部署和维护一般都比mq高
3. Golang 操作kafka
3.1. kafka的环境
网上有很多搭建kafka环境教程,这里就不再搭建,就展示一下kafka的环境,在kubernetes上进行的搭建,有需要的私我,可以发yaml文件
3.2. 第三方库
github.com/Shopify/sarama // kafka主要的库*github.com/bsm/sarama-cluster // kafka消费组
3.3. 消费者
单个消费者
funcconsumer(){varwg sync.WaitGroup consumer, err := sarama.NewConsumer([]string{"172.20.3.13:30901"},nil)iferr !=nil{ fmt.Println("Failed to start consumer: %s", err)return} partitionList, err := consumer.Partitions("test0")//获得该topic所有的分区iferr !=nil{ fmt.Println("Failed to get the list of partition:, ", err)return}forpartition :=rangepartitionList { pc, err := consumer.ConsumePartition("test0",int32(partition), sarama.OffsetNewest)iferr !=nil{ fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)return} wg.Add(1)gofunc(sarama.PartitionConsumer){//为每个分区开一个go协程去取值formsg :=rangepc.Messages() {//阻塞直到有值发送过来,然后再继续等待fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value)) }deferpc.AsyncClose() wg.Done() }(pc) } wg.Wait()}funcmain(){ consumer()}
消费组
funcconsumerCluster(){ groupID :="group-1"config := cluster.NewConfig() config.Group.Return.Notifications =trueconfig.Consumer.Offsets.CommitInterval =1* time.Second config.Consumer.Offsets.Initial = sarama.OffsetNewest//初始从最新的offset开始c, err := cluster.NewConsumer(strings.Split("172.20.3.13:30901",","),groupID, strings.Split("test0",","), config)iferr !=nil{ glog.Errorf("Failed open consumer: %v", err)return}deferc.Close()gofunc(c *cluster.Consumer){ errors := c.Errors() noti := c.Notifications()for{select{caseerr := -errors: glog.Errorln(err)case-noti: } } }(c)formsg :=rangec.Messages() { fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value)) c.MarkOffset(msg,"")//MarkOffset 并不是实时写入kafka,有可能在程序crash时丢掉未提交的offset}}funcmain(){goconsumerCluster()}
3.4. 生产者
同步生产者
packagemainimport("fmt""github.com/Shopify/sarama")funcmain(){ config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll//赋值为-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。config.Producer.Partitioner = sarama.NewRandomPartitioner//写到随机分区中,默认设置8个分区config.Producer.Return.Successes =truemsg := sarama.ProducerMessage{} msg.Topic =`test0`msg.Value = sarama.StringEncoder("Hello World!") client, err := sarama.NewSyncProducer([]string{"172.20.3.13:30901"}, config)iferr !=nil{ fmt.Println("producer close err, ", err)return}deferclient.Close() pid, offset, err := client.SendMessage(msg)iferr !=nil{ fmt.Println("send message failed, ", err)return} fmt.Printf("分区ID:%v, offset:%v \n", pid, offset)}
异步生产者
funcasyncProducer(){ config := sarama.NewConfig() config.Producer.Return.Successes =true//必须有这个选项config.Producer.Timeout =5* time.Second p, err := sarama.NewAsyncProducer(strings.Split("172.20.3.13:30901",","), config)deferp.Close()iferr !=nil{return}//这个部分一定要写,不然通道会被堵塞gofunc(p sarama.AsyncProducer){ errors := p.Errors() success := p.Successes()for{select{caseerr := -errors:iferr !=nil{ glog.Errorln(err) }case-success: } } }(p)for{ v :="async: "+ strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000)) fmt.Fprintln(os.Stdout, v) msg := sarama.ProducerMessage{ Topic: topics, Value: sarama.ByteEncoder(v), } p.Input() - msg time.Sleep(time.Second *1) }}funcmain(){goasyncProducer()select{ }}
3.5. 结果展示-
同步生产打印:
分区ID:0,offset:90
消费打印:
Partition:0,Offset:90,key:,value:Hello World!
异步生产打印:
async:7272async:7616async:998
消费打印:
Partition:0,Offset:91,key:,value:async:7272Partition:0,Offset:92,key:,value:async:7616Partition:0,Offset:93,key:,value:async:998
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流