浅谈Flink(二)——分层API之ProcessFunction

浅谈Flink(二)——分层API之ProcessFunction

大家好我是栗子鑫,今天主要想给大家介绍一下FlinkAPI的一种ProcessFunction,希望对大家有所帮助。

上一篇博客浅谈Flink(一)主要介绍了Flink的一些基本概念,同时介绍了Flink的四种抽象,具体如下图:

浅谈Flink(二)——分层API之ProcessFunction

Flink 根据抽象程度分层,提供了三种不同的 API。每一种 API 在简洁性和表达力上有着不同的侧重,并且针对不同的应用场景。今天主要详细讲一下Proecessing Function层API。

浅谈Flink(二)——分层API之ProcessFunction



01


Processing Function 介绍



Flink 官网介绍:ProcessFunction[1] 是 Flink 所提供的最具表达力的接口。ProcessFunction 可以处理一或两条输入数据流中的单个事件或者归入一个特定窗口内的多个事件。它提供了对于时间和状态的细粒度控制。开发者可以在其中任意地修改状态,也能够注册定时器用以在未来的某一时刻触发回调函数。因此,你可以利用 ProcessFunction 实现许多有状态的事件驱动应用[2]所需要的基于单个事件的复杂业务逻辑。换句话表达ProcessFunction 是一个低级流处理操作,可以访问所有(非循环)流应用程序的基本构建块,主要包括

  • 事件(流元素)
  • 状态(容错,一致,仅在键控流上)
  • 计时器(事件时间和处理时间,仅在键控流上)

如何操作事件:可以将其ProcessFunction视为可以FlatMapFunction访问键控状态和计时器。它通过为输入流中接收到的每个事件调用来处理事件

状态:对于容错状态,ProcessFunction可以访问 Flink 的keyed state,可以通过 访问 RuntimeContext,类似于其他有状态函数访问 keyed state 的方式。

计时器:计时器允许应用程序对处理时间和事件时间的变化做出反应。对该函数的每次调用processElement(...)都会获得一个Context对象,该对象可以访问元素的事件时间时间戳和TimerService。可TimerService用于为将来的事件/处理时间瞬间注册回调。对于事件时间计时器,onTimer(...)当当前水印超过或超过计时器的时间戳时调用该方法,而对于处理时间计时器,onTimer(...)当挂钟时间达到指定时间时调用该方法。在该调用期间,所有状态再次限定为创建计时器的键,允许计时器操作键控状态。

注:如果要访问键控状态和计时器,则必须ProcessFunction在键控流上应用:

stream.keyBy(...).process(new MyProcessFunction());

下文详解讲解如何Processing Function 来操作这三个模块



02


操作事件



下面举一个🌰来感受一下Processing Function 对事件流的操作:

public class ProcessStudyFunction {
 public static void main(String[] args) throws Exception {
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
     env.setParallelism(1);
     //获取数据源
     DataStreamSource<Tuple2<String, Integer>> dataStreamSource = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
         @Override
         public void run(SourceContext<Tuple2<String, Integer>> sourceContext) throws Exception {
             for (int i = 0; i < 10; i++) {
                 String key_name = "zxf" + i;
                 Integer value = i;
                 //获取时间戳
                 Long time = System.currentTimeMillis();
                 System.out.printf("source,%s,%d,%dn%n", key_name, value, time);
                 //发送元素 附上事件戳
                 sourceContext.collectWithTimestamp(new Tuple2<>(key_name, value), time);
                 Thread.sleep(10);
             }
         }
         @Override
         public void cancel() {
         }
     });

     // 过滤 value 大于2的 数据
     SingleOutputStreamOperator<String> outdata = dataStreamSource.process(new ProcessFunction<Tuple2<String, Integer>, String>() {
         @Override
         public void processElement(Tuple2<String, Integer> inputvalue, Context context, Collector<String> out) throws Exception {
             if (inputvalue.f1 > 2) {
                 out.collect(String.format("processElement,%s, %d, %dn", inputvalue.f0, inputvalue.f1, context.timestamp()));
             }
         }
     });
     outdata.print("1:");
     env.execute();
 }
}

结果:

浅谈Flink(二)——分层API之ProcessFunction

上述代码介绍:

  1. 创建一个数据源,每个10毫秒发出一个元素,一共十个,类型是Tuple2,f0是个字符串,f1是Integer,每个元素都带时间戳;
  2. ProcessFunction的匿名子类中,将每个元素的f0和f1拼接成字符串,发给主流程算子,再将f1字段为奇数的元素发到旁路输出;
  3. 在后面的处理中,创建了ProcessFunction的匿名子类,里面可以处理上游发来的每个元素,并且还能取得每个元素的时间戳(这个能力很重要),然后将f1字段为小于2的元素过滤掉;
  4. 最后将ProcessFunction处理过的数据打印出来,验证处理结果是否符合预期;



03


操作状态和计时器



对于这两个模块都有一个特性仅在键控流上(keyed stream 元素是具有key的特征),下面举一个🌰来查看process function 如何进行状态处理和定时器功能。

