怎样理解golang的异步

Python014

怎样理解golang的异步,第1张

同步的意思是,后一条指令必须要等待上一条指令执行完成后,才开始运行;异步呢就是,上一条指令启动后,就在“另一个维度”运行了,和下一条指令好像是同时运行的,更为生动的说法,我照搬一下一个百度知道的神比喻:

你给了狗一个包子,然后就走开做别的事去了,过后狗追过来对你说了声谢谢,

或者咬了你一口说,包子有毒。这是异步。

同样你给了狗一个包子后,看着狗把包子吃完,并对你摇尾巴,这个期间你一点别的

事都不做,就等着狗吃完包子。 这是同步。

用你问题中的代码,可以理解为:

你再给了狗一个包子,然后就呆在原地除了等狗啥都不做,过会儿狗追过来对你说了声谢谢,你再心满意足地继续做自己的事情,这就是将异步阻塞后变成的同步。

所以呢,同步和阻塞还是有那么些联系的(如果你把它们揉在一起的话)。

再说回 golang,其实你代码中,也就两个 go 语句是达到这个效果了的,但相对于 fmt.Println("继续执行") 以及后面的代码来说,前两条语句还是异步执行的,并且它不像 javascript 那样只是单线程工作(并发),golang 协程是可以多个同时工作的(并行)

一、Kafka简述

1. 为什么需要用到消息队列

异步:对比以前的串行同步方式来说,可以在同一时间做更多的事情,提高效率;

解耦:在耦合太高的场景,多个任务要对同一个数据进行操作消费的时候,会导致一个任务的处理因为另一个任务对数据的操作变得及其复杂。

缓冲:当遇到突发大流量的时候,消息队列可以先把所有消息有序保存起来,避免直接作用于系统主体,系统主题始终以一个平稳的速率去消费这些消息。

2.为什么选择kafka呢?

这没有绝对的好坏,看个人需求来选择,我这里就抄了一段他人总结的的优缺点,可见原文

kafka的优点:

1.支持多个生产者和消费者2.支持broker的横向拓展3.副本集机制,实现数据冗余,保证数据不丢失4.通过topic将数据进行分类5.通过分批发送压缩数据的方式,减少数据传输开销,提高吞高量6.支持多种模式的消息7.基于磁盘实现数据的持久化8.高性能的处理信息,在大数据的情况下,可以保证亚秒级的消息延迟9.一个消费者可以支持多种topic的消息10.对CPU和内存的消耗比较小11.对网络开销也比较小12.支持跨数据中心的数据复制13.支持镜像集群

kafka的缺点:

1.由于是批量发送,所以数据达不到真正的实时2.对于mqtt协议不支持3.不支持物联网传感数据直接接入4.只能支持统一分区内消息有序,无法实现全局消息有序5.监控不完善,需要安装插件6.需要配合zookeeper进行元数据管理7.会丢失数据,并且不支持事务8.可能会重复消费数据,消息会乱序,可用保证一个固定的partition内部的消息是有序的,但是一个topic有多个partition的话,就不能保证有序了,需要zookeeper的支持,topic一般需要人工创建,部署和维护一般都比mq高

3. Golang 操作kafka

3.1. kafka的环境

网上有很多搭建kafka环境教程,这里就不再搭建,就展示一下kafka的环境,在kubernetes上进行的搭建,有需要的私我,可以发yaml文件

3.2. 第三方库

github.com/Shopify/sarama // kafka主要的库*github.com/bsm/sarama-cluster // kafka消费组

3.3. 消费者

单个消费者

