十年网站开发经验 + 多家企业客户 + 靠谱的建站团队
量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决
本篇内容介绍了“flinksql env的定义”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
创新互联公司主要从事成都网站设计、做网站、成都外贸网站建设公司、网页设计、企业做网站、公司建网站等业务。立足成都服务六安,10多年网站建设经验,价格优惠、服务专业,欢迎来电咨询建站服务:18982081108
1、编写 pom
4.0.0 org.example flinksqldemo 1.0-SNAPSHOT UTF-8 UTF-8 2.11 2.11.8 0.10.2.1 1.12.0 2.7.3 compile org.apache.maven.plugins maven-compiler-plugin 8 org.apache.flink flink-table-planner-blink_2.11 1.12.0 org.apache.flink flink-java ${flink.version} ${setting.scope} org.apache.flink flink-streaming-java_2.11 ${flink.version} ${setting.scope} org.apache.flink flink-clients_2.11 ${flink.version} ${setting.scope} org.apache.flink flink-connector-kafka-0.10_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} ${setting.scope} org.apache.flink flink-connector-filesystem_${scala.binary.version} ${flink.version} org.apache.kafka kafka_${scala.binary.version} ${kafka.version} ${setting.scope} org.apache.hadoop hadoop-common ${hadoop.version} ${setting.scope} org.apache.hadoop hadoop-hdfs ${hadoop.version} ${setting.scope} org.apache.hadoop hadoop-client ${hadoop.version} ${setting.scope} org.slf4j slf4j-api 1.7.25 com.alibaba fastjson 1.2.72 redis.clients jedis 2.7.3 com.google.guava guava 29.0-jre
2、编写代码
package com.jd.data; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class FlinkTableApiDemo { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSourcestream = env.readTextFile("/Users/liuhaijing/Desktop/flinktestword/aaa.txt"); // 1、创建表执行环节 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // ============================================== // 1.1 老版本planner的流式查询 EnvironmentSettings set = EnvironmentSettings.newInstance() .useOldPlanner() //用老版本 .inStreamingMode() //流式处理 .build(); // 老版本的流式处理执行环境 StreamTableEnvironment oldStreamingEnv = StreamTableEnvironment.create(env, set); // 1.2 老版本批处理环境 ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(executionEnvironment); // ========================================================= // 1.3 blink 版本的流式查询 EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment blinkTableEnv = StreamTableEnvironment.create(env, settings); // 1.4 blink 版本的批处理查询 EnvironmentSettings bsettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inBatchMode() .build(); TableEnvironment blinkBatchTableEnvironment = TableEnvironment.create(settings); } }
“flinksql env的定义”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注创新互联网站,小编将为大家输出更多高质量的实用文章!