首先创建KeyedProcessFunction的子类,作用是将每个单词最新出现时间记录到backend,并创建定时器,定时器触发的时候,检查这个单词距离上次出现是否已经达到10秒,如果是,就发射给下游算子。

@Override
public void processElement(Tuple2<String, Integer> value, Context context, Collector<Tuple2<String, Long>> collector) throws Exception {
// 取得当前是哪个key
Tuple currentKey = context.getCurrentKey();
// 从backend取得当前单词的myState状态
KeyAndValue current = state.value();
// 如果myState还从未没有赋值过,就在此初始化

if (current == null) {
  current = new KeyAndValue();
  current.key = value.f0;
}
// key数量加一
current.value++;
// 取当前元素的时间戳,作为该单词最后一次出现的时间
current.lastmodified = context.timestamp();
// 重新保存到backend,包括该单词出现的次数,以及最后一次出现的时间
state.update(current);
// 为当前key创建定时器,十秒后后触发
long timer = current.lastmodified + 10000;
context.timerService().registerProcessingTimeTimer(timer);
// 打印所有信息,用于核对数据正确性
System.out.println(String.format("process, %s, %d, lastModified : %d (%s), timer : %d (%s)nn",
      currentKey.getField(0),
      current.value,
      current.lastmodified,
      time(current.lastmodified),
      timer,
      time(timer)));
}
@Override
  public void onTimer(
          long timestamp,
          OnTimerContext ctx,
          Collector<Tuple2<String, Long>> out)
 throws Exception 
{

      // 取得当前单词
      Tuple currentKey = ctx.getCurrentKey();
      // 取得该单词的myState状态
      CountWithTimestamp result = state.value();
      // 当前元素是否已经连续10秒未出现的标志
      boolean isTimeout = false;
      // timestamp是定时器触发时间,如果等于最后一次更新时间+10秒,就表示这十秒内已经收到过该单词了,
      // 这种连续十秒没有出现的元素,被发送到下游算子
      if (timestamp == result.lastModified + 10000) {
          // 发送
          out.collect(new Tuple2<String, Long>(result.key, result.count));
          isTimeout = true;
      }
      // 打印数据,用于核对是否符合预期
      System.out.println(String.format("ontimer, %s, %d, lastModified : %d (%s), stamp : %d (%s), isTimeout : %snn",
              currentKey.getField(0),
              result.count,
              result.lastModified,
              time(result.lastModified),
              timestamp,
              time(timestamp),
              String.valueOf(isTimeout)));
  }
}

在主函数中需要将写入对数据的操作

SingleOutputStreamOperator<Tuple2<String, Long>> result = socketDataStream
  //先进行切分
  .flatMap(new Splited())
  // 设置时间戳分配器,用当前时间作为时间戳
  .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {
      @Nullable
      @Override
      public Watermark getCurrentWatermark() {
          return null;
      }

      @Override
      public long extractTimestamp(Tuple2<String, Integer> stringIntegerTuple2, long l) {
          // 使用当前系统时间作为时间戳
          return System.currentTimeMillis();
      }
  }) // 将单词作为key分区
  .keyBy(0)
  // 按单词分区后的数据,交给自定义KeyedProcessFunction处理
  .process(new CountKeyWithTimeoutFunction());

上述代码注意:

  1. 为什么要设置时间戳:使用EventTime时间语义时,需指定时间戳(Timestamp)提取规则以及水位线(Watermark)生成策略,才能正确进行窗口计算。定时器需要时间语义。
  2. processElement方法中,state.value()可以取得当前单词的状态,state.update(current)可以设置当前单词的状态
  3. 定时器触发后,onTimer方法被执行,里面有这个定时器的全部信息,尤其是入参timestamp,这是原本设置的该定时器的触发时间;

实验结果:在本机7777端口发送文本,格式为“keytabvalue”,然后十秒后,会显示Flink的执行结果:

浅谈Flink(二)——分层API之ProcessFunction

可以看见时间刚好相差10s,上述代码只是部分显示,如果读者感兴趣,具体链接如下https://Github.com/zhangxinfang520/FlinkStudy.git。



04


总结



上述笔者简单的介绍了Process Function操作的三种方式,希望对你们能有所帮助。

关注六只栗子,面试不迷路!

参考资料

[1]

ProcessFunction: https://nightlies.apache.org/flink/flink-docs-stable/dev/stream/operators/process_function.html

[2]

有状态的事件驱动应用: https://flink.apache.org/zh/usecases.html#eventDrivenApps


作者    栗子鑫

编辑   一口栗子  




浅谈Flink(二)——分层API之ProcessFunction

浅谈Flink(二)——分层API之ProcessFunction

浅谈Flink(二)——分层API之ProcessFunction

浅谈Flink(二)——分层API之ProcessFunction


原文始发于微信公众号(六只栗子):浅谈Flink(二)——分层API之ProcessFunction

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/88392.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!