maven配置文件
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --> <dependency>
<groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version
>1.0.0</version> </dependency> <!--
https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <
dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</
artifactId> <version>1.0.0</version> </dependency> <!--
https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11
--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>
spark-streaming-kafka-0-8_2.11</artifactId> <version>2.1.1</version> <scope>
provided</scope> </dependency> <!--
https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10 --> <
dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11
</artifactId> <version>2.1.1</version> <scope>provided</scope> </dependency> 1.
kafka生产者 import java.util.Properties import
org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}import
scala.io.Sourceimport scala.reflect.io.Path class KafkaProduceMsg extends
Runnable { private val BROKER_LIST = "slave6:9092,slave7:9092" private val
TOPIC ="kafka" private val DIR = "C:\\Users\\admin\\Desktop\\kafka-data.txt"
/** * 1、配置属性 * metadata.broker.list : kafka集群的broker * serializer.class :
如何序列化发送消息 * request.required.acks : 1代表需要broker接收到消息后acknowledgment,默认是0 *
producer.type : 默认就是同步sync */ private val props = new Properties() props.put(
"bootstrap.servers",BROKER_LIST) props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer") props.put(
"value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("request.required.acks", "1") props.put("producer.type", "async")
private val producer = new KafkaProducer[String,String](props) def run(): Unit
= { println("开始生产消息!!!!!!!!!!") while(true){ val files = Path(this
.DIR).walkFilter(p => p.isFile)try { for(file <- files){ val reader =
Source.fromFile(file.toString(),"UTF-8") for(line <- reader.getLines()){ var m =
0 while(m < 10){ val record = new ProducerRecord[String,String](this.TOPIC,"key"
,line) m = m +1 println(m + "" + record) producer.send(record) } try{
Thread.sleep(3000) }catch { case e : Exception => println(e) } } } }catch{ case
e : Exception => println(e) } } } }
生产者执行程序:
object Msg { def main(args: Array[String]): Unit = { new Thread(new
KafkaProduceMsg()).start() } } 2. 消费者sparkStreaming import
kafka.serializer.StringDecoderimport org.apache.spark.SparkConf import
org.apache.spark.streaming.dstream.DStreamimport
org.apache.spark.streaming.kafka.KafkaUtilsimport
org.apache.spark.streaming.{Seconds, StreamingContext}/** *
2.spark-streaming消费数据,匹配应用层是否含有制定关键字, * 如果包含就存储下来,不包含就丢弃 */ object KafkaConsumer
{ def main(args: Array[String]): Unit = { // 创建sparksession val conf = new
SparkConf().setAppName("Consumer") val ssc = new StreamingContext(conf,Seconds(5
))// 设置中间存储的检查点,可以进行累计计算 // ssc.checkpoint("hdfs://master:9000/xxx") //
读取kafka数据 val kafkaParam = Map("metadata.broker.list" ->
"slave6:9092,slave7:9092") val topic = "kafka".split(",").toSet // 获取日志数据 val
logDStream: DStream[String] =
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParam,topic).map(_._2)
logDStream.print() ssc.start() ssc.awaitTermination() ssc.stop() } }

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:637538335
关注微信