storm superviser FileNotFoundException异常退出原因分析

storm版本

1.0.2

故障日志

日志分析

分析nimbus日志可知,这是一次正常的分配行为。
正常流程

  1. Downloading code for storm id
  2. Successfully downloaded blob resources for storm-id
  3. Finished downloading code for storm id
  4. Launching worker with assignment

异常流程

  1. Downloading code for storm id
  2. Successfully downloaded blob resources for storm-id
  3. Finished downloading code for storm id
  4. Removing code for storm id
  5. Missing topology storm code, so can’t launch worker with assignment

可以明显看出,在正常流程中多了一个Removing code的过程。

源码分析

supervisor.clj代码中,有两个scheduler,分别周期性调用sync-processesmk-synchronize-supervisor两个函数。

  (schedule-recurring (:event-timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor)))
(schedule-recurring (:event-timer supervisor)
0
(conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
(fn [] (.add processes-event-manager sync-processes)))

其中:

mk-synchronize-supervisor

负责从nimbus下载代码,并写入分配信息到LocalState中

  (doseq [[storm-id master-code-dir] storm-code-map]
(when (and (not (downloaded-storm-ids storm-id))
(assigned-storm-ids storm-id))

(log-message "Downloading code for storm id " storm-id)
(try-cause
(download-storm-code conf storm-id master-code-dir localizer)

(catch NimbusLeaderNotFoundException e
(log-warn-error e "Nimbus leader was not available."))

(catch TTransportException e
(log-warn-error e "There was a connection problem with nimbus."))
)

(log-message "Finished downloading code for storm id " storm-id))
)


(log-debug "Writing new assignment "
(pr-str new-assignment))

(doseq [p (set/difference (set (keys existing-assignment))
(set (keys new-assignment)))
]

(.killedWorker isupervisor (int p)))

(.assigned isupervisor (keys new-assignment))
(ls-local-assignments! local-state
new-assignment)

(reset! (:assignment-versions supervisor) versions)
(reset! (:stormid->profiler-actions supervisor) storm-id->profiler-actions)
(reset! (:curr-assignment supervisor) new-assignment)

(.add processes-event-manager sync-processes))))

sync-processes

负责从LocalState中读取分配信息,检查是否需要关闭worker,是否需要移除code,并启动worker。

(doseq [[id [state heartbeat]] allocated]
(when (not= :valid state)
(log-message
"Shutting down and clearing state for id " id
". Current supervisor time: " now
". State: " state
", Heartbeat: " (pr-str heartbeat))

(shutdown-worker supervisor id))
)

;;all-downloaded-storm-ids是从本地文件中读取的已经下载好的文件
(doseq [storm-id all-downloaded-storm-ids]
(when-not (assigned-storm-ids storm-id);;assigned-storm-ids从LocalState中判断
(log-message "Removing code for storm id "
storm-id)

(rm-topo-files conf storm-id localizer true))
)

;;get-valid-new-worker-ids负责启动worker
(let [valid-new-worker-ids (get-valid-new-worker-ids conf supervisor reassign-executors new-worker-ids)]
(ls-approved-workers! local-state
(merge
(select-keys (ls-approved-workers local-state)
(keys keepers))

valid-new-worker-ids)
)

(wait-for-workers-launch conf (keys valid-new-worker-ids)))
))

