spark streaming是spark中用来处理流式数据的,用来对接各类消息队列是极好的。spark
streaming并不是真正实时的流式处理,它本质上还是批处理,只是每一个批次间隔的时间很短。
我是用java来写的。跟大佬们的scala不能比,没有scala简洁。。
先是maven需要依赖的spark-kafka包:
<dependency> <groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.1</version> </dependency> <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId> <version>2.3.1</version>
<scope>provided</scope> </dependency>
maven的打包组件:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compile.source>1.8</maven.compile.source>
<maven.compile.target>1.8</maven.compile.target> </properties> <build>
<plugins> <plugin> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId> <executions> <execution>
<phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration>
<transformers> <transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<manifestEntries> <Main-Class>org.xxx.TransferApp</Main-Class>
<X-Compile-Source-JDK>${maven.compile.source}</X-Compile-Source-JDK>
<X-Compile-Target-JDK>${maven.compile.target}</X-Compile-Target-JDK>
</manifestEntries> </transformer> </transformers> </configuration> </execution>
</executions> </plugin> </plugins> </build>
然后是java代码:
首先是spark的配置信息和kafka的配置信息:
SparkConf conf = new SparkConf(); // conf.setMaster("local[4]");
conf.setMaster("spark://192.168.1.100:7077"); conf.setAppName("transfer App");
conf.set("spark.streaming.stopGracefullyOnShutdown","true");
conf.set("spark.default.parallelism", "6"); SparkSession spark =
SparkSession.builder().config(conf).getOrCreate(); JavaSparkContext
javaSparkContext = new JavaSparkContext(spark.sparkContext());
JavaStreamingContext jssc = new JavaStreamingContext(javaSparkContext,
Durations.seconds(10)); Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "192.168.1.101:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "myGroup998"); kafkaParams.put("auto.offset.reset",
"latest"); kafkaParams.put("enable.auto.commit", true);
这里面conf.set("spark.streaming.stopGracefullyOnShutdown","true");是让streaming任务可以优雅的结束,当把它停止掉的时候,它会执行完当前正在执行的任务。
JavaStreamingContext jssc = new JavaStreamingContext(javaSparkContext,
Durations.seconds(10));这个是设置每一个批处理的时间,我这里是设置的10秒,通常可以1秒。。
然后创建主题列表:
Collection<String> topic0 = Arrays.asList("self-topic0"); Collection<String>
topic1 = Arrays.asList("self-topic1"); Collection<String> topic2 =
Arrays.asList("self-topic2"); Collection<String> topic3 =
Arrays.asList("self-topic3"); Collection<String> topic4 =
Arrays.asList("self-topic4"); Collection<String> topic5 =
Arrays.asList("self-topic5"); List<Collection<String>> topics =
Arrays.asList(topic0, topic1, topic2, topic3, topic4, topic5);
List<JavaDStream<ConsumerRecord<String, String>>> kafkaStreams = new
ArrayList<>(topics.size());
主题列表本来是可以这样的:
Collection<String> topic0 =
Arrays.asList("self-topic0","self-topic1","self-topic2");
如果是这样的话,这多个主题会在一个消费者中去接收。而我上面的写法可以让spark更好的并发去处理每一个主题。
最后,就是为每一个主题开一个stream去接收,收完了再把结果union起来。
for (int i = 0; i < topics.size(); i++) {
kafkaStreams.add(KafkaUtils.createDirectStream( jssc,
LocationStrategies.PreferConsistent(), ConsumerStrategies.<String,
String>Subscribe(topics.get(i), kafkaParams))); }
JavaDStream<ConsumerRecord<String, String>> stream =
jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
stream.foreachRDD((rdd)->{rdd.foreachPartition((crs)->patchTransfer(crs));});
jssc.start(); try { jssc.awaitTermination(); } catch (InterruptedException e) {
// TODO Auto-generated catch block e.printStackTrace(); }
最最后,是启动,和保持运行。
最最最后,用maven打包,命令是mvn package,然后像普通spark应用一样提交。
提交的时候用的是6066的rest接口,7077会报一个不影响运行的小错。jar包要放在所有节点都能拿到的地方,我没有hdfs也没有网络磁盘,就放http服务器里了。。
./spark-submit --deploy-mode cluster --master spark://master:6066 --class
xxx.xxx.TransferApp http://server/journalTransfer-1.0.jar
官方的说明文档:
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
<http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html>
未经允许,禁止转载 https://blog.csdn.net/redstarofsleep
<https://blog.csdn.net/redstarofsleep>
更多内容请关注公众号:
热门工具 换一换