β

docker swarmkit 源码阅读: Agent 部分

nosa.me 27 阅读

相关的数据结果如下。

e2939924-632e-4512-8526-ec1420ab0e05

85a0d8de-6e44-48ce-a701-c416f85e9bfa

e8abffa3-e4ec-42a1-aba8-3a435b74931a

30441360-5af9-4106-ac53-d40cea2a071c

worker 负责管理任务的执行、状态上报和保存,taskManagers 用来管理任务的执行、listeners 负责监听要任务上报的信息并上报给 Dispatcher,额外的,任务信息会报错在本地 DB 中。

83602e28-7f61-4709-8aa0-df5a4ec42968

reporter 用来报告任务的状态,包括本地 DB 和 Manger,reporter 由 worker 声明。

630d0620-0fe1-492f-a6d2-69c685bbd429

1. 输入一个 Config 配置创建一个 agent,会检查 Config 有效性,Credentials、Executor、DB 不能为空;

2. agent 开始 run,从 config.Executor.Describe 获取 NodeDescription,重写掉 config.Hostname;

3. 首先基于 NodeDescription 创建到 manager (基于算法选一个)的 session, 会把 sessionID 和 stream 保存到 session 结构中,然后拿到第一个 SessionMessage,放入 session.messages 中等待处理。

然后,分别开启三个 goroutine:heartbeat、watch、listen,三者产生的 error 会放入 session.errs。

heartbeat 用于保持 session 不断,发送请求的格式是 HeartbeatRequest(包含 sessionID),返回的结果包含 period,等待 period 之后继续请求,一直如此下去。

watch 用于监听 task 信息,先使用 Dispatcher 的 Assignments 方法,如果失败则使用 Dispatcher 的 Tasks 方法,拿到的 task 信息写入 session.assignments;

listen 用于监听 session stream 的内容,如果有内容放入  session.messages。

4. 初始化 worker,Worker 声明了四个方法:Init、AssignTasks、UpdateTasks、Listen。

初始化调用 Init 方法,遍历 DB(本地存储,task 会存在本地) 中的 task,如果 task 没有 assigned 给一个 node,则删除 task,否则从 DB 中获取 task 的状态覆盖掉当前状态,然后启动 task。Init 方法事实上是确保已经在本地 DB 中的 task 正确运行,对于新的 task 在后面看如何处理。

启动 task 的过程,调用 worker 的 taskManager 方法,如果 task 在 taskManagers,返回。

否则调用 worker 的 newTaskManager 创建,newTaskManager 先调用 exec.Resolve 方法获取 Controller 和 状态,其中状态会被改成 TaskStateAccepted,然后保存状态,包括 保存到 DB 存储和调用 listeners 中所有 statusReporterKey 中的 StatusReporter 的 UpdateTaskStatus 保存到 Dispatcher,最后调用另一个 newTaskManager 来创建 taskManager 结构并运行。

说说运行的处理流程:

1). 如果状态为 TaskStatePreparing,则调用 Controller 的 Prepare 方法,并设置状态为 TaskStateReady;

2). 如果状态为 TaskStateStarting,则调用 Controller 的 Start 方法,并设置状态为 TaskStateRunning;

3). 如果状态为 TaskStateRunning,则调用 Controller 的 Wait 方法,如果 task 执行完了,状态改为 TaskStateCompleted;

4). 如果状态为 TaskStateNew、TaskStateAllocated、TaskStateAssigned 则改为 TaskStateAccepted;

5). 如果状态为 TaskStateAccepted,则改成 TaskStatePreparing;

6). 如果状态为 TaskStateReady,则改成 TaskStateStarting。

上面所说,实际上是新建一个 task 的情况,状态是 TaskStateAccepted,套用状态变更流程,先会改成 TaskStatePreparing,然后改成 TaskStateReady,然后改成 TaskStateStarting,再改成 TaskStateRunning。

事实上,taskManager 结构在运行开始就会调用一次处理流程,以保证 TaskStateAccepted 状态的 task 立即被执行,而且 taskManager 还接收 task 的 update,如果和当前维护 task 的配置(CPU、MEMORY 等)不同,则进入处理流程(如果是 CPU、MEMORY 不同,docker 应该不会重启 task,但是其他的呢,比如 Network?)。