问题发生过程

  1. Nimbus正常分配任务
  2. Supervisor正常下载代码。
  3. 运行到上述代码Finished downloading code for storm id处。到目前为止工作正常。
  4. 此时scheduler触发sync-processes函数,读取了硬盘上下载好的topology列表。
  5. 由于第二步中,分配信息还没有写入到LocalState中。(when-not (assigned-storm-ids storm-id)判断为true。
  6. 移除下载好的代码。
  7. 继续正常启动worker过程,发现没有代码,启动失败。
  8. supervisor退出。

问题解决

  • 由于该问题只会造成supervisor退出,且目前配置有自动拉起,可暂不解决。
  • 如果要停止在1.0.2的版本,可针对如上问题解决,增加同步机制即可。
  • Storm 1.0.3中完全重构了supervisor代码,目前暂时没有发现这一问题。可升级解决。

Flink On Yarn Resource Isolation

解决问题

Flink目前没有资源隔离,不同任务的task可能跑在同一个JVM进程中,无法进行资源隔离,一个应用出问题,容易导致集群全部挂掉。

Flink on Yarn

Flink提供两种Yarn的部署方式Yarn Setup

  1. Start a long-running Flink cluster on YARN

    • 通过命令yarn-session.sh来实现,本质上是在yarn集群上启动一个flink集群。
    • 由yarn预先给flink集群分配若干个container给flink使用,在yarn的界面上只能看到一个Flink session with X TaskManagers的任务。
    • 只有一个Flink界面,可以从Yarn的ApplicationMaster链接进入。
    • 使用bin/flink run命令发布任务时,本质上是使用Flink自带的调度,与普通的在Flink集群上发布任务并没有不同。不同的任务可能在一个TaskManager中,也即是在一个JVM进程中,无法实现资源隔离。
  2. Run a Flink job on YARN

    • 通过命令bin/flink run -m yarn-cluster实现,一次只发布一个任务,本质上给每个flink任务启动了一个集群。
    • yarn不事先给flink分配container,而是在任务发布时,启动JobManager(对应Yarn的AM)和TaskManager,如果一个任务指定了n个TaksManager(-yn n),则会启动n+1个Container,其中一个是JobManager。
    • 发布m个应用,则有m个Flink界面,对比方式一,同样发布m个应用,会多出m-1个JobManager的。
    • 发布任务时,实际上是使用了Yarn的调用。不同的任务不可能在一个Container(JVM)中,也即是实现了资源隔离。

Example

假设已经搭建好了yarn和hdfs集群。Flink目录放在yarn集群的某台机器上,最好是ResourceManager。该机器必须配置有YARN_CONF_DIR 或HADOOP_CONF_DIR环境变量,且Flink能访问。

集群模式

  • 命令:./bin/yarn-session.sh -n 10 -tm 8192 -s 8,表示启动10个TaskManager,每个内存为8G,slots数为8个的Flink集群。

  • 点开Tracking UI中的ApplicationMaster,可以看到Flink的页面。

  • Flink页面中能看到目前只启动了一个TaskMananger(一个JVM进程),并且有FreeSlot,新启动的Flink Job会在这些slots中启动,直到没有更多FreeSlots了才会分配新的TaskMananger。
  • 运行两个命令:./bin/flink run ./examples/streaming/SocketWindowWordCount.jar –port 12345./bin/flink run ./examples/streaming/SocketWindowWordCount.jar –port 1234启动两个flink任务(需要预先启动nc -l 12345和nc -l 1234才能正常运行),截图如下:

  • 可以看到是在同一个JVM进程中运行的,没有资源隔离。

单个任务模式

  • 命令./bin/flink run -m yarn-cluster -yn 2 -ys 4 ./examples/streaming/SocketWindowWordCount.jar –port 12345./bin/flink run -m yarn-cluster -yn 2 -ys 2 ./examples/streaming/SocketWindowWordCount.jar –port 1234分别表示以yarn-cluster模式,启动了两个flink任务,第一个使用了2个container,每个container 4个slots,第二个使用了2个container,每个container 2个slots

  • Yarn中能看到两个任务,分别有两个ApplicationMaster,对应Flink的JobMananger。

  • 可以看到两个Flink集群中,均没有Free Slots,所有的资源仅提供给当前任务运行。使用jps查看进程也能看到集群上共有2个ApplicationMaster和4个TaskMananger。实现了资源隔离。

其他

  1. slots数的指定:对于standalone cluster而言,由于一台机器上只有一个TaskManager,slots数应与机器核数相同。对于single job on yarn模式和yarn cluster模式而言,一台机器上可能有多个TaskManager(取决于yarn在该机器上分配的container数),理论上应该与该Container分配的核数一致为佳。
  2. Flink命令执行后,在任务执行完之前不会返回,控制台是不能退出的。可以在命令后加-d 参数,表示detached,但此时无法再通过flink命令结束任务,需要通过yarn命令yarn application -kill < appId > 结束任务。

Strom nimbus HA summary

解决问题:

nimbus 单点故障问题

nimbus HA部署方式:

  1. 假设原有nimbus为A,相关配置为(storm.yaml):nimbus.seeds:[“A”],现在增加B机器作为nimbus HA
  2. 将所有机器的配置改为:nimbus.seeds: [ “A”,”B”]
  3. 重启节点,重启顺序如下:

    1. 重启nimbus A
    2. 重启ui
    3. 启动nimbus B
    4. 重启所有supervisor
  4. 观察ui上,是否有两个nimbus节点,如图所示,则成功:

storm命令访问nimbus过程

  1. storm命令可以在任何一台集群节点上执行
  2. 在linux上,storm命令调用storm.py,拼接Java命令启动Java执行(windows上调用storm.bat拼接命令)。
  3. Java命令调用nimbusClient。
  4. nimbusClient读取本地配置文件(storm.yaml),读取其中的nimbus.seeds配置
  5. nimbusClient依次访问nimbus.seeds中的节点,如果超时则重试连接下一个。
for (String host : seeds) {
int port = Integer.parseInt(conf.get(Config.NIMBUS_THRIFT_PORT).toString());
ClusterSummary clusterInfo;
try {
NimbusClient client = new NimbusClient(conf, host, port, null, asUser);
clusterInfo = client.getClient().getClusterInfo();
} catch (Exception e) {
LOG.warn("Ignoring exception while trying to get leader nimbus info from " + host
+ ". will retry with a different seed host.", e);
continue;
}
...
}
  1. 获取到所有存活的nimbus节点,依次检测它们是否为leader,如果是,则连接。
List<NimbusSummary> nimbuses = clusterInfo.get_nimbuses();
if (nimbuses != null) {
for (NimbusSummary nimbusSummary : nimbuses) {
if (nimbusSummary.is_isLeader()) {
try {
return new NimbusClient(conf, nimbusSummary.get_host(), nimbusSummary.get_port(), null, asUser);
} catch (TTransportException e) {
String leaderNimbus = nimbusSummary.get_host() + ":" + nimbusSummary.get_port();
throw new RuntimeException("Failed to create a nimbus client for the leader " + leaderNimbus, e);
}
}
}
throw new NimbusLeaderNotFoundException(
"Found nimbuses " + nimbuses + " none of which is elected as leader, please try " +
"again after some time.");
}
  1. 如果找不到leader,则抛出异常:NimbusLeaderNotFoundException

nimbus HA文档过期

nimbus HA文档地址
文档中主要部分基本符合目前的代码实现,但Nimbus state store一节基本与目前实现不一致。文档中写道,Nimbus之间的状态同步主要靠实现ICodeDistributor接口实现,并且有LocalFileSystemCodeDistributorHDFSCodeDistributor两个实现,默认是LocalFileSystemCodeDistributor。但目前代码(1.0.1)中,全面使用了BlobStore来实现nimbus以及supervisor之间的jar包同步,默认为LocalFsBlobStore,也有HdfsBlobStoreConfiguration一节中的相关配置项也要做相应更改,原有配置并没有删除,但是已经没有使用,除此之外nimbus.min.replication.count这个配置项也没有使用。

nimbus HA任务提交过程

  1. 前文提到,nimbusClient会检测哪个nimbus是leader,会自动连接Leader Nimbus。Nimbus的各个操作之前会调用is-leader检查自身是否是Leader,如果不是则抛出异常。
  2. nimbus接受到submit请求后,会检查各个nimbus之间的jar包的同步情况,调用函数:wait-for-desired-code-replication,等待代码在各个nimbus之间同步。topology.min.replication.count默认值为1,表示要等待多少个nimbus获取到副本之后才能开始调度任务。
  3. nimbus有后台进程同步代码,nimbus.code.sync.freq.secs表示同步频率,默认为60s。代码会最终同步到各个nimbus上。
  4. 任务发布的可能时长:

    • 如果 topology.min.replication.count =1 则立刻发布
    • 如果 topology.max.replication.wait.time.sec >=0 则等待最长topology.max.replication.wait.time.sec时间。
    • 如果 topology.max.replication.wait.time.sec <0,且topology.min.replication.count>1则最长等待 2*nimbus.code.sync.freq.secs(存疑)

nimbus选举及容错机制

  1. 启动时检查本地是否都有全部的jar信息,如果有,则调用addToLeaderLockQueue将nimbus加入到可以加入作为leader的列表中。
  2. 当nimbus 退出时,其他nimbus节点使用curator的LeaderLatch机制获取到leadeShip。
  3. nimbus进行每一步必要的操作时,都会检查是否有leadership。
  4. 图中和文档中提到,Leader只会分配给有所有代码的nimbus,但是在1.0.1的代码中,不会执行这一检查。没有所有代码的Nimbus也会获取到Leadership,同时removeFromLeaderLockQueue方法没有得到任何的调用。STORM-1977

none_leader节点启动后杀死topology问题

  1. Nimbus启动时,会调用cleanup-corrupt-topologies!函数,该函数会:

    1. 读取本地存在的jar包
    2. 读取zk上active的topology
    3. 对比两者,zk上存在但本地不存在的认为是corrupt-topology
    4. 杀死corrupt-topology
  2. 由于nimbusHA机制,新启动的nimbus肯定没有在leader节点上发布的topology,所有topology都会被杀死。
  3. 这是早期的遗留代码,1.0.2中已经完全移除这个函数。STORM-1976

其他

  1. 集群在进行nimbus选举期间(主要是故障nimbus重新上线过程中),会出现短时间不稳定现象,可能有发布或杀死任务失败的情况,过一会儿重试即可。
  2. 由于nimbusClient访问nimbus时,是从nimbus.seeds配置中,依次读取,当列表中前面有nimbus无法访问时,从VRC上操作会返回如下warning:该warning是正常现象,是访问不存在的nimbus时出现的超时现象,会自动选取下一个nimbus访问。但由于存在超时等待的问题,速度较慢,时间较长,为减少任务操作时间,建议保证nimbus.seeds中第一个节点可用。如nimbus.seeds:[“A”,”B”]中A节点长时间不可用,请调整配置为nimbus.seeds:[”B”, “A”]

Worker实例化Spout Bolt过程分析

以下代码基于storm1.0.1
首先我们知道,执行一个Topology时,Supervisor将从Nimbus上下载三个文件,分别是stormconf.ser,stormcoe.ser和stormjar.jar。其中stormcore.ser即是该topology的序列化文件。Worker会从supervisor目录下读取该文件。时序图:

同时,在get-task-object中还有shellBolt和JavaObject两种类型。这两种类型都是提供给NONE-JVM DML使用的。其中的JavaObject是Thrift对象,供其他语言调用原生的Java对象使用(如在python中调用Java编写的spout)。其本质与序列化的Java对象不同。