storm superviser FileNotFoundException异常退出原因分析

storm superviser FileNotFoundException异常退出原因分析

storm版本

1.0.2

故障日志

日志分析

分析nimbus日志可知,这是一次正常的分配行为。
正常流程

  1. Downloading code for storm id
  2. Successfully downloaded blob resources for storm-id
  3. Finished downloading code for storm id
  4. Launching worker with assignment

异常流程

  1. Downloading code for storm id
  2. Successfully downloaded blob resources for storm-id
  3. Finished downloading code for storm id
  4. Removing code for storm id
  5. Missing topology storm code, so can’t launch worker with assignment

可以明显看出,在正常流程中多了一个Removing code的过程。

源码分析

supervisor.clj代码中,有两个scheduler,分别周期性调用sync-processesmk-synchronize-supervisor两个函数。

  (schedule-recurring (:event-timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor)))
(schedule-recurring (:event-timer supervisor)
0
(conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
(fn [] (.add processes-event-manager sync-processes)))

其中:

mk-synchronize-supervisor

负责从nimbus下载代码,并写入分配信息到LocalState中

  (doseq [[storm-id master-code-dir] storm-code-map]
(when (and (not (downloaded-storm-ids storm-id))
(assigned-storm-ids storm-id))

(log-message "Downloading code for storm id " storm-id)
(try-cause
(download-storm-code conf storm-id master-code-dir localizer)

(catch NimbusLeaderNotFoundException e
(log-warn-error e "Nimbus leader was not available."))

(catch TTransportException e
(log-warn-error e "There was a connection problem with nimbus."))
)

(log-message "Finished downloading code for storm id " storm-id))
)


(log-debug "Writing new assignment "
(pr-str new-assignment))

(doseq [p (set/difference (set (keys existing-assignment))
(set (keys new-assignment)))
]

(.killedWorker isupervisor (int p)))

(.assigned isupervisor (keys new-assignment))
(ls-local-assignments! local-state
new-assignment)

(reset! (:assignment-versions supervisor) versions)
(reset! (:stormid->profiler-actions supervisor) storm-id->profiler-actions)
(reset! (:curr-assignment supervisor) new-assignment)

(.add processes-event-manager sync-processes))))

sync-processes

负责从LocalState中读取分配信息,检查是否需要关闭worker,是否需要移除code,并启动worker。

(doseq [[id [state heartbeat]] allocated]
(when (not= :valid state)
(log-message
"Shutting down and clearing state for id " id
". Current supervisor time: " now
". State: " state
", Heartbeat: " (pr-str heartbeat))

(shutdown-worker supervisor id))
)

;;all-downloaded-storm-ids是从本地文件中读取的已经下载好的文件
(doseq [storm-id all-downloaded-storm-ids]
(when-not (assigned-storm-ids storm-id);;assigned-storm-ids从LocalState中判断
(log-message "Removing code for storm id "
storm-id)

(rm-topo-files conf storm-id localizer true))
)

;;get-valid-new-worker-ids负责启动worker
(let [valid-new-worker-ids (get-valid-new-worker-ids conf supervisor reassign-executors new-worker-ids)]
(ls-approved-workers! local-state
(merge
(select-keys (ls-approved-workers local-state)
(keys keepers))

valid-new-worker-ids)
)

(wait-for-workers-launch conf (keys valid-new-worker-ids)))
))

问题发生过程

  1. Nimbus正常分配任务
  2. Supervisor正常下载代码。
  3. 运行到上述代码Finished downloading code for storm id处。到目前为止工作正常。
  4. 此时scheduler触发sync-processes函数,读取了硬盘上下载好的topology列表。
  5. 由于第二步中,分配信息还没有写入到LocalState中。(when-not (assigned-storm-ids storm-id)判断为true。
  6. 移除下载好的代码。
  7. 继续正常启动worker过程,发现没有代码,启动失败。
  8. supervisor退出。

问题解决

  • 由于该问题只会造成supervisor退出,且目前配置有自动拉起,可暂不解决。
  • 如果要停止在1.0.2的版本,可针对如上问题解决,增加同步机制即可。
  • Storm 1.0.3中完全重构了supervisor代码,目前暂时没有发现这一问题。可升级解决。

发表评论

电子邮件地址不会被公开。 必填项已用*标注