storm superviser FileNotFoundException异常退出原因分析
storm版本
1.0.2
故障日志
日志分析
分析nimbus日志可知,这是一次正常的分配行为。
正常流程
- Downloading code for storm id
- Successfully downloaded blob resources for storm-id
- Finished downloading code for storm id
- Launching worker with assignment
异常流程
- Downloading code for storm id
- Successfully downloaded blob resources for storm-id
- Finished downloading code for storm id
- Removing code for storm id
- Missing topology storm code, so can’t launch worker with assignment
可以明显看出,在正常流程中多了一个Removing code的过程。
源码分析
supervisor.clj代码中,有两个scheduler,分别周期性调用sync-processes和mk-synchronize-supervisor两个函数。
(schedule-recurring (:event-timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor)))
(schedule-recurring (:event-timer supervisor)
0
(conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
(fn [] (.add processes-event-manager sync-processes)))
其中:
mk-synchronize-supervisor
负责从nimbus下载代码,并写入分配信息到LocalState中
(doseq [[storm-id master-code-dir] storm-code-map]
(when (and (not (downloaded-storm-ids storm-id))
(assigned-storm-ids storm-id))
(log-message "Downloading code for storm id " storm-id)
(try-cause
(download-storm-code conf storm-id master-code-dir localizer)
(catch NimbusLeaderNotFoundException e
(log-warn-error e "Nimbus leader was not available."))
(catch TTransportException e
(log-warn-error e "There was a connection problem with nimbus.")))
(log-message "Finished downloading code for storm id " storm-id)))
(log-debug "Writing new assignment "
(pr-str new-assignment))
(doseq [p (set/difference (set (keys existing-assignment))
(set (keys new-assignment)))]
(.killedWorker isupervisor (int p)))
(.assigned isupervisor (keys new-assignment))
(ls-local-assignments! local-state
new-assignment)
(reset! (:assignment-versions supervisor) versions)
(reset! (:stormid->profiler-actions supervisor) storm-id->profiler-actions)
(reset! (:curr-assignment supervisor) new-assignment)
(.add processes-event-manager sync-processes))))
sync-processes
负责从LocalState中读取分配信息,检查是否需要关闭worker,是否需要移除code,并启动worker。
(doseq [[id [state heartbeat]] allocated]
(when (not= :valid state)
(log-message
"Shutting down and clearing state for id " id
". Current supervisor time: " now
". State: " state
", Heartbeat: " (pr-str heartbeat))
(shutdown-worker supervisor id)))
;;all-downloaded-storm-ids是从本地文件中读取的已经下载好的文件
(doseq [storm-id all-downloaded-storm-ids]
(when-not (assigned-storm-ids storm-id);;assigned-storm-ids从LocalState中判断
(log-message "Removing code for storm id "
storm-id)
(rm-topo-files conf storm-id localizer true)))
;;get-valid-new-worker-ids负责启动worker
(let [valid-new-worker-ids (get-valid-new-worker-ids conf supervisor reassign-executors new-worker-ids)]
(ls-approved-workers! local-state
(merge
(select-keys (ls-approved-workers local-state)
(keys keepers))
valid-new-worker-ids))
(wait-for-workers-launch conf (keys valid-new-worker-ids)))))
问题发生过程
- Nimbus正常分配任务
- Supervisor正常下载代码。
- 运行到上述代码
Finished downloading code for storm id
处。到目前为止工作正常。 - 此时scheduler触发
sync-processes
函数,读取了硬盘上下载好的topology列表。 - 由于第二步中,分配信息还没有写入到LocalState中。
(when-not (assigned-storm-ids storm-id)
判断为true。 - 移除下载好的代码。
- 继续正常启动worker过程,发现没有代码,启动失败。
- supervisor退出。
问题解决
- 由于该问题只会造成supervisor退出,且目前配置有自动拉起,可暂不解决。
- 如果要停止在1.0.2的版本,可针对如上问题解决,增加同步机制即可。
- Storm 1.0.3中完全重构了supervisor代码,目前暂时没有发现这一问题。可升级解决。