5. 上面说保存任务状态会调用 (worker.)listeners 中所有 statusReporterKey 中的 StatusReporter 的 UpdateTaskStatus,这里的核心是 StatusReporter 这个接口。

1). agent 声明了 UpdateTaskStatus 函数,所以是 StatusReporter 这个接口的实例;

2). agent 的 run 会调用 newStatusReporter 创建 statusReporter 结构并运行,statusReporter 结构中的 reporter 就是 agent 的 UpdateTaskStatus;

3). agent 的 run 调用 worker.Listen 把 statusReporter 注册到 listeners 中(由于 statusReporter 声明了UpdateTaskStatus 函数,所以 statusReporter 是  StatusReporter 接口,所以才能加入到 listeners 中)。

4). statusReporter 的运行:不断从 statuses(map[string]*api.TaskStatus) 中获取 status 并调用 reporter.UpdateTaskStatus 完成上报;

5). statusReporter 并提供 UpdateTaskStatus 向 statuses 增加新的 status;

6). 是谁调用 UpdateTaskStatus 增加 status 呢?

答案是 worker 中的 taskManager,worker 对每一个 task 都一个 taskManager,taskManager 中有一个 reporter,每当有状态变化时,会执行此 reporter。

这个 reporter 函数是 worker 封装的,包括保存到本地 DB 和 调用 listeners 中所有 statusReporterKey 中的 StatusReporter 的 UpdateTaskStatus,而 listeners 中 statusReporterKey 中的 StatusReporter 的 UpdateTaskStatus 是什么?是 statusReporter 的 UpdateTaskStatus,执行此函数相当于向 statusReporter.statuses 增加 status。

实在是绕,简单的说:

1). agent 基于自己的 UpdateTaskStatus 包装出满足 StatusReporter 接口的 statusReporter,并把 statusReporter 注册到到 worker.listeners,当有新状态时 statusReporter 后台自动调用 agent 的 UpdateTaskStatus 上报(到Dispatcher);

2). worker 会为每一个 task 分配 taskManager,taskManager 会包含上报函数,当有状态变更时,会执行 statusReporter 的 UpdateTaskStatus 实现向 statusReporter 的状态列表中增加状态。

6. 说说 agent 的  UpdateTaskStatus 函数,通过 session 的 sendTaskStatus 函数调用到 Dispatcher RPC 请求,保存到 store 中,Dispatcher 的 UpdateTaskStatus 实现此功能;

7. 上面说了 agent 会监听 Dispatcher 来的信息,包括 session.assignments 和 session.messages,agent 会分别处理这两个信息。

1). 对于 session.assignments,是任务信息,分为全量信息和增量信息, 增量信息包括该更新的任务和该删除的任务。

对于更新的任务:

i). 先把任务保存到本地 DB 并标记为 assigned;

ii). 如果任务已经在 taskManagers,则更新,任务的配置不一样时才会真正更新,状态会被忽略,比如 CPU、MEMORY 变化,会更新;

iii). 如果任务不在 taskManagers,看本地 DB 里是否存在,如果存在,更新任务的状态(本地 DB 的状态更准),如果不在本地 DB,则保存状态,然后启动任务。

启动任务,会调用 worker.taskManager,创建新的 taskManager 并加到 taskManagers。

对于删除的任务:

i). 删除本地 DB 的 assigned 状态;

ii). 从 taskManagers 中删除;

iii). 关闭 taskManager。

全量信息先全部当做更新的任务处理,但是当前运行的任务如果不在全量列表里则会被删除。

2). 对于 session.messages,包括的功能有:

i). 更新 Manager 列表,保持最新(agent 来负责更新 Manager 列表);

ii). worker 的角色是否有变(NotifyRoleChange);

iii). 保持 NetworkBootstrapKeys 最新。

Related posts:

  1. swarm task 的更新流程
  2. docker swarmkit 源码阅读: Dispatcher 部分
  3. docker swarmkit 源码阅读: Scheduler 部分
  4. docker swarmkit 源码阅读: Orchestrator 部分
作者:nosa.me
未来不会有sa
原文地址:docker swarmkit 源码阅读: Agent 部分, 感谢原作者分享。

发表评论