本篇內容主要講解“Kafka+SparkStream+Hive的項目實現方法是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Kafka+SparkStream+Hive的項目實現方法是什么”吧!
十多年的石棉網站建設經驗,針對設計、前端、開發、售后、文案、推廣等六對一服務,響應快,48小時及時工作處理。全網整合營銷推廣的優勢是能夠根據用戶設備顯示端的尺寸不同,自動調整石棉建站的顯示方式,使網站能夠適用不同顯示終端,在瀏覽器中調整網站的寬度,無論在任何一種瀏覽器上瀏覽網站,都能展現優雅布局與設計,從而大程度地提升瀏覽體驗。成都創新互聯從事“石棉網站設計”,“石棉網站推廣”以來,每個客戶項目都認真落實執行。
目前的項目中需要將kafka隊列的數據實時存到hive表中。
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} import org.apache.spark.streaming.{Durations, Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange} import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
def main(args: Array[String]): Unit = { // val conf = new SparkConf() // conf.setMaster("local") // conf.setAppName("SparkStreamingOnKafkaDirect") val spark = SparkSession.builder().appName("test").master("local").enableHiveSupport().getOrCreate() val ssc = new StreamingContext(spark.sparkContext, Durations.seconds(3)) //設置日志級別 ssc.sparkContext.setLogLevel("Error") val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "MyGroupId", // /** * 當沒有初始的offset,或者當前的offset不存在,如何處理數據 * earliest :自動重置偏移量為最小偏移量 * latest:自動重置偏移量為最大偏移量【默認】 * none:沒有找到以前的offset,拋出異常 */ "auto.offset.reset" -> "earliest", /** * 當設置 enable.auto.commit為false時,不會自動向kafka中保存消費者offset.需要異步的處理完數據之后手動提交 */ "enable.auto.commit" -> (false: java.lang.Boolean) //默認是true ) //設置Kafka的topic val topics = Array("test") //創建與Kafka的連接,接收數據 /*這里接收到數據的樣子 2019-09-26 1569487411604 1235 497 Kafka Register 2019-09-26 1569487411604 1235 497 Kafka Register 2019-09-26 1569487414838 390 778 Flink View */ val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, // Subscribe[String, String](topics, kafkaParams) ) //對接收到的數據進行處理,打印出來接收到的key跟value,最后放回的是value val transStrem: DStream[String] = stream.map(record => { val key_value = (record.key, record.value) println("receive message key = " + key_value._1) println("receive message value = " + key_value._2) key_value._2 }) //這里用了一下動態創建的Schema val structType: StructType = StructType(List[StructField]( StructField("Date_", StringType, nullable = true), StructField("Timestamp_", StringType, nullable = true), StructField("UserID", StringType, nullable = true), StructField("PageID", StringType, nullable = true), StructField("Channel", StringType, nullable = true), StructField("Action", StringType, nullable = true) )) //因為foreachRDD可以拿到封裝到DStream中的rdd,可以對里面的rdd進行, /*代碼解釋: 先從foreach中拿到一條數據,,在函數map中對接收來的數據用 “\n” 進行切分,放到Row中,用的是動態創建Schema,因為我們需要再將數據存儲到hive中,所以需要Schema。 因為map是transformance算子,所以用rdd.count()觸發一下 spark.createDataFrame:創建一個DataFrame,因為要注冊一個臨時表,必須用到DataFrame frame.createOrReplaceTempView("t1"):注冊臨時表 spark.sql("use spark"):使用 hive 的 spark 庫 result.write.mode(SaveMode.Append).saveAsTable("test_kafka"):將數據放到 test_kafka 中 */ transStrem.foreachRDD(one => { val rdd: RDD[Row] = one.map({ a => val arr = a.toString.split("\t") Row(arr(0).toString, arr(1).toString, arr(2).toString, arr(3).toString, arr(4).toString, arr(5).toString) }) rdd.count() val frame: DataFrame = spark.createDataFrame(rdd, structType) // println(" Scheme: "+frame.printSchema()) frame.createOrReplaceTempView("t1") // spark.sql("select * from t1").show() spark.sql("use spark") spark.sql("select * from t1"). write.mode(SaveMode.Append).saveAsTable("test_kafka") } ) /** * 以上業務處理完成之后,異步的提交消費者offset,這里將 enable.auto.commit 設置成false,就是使用kafka 自己來管理消費者offset * 注意這里,獲取 offsetRanges: Array[OffsetRange] 每一批次topic 中的offset時,必須從 源頭讀取過來的 stream中獲取,不能從經過stream轉換之后的DStream中獲取。 */ stream.foreachRDD { rdd => val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // some time later, after outputs have completed stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } ssc.start() ssc.awaitTermination() ssc.stop() }
到此,相信大家對“Kafka+SparkStream+Hive的項目實現方法是什么”有了更深的了解,不妨來實際操作一番吧!這里是創新互聯網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
網站標題:Kafka+SparkStream+Hive的項目實現方法是什么
本文來源:http://m.newbst.com/article34/gpggpe.html
成都網站建設公司_創新互聯,為您提供品牌網站建設、自適應網站、用戶體驗、動態網站、建站公司、標簽優化
聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