java 连接spark集群群可以运行java程序吗

maven环境下使用java、scala混合开发spark应用
maven环境下使用java、scala混合开发spark应用:熟悉java的开发者在开发spark应用时,常常会遇到spark对java的接口文档不完善或者不提供对应的java接口的问题。
这个时候,如果在java项目中能直接使用scala来开发spark应用,同时使用java来处理项目中的其它需求,将在一定程度上降低开发spark项目的难度。下面就来探索一下java、scala、spark、maven这一套开发环境要怎样来搭建。
1、下载scala sdk
https://www.scala-lang.org/download/直接到这里下载sdk,目前最新的稳定版为2.11.7,下载后解压就行
(后面在intellijidea中创建.scala后缀源代码时,ide会智能感知并提示你设置scala sdk,按提示指定sdk目录为解压目录即可)
也可以手动配置scala SDK:ideal =&File =&project struct.. =&library..=& +...
2、下载scala forintellij idea的插件
如上图,直接在plugins里搜索Scala,然后安装即可,如果不具备上网环境,或网速不给力。也可以直接到/plugin/?idea&id=1347手动下载插件的zip包,手动下载时,要特别注意版本号,一定要跟本机的intellij idea的版本号匹配,否则下载后无法安装。下载完成后,在上图中,点击&Install plugin from disk...&,选择插件包的zip即可。
3、如何跟maven整合
使用maven对项目进行打包的话,需要在pom文件中配置scala-maven-plugin这个插件。同时,由于是spark开发,jar包需要打包为可执行java包,还需要在pom文件中配置maven-assembly-plugin和maven-shade-plugin插件并设置mainClass。经过实验摸索,下面贴出一个可用的pom文件,使用时只需要在包依赖上进行增减即可使用。
my-project-groupid
1.0-SNAPSHOT
https://maven.apache.org
repo1.maven.org
https://repo1.maven.org/maven2
repository.jboss.org
https://repository.jboss.org/nexus/content/groups/public/
cloudhopper
Repository for Cloudhopper
/repos/third-party/
Repository maven
Scala Tools
Scala Tools
org.scala-lang
scala-library
${scala.version}
org.scala-lang
scala-compiler
${scala.version}
javax.mail
javax.mail-api
org.apache.spark
spark-core_2.10
${spark.version}
org.apache.spark
spark-sql_2.10
${spark.version}
org.apache.spark
spark-streaming_2.10
${spark.version}
org.apache.spark
spark-mllib_2.10
${spark.version}
org.apache.spark
spark-hive_2.10
${spark.version}
org.apache.spark
spark-graphx_2.10
${spark.version}
mysql-connector-java
com.google.guava
org.apache.hadoop
hadoop-common
org.apache.hadoop
hadoop-client
org.apache.spark
spark-hive_2.10
${spark.version}
com.alibaba
commons-math3
com.google.guava
org.apache.hadoop
hadoop-common
org.apache.hadoop
hadoop-hdfs
redis.clients
org.apache.hbase
hbase-client
0.98.6-hadoop2
org.apache.hbase
0.98.6-hadoop2
org.apache.hbase
hbase-common
0.98.6-hadoop2
org.apache.hbase
hbase-server
0.98.6-hadoop2
org.testng
mysql-connector-java
com.fasterxml.jackson.jaxrs
jackson-jaxrs-json-provider
com.fasterxml.jackson.core
jackson-databind
net.sf.json-lib
javax.mail
javax.mail-api
maven-assembly-plugin
jar-with-dependencies
rrkd.dt.sparkTest.HelloWorld
make-assembly
org.apache.maven.plugins
maven-compiler-plugin
${jdk.version}
${jdk.version}
${project.build.sourceEncoding}
org.apache.maven.plugins
maven-shade-plugin
META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA
reference.conf
rrkd.dt.sparkTest.HelloWorld
net.alchim31.maven
scala-maven-plugin
compile-scala
add-source
test-compile-scala
test-compile
add-source
testCompile
${scala.version}
主要是build部分的配置,其它的毋须过多关注。
项目的目录结构,大体跟maven的默认约定一样,只是src下多了一个scala目录,主要还是为了便于组织java和scala源码,如下图:
在java目录下建立HelloWorld类HelloWorld.class:
import test.H
* Created by L on .
public class HelloWorld {
public static void main(String[] args){
System.out.print(&test&);
Hello.sayHello(&scala&);
Hello.runSpark();
在scala目录下建立hello类hello.scala:
package test
import org.apache.spark.graphx.{Graph, Edge, VertexId, GraphLoader}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import breeze.linalg.{Vector, DenseVector, squaredDistance}
* Created by L on .
object Hello {
def sayHello(x: String): Unit = {
println(&hello,& + x);
def main(args: Array[String]) {
def runSpark() {
val sparkConf = new SparkConf().setAppName(&SparkKMeans&).setMaster(&local[*]&)
val sc = new SparkContext(sparkConf)
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, (&rxin&, &student&)), (7L, (&jgonzal&, &postdoc&)),
(5L, (&franklin&, &prof&)), (2L, (&istoica&, &prof&)),
(4L, (&peter&, &student&))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, &collab&),
Edge(5L, 3L, &advisor&),
Edge(2L, 5L, &colleague&), Edge(5L, 7L, &pi&),
Edge(4L, 0L, &student&),
Edge(5L, 0L, &colleague&)))
// Define a default user in case there are relationship with missing user
val defaultUser = (&John Doe&, &Missing&)
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connected to users
// 4 (peter) and 5 (franklin).
graph.triplets.map(
triplet =& triplet.srcAttr._1 + & is the & + triplet.attr + & of & + triplet.dstAttr._1
).collect.foreach(println(_))
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) =& attr._2 != &Missing&)
// The valid subgraph will disconnect users 4 and 5 by removing user 0
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
triplet =& triplet.srcAttr._1 + & is the & + triplet.attr + & of & + triplet.dstAttr._1
).collect.foreach(println(_))
这样子,在scala项目中调用spark的接口来运行一些spark应用,在java项目中再调用scala。
4、scala项目maven的编译打包
java/scala混合的项目,怎么先编译scala再编译java,可以使用以下maven 命令来进行编译打包:
mvn clean scala:compile assembly:assembly
5、spark项目的jar包的运行问题
在开发时,我们可能会以local模式在IDEA中运行,然后使用了上面的命令进行打包。打包后的spark项目必须要放到spark集群下以spark-submit的方式提交运行。> 博客详情
&&&&用java写的一个简单的spark程序,通过本地运行和集群运行例子。
&&&&1&&&&在eclipse下建一个maven工程
&&&&配置pom.xml
配置文件参考下面:
&project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"&
&modelVersion&4.0.0&/modelVersion&
&groupId&cn.spark&/groupId&
&artifactId&SparkTest&/artifactId&
&version&0.0.1-SNAPSHOT&/version&
&packaging&jar&/packaging&
&name&SparkTest&/name&
&url&http://maven.apache.org&/url&
&properties&
&project.build.sourceEncoding&UTF-8&/project.build.sourceEncoding&
&/properties&
&dependencies&
&dependency&
&groupId&junit&/groupId&
&artifactId&junit&/artifactId&
&version&3.8.1&/version&
&scope&test&/scope&
&/dependency&
&dependency&
&groupId&org.apache.spark&/groupId&
&artifactId&spark-core_2.10&/artifactId&
&version&1.3.0&/version&
&/dependency&
&dependency&
&groupId&org.apache.spark&/groupId&
&artifactId&spark-sql_2.10&/artifactId&
&version&1.3.0&/version&
&/dependency&
&dependency&
&groupId&org.apache.spark&/groupId&
&artifactId&spark-hive_2.10&/artifactId&
&version&1.3.0&/version&
&/dependency&
&dependency&
&groupId&org.apache.spark&/groupId&
&artifactId&spark-streaming_2.10&/artifactId&
&version&1.3.0&/version&
&/dependency&
&dependency&
&groupId&org.apache.hadoop&/groupId&
&artifactId&hadoop-client&/artifactId&
&version&2.4.1&/version&
&/dependency&
&dependency&
&groupId&org.apache.spark&/groupId&
&artifactId&spark-streaming-kafka_2.10&/artifactId&
&version&1.3.0&/version&
&/dependency&
&/dependencies&
&sourceDirectory&src/main/java&/sourceDirectory&
&testSourceDirectory&src/main/test&/testSourceDirectory&
&artifactId&maven-assembly-plugin&/artifactId&
&configuration&
&descriptorRefs&
&descriptorRef&jar-with-dependencies&/descriptorRef&
&/descriptorRefs&
&manifest&
&mainClass&&/mainClass&
&/manifest&
&/archive&
&/configuration&
&executions&
&execution&
&id&make-assembly&/id&
&phase&package&/phase&
&goal&single&/goal&
&/execution&
&/executions&
&groupId&org.codehaus.mojo&/groupId&
&artifactId&exec-maven-plugin&/artifactId&
&version&1.2.1&/version&
&executions&
&execution&
&goal&exec&/goal&
&/execution&
&/executions&
&configuration&
&executable&java&/executable&
&includeProjectDependencies&true&/includeProjectDependencies&
&includePluginDependencies&false&/includePluginDependencies&
&classpathScope&compile&/classpathScope&
&mainClass&cn.spark.sparktest.App&/mainClass&
&/configuration&
&groupId&org.apache.maven.plugins&/groupId&
&artifactId&maven-compiler-plugin&/artifactId&
&configuration&
&source&1.6&/source&
&target&1.6&/target&
&/configuration&
&/plugins&
&/project&
配置好后eclipse会自动从远端资源库中进行下载
2&&&&编写spark程序
程序详细如下:
package org.spark.study.
import java.util.A
import org.apache.spark.SparkC
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkC
import org.apache.spark.api.java.function.FlatMapF
import org.apache.spark.api.java.function.F
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairF
import org.apache.spark.api.java.function.VoidF
import scala.Tuple2;
* 用java语言开发spark程序
* 第一个学习程序 wordcount
* @author 18521
public class wordCountLocal {
public static void main(String[] args) {
// TODO Auto-generated method stub
// 1 创建一个sparkconf 对象并配置
// 使用setMaster 可以设置spark集群可以链接集群的URL,如果设置local 代表在本地运行而不是在集群运行
SparkConf conf = new SparkConf()
.setAppName("wordCountLocal")
.setMaster("local");
// 2 创建javasparkContext对象
// sparkcontext 是一个入口,主要作用就是初始化spark应用程序所需的一些核心组件,例如调度器,task,
// 还会注册spark,sparkMaster结点上注册。反正就是spake应用中最重要的对象
JavaSparkContext sc = new JavaSparkContext(conf);
// 3 对输入源创建一个出事RDD
// 元素就是输入源文件中的一行
JavaRDD&String& lines = sc.textFile("D://worksoft//testdata//spark.txt");
// 4 把输入源拆分成一个一个的单词
// 引用一个RDD 都会创建一个function 类(比较简单的话就是一个匿名内部类)
// FlatMapFunction 有连个参数输入和输出
JavaRDD&String& words = lines.flatMap(new FlatMapFunction&String, String&() {
private static final long serialVersionUID = 1L;
public Iterable&String& call(String arg0) throws Exception {
// TODO Auto-generated method stub
return Arrays.asList(arg0.split(" "));
// 5 需要将每一个单词映射为(单词,1) 后面才可以更具单词key 对后面value 1 进行累加从而达到计数的功能
JavaPairRDD&String, Integer& parirs = words.mapToPair(new PairFunction&String, String, Integer&() {
* 每一个单词都映射成(单词,1)
private static final long serialVersionUID = 1L;
public Tuple2&String, Integer& call(String arg0) throws Exception {
// TODO Auto-generated method stub
return new Tuple2&String, Integer&(arg0, 1);
// 6 以单词做为key 统计单词出现的次数,用reducebykey 算子,对每一个key对于的value进行操作
JavaPairRDD&String,Integer& wordcount = parirs.reduceByKey(new Function2&Integer, Integer, Integer&() {
public Integer call(Integer arg0, Integer arg1) throws Exception {
// TODO Auto-generated method stub
return arg0+arg1;
// 7 已经通过spark 的几个算子 flatMap,mapToPair,reduceByKey 已经统计出每一个结点中的单词出现的次数
// 这中操作叫做transformation,但是在一开始的RDD是把文件拆分打散到不同的结点中的,所以后面还需要操作action 进行集合
// 9 action 操作通过foreach 来遍历所有最后一个RDD生成的元素
wordcount.foreach(new VoidFunction&Tuple2&String,Integer&&() {
public void call(Tuple2&String, Integer& arg0) throws Exception {
// TODO Auto-generated method stub
System.out.println(arg0._1+" 出现了:"+arg0._2+"次");
sc.close();
3&&&&本地测试
4&&&&集群运行
&&&&4.1 spark程序修改
&&&&4.2 测试文件上传到hdfs
[root@spark1 opt]# hadoop fs -put spark.txt /spark.txt
[root@spark1 opt]# hadoop fs -ls /
17/05/27 11:51:29 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r--
3 root supergroup
17-05-27 10:32 /spark.txt
drwxr-xr-x
- root supergroup
15:40 /user
&&&&4.3 程序打包
&&&&4.4 上传打包程序并写启动脚本
编写启动脚本
[root@spark1 java]# cat wordcount.sh
/opt/spark/bin/spark-submit \
# 用这个命令启动
--class org.spark.study.core.wordCountSpark \
# 配置类名
--num-executors 3 \
# 配置在三个结点上运行
--driver-memory 100m \
# drive内存
--executor-memory 100m \
# 配置execute内存
--executor-cores 3 \
# 内核运行单元数
/opt/spark-study/java/study-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
# 运行的jar包
&&&&4.5 运行启动脚本进行测试
[root@spark1 java]# ./wordcount.sh && spark.log
[root@spark1 java]# cat spark.log
integration 出现了:89100次
Hadoop??s 出现了:89100次
general 出现了:89100次
have 出现了:267300次
Million 出现了:89100次
here 出现了:89100次
big 出现了:89100次
stack. 出现了:89100次
modification 出现了:89100次
meili 出现了:267300次
conference. 出现了:89100次
we 出现了:178200次
requiring 出现了:89100次
conv 出现了:297次
simple 出现了:89100次
This 出现了:89100次
Joel 出现了:89118次
send 出现了:89118次
(HDFS) 出现了:89100次
without 出现了:178200次
支付宝支付
微信扫码支付
打赏金额: ¥
已支付成功
打赏金额: ¥sponsored links
使用Java编写并运行Spark应用程序
本文转载自:/archives/742.html
我们首先提出这样一个简单的需求:现在要分析某网站的访问日志信息,统计来自不同IP的用户访问的次数,从而通过Geo信息来获得来访用户所在国家地区分布状况。这里我拿我网站的日志记录行示例,如下所示:
121.205.198.92 - - [21/Feb/:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/ Firefox/11.0"
121.205.198.92 - - [21/Feb/:11 +0800] "POST /wp-comments-post.php HTTP/1.1" 302 26 "/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/ Firefox/23.0"
121.205.198.92 - - [21/Feb/:12 +0800] "GET /archives/417.html/ HTTP/1.1" 301 26 "/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/ Firefox/11.0"
121.205.198.92 - - [21/Feb/:12 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "/archives/417.html" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/ Firefox/11.0"
121.205.241.229 - - [21/Feb/:13 +0800] "GET /archives/526.html HTTP/1.1" 200 12080 "/archives/526.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/ Firefox/11.0"
121.205.241.229 - - [21/Feb/:15 +0800] "POST /wp-comments-post.php HTTP/1.1" 302 26 "/archives/526.html/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/ Firefox/23.0"
Java实现Spark应用程序(Application)
我们实现的统计分析程序,有如下几个功能点:
从HDFS读取日志数据文件
将每行的第一个字段(IP地址)抽取出来
统计每个IP地址出现的次数
根据每个IP地址出现的次数进行一个降序排序
根据IP地址,调用GeoIP库获取IP所属国家
打印输出结果,每行的格式:[国家代码] IP地址 频率
下面,看我们使用Java实现的统计分析应用程序代码,如下所示:
1 package org.shirdrn.spark.
3 import java.io.F
4 import java.io.IOE
5 import java.util.A
6 import java.util.C
8 import java.util.L
9 import java.util.regex.P
11 import mons.logging.L
12 import mons.logging.LogF
13 import org.apache.spark.api.java.JavaPairRDD;
14 import org.apache.spark.api.java.JavaRDD;
15 import org.apache.spark.api.java.JavaSparkC
16 import org.apache.spark.api.java.function.FlatMapF
17 import org.apache.spark.api.java.function.Function2;
18 import org.apache.spark.api.java.function.PairF
19 import org.shirdrn.spark.job.maxmind.C
20 import org.shirdrn.spark.job.maxmind.LookupS
22 import scala.S
23 import scala.Tuple2;
25 public class IPAddressStats implements Serializable {
private static final long serialVersionUID = 5413763L;
private static final Log LOG = LogFactory.getLog(IPAddressStats.class);
private static final Pattern SPACE = pile(" ");
private transient LookupService lookupS
private transient final String geoIPF
public IPAddressStats(String geoIPFile) {
this.geoIPFile = geoIPF
// lookupService: get country code from a IP address
File file = new File(this.geoIPFile);
("GeoIP file: " + file.getAbsolutePath());
lookupService = new AdvancedLookupService(file, LookupService.GEOIP_MEMORY_CACHE);
} catch (IOException e) {
throw new RuntimeException(e);
@SuppressWarnings("serial")
public void stat(String[] args) {
JavaSparkContext ctx = new JavaSparkContext(args[0], "IPAddressStats",
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(IPAddressStats.class));
JavaRDD&String& lines = ctx.textFile(args[1], 1);
// splits and extracts ip address filed
JavaRDD&String& words = lines.flatMap(new FlatMapFunction&String, String&() {
public Iterable&String& call(String s) {
// 121.205.198.92 - - [21/Feb/:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/ Firefox/11.0"
// ip address
return Arrays.asList(SPACE.split(s)[0]);
JavaPairRDD&String, Integer& ones = words.map(new PairFunction&String, String, Integer&() {
public Tuple2&String, Integer& call(String s) {
return new Tuple2&String, Integer&(s, 1);
JavaPairRDD&String, Integer& counts = ones.reduceByKey(new Function2&Integer, Integer, Integer&() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
List&Tuple2&String, Integer&& output = counts.collect();
// sort statistics result by value
Collections.sort(output, new Comparator&Tuple2&String, Integer&&() {
public int compare(Tuple2&String, Integer& t1, Tuple2&String, Integer& t2) {
if(t1._2 & t2._2) {
} else if(t1._2 & t2._2) {
return -1;
writeTo(args, output);
private void writeTo(String[] args, List&Tuple2&String, Integer&& output) {
for (Tuple2&?, ?& tuple : output) {
Country country = lookupService.getCountry((String) tuple._1);
("[" + country.getCode() + "] " + tuple._1 + "\t" + tuple._2);
public static void main(String[] args) {
// ./bin/run-my-java-example org.shirdrn.spark.job.IPAddressStats spark://m1:7077 hdfs://m1:9/user/shirdrn/wwwlog20140.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat
if (args.length & 3) {
System.err.println("Usage: IPAddressStats &master& &inFile& &GeoIPFile&");
System.err.println("
Example: org.shirdrn.spark.job.IPAddressStats spark://m1:7077 hdfs://m1:9/user/shirdrn/wwwlog20140.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat");
System.exit(1);
String geoIPFile = args[2];
IPAddressStats stats = new IPAddressStats(geoIPFile);
stats.stat(args);
System.exit(0);
具体实现逻辑,可以参考代码中的注释。我们使用Maven管理构建Java程序,首先看一下我的pom配置中所依赖的软件包,如下所示:
&dependencies&
&dependency&
&groupId&org.apache.spark&/groupId&
&artifactId&spark-core_2.10&/artifactId&
&version&0.9.0-incubating&/version&
&/dependency&
&dependency&
&groupId&log4j&/groupId&
&artifactId&log4j&/artifactId&
&version&1.2.16&/version&
&/dependency&
&dependency&
&groupId&dnsjava&/groupId&
&artifactId&dnsjava&/artifactId&
&version&2.1.1&/version&
&/dependency&
&dependency&
&groupId&commons-net&/groupId&
&artifactId&commons-net&/artifactId&
&version&3.1&/version&
&/dependency&
&dependency&
&groupId&org.apache.hadoop&/groupId&
&artifactId&hadoop-client&/artifactId&
&version&1.2.1&/version&
&/dependency&
&/dependencies&
需要说明的是,当我们将程序在Spark集群上运行时,它要求我们的编写的Job能够进行序列化,如果某些字段不需要序列化或者无法序列化,可以直接使用transient修饰即可,如上面的属性lookupService没有实现序列化接口,使用transient使其不执行序列化,否则的话,可能会出现类似如下的错误:
14/03/10 22:34:06 INFO scheduler.DAGScheduler: Failed to run collect at IPAddressStats.java:76
Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.shirdrn.spark.job.IPAddressStats
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:741)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:740)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:740)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
在Spark集群上运行Java程序
这里,我使用了Maven管理构建Java程序,实现上述代码以后,使用Maven的maven-assembly-plugin插件,配置内容如下所示:
&artifactId&maven-assembly-plugin&/artifactId&
&configuration&
&manifest&
&mainClass&org.shirdrn.spark.job.UserAgentStats&/mainClass&
&/manifest&
&/archive&
&descriptorRefs&
&descriptorRef&jar-with-dependencies&/descriptorRef&
&/descriptorRefs&
&excludes&
&exclude&*.properties&/exclude&
&exclude&*.xml&/exclude&
&/excludes&
&/configuration&
&executions&
&execution&
&id&make-assembly&/id&
&phase&package&/phase&
&goal&single&/goal&
&/execution&
&/executions&
将相关依赖库文件都打进程序包里面,最后拷贝JAR文件到Linux系统下(不一定非要在Spark集群的Master节点上),保证该节点上Spark的环境变量配置正确即可看。Spark软件发行包解压缩后,可以看到脚本bin/run-example,我们可以直接修改该脚本,将对应的路径指向我们实现的Java程序包(修改变量EXAMPLES_DIR以及我们的JAR文件存放位置相关的内容),使用该脚本就可以运行,脚本内容如下所示:
1 cygwin=false
2 case "`uname`" in
CYGWIN*) cygwin=true;;
6 SCALA_VERSION=2.10
8 # Figure out where the Scala framework is installed
9 FWDIR="$(cd `dirname $0`/..; pwd)"
11 # Export this as SPARK_HOME
12 export SPARK_HOME="$FWDIR"
14 # Load environment variables from conf/spark-env.sh, if it exists
15 if [ -e "$FWDIR/conf/spark-env.sh" ] ; then
. $FWDIR/conf/spark-env.sh
19 if [ -z "$1" ]; then
echo "Usage: run-example &example-class& [&args&]" &&2
24 # Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
25 # to avoid the -sources and -doc packages that are built by publish-local.
26 EXAMPLES_DIR="$FWDIR"/java-examples
27 SPARK_EXAMPLES_JAR=""
28 if [ -e "$EXAMPLES_DIR"/*.jar ]; then
export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR"/*.jar`
31 if [[ -z $SPARK_EXAMPLES_JAR ]]; then
echo "Failed to find Spark examples assembly in $FWDIR/examples/target" &&2
echo "You need to build Spark with sbt/sbt assembly before running this program" &&2
38 # Since the examples JAR ideally shouldn't include spark-core (that dependency should be
39 # "provided"), also add our standard Spark classpath, built using compute-classpath.sh.
40 CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
41 CLASSPATH="$SPARK_EXAMPLES_JAR:$CLASSPATH"
43 if $ then
CLASSPATH=`cygpath -wp $CLASSPATH`
export SPARK_EXAMPLES_JAR=`cygpath -w $SPARK_EXAMPLES_JAR`
48 # Find java binary
49 if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
if [ `command -v java` ]; then
RUNNER="java"
echo "JAVA_HOME is not set" &&2
60 # Set JAVA_OPTS to be able to load native libraries and to set heap size
61 JAVA_OPTS="$SPARK_JAVA_OPTS"
62 JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
63 # Load extra JAVA_OPTS from conf/java-opts, if it exists
64 if [ -e "$FWDIR/conf/java-opts" ] ; then
JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`"
67 export JAVA_OPTS
69 if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
echo -n "Spark Command: "
echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
echo "========================================"
76 exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
在Spark上运行我们开发的Java程序,执行如下命令:
cd /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1
./bin/run-my-java-example org.shirdrn.spark.job.IPAddressStats spark://m1:7077 hdfs://m1:9/user/shirdrn/wwwlog20140.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat
我实现的程序类org.shirdrn.spark.job.IPAddressStats运行需要3个参数:
Spark集群主节点URL:例如我的是spark://m1:7077
输入文件路径:业务相关的,我这里是从HDFS上读取文件hdfs://m1:9/user/shirdrn/wwwlog20140.log
GeoIP库文件:业务相关的,用来计算IP地址所属国家的外部文件
如果程序没有错误,能够正常运行,控制台输出程序运行日志,示例如下所示:
14/03/10 22:17:24 INFO job.IPAddressStats: GeoIP file: /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
14/03/10 22:17:25 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/03/10 22:17:25 INFO Remoting: Starting remoting
14/03/10 22:17:25 INFO Remoting: R listening on addresses :[akka.tcp://spark@m1:57379]
14/03/10 22:17:25 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@m1:57379]
14/03/10 22:17:25 INFO spark.SparkEnv: Registering BlockManagerMaster
14/03/10 22:17:25 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-25-c1cb
14/03/10 22:17:25 INFO storage.MemoryStore: MemoryStore started with capacity 143.8 MB.
14/03/10 22:17:25 INFO network.ConnectionManager: Bound socket to port 45189 with id = ConnectionManagerId(m1,45189)
14/03/10 22:17:25 INFO storage.BlockManagerMaster: Trying to register BlockManager
14/03/10 22:17:25 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager m1:45189 with 143.8 MB RAM
14/03/10 22:17:25 INFO storage.BlockManagerMaster: Registered BlockManager
14/03/10 22:17:25 INFO spark.HttpServer: Starting HTTP Server
14/03/10 22:17:25 INFO server.Server: jetty-7.x.y-SNAPSHOT
14/03/10 22:17:25 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:49186
14/03/10 22:17:25 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.95.3.56:49186
14/03/10 22:17:25 INFO spark.SparkEnv: Registering MapOutputTracker
14/03/10 22:17:25 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-56c3e30d-a01b-4752-83d1-af
14/03/10 22:17:25 INFO spark.HttpServer: Starting HTTP Server
14/03/10 22:17:25 INFO server.Server: jetty-7.x.y-SNAPSHOT
14/03/10 22:17:25 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:52073
14/03/10 22:17:26 INFO server.Server: jetty-7.x.y-SNAPSHOT
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage/rdd,null}
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage,null}
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/stage,null}
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/pool,null}
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages,null}
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/environment,null}
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/executors,null}
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null}
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null}
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/,null}
14/03/10 22:17:26 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
14/03/10 22:17:26 INFO ui.SparkUI: Started Spark Web UI at http://m1:4040
14/03/10 22:17:26 INFO spark.SparkContext: Added JAR /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar at http://10.95.3.56:52073/jars/spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar with timestamp 6
14/03/10 22:17:26 INFO client.AppClient$ClientActor: Connecting to master spark://m1:7077...
14/03/10 22:17:26 INFO storage.MemoryStore: ensureFreeSpace(60341) called with curMem=0, maxMem=
14/03/10 22:17:26 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 58.9 KB, free 143.8 MB)
14/03/10 22:17:26 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-26-0
14/03/10 22:17:27 INFO client.AppClient$ClientActor: Executor added: app-26-0/0 on worker-48-s1-52544 (s1:52544) with 1 cores
14/03/10 22:17:27 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-26-0/0 on hostPort s1:52544 with 1 cores, 512.0 MB RAM
14/03/10 22:17:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/03/10 22:17:27 WARN snappy.LoadSnappy: Snappy native library not loaded
14/03/10 22:17:27 INFO client.AppClient$ClientActor: Executor updated: app-26-0/0 is now RUNNING
14/03/10 22:17:27 INFO mapred.FileInputFormat: Total input paths to process : 1
14/03/10 22:17:27 INFO spark.SparkContext: Starting job: collect at IPAddressStats.java:77
14/03/10 22:17:27 INFO scheduler.DAGScheduler: Registering RDD 4 (reduceByKey at IPAddressStats.java:70)
14/03/10 22:17:27 INFO scheduler.DAGScheduler: Got job 0 (collect at IPAddressStats.java:77) with 1 output partitions (allowLocal=false)
14/03/10 22:17:27 INFO scheduler.DAGScheduler: Final stage: Stage 0 (collect at IPAddressStats.java:77)
14/03/10 22:17:27 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 1)
14/03/10 22:17:27 INFO scheduler.DAGScheduler: Missing parents: List(Stage 1)
14/03/10 22:17:27 INFO scheduler.DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[4] at reduceByKey at IPAddressStats.java:70), which has no missing parents
14/03/10 22:17:27 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 1 (MapPartitionsRDD[4] at reduceByKey at IPAddressStats.java:70)
14/03/10 22:17:27 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
14/03/10 22:17:28 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@s1:59233/user/Executor#-] with ID 0
14/03/10 22:17:28 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 0 on executor 0: s1 (PROCESS_LOCAL)
14/03/10 22:17:28 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 2396 bytes in 5 ms
14/03/10 22:17:29 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager s1:47282 with 297.0 MB RAM
14/03/10 22:17:32 INFO scheduler.TaskSetManager: Finished TID 0 in 3376 ms on s1 (progress: 0/1)
14/03/10 22:17:32 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1, 0)
14/03/10 22:17:32 INFO scheduler.DAGScheduler: Stage 1 (reduceByKey at IPAddressStats.java:70) finished in 4.420 s
14/03/10 22:17:32 INFO scheduler.DAGScheduler: looking for newly runnable stages
14/03/10 22:17:32 INFO scheduler.DAGScheduler: running: Set()
14/03/10 22:17:32 INFO scheduler.DAGScheduler: waiting: Set(Stage 0)
14/03/10 22:17:32 INFO scheduler.DAGScheduler: failed: Set()
14/03/10 22:17:32 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 1.0 from pool
14/03/10 22:17:32 INFO scheduler.DAGScheduler: Missing parents for Stage 0: List()
14/03/10 22:17:32 INFO scheduler.DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[6] at reduceByKey at IPAddressStats.java:70), which is now runnable
14/03/10 22:17:32 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[6] at reduceByKey at IPAddressStats.java:70)
14/03/10 22:17:32 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
14/03/10 22:17:32 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 1 on executor 0: s1 (PROCESS_LOCAL)
14/03/10 22:17:32 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 2255 bytes in 1 ms
14/03/10 22:17:32 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to spark@s1:33534
14/03/10 22:17:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 120 bytes
14/03/10 22:17:32 INFO scheduler.TaskSetManager: Finished TID 1 in 282 ms on s1 (progress: 0/1)
14/03/10 22:17:32 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)
14/03/10 22:17:32 INFO scheduler.DAGScheduler: Stage 0 (collect at IPAddressStats.java:77) finished in 0.314 s
14/03/10 22:17:32 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 0.0 from pool
14/03/10 22:17:32 INFO spark.SparkContext: Job finished: collect at IPAddressStats.java:77, took 4. s
14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 58.246.49.218
14/03/10 22:17:32 INFO job.IPAddressStats: [KR] 1.234.83.77
14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.43.11.16
14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 110.85.72.254
14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 27.150.229.134
14/03/10 22:17:32 INFO job.IPAddressStats: [HK] 180.178.52.181
14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.37.210.212
14/03/10 22:17:32 INFO job.IPAddressStats: [CN] .77.226.83
14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.43.11.205
14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.43.9.19
我们也可以通过Web控制台来查看当前执行应用程序(Application)的状态信息,通过Master节点的8080端口(如:http://m1:8080/)就能看到集群的应用程序(Application)状态信息。另外,需要说明的时候,如果在Unix环境下使用Eclipse使用Java开发Spark应用程序,也能够直接通过Eclipse连接Spark集群,并提交开发的应用程序,然后交给集群去处理。
初学java,编写了一个计算24点的程序,时间有限,有些粗糙,不过可以使用. //-------------Cal24.java--------------- //计算24点程序 //作者:徒步天下(hiker2008.) // //运行格式: java Cal24 abcd //abcd为四个1-9数字 // //输出为结果等于 ...
1.Java版本简单的手绘程序,点击右键可以选择颜色,对初学Java的同学还是很有帮助!学会做这个,下一步就可以做出一个简单的Windows版本的画图! import java.awt.C import java.awt.C import java.awt.D import java. ...
由于存在一部分在windows上编码linux程序的程序员, 他们多数使用Editplus/SouceInsight等工具编辑远程机器上的文件, 编辑完后切换到终端去编译,发现编译错误后再切换回来编辑,很不方便, 而相比之下使用VS的程序员(尤其是使用了VisualAssist)就要方便的多 因此尝试了使用VC+VA编写linux下的程序 目前已经能做到如下 ...
/ * @version 1.0 * @author Devil_Angel * 该程序的功能为实现模拟银行ATM自动取款机提款,查询等功能. *
*/ import java.io.*; /*该类为实现客户信息及部分功能*/class Account { private String code
//信用卡号 private S ...
1.数据的连接代码 import java.sql.*; public class Conn { public static Connection connection(){ Connection conn= String url=&jdbc:mysql://192.168.21.52:3306/spide ...}

我要回帖

更多关于 spark集群运行jar 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信