十年网站开发经验 + 多家企业客户 + 靠谱的建站团队
量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决
这篇文章主要介绍Spark中ContinuousExecution执行流程是怎么样的,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
成都创新互联公司-专业网站定制、快速模板网站建设、高性价比台前网站开发、企业建站全套包干低至880元,成熟完善的模板库,直接使用。一站式台前网站制作公司更省心,省钱,快速模板网站建设找我们,业务覆盖台前地区。费用合理售后完善,10多年实体公司更值得信赖。
最重要的是看ContinuousExecution怎么重写LogicalPlan的,详细代码不贴了,最后是创建了Sink类型的LogicalPlan。
val writer = sink.createStreamWriter( s"$runId", triggerLogicalPlan.schema, outputMode, new DataSourceOptions(extraOptions.asJava)) val withSink = WriteToDataSourceV2(writer, triggerLogicalPlan) val reader = withSink.collect { case DataSourceV2Relation(_, r: ContinuousReader) => r }.head
这里的sink可以看成就是DataSource。然后用withSink作为入参创建了IncrementalExecution。
triggerLogicalPlan是StreamingDataSourceV2Relation类。
IncrementalExecution本身没啥,只是在每一个处理的时候包装了一些额外的辅助处理而已。
WriteToDataSourceV2的作用是将triggerLogicalPlan的物理计划的执行结果通过writer写入到外部存储中,所有这里也不看WriteToDataSourceV2了,就看看triggerLogicalPlan的对应的物理计划是什么,前面说过了它对应的逻辑计划是:StreamingDataSourceV2Relation。
直接找是不是StreamingDataSourceV2Relation对应的物理计划的,所以我们先看看StreamingDataSourceV2Relation类的定义:
class StreamingDataSourceV2Relation( output: Seq[AttributeReference], reader: DataSourceReader) extends DataSourceV2Relation(output, reader) { override def isStreaming: Boolean = true }
原来是DataSourceV2Relation的子类啊!
直接找DataSourceV2Relation的物理计划吧,在DataSourceV2Strategy.scala文件中定义了。
object DataSourceV2Strategy extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case DataSourceV2Relation(output, reader) => DataSourceV2ScanExec(output, reader) :: Nil case WriteToDataSourceV2(writer, query) => WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil case _ => Nil } }
DataSourceV2Relation对应的物理计划是DataSourceV2ScanExec。
DataSourceV2ScanExec的代码也不多。
DataSourceV2ScanExec是用DataSourceReader来作为数据源的读取器的,它的inputRDDs返回的是DataSourceRDD或者ContinuousDataSourceRDD,ContinuousDataSourceRDD肯定是对应的ContinuousExecution,其他方式就是DataSourceRDD了。
不管是DataSourceRDD或者ContinuousDataSourceRDD,他们的读取数据源的类都是一样的,都是DataSourceReader过来的。DataSourceRDD或者ContinuousDataSourceRDD这两者的代码都非常少,一看就知道怎么回事了。
以上是“Spark中ContinuousExecution执行流程是怎么样的”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注创新互联行业资讯频道!