版权声明:本文为博主原创文章,未经博主允许不得转载!!

欢迎访问:https://blog.csdn.net/qq_21439395/article/details/80710180
<https://blog.csdn.net/qq_21439395/article/details/80710180>

交流QQ: 824203453

 

SparkSql 版本为 2.2.0

sparksql解析json格式的数据源

首先,获取操作sparkSql的SparkSession操作实例:
val session = SparkSession.builder() .master("local[*]")
.appName(this.getClass.getSimpleName) .getOrCreate() // 导入隐式转换和functions import
session.implicits._ import org.apache.spark.sql.types._ import
org.apache.spark.sql.functions._
 

 

1.1.  根据json数据,创建Dataset

指定嵌套json格式的数据:
val opds = session.createDataset( // 三引号中,编写json字符串
List("""{"name":"xx","address":{"city":"bj"}}""") ) val otherPeople =
session.read.json(opds) otherPeople.printSchema()
schema如下:



1.2.  读取普通json文件

json数据格式为:


val json1: DataFrame =session.read.json("jsonlog1.json") json1.printSchema()
获取schema为:



1.3.  读取嵌套json文件

数据格式为:


val json: DataFrame = session.read.json("jsonlog2.json") json.printSchema()
schema信息如下:



操作嵌套json的方式:
//DSL 语法的查询 json.select("address.province").show() // 使用sql语法查询
json.createTempView("v_tmp") session.sql("select address.city from
v_tmp").show()
 

1.4.  操作嵌套json数组-explode函数

数据格式为:

 

读取json数组的数据:
val json3 = session.read.json("jsonlog3array.json") json3.printSchema()
json3.show()
schema信息为:



示例数据为:


这种结果的展示数据,查询非常不方便。

 

解决方案:

利用explode函数,把数组数据进行展开。
// 导入sparksql中的函数 import org.apache.spark.sql.functions._ // 利用explode函数
把json数组进行展开, 数组中的每一条数据,都是一条记录 val explodeDF = json3.select($"name",
explode($"myScore")).toDF("name", "score") explodeDF.printSchema() // 再次进行查询
类似于普通的数据库表 默认schema: score1, 可以通过as 指定schema名称 val json3Res: DataFrame =
explodeDF.select($"name", $"score.score1", $"score.score2" as "score222") //
创建临时视图 json3Res.createTempView("v_jsonArray") // 写sql,分别求平均值
session.sql("select name,avg(score1),avg(score222) from v_jsonArray group by
name") .show()
explodeDF的schema信息为:



最终,查询结果为:



 

1.5.  get_json_object() 方法

get_json_object() 方法 从一个json 字符串中根据指定的json路径抽取一个json 对象

 

根据指定数据,获取一个DataFrame
val json4 = Seq( (0, """{"device_id": 0, "device_type": "sensor-ipad", "ip":
"68.161.225.1", "cn": "United States"}""")) .toDF("id", "json")
json4.printSchema()
schema信息为:



 

使用get_json_object 从json字符串中提取列:
// 利用get_json_object 从 json字符串中,提取列 val jsDF = json4.select($"id",
get_json_object($"json", "$.device_type").alias("device_type"),
get_json_object($"json", "$.ip").alias("ip"), get_json_object($"json",
"$.cn").alias("cn")) jsDF.printSchema()
schema信息为:



 

更多复杂操作:可参考:https://cloud.tencent.com/developer/article/1032532

 

版权声明:本文为博主原创文章,未经博主允许不得转载!!

欢迎访问:https://blog.csdn.net/qq_21439395/article/details/80710180
<https://blog.csdn.net/qq_21439395/article/details/80710180>

交流QQ: 824203453

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:637538335
关注微信