我们专注攀枝花网站设计 攀枝花网站制作 攀枝花网站建设
成都网站建设公司服务热线:400-028-6601

网站建设知识

十年网站开发经验 + 多家企业客户 + 靠谱的建站团队

量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决

Flink中如何使用TimeWindowAll

这篇文章主要介绍了Flink中如何使用TimeWindowAll,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

创新互联专注于南涧企业网站建设,成都响应式网站建设公司,购物商城网站建设。南涧网站建设公司,为南涧等地区提供建站服务。全流程按需设计,专业设计,全程项目跟踪,创新互联专业和态度为您提供的服务

timeWindowAll时间滚动窗口(不分区时间滚动窗口【滑动窗口与滚动窗口的区别,在于滑动窗口会有数据元素重叠可能,而滚动窗口不存在元素重叠】)

示例环境

java.version: 1.8.xflink.version: 1.11.1

Flink 系例 之 搭建开发环境与数据

TimeWindowAll.java

import com.flink.examples.DataSource;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.List;

/**
 * @Description 不分区时间滚动窗口
 */
public class TimeWindowAll {

    /*
    窗口在处理流数据时,通常会对流进行分区;
    数据流划分为:
    keyed(根据key划分不同数据流区)
    non-keyed(指没有按key划分的数据流区,指所有原始数据流)
    */

    /**
     * 遍历集合,返回指定时间滚动窗口下最大年龄数据记录
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setParallelism(1);
        DataStream> inStream = env.addSource(new MyRichSourceFunction());
        DataStream> dataStream = inStream
                //按时间窗口滚动,对前6秒内的输入数据流,计算一次
                .timeWindowAll(Time.seconds(6))
                //注意:计算变量为f2
                .maxBy(2);
        dataStream.print();
        env.execute("flink TimeWindow job");
    }

    /**
     * 模拟数据持续输出
     */
    public static class MyRichSourceFunction extends RichSourceFunction> {
        @Override
        public void run(SourceContext> ctx) throws Exception {
            List> tuple3List = DataSource.getTuple3ToList();
            for (Tuple3 tuple3 : tuple3List){
                ctx.collect(tuple3);
                //1秒钟输出一个
                Thread.sleep(1 * 1000);
            }
        }
        @Override
        public void cancel() {
            try{
                super.close();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}

打印结果

2> (王五,man,29)

感谢你能够认真阅读完这篇文章,希望小编分享的“Flink中如何使用TimeWindowAll”这篇文章对大家有帮助,同时也希望大家多多支持创新互联,关注创新互联行业资讯频道,更多相关知识等着你来学习!


标题名称:Flink中如何使用TimeWindowAll
网站URL:http://shouzuofang.com/article/jsdehe.html

其他资讯