n.netService, err = nebnet.NewNebService(n)
if err != nil {
logging.CLog().WithFields(logrus.Fields{
"err": err,
}).Fatal("Failed to setup net service.")
}
netservice有两个成员
type NebServicestruct {
node *Node
dispatcher *Dispatcher
}
跳出stup()函数
先进入start()函数看一看
if err := n.netService.Start()err != nil {
logging.CLog().WithFields(logrus.Fields{
"err": err,
}).Fatal("Failed to start net service.")
}
进入netservice.start()
func (ns *NebService) Start() error {
logging.CLog().Info("Starting NebService...")
// start dispatcher.
ns.dispatcher.Start()
// start node.
if err := ns.node.Start()err != nil {
ns.dispatcher.Stop()
logging.CLog().WithFields(logrus.Fields{
"err": err,
}).Error("Failed to start NebService.")
return err
}
logging.CLog().Info("Started NebService.")
return nil
}
可以看到第一个start()的函数是dispatcher.start()
进入dispatch.start()
func (dp *Dispatcher) Start() {
logging.CLog().Info("Starting NebService Dispatcher...")
go dp.loop()
}
然后就出现一个新的线程、goruntime
go dp.loop()
进入该线程,看它干了些什么
timerChan := time.NewTicker(time.Second).C
for {
select {
case <-timerChan:
metricsDispatcherCached.Update(int64(len(dp.receivedMessageCh)))
case <-dp.quitCh:
logging.CLog().Info("Stoped NebService Dispatcher.")
return
case msg := <-dp.receivedMessageCh:
msgType := msg.MessageType()
v, _ := dp.subscribersMap.Load(msgType)
if v == nil {
continue
}
m, _ := v.(*sync.Map)
m.Range(func(key, valueinterface{}) bool {
select {
case key.(*Subscriber).msgChan <- msg:
default:
logging.VLog().WithFields(logrus.Fields{
"msgType": msgType,
}).Warn("timeout to dispatch message.")
}
return true
})
}
}
一个有点长的循环
metricsDispatcherCached.Update(int64(len(dp.receivedMessageCh)))一秒钟刷新一次缓冲区
case msg := <-dp.receivedMessageCh:
msgType := msg.MessageType()如果能取出dp.receivedMessageCh
msgType := msg.MessageType()首先判断取出的信息类型
v, _ := dp.subscribersMap.Load(msgType)
if v == nil {
continue
}
根据类型取出相应的map
如果取不出,那么使用continue结束这个case
m, _ := v.(*sync.Map)
断言
m.Range(func(key, valueinterface{}) bool {
select {
case key.(*Subscriber).msgChan <- msg:
default:
logging.VLog().WithFields(logrus.Fields{
"msgType": msgType,
}).Warn("timeout to dispa+tch message.")
}
return true
})
将msg推入其他管道里面去。其他goruntime会循环等待该
package p2p
import (
"context"
"errors"
"time"
net "gx/ipfs/QmPjvxTpVH8qJyQDnxnsxF9kv9jezKD1kozz1hs3fCGsNh/go-libp2p-net"
manet "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net"
ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
pro "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
pstore "gx/ipfs/QmZR2XWVVBCtbgBWnQhWk2xcQfaR3W8faQPriAiaaj7rsr/go-libp2p-peerstore"
p2phost "gx/ipfs/Qmb8T6YBBsjYsVGfrihQLfCJveczZnneSBqBKkYEBWDjge/go-libp2p-host"
peer "gx/ipfs/QmdVrMn1LhB4ybb8hMVaMLXnA8XRSewMnK6YqXKXoTcRvN/go-libp2p-peer"
)
//P2P结构保存当前正在运行的流/监听器的信息
// P2P structure holds information on currently running streams/listeners
type P2P struct {
//监听器
Listeners ListenerRegistry
//数据流
Streams StreamRegistry
//节点ID
identity peer.ID
//节点地址
peerHost p2phost.Host
//一个线程安全的对等节点存储
peerstore pstore.Peerstore
}
//创建一个新的p2p结构
// NewP2P creates new P2P struct
//这个新的p2p结构不包含p2p结构中的监听器和数据流
func NewP2P(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore) *P2P {
return &P2P{
identity: identity,
peerHost: peerHost,
peerstore: peerstore,
}
}
//新建一个数据流 工具方法 构建一个有节点id,内容和协议的流
func (p2p P2P) newStreamTo(ctx2 context.Context, p peer.ID, protocol string) (net.Stream, error) {
//30s 后会自动timeout
ctx, cancel := context.WithTimeout(ctx2, time.Second 30) //TODO: configurable?
defer cancel()
err := p2p.peerHost.Connect(ctx, pstore.PeerInfo{ID: p})
if err != nil {
return nil, err
}
return p2p.peerHost.NewStream(ctx2, p, pro.ID(protocol))
}
//对话为远程监听器创建新的P2P流
//创建一个新的p2p流实现对对话的监听
// Dial creates new P2P stream to a remote listener
//Multiaddr是一种跨协议、跨平台的表示格式的互联网地址。它强调明确性和自我描述。
//对内接收
func (p2p P2P) Dial(ctx context.Context, addr ma.Multiaddr, peer peer.ID, proto string, bindAddr ma.Multiaddr) ( ListenerInfo, error) {
//获取一些节点信息 network, host, nil
lnet, _, err := manet.DialArgs(bindAddr)
if err != nil {
return nil, err
}
//监听信息
listenerInfo := ListenerInfo{
//节点身份
Identity: p2p.identity,
////应用程序协议标识符。
Protocol: proto,
}
//调用newStreamTo 通过ctx(内容) peer(节点id) proto(协议标识符) 参数获取一个新的数据流
remote, err := p2p.newStreamTo(ctx, peer, proto)
if err != nil {
return nil, err
}
//network协议标识
switch lnet {
//network为"tcp", "tcp4", "tcp6"
case "tcp", "tcp4", "tcp6":
//从监听器获取新的信息 nla.Listener, nil
listener, err := manet.Listen(bindAddr)
if err != nil {
if err2 := remote.Reset()err2 != nil {
return nil, err2
}
return nil, err
}
//将获取的新信息保存到listenerInfo
listenerInfo.Address = listener.Multiaddr()
listenerInfo.Closer = listener
listenerInfo.Running = true
//开启接受
go p2p.doAccept(&listenerInfo, remote, listener)
default:
return nil, errors.New("unsupported protocol: " + lnet)
}
return &listenerInfo, nil
}
//
func (p2p *P2P) doAccept(listenerInfo *ListenerInfo, remote net.Stream, listener manet.Listener) {
//关闭侦听器并删除流处理程序
defer listener.Close()
//Returns a Multiaddr friendly Conn
//一个有好的 Multiaddr 连接
local, err := listener.Accept()
if err != nil {
return
}
stream := StreamInfo{
//连接协议
Protocol: listenerInfo.Protocol,
//定位节点
LocalPeer: listenerInfo.Identity,
//定位节点地址
LocalAddr: listenerInfo.Address,
//远程节点
RemotePeer: remote.Conn().RemotePeer(),
//远程节点地址
RemoteAddr: remote.Conn().RemoteMultiaddr(),
//定位
Local: local,
//远程
Remote: remote,
//注册码
Registry: &p2p.Streams,
}
//注册连接信息
p2p.Streams.Register(&stream)
//开启节点广播
stream.startStreaming()
}
//侦听器将流处理程序包装到侦听器中
// Listener wraps stream handler into a listener
type Listener interface {
Accept() (net.Stream, error)
Close() error
}
//P2PListener保存关于侦听器的信息
// P2PListener holds information on a listener
type P2PListener struct {
peerHost p2phost.Host
conChchan net.Stream
protopro.ID
ctx context.Context
cancel func()
}
//等待侦听器的连接
// Accept waits for a connection from the listener
func (il *P2PListener) Accept() (net.Stream, error) {
select {
case c := <-il.conCh:
return c, nil
case <-il.ctx.Done():
return nil, il.ctx.Err()
}
}
//关闭侦听器并删除流处理程序
// Close closes the listener and removes stream handler
func (il *P2PListener) Close() error {
il.cancel()
il.peerHost.RemoveStreamHandler(il.proto)
return nil
}
// Listen创建新的P2PListener
// Listen creates new P2PListener
func (p2p P2P) registerStreamHandler(ctx2 context.Context, protocol string) ( P2PListener, error) {
ctx, cancel := context.WithCancel(ctx2)
list := &P2PListener{
peerHost: p2p.peerHost,
proto:pro.ID(protocol),
conCh:make(chan net.Stream),
ctx: ctx,
cancel: cancel,
}
p2p.peerHost.SetStreamHandler(list.proto, func(s net.Stream) {
select {
case list.conCh <- s:
case <-ctx.Done():
s.Reset()
}
})
return list, nil
}
// NewListener创建新的p2p侦听器
// NewListener creates new p2p listener
//对外广播
func (p2p P2P) NewListener(ctx context.Context, proto string, addr ma.Multiaddr) ( ListenerInfo, error) {
//调用registerStreamHandler 构造一个新的listener
listener, err := p2p.registerStreamHandler(ctx, proto)
if err != nil {
return nil, err
}
//构造新的listenerInfo
listenerInfo := ListenerInfo{
Identity: p2p.identity,
Protocol: proto,
Address: addr,
Closer: listener,
Running: true,
Registry: &p2p.Listeners,
}
go p2p.acceptStreams(&listenerInfo, listener)
//注册连接信息
p2p.Listeners.Register(&listenerInfo)
return &listenerInfo, nil
}
//接受流
func (p2p *P2P) acceptStreams(listenerInfo *ListenerInfo, listener Listener) {
for listenerInfo.Running {
//一个有好的 远程 连接
remote, err := listener.Accept()
if err != nil {
listener.Close()
break
}
}
//取消注册表中的p2p侦听器
p2p.Listeners.Deregister(listenerInfo.Protocol)
}
// CheckProtoExists检查是否注册了协议处理程序
// mux处理程序
// CheckProtoExists checks whether a protocol handler is registered to
// mux handler
func (p2p *P2P) CheckProtoExists(proto string) bool {
protos := p2p.peerHost.Mux().Protocols()
for _, p := range protos {
if p != proto {
continue
}
return true
}
return false
}
个人觉得golang十分适合进行网游服务器端开发,写下这篇文章总结一下。从网游的角度看:要成功的运营一款网游,很大程度上依赖于玩家自发形成的社区。只有玩家自发形成一个稳定的生态系统,游戏才能持续下去,避免鬼城的出现。而这就需要多次大量导入用户,在同时在线用户量达到某个临界点的时候,才有可能完成。因此,多人同时在线十分有必要。再来看网游的常见玩法,除了排行榜这类统计和数据汇总的功能外,基本没有需要大量CPU时间的应用。以前的项目里,即时战斗产生的各种伤害计算对CPU的消耗也不大。玩家要完成一次操作,需要通过客户端-服务器端-客户端这样一个来回,为了获得高响应速度,满足玩家体验,服务器端的处理也不能占用太多时间。所以,每次请求对应的CPU占用是比较小的。网游的IO主要分两个方面,一个是网络IO,一个是磁盘IO。网络IO方面,可以分成美术资源的IO和游戏逻辑指令的IO,这里主要分析游戏逻辑的IO。游戏逻辑的IO跟CPU占用的情况相似,每次请求的字节数很小,但由于多人同时在线,因此并发数相当高。另外,地图信息的广播也会带来比较频繁的网络通信。磁盘IO方面,主要是游戏数据的保存。采用不同的数据库,会有比较大的区别。以前的项目里,就经历了从MySQL转向MongoDB这种内存数据库的过程,磁盘IO不再是瓶颈。总体来说,还是用内存做一级缓冲,避免大量小数据块读写的方案。针对网游的这些特点,golang的语言特性十分适合开发游戏服务器端。首先,go语言提供goroutine机制作为原生的并发机制。每个goroutine所需的内存很少,实际应用中可以启动大量的goroutine对并发连接进行响应。goroutine与gevent中的greenlet很相像,遇到IO阻塞的时候,调度器就会自动切换到另一个goroutine执行,保证CPU不会因为IO而发生等待。而goroutine与gevent相比,没有了python底层的GIL限制,就不需要利用多进程来榨取多核机器的性能了。通过设置最大线程数,可以控制go所启动的线程,每个线程执行一个goroutine,让CPU满负载运行。同时,go语言为goroutine提供了独到的通信机制channel。channel发生读写的时候,也会挂起当前操作channel的goroutine,是一种同步阻塞通信。这样既达到了通信的目的,又实现同步,用CSP模型的观点看,并发模型就是通过一组进程和进程间的事件触发解决任务的。虽然说,主流的编程语言之间,只要是图灵完备的,他们就都能实现相同的功能。但go语言提供的这种协程间通信机制,十分优雅地揭示了协程通信的本质,避免了以往锁的显式使用带给程序员的心理负担,确是一大优势。进行网游开发的程序员,可以将游戏逻辑按照单线程阻塞式的写,不需要额外考虑线程调度的问题,以及线程间数据依赖的问题。因为,线程间的channel通信,已经表达了线程间的数据依赖关系了,而go的调度器会给予妥善的处理。另外,go语言提供的gc机制,以及对指针的保护式使用,可以大大减轻程序员的开发压力,提高开发效率。展望未来,我期待go语言社区能够提供更多的goroutine间的隔离机制。个人十分推崇erlang社区的脆崩哲学,推动应用发生预期外行为时,尽早崩溃,再fork出新进程处理新的请求。对于协程机制,需要由程序员保证执行的函数不会发生死循环,导致线程卡死。