funcconsumer(){varwg sync.WaitGroup  consumer, err := sarama.NewConsumer([]string{"172.20.3.13:30901"},nil)iferr !=nil{      fmt.Println("Failed to start consumer: %s", err)return}  partitionList, err := consumer.Partitions("test0")//获得该topic所有的分区iferr !=nil{      fmt.Println("Failed to get the list of partition:, ", err)return}forpartition :=rangepartitionList {      pc, err := consumer.ConsumePartition("test0",int32(partition), sarama.OffsetNewest)iferr !=nil{        fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)return}      wg.Add(1)gofunc(sarama.PartitionConsumer){//为每个分区开一个go协程去取值formsg :=rangepc.Messages() {//阻塞直到有值发送过来,然后再继续等待fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))        }deferpc.AsyncClose()        wg.Done()      }(pc)  }  wg.Wait()}funcmain(){  consumer()}

消费组

funcconsumerCluster(){  groupID :="group-1"config := cluster.NewConfig()  config.Group.Return.Notifications =trueconfig.Consumer.Offsets.CommitInterval =1* time.Second  config.Consumer.Offsets.Initial = sarama.OffsetNewest//初始从最新的offset开始c, err := cluster.NewConsumer(strings.Split("172.20.3.13:30901",","),groupID, strings.Split("test0",","), config)iferr !=nil{      glog.Errorf("Failed open consumer: %v", err)return}deferc.Close()gofunc(c *cluster.Consumer){      errors := c.Errors()      noti := c.Notifications()for{select{caseerr := <-errors:            glog.Errorln(err)case<-noti:        }      }  }(c)formsg :=rangec.Messages() {      fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))      c.MarkOffset(msg,"")//MarkOffset 并不是实时写入kafka,有可能在程序crash时丢掉未提交的offset}}funcmain(){goconsumerCluster()}

3.4. 生产者

同步生产者

packagemainimport("fmt""github.com/Shopify/sarama")funcmain(){  config := sarama.NewConfig()  config.Producer.RequiredAcks = sarama.WaitForAll//赋值为-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。config.Producer.Partitioner = sarama.NewRandomPartitioner//写到随机分区中,默认设置8个分区config.Producer.Return.Successes =truemsg := &sarama.ProducerMessage{}  msg.Topic =`test0`msg.Value = sarama.StringEncoder("Hello World!")  client, err := sarama.NewSyncProducer([]string{"172.20.3.13:30901"}, config)iferr !=nil{      fmt.Println("producer close err, ", err)return}deferclient.Close()  pid, offset, err := client.SendMessage(msg)iferr !=nil{      fmt.Println("send message failed, ", err)return}  fmt.Printf("分区ID:%v, offset:%v \n", pid, offset)}

异步生产者

funcasyncProducer(){  config := sarama.NewConfig()  config.Producer.Return.Successes =true//必须有这个选项config.Producer.Timeout =5* time.Second  p, err := sarama.NewAsyncProducer(strings.Split("172.20.3.13:30901",","), config)deferp.Close()iferr !=nil{return}//这个部分一定要写,不然通道会被堵塞gofunc(p sarama.AsyncProducer){      errors := p.Errors()      success := p.Successes()for{select{caseerr := <-errors:iferr !=nil{              glog.Errorln(err)            }case<-success:        }      }  }(p)for{      v :="async: "+ strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))      fmt.Fprintln(os.Stdout, v)      msg := &sarama.ProducerMessage{        Topic: topics,        Value: sarama.ByteEncoder(v),      }      p.Input() <- msg      time.Sleep(time.Second *1)  }}funcmain(){goasyncProducer()select{      }}

3.5. 结果展示->

同步生产打印:

分区ID:0,offset:90

消费打印:

Partition:0,Offset:90,key:,value:Hello World!

异步生产打印:

async:7272async:7616async:998

消费打印:

Partition:0,Offset:91,key:,value:async:7272Partition:0,Offset:92,key:,value:async:7616Partition:0,Offset:93,key:,value:async:998

协程,又称微线程,纤程。英文名 Coroutine 。Python对协程的支持是通过 generator 实现的。在generator中,我们不但可以通过for循环来迭代,还可以不断调用 next()函数 获取由 yield 语句返回的下一个值。但是Python的yield不但可以返回一个值,它还可以接收调用者发出的参数。yield其实是终端当前的函数,返回给调用方。python3中使用yield来实现range,节省内存,提高性能,懒加载的模式。

asyncio是Python 3.4 版本引入的 标准库 ,直接内置了对异步IO的支持。

从Python 3.5 开始引入了新的语法 async 和 await ,用来简化yield的语法:

import asyncio

import threading

async def compute(x, y):

    print("Compute %s + %s ..." % (x, y))

    print(threading.current_thread().name)

    await asyncio.sleep(x + y)

    return x + y

async def print_sum(x, y):

    result = await compute(x, y)

    print("%s + %s = %s" % (x, y, result))

    print(threading.current_thread().name)

if __name__ == "__main__":

    loop = asyncio.get_event_loop()

    tasks = [print_sum(1, 2), print_sum(3, 4)]

    loop.run_until_complete(asyncio.wait(tasks))

    loop.close()

线程是内核进行抢占式的调度的,这样就确保了每个线程都有执行的机会。而 coroutine 运行在同一个线程中,由语言的运行时中的 EventLoop(事件循环) 来进行调度。和大多数语言一样,在 Python 中,协程的调度是非抢占式的,也就是说一个协程必须主动让出执行机会,其他协程才有机会运行。

让出执行的关键字就是 await。也就是说一个协程如果阻塞了,持续不让出 CPU,那么整个线程就卡住了,没有任何并发。

PS: 作为服务端,event loop最核心的就是IO多路复用技术,所有来自客户端的请求都由IO多路复用函数来处理作为客户端,event loop的核心在于利用Future对象延迟执行,并使用send函数激发协程,挂起,等待服务端处理完成返回后再调用CallBack函数继续下面的流程

Go语言的协程是 语言本身特性 ,erlang和golang都是采用了CSP(Communicating Sequential Processes)模式(Python中的协程是eventloop模型),但是erlang是基于进程的消息通信,go是基于goroutine和channel的通信。

Python和Go都引入了消息调度系统模型,来避免锁的影响和进程/线程开销大的问题。

协程从本质上来说是一种用户态的线程,不需要系统来执行抢占式调度,而是在语言层面实现线程的调度 。因为协程 不再使用共享内存/数据 ,而是使用 通信 来共享内存/锁,因为在一个超级大系统里具有无数的锁,共享变量等等会使得整个系统变得无比的臃肿,而通过消息机制来交流,可以使得每个并发的单元都成为一个独立的个体,拥有自己的变量,单元之间变量并不共享,对于单元的输入输出只有消息。开发者只需要关心在一个并发单元的输入与输出的影响,而不需要再考虑类似于修改共享内存/数据对其它程序的影响。