如何通过 Oozie 在 YARN 上msdb恢复不能运行作业 Spark 作业

查看: 90901|回复: 6
Spark 作业调度--job执行方式介绍
主题帖子积分
本帖最后由 hyj 于
19:01 编辑
问题导读:
1.由不同线程提交的多个“jobs”(Spark actions)是否可以同时运行
2.线程的含义什么?
3.Java main函数是进程还是线程?
4.应用程序启用Spark独立部署模式,job是否并发执行?
1.Spark是否并发分析
本文是针对spark作业执行方式是否并发解惑的一篇文章。spark有三种作业调度方式,那么他们的调度方式什么?这里在下文的基础上分析一下:
首先我们知道不同的用户,对于一个集群可能会同时不断的提交作业,那么这些job是怎么执行的,这里困惑了不少刚接触spark的同学。其实无论是那种部署方式,他们都是有可能并发执行的,也就是可能是同时执行的。
下面引用下文的内容:
由不同线程提交的多个“jobs”(Spark actions)可以同时运行
这里我产生了疑问,什么是线程,main()函数是进程还是线程。引用下面内容
main是一个线程也是一个进程,一个java程序启动后它就是一个进程,进程相当于一个空盒,它只提供资源装载的空间,具体的调度并不是由进程来完成的,而是由线程来完成的。一个java程序从main开始之后,进程启动,为整个程序提供各种资源,而此时将启动一个线程,这个线程就是主线程,它将调度资源,进行具体的操作。
对于不同用户提交的.jar是可以理解为一个线程的,因此从下文得知
由不同线程提交的多个“jobs”(Spark actions)可以同时运行
无论是那种部署方式,我们的spark程序都是可以并发执行的。详细可以看下文。
2.Spark 作业调度
Spark有几个在计算中调度资源的工具。
第一需要记得,正如集群模式概述中描述的那样,每个Spark应用中(SparkContext实例)都运行着一组独立的执行进程。Spark运行在的集群管理器提供了应用间调度的工具。
第二,在每个Spark应用中,由不同线程提交的多个“jobs”(Spark actions)可以同时运行。在处理网络请求的应用中这很常见,比如Shark服务器就以这种方式运行。Spark有一个调度均衡器在每个SparkContext中调度资源。
应用程序之间的调度
每个运行在集群上的Spark应用程序都能得到一个独立的JVM虚拟机,而JVM仅仅用于应用程序运行任务和存储数据。如果多用户需要共享你的集群,可以通过集群管理器配置不同的选项来分配资源。
在集群管理器中最简单有效的方式就是静态区分资源。使用此方法,每个应用程序在整个的生命周期中都可以得到一个最大数量的资源。这种方式被用于Spark的standalone和YARN模式中,同样也用于coarse-grained Mesos mode模式。根据集群的类型,可以通过下面的配置来分配资源。
1.Standalone mode:默认情况下,应用程序启用Spark独立部署模式,这种模式按照FIFO(先进先出)的顺序执行,每个应用程序都会尝试使用所有可用的节点。你可以通过spark设置来限制应用程序的节点数。例如:你可以启动一个有10核并长时间运行的服务器,并允许每个用户通过shells使用20核心。最后,除了控制核心外,每个应用程序的spark执行存储器可以控制自己的内存使用。
2.Mesos:在独立部署模式中,要在Mesos上使用静态分区,需要设置spark.mesos.coarse 系统属性为true,另外,可选项设置spark.cores.max可以限制每个应用程序的共享资源数。你也可以配置spark.executor.memory来控制执行器的内存。
3.YARN:num-workers选项用于在Spark YARN端分配集群上workers数量,尽管worker-memory和worker-cores可以控制每个worker的资源分配。
Mesos上的第二个可用选项是动态共享CPU内核。在这种模式下,每个Spark应用程序仍然分配有一个固定和独立的内存(通过spark.executor.memory来设置),当这个应用程序没有在机器上执行任务的时候,其他的应用程序就可能在这些内核上运行任务。当你期望大量但不是过度活跃应用程序的时候,这种模式是非常有用的,例如独立用户中的shell会话。然而,它却伴随着一个不可预知的潜在危险,这是因为当它需要执行任务的时候,在节点上需要耗费一段时间重新获得CPU核心资源。使用这种模式,不需要设置spark.mesos.coarse为true,只需要简单的使用amesos://URL。
请注意,所有的模式目前提供跨应用程序内存共享。如果你喜欢通过这种方式共享数据,我们推荐运行单一服务器的应用程序能够提供多个请求,可以通过查询相同的RDDs得到。例如,Shark JDBC服务器以这种方式进行SQL查询。在将来的版本中,内存中的存储系统,如 Tachyon将会提供另外的一种方式来共享RDDs。
应用中的调度
在给定的Spark应用(已实例化SparkContext)中,如果在不同线程中,多个并行的工作可以同时运行。我们所说的“工作”,其实是一个Spark动作(如保存,收集等)或是任何想需要评估动作的任务。Spark的任务调度员是多线程安全的,它也支持这个用例来使应用服务多个请求(多用户查询).
默认的,Spark调度员是按照FIFO(先进先出)的形式来运行工作的。每一个工作被分为多个“阶段”(如,map和reduce语句),对于所有可用的资源中第一个工作优先级最高,这个工作阶段中的任务会被启动,之后是第二个,依次类推。如果集群不需要队列头中的工作,后面的工作将被立刻启动,如果队列头的工作很大,后面的工作可能大大地推迟。
启动Spark0.8,它可以在两个作业之间配置共享调度。Spark负责在作业之间轮换分配调度,所以,所有的作业都能得到一个大致公平的共享的集群资源。这就意味着即使有一个很长的作业在运行,花费时间较少的作业在提交之后,仍然能够立即获得资源,并且能够有很好的响应,而不是需要等待那个很长的作业运行完之后才能运行。这种模式最适合多用户设置。
要启用公平作业调度,在创建一个SparkContext之前,需要简单的配置spark.scheduler.mode为FAIR:
System.setProperty(&spark.scheduler.mode&, &FAIR&) 复制代码
公平的调度池
公平调度可以支持在池中将工作分组,而且为不同的池可以设置不同的调度选项(如,权重)。这样可以很有用的为更多重要的工作创建一个“高优先级”池,举例,将每一个用户的工作一起分组,不管有多少并发工作也让每个用户平等的分享 ,以这种方式代替了平分给定的工作。这种方式是模仿Hadoop公平调度。
如果没有设置,新提交的工作将进入默认池中,但是工作池可以在线程中用spark.scheduler.pool来给SparkContent添加“本地属性”并提交。如下:
// 假设context是你SparkContext中的变量
context.setLocalProperty(&spark.scheduler.pool&, &pool1&)复制代码
在设置了本地属性之后,所有的在这个线程(在这个线程中调用 RDD.save,count,collect等)的工作提交将会用这个池来命名。这样同一个用户可以让每个线程容易的执行多个工作。如果你想要清除线程相关的池,简单调用如下:
context.setLocalProperty(&spark.scheduler.pool&, null) 复制代码
调度池中的默认行为
默认的,每个池都会平等的分享集群(在默认的池中每一个工作也是平等分享的),但在每一个池中,工作是按照FIFO(先进先出)顺序。比如,如果你给每一个用户创建一个池,这就意味着每一个用户都平等的分享一个集群,这样每一个查询都是按顺序查询的。
配置调度池
通过配置文件可以修改调度池的属性。每个调度池都支持3个属性。
schedulingMode:该属性的值可以是FIFO或者FAIR,用来控制作业在调度池中排队运行(默认情况下)或者公平分享调度池资源。
weight:控制调度池在集群之间的分配。默认情况下,所有调度池的weight值都是为1。例如:如果你指定了一个调度池的值为2,那么这个调度池就比其它调度池多获得2倍的资源。设置一个更高的weight值,例如1000,就可以实现线程池之间的优先权——实际上,weight值为1000的调度池无论什么时候作业被激活,它都总是能够最先运行。
minShare:除了一个整体的权重,如果管理员喜欢,可以给每个调度池指定一个最小的shares值(也就是CPU的核数目)。公平调度器通过权重重新分配资源之前总是试图满足所有活动调度池的最小share。在没有给定一个高优先级的其他集群中,minShare属性是另外的一种方式来确保调度池能够迅速的获得一定数量的资源(例如10核CPU),默认情况下,每个调度池的minShare值都为0。
可以通过XML文件来设置pool属性,和配置公平调度的xml模板文件一样,只需要设置spark.scheduler.allocation.file的属性:
System.setProperty(&spark.scheduler.allocation.file&, &/path/to/file&) 复制代码
对于每个池,XML文件的格式是一个简单的&pool&元素,可以在这个元素中设置各种不同元素。例如:
&?xml version=&1.0&?&
&allocations&
&&&pool name=&production&&
& & &schedulingMode&FAIR&/schedulingMode&
& & &weight&1&/weight&
& & &minShare&2&/minShare&
&&&/pool&
&&&pool name=&test&&
& & &schedulingMode&FIFO&/schedulingMode&
& & &weight&2&/weight&
& & &minShare&3&/minShare&
&&&/pool&
&/allocations&复制代码这个完整的例子也可以适用到对公平调度的xml模板文件配置。请注意,任何没有在xml文件中配置的池,都会有一个默认配置值(scheduling mode 值是FIFO,weight值为1,minShare值为0)。
欢迎加入about云群 、 ,云计算爱好者群,关注
主题帖子积分
我现在有点迷糊,请看如下代码:
[Scala] 纯文本查看 复制代码object JobScheduling {
def main(args: Array[String]) {
System.setProperty(&spark.scheduler.mode&, &FAIR&)
val conf = new SparkConf().setAppName(&BaseLineDemo&).setMaster(&local[*]&)
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint(&/tmp/checkpoint&)
// 通过TCP连接本机7788端口 获得输入流
val logs = ssc.receiverStream(new UdpReceiver(9514, &UTF-8&))
logs.foreachRDD(rdd =&{
rdd.foreach(event =& {
// action-1
println(s&---------------${event}&)
Thread.sleep(100)
logs.foreachRDD(rdd =&{
// action-2
rdd.foreach(event =& {
println(s&+++++++++++++++${event}&)
ssc.start()
ssc.awaitTermination()
action-1&&action-2&&应该就是两个job。那么这两个job是串行执行的 还是并行执行的?
& & 在每个Spark应用中,由不同线程提交的多个“jobs”(Spark actions)可以同时运行。
在spark中怎么使用多线程提交jobs? 使用 sparkContent.runJob 么? 能否给个例子??
主题帖子积分
高级会员, 积分 2649, 距离下一级还需 2351 积分
高级会员, 积分 2649, 距离下一级还需 2351 积分
我现在有点迷糊,请看如下代码:
[mw_shl_code=scala,true]object JobScheduling {
spark internal - 作业调度
主题帖子积分
我现在有点迷糊,请看如下代码:
[mw_shl_code=scala,true]object JobScheduling {
action-1,action-2的确是两个不同的job,当action触发时将会提交job,这两个job的运行将在调度模式下依次执行,即串先行执行。执行的顺序具体看你配置的job调度策略,默认为FIFO,也可以配置成公平调度策略,在公平调度策略中可配置job执行的权重,权重越大越优先执行。每一个job提交后将会划分stage,每个stage中会有多个任务,每个stage中的任务在worker节点在线程池中并行处理。
主题帖子积分
我现在有点迷糊,请看如下代码:
[mw_shl_code=scala,true]object JobScheduling {
在多个线程中可以提交job,但是提交的job到spark集群中也是遵守调度规则的,即每一个job也是顺序执行的。
主题帖子积分
我现在有点迷糊,请看如下代码:
[mw_shl_code=scala,true]object JobScheduling {
比如说你要在10个线程中提交Job,像下面一样
for(i &- 1 to 10){
&&val thread:Thread = new Thread(new Runnable {
& & override def run(): Unit = {
& && &logs.foreachRDD(rdd =&{
& && &&&rdd.foreach(event =& {
& && && & // action-1
& && && & println(s&---------------${event}&)
& && && & Thread.sleep(100)
& && &&&})
&&thread.start()
你可以把rdd的treasaction和actoin操作,包装一下,成为一个Runnable,在runnable中去写每一个线程要执行的作业,当actin触发的时候,每一个线程的actin操作都将触发job的提交,形成多个job的提交,提交的多个job会根据你的调度策略进行作业的调度。
主题帖子积分
中级会员, 积分 429, 距离下一级还需 571 积分
中级会员, 积分 429, 距离下一级还需 571 积分
Spark Streaming可以用公平调度策略么?
比如某个时间窗口的数据处理耗时长,先达到了,但是因为耗时长,后来的时间窗口数据耗时短,还的等之前的处理完,才会处理短的。怎么让短的也立即被处理?
积极上进,爱好学习
经常帮助其他会员答疑
站长推荐 /4
云计算hadoop视频大全(新增 yarn、flume|storm、hadoop一套视频
等待验证会员请验证邮箱
新手获取积分方法
技术类问答,解决学习openstack,hadoop生态系统中遇到的问题
Powered byRunning Spark job in Oozie using Yarn-cluster - Stack Overflow
Join Stack Overflow to learn, share knowledge, and build your career.
or sign in with
I had created a Spark Job using Oozie which configure run on yarn-cluster.
The Spark program is written in Scala and is a very simple program which just initialize SparkContext, println("hello world") and stop the SparkContext.
Below is the workflow.xml:
&workflow-app name="My_Workflow" xmlns="uri:oozie:workflow:0.5"&
&start to="spark-0177"/&
&kill name="Kill"&
&message&Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]&/message&
&action name="spark-0177"&
&spark xmlns="uri:oozie:spark-action:0.1"&
&job-tracker&${jobTracker}&/job-tracker&
&name-node&${nameNode}&/name-node&
&master&yarn-cluster&/master&
&mode&cluster&/mode&
&name&MySpark&/name&
&class&com.test1&/class&
&jar&/user/hue/oozie/workspaces/tl_test/lib/testOozie1.jar&/jar&
&spark-opts&--executor-cores 2
--driver-memory 5g --num-executors 2 --executor-memory 5g&/spark-opts&
&ok to="End"/&
&error to="Kill"/&
&end name="End"/&
&/workflow-app&
However, i was getting the following error:
Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.SparkMain], main() threw exception, Can not create a Path from an empty string
java.lang.IllegalArgumentException: Can not create a Path from an empty string
at org.apache.hadoop.fs.Path.checkPathArg(Path.java:127)
at org.apache.hadoop.fs.Path.&init&(Path.java:135)
at org.apache.hadoop.fs.Path.&init&(Path.java:94)
at org.apache.spark.deploy.yarn.Client.copyFileToRemote(Client.scala:191)
at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$3.apply(Client.scala:254)
at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$3.apply(Client.scala:248)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:248)
at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:384)
at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:102)
at org.apache.spark.deploy.yarn.Client.run(Client.scala:623)
at org.apache.spark.deploy.yarn.Client$.main(Client.scala:651)
at org.apache.spark.deploy.yarn.Client.main(Client.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
at org.apache.oozie.action.hadoop.SparkMain.runSpark(SparkMain.java:105)
at org.apache.oozie.action.hadoop.SparkMain.run(SparkMain.java:96)
at org.apache.oozie.action.hadoop.LauncherMain.run(LauncherMain.java:46)
at org.apache.oozie.action.hadoop.SparkMain.main(SparkMain.java:40)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.oozie.action.hadoop.LauncherMapper.map(LauncherMapper.java:228)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler.runSubtask(LocalContainerLauncher.java:370)
at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler.runTask(LocalContainerLauncher.java:295)
at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler.access$200(LocalContainerLauncher.java:181)
at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler$1.run(LocalContainerLauncher.java:224)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Oozie Launcher failed, finishing Hadoop job gracefully
Oozie Launcher, uploading action data to HDFS sequence file: hdfs://MYRNDSVRVM350:8020/user/oozie-oozi/-oozie-oozi-W/spark-156b--spark/action-data.seq
Oozie Launcher ends
Please help as i'm totally stuck already.
Can not create a Path from an empty string &- Spark is trying to stage your code, but your Oozie workflow doesn't provide the jar path, hence the empty path exception.
Providing the jar path in Oozie will fix this
Did you find this question interesting? Try our newsletter
Sign up for our newsletter and get our top new questions delivered to your inbox ().
Subscribed!
Success! Please click the link in the confirmation email to activate your subscription.
I meet the exactly same wired problem.
It turns out that multiple spaces in &spark-opts& cause this un-informtive error.
You have two spaces between --executor-cores 2
--driver-memory 5g.
4,58432143
Your Answer
Sign up or
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Post as a guest
By posting your answer, you agree to the
Not the answer you're looking for?
Browse other questions tagged
The week's top questions and answers
Important community announcements
Questions that need answers
By subscribing, you agree to the
Stack Overflow works best with JavaScript enabledSpark on YARN客户端模式作业运行全过程分析 – 过往记忆
我的图书馆
Spark on YARN客户端模式作业运行全过程分析 – 过往记忆
在前篇文章中我介绍了 on YARN集群模式(yarn-cluster)作业从提交到运行整个过程的情况(详情见),我们知道 on yarn有两种模式:yarn-cluster和yarn-client。这两种模式作业虽然都是在yarn上面运行,但是其中的运行方式很不一样,今天我就来谈谈Spark on YARN yarn-client模式作业从提交到运行的过程剖析。
  和yarn-cluster模式一样,整个程序也是通过spark-submit脚本提交的。但是yarn-client作业程序的运行不需要通过Client类来封装启动,而是直接通过反射机制调用作业的main函数。下面就来分析:
  1、通过SparkSubmit类的launch的函数直接调用作业的main函数(通过反射机制实现),如果是集群模式就会调用Client的main函数。
  2、而应用程序的main函数一定都有个SparkContent,并对其进行初始化;
  3、在SparkContent初始化中将会依次做如下的事情:设置相关的配置、注册MapOutputTracker、BlockManagerMaster、BlockManager,创建taskScheduler和dagScheduler;其中比较重要的是创建taskScheduler和dagScheduler。在创建taskScheduler的时候会根据我们传进来的master来选择Scheduler和SchedulerBackend。由于我们选择的是yarn-client模式,程序会选择ClientClusterScheduler和ClientSchedulerBackend,并将YarnClientSchedulerBackend的实例初始化YarnClientClusterScheduler,上面两个实例的获取都是通过反射机制实现的,YarnClientSchedulerBackend类是CoarseGrainedSchedulerBackend类的子类,YarnClientClusterScheduler是TaskSchedulerImpl的子类,仅仅重写了TaskSchedulerImpl中的getRackForHost方法。
  4、初始化完taskScheduler后,将创建dagScheduler,然后通过taskScheduler.start()启动taskScheduler,而在taskScheduler启动的过程中也会调用SchedulerBackend的start方法。在SchedulerBackend启动的过程中将会初始化一些参数,封装在ClientArguments中,并将封装好的ClientArguments传进Client类中,并client.runApp()方法获取Application ID。
  5、client.runApp里面的做是和前面客户端进行操作那节类似,不同的是在里面启动是ExecutorLauncher(yarn-cluster模式启动的是ApplicationMaster)。
  6、在ExecutorLauncher里面会初始化并启动amClient,然后向ApplicationMaster注册该Application。注册完之后将会等待driver的启动,当driver启动完之后,会创建一个MonitorActor对象用于和CoarseGrainedSchedulerBackend进行通信(只有事件AddWebUIFilter他们之间才通信,Task的运行状况不是通过它和CoarseGrainedSchedulerBackend通信的)。然后就是设置addAmIpFilter,当作业完成的时候,ExecutorLauncher将通过amClient设置Application的状态为FinalApplicationStatus.SUCCEEDED。
  7、分配Executors,这里面的分配逻辑和yarn-cluster里面类似,就不再说了。
  8、最后,Task将在CoarseGrainedExecutorBackend里面运行,然后运行状况会通过Akka通知CoarseGrainedScheduler,直到作业运行完成。
  9、在作业运行的时候,YarnClientSchedulerBackend会每隔1秒通过client获取到作业的运行状况,并打印出相应的运行信息,当Application的状态是FINISHED、FAILED和KILLED中的一种,那么程序将退出等待。
  10、最后有个线程会再次确认Application的状态,当Application的状态是FINISHED、FAILED和KILLED中的一种,程序就运行完成,并停止SparkContext。整个过程就结束了。
TA的最新馆藏
喜欢该文的人也喜欢}

我要回帖

更多关于 spark作业运行流程 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信