最近在做实时流处理的一个项目,遇到N多问题,经过不断的调试,终于有点进展,记录一下,防止后人遇到同样的问题.

特别注意自己代码里面的版本一定要和集群上面的保持一致

1,sparkstreaming消费kafka有两种方法,这里我就不介绍了,网上关于这方面的资料很多,我就简单说一下两者的区别吧,

(1)
基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的.

(2)基于direct的方式,使用kafka的简单api,Spark
Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次,在实际生产环境中大都用Direct方式.


特别说明一下:kafka0.9之前,offest都是保存在zk中,有zk来维护的,0.9之后kafka的offest保存在kafka的 __consumer_offsets
这个topic中了.

2,手动维护kafka的offest.

(1)
,为了实现exactly-once的语义,我采用自己保存offest的方法,offest可以保存在zk,kafka,mysql,hbase,redis中自己根据情况而定,我选择把offest保存到redis中.创建Dstream之前,先判断是否消费过,如果没有消费就从头开始,如果已经消费过了,就从上次保存的offest处开始消费,从严格意义上说,我这个方法也保证不了exactly-once,废话不多说,直接上代码.(因代码有点多,就只贴了重要的部分)

(2),spark版本2.2.0,scala版本2.11.8,kafka版本0.10.1,hbase版本1.1.2.

代码如下:
package test import java.util import kafka.{PropertiesScalaUtils,
RedisKeysListUtils} import kafka.SparkStreamingKafka.{dbIndex, kafkaStreams}
import org.apache.kafka.common.TopicPartition import
org.apache.kafka.common.serialization.StringDeserializer import
org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import
org.apache.spark.streaming.{Seconds, StreamingContext} import
org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils,
LocationStrategies} import redis.RedisPool object sparkstreaming { def
main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.INFO)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.INFO)
Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.INFO) val
conf = new SparkConf().setAppName("sparkstreaming")
conf.set("spark.streaming.kafka.maxRatePerPartition", "2000")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.streaming.concurrentJobs", "10")
conf.set("spark.streaming.kafka.maxRetries", "50") val scc = new
StreamingContext(conf, Seconds(5)) val topic =
PropertiesScalaUtils.loadProperties("topic") val topicSet: Set[String] =
Set(topic) val kafkaParams = Map[String, Object]( "auto.offset.reset" ->
"latest", "value.deserializer" -> classOf[StringDeserializer] ,
"key.deserializer" -> classOf[StringDeserializer] , "bootstrap.servers" ->
PropertiesScalaUtils.loadProperties("broker") , "group.id" ->
PropertiesScalaUtils.loadProperties("groupId") , "enable.auto.commit" ->
(false: java.lang.Boolean) ) val maxTotal = 200 val maxIdle = 100 val minIdle =
10 val testOnBorrow = false val testOnReturn = false val maxWaitMillis = 500
RedisPool.makePool(PropertiesScalaUtils.loadProperties("redisHost"),
PropertiesScalaUtils.loadProperties("redisPort").toInt, maxTotal, maxIdle,
minIdle, testOnBorrow, testOnReturn, maxWaitMillis) val jedis =
RedisPool.getPool.getResource jedis.select(dbIndex) val keys: util.Set[String]
= jedis.keys(topic + "*") if (keys.size() == 0) { kafkaStreams =
KafkaUtils.createDirectStream[String, String]( scc,
LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,
String](topicSet, kafkaParams)) } else { val fromOffsets: Map[TopicPartition,
Long] =
RedisKeysListUtils.getKeysList(PropertiesScalaUtils.loadProperties("redisHost"),
PropertiesScalaUtils.loadProperties("redisPort").toInt, topic) kafkaStreams =
KafkaUtils.createDirectStream[String, String]( scc,
LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,
String](topicSet, kafkaParams, fromOffsets)) }
RedisPool.getPool.returnResource(jedis) kafkaStreams.foreachRDD(rdd=>{ if
(!rdd.isEmpty()) { val offsetRanges =
rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreachPartition(partiton=>{
val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum",
PropertiesScalaUtils.loadProperties("zk_hbase")) //zk的地址;
conf.set("hbase.zookeeper.property.clientPort",
PropertiesScalaUtils.loadProperties("zk_port")) conf.set("hbase.master",
PropertiesScalaUtils.loadProperties("hbase_master"))
conf.set("hbase.defaults.for.version.skip", "true") conf.set("hhbase.rootdir",
PropertiesScalaUtils.loadProperties("hbase_rootdir"))
conf.set("zookeeper.znode.parent",
PropertiesScalaUtils.loadProperties("zookeeper_znode_parent")) myTable = new
HTable(conf,
TableName.valueOf(PropertiesScalaUtils.loadProperties("hbase_table")))
myTable.setAutoFlush(false, false) //关闭自动提交 myTable.setWriteBufferSize(3 * 1024
* 1024) partiton.foreach(pair=>{ //自己的处理逻辑; }) myTable.flushCommits()
myTable.close() offsetRanges.foreach { offsetRange => println("partition : " +
offsetRange.partition + " fromOffset: " + offsetRange.fromOffset + "
untilOffset: " + offsetRange.untilOffset) val topic_partition_key_new =
offsetRange.topic + "_" + offsetRange.partition
jedis_jason.set(topic_partition_key_new, offsetRange.untilOffset + "") }) } })
scc.start() scc.awaitTermination() } }
 sparkstreaming同时消费多个topic的数据可以看下这篇
<https://blog.csdn.net/xianpanjia4616/article/details/81709075>

代码已经测试过了,没有问题,如果有写的不对的地方,欢迎大家指正,如果有什么疑问,可以加QQ群:340297350,谢谢



 

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:637538335
关注微信