针对实时任务处理中的去重指标、数据倾斜、事务处理等场景进行分析。

去重指标

在实时统计类任务中,有一类指标对资源消耗非常高,那就是去重指标。由于实时任务为了追求处理性能,计算逻辑一般都是放在内存中完成,中间结果数据也放在内存中,这就带来了内存消耗过多的问题。在计算去重时,势必要把去重的明细数据存储下来,根据去重的场景,分两种情况看。

精确去重场景

  • 基于Flink MapState精确去重

场景:内存中可以放下全量数据的情况。

方式:MapState是Flink中KeyedState的状态类型,这种方式实现去重是一个精确的去重结果。

示例:计算每个广告每小时的点击用户数,广告点击日志包含:广告位 ID、用户设备 ID(idfa/imei/cookie)、点击时间。将设备ID保存在MapState中。步骤如下:

  1. 为了当天的数据可重现,这里选择事件时间也就是广告点击时间作为每小时的窗口期划分

  2. 数据分组使用广告位 ID+点击事件所属的小时

  3. 选择 processFunction 来实现,一个状态用来保存数据、另外一个状态用来保存对应的数据量

  4. 计算完成之后的数据清理,按照时间进度注册定时器清理

去重逻辑如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class Distinct1ProcessFunction extends KeyedProcessFunction[AdKey, AdData, Void] {
 
  var devIdState: MapState[String, Int] = _
  var devIdStateDesc: MapStateDescriptor[String, Int] = _
 
  var countState: ValueState[Long] = _
  var countStateDesc: ValueStateDescriptor[Long] = _
 
  override def open(parameters: Configuration): Unit = {
    devIdStateDesc = new MapStateDescriptor[String, Int]("devIdState", TypeInformation.of(classOf[String]), TypeInformation.of(classOf[Int]))
 
    devIdState = getRuntimeContext.getMapState(devIdStateDesc)
    countStateDesc = new ValueStateDescriptor[Long]("countState", TypeInformation.of(classOf[Long]))
 
    countState = getRuntimeContext.getState(countStateDesc)
  }
 
  override def processElement(value: AdData, ctx: KeyedProcessFunction[AdKey, AdData, Void]#Context, out: Collector[Void]): Unit = {
 
    val currW=ctx.timerService().currentWatermark()
 
    if(ctx.getCurrentKey.time+1<=currW) {
 
        println("late data:" + value)
        return
      }
 
    val devId = value.devId
    devIdState.get(devId) match {
      case 1 => {
        //表示已经存在
      }
      case _ => {
        //表示不存在
        devIdState.put(devId, 1)
        val c = countState.value()
        countState.update(c + 1)
        //还需要注册一个定时器
        ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey.time + 1)
      }
    }
    println(countState.value())
  }
 
 
 
  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[AdKey, AdData, Void]#OnTimerContext, out: Collector[Void]): Unit = {
    println(timestamp + " exec clean~~~")
    println(countState.value())
    devIdState.clear()
    countState.clear()
  }
}

自定义 Distinct1ProcessFunction 继承了 KeyedProcessFunction,定义两个状态:MapState,key 表示 devId, value 表示一个随意的值只是为了标识,该状态表示一个广告位在某个小时的设备数据,如果我们使用 rocksdb 作为 statebackend, 那么会将 mapstate 中 key 作为 rocksdb 中 key 的一部分,mapstate 中 value 作为 rocksdb 中的 value, rocksdb 中 value 大小是有上限的,这种方式可以减少 rocksdb value 的大小;另外一个 ValueState,存储当前 MapState 的数据量,是由于 mapstate 只能通过迭代方式获得数据量大小,每次获取都需要进行迭代,这种方式可以避免每次迭代。

数据清理通过注册定时器方式 ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey. time + 1) 表示当 watermark 大于该小时结束时间+1 就会执行清理动作,调用 onTimer 方法。

特点:优点是精确去重,占用空间小(在数据相对均匀的情况下),缺点是只能用于数字类型(int或者long)。

Flink基于RoaringBitmap的去重方案:

  1. 构建BitIndex

BitMap固然好用,但是对去重的字段只能用int或者long类型;但是如果去重字段不是int或者long怎么办呢?那我们就构建一个字段与BitIndex的映射关系表,使用的时候先从映射表里根据字段取出对应的bitindex,如果没有,则全局生成一个。

构建BitIndex可以用Redis获取,从1开始递增, 比如{a = 1, b = 2, c = 3},好处是长度短,占用空间小。复杂点的可以考虑使用美团开源的leaf分布式唯一自增ID算法,也可以使用Twitter开源的snowflake分布式唯一ID雪花算法,最好不要用,因为不能压缩,占用空间非常大,1000多万个id就达到了700多M。

在数据量特别大的时候,在生成bitindex的时候会有性能的瓶颈,所以我们应该预先构建BitIndex,也就是把你的数据库当中的所有用户id,预先用flink批处理任务,生成映射。

  1. 实现去重逻辑

通过MapFunction拿到字段对应的BitIndex之后,就可以直接进行去重逻辑了,比如我要统计某个页面下的访问人数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class CountDistinctFunction extends KeyedProcessFunction<Tuple, Tuple3<String, String, Integer>, Tuple2<String, Long>> {

  private static final Logger LOG = LoggerFactory.getLogger(CountDistinctFunction.class);

  private ValueState<Tuple2<RoaringBitmap, Long>> state;

  @Override
  public void open(Configuration parameters) throws Exception {
    state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", Types.TUPLE(Types.GENERIC(RoaringBitmap.class), Types.LONG)));
  }

  @Override
  public void processElement(Tuple3<String, String, Integer> in, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
    // retrieve the current count
    Tuple2<RoaringBitmap, Long> current = state.value();
    if (current == null) {
      current = new Tuple2<>();
      current.f0 = new RoaringBitmap();
    }
    current.f0.add(in.f2);

    long processingTime = ctx.timerService().currentProcessingTime();
    if(current.f1 == null || current.f1 + 10000 <= processingTime) {
      current.f1 = processingTime;
      // write the state back
      state.update(current);
      ctx.timerService().registerProcessingTimeTimer(current.f1 + 10000);
    } else {
      state.update(current);
    }
  }

  @Override
  public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
    Tuple1<String> key = (Tuple1<String>) ctx.getCurrentKey();
    Tuple2<RoaringBitmap, Long> result = state.value();

    result.f0.runOptimize();
    out.collect(new Tuple2<>(key.f0, result.f0.getLongCardinality()));
  }
}

主程序代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
env.addSource(source).map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String value) throws Exception {
                String[] arr = StringUtils.split(value, ",");
                return new Tuple2<>(arr[0], arr[1]);
            }
        })
            .keyBy(0) //根据userId分组
            .map(new BitIndexBuilderMap()) //构建bitindex
            .keyBy(1) //统计页面下的访问人数
            .process(new CountDistinctFunction())
            .print();

模糊去重场景

方式:该算法是位数组算法的应用,不保存真实的明细数据,只保存明细数据对应Hash值的标记位。这种方式存在hash碰撞的情况,但是误差率可以控制,计算出来的去重值会比真实值小。采用这种方法存储1亿条数据,只需要100M多的空间。

场景:统计精度要求不高,统计维度值非常多的情况。比如统计全网各个商家的uv数据,结果记录数达上千万。布隆过滤器在各个维度之间,是可以共用的。

方式:该算法也是利用了hash原理,按照数据的分散程度来估算现有数据集的边界,从而得出大概的去重值总和。这里估算的去重值可能比真实值大,也可能比真实值小。采用这个算法存储1亿条数据只需要几KB的内存。

场景:统计精度要求不高,统计维度非常粗的情况。比如整个大盘的UV数据,每天的结果只有一条记录。基数估计在各个维度值之间不能共用,比如统计全天小时的UV数据,就需要有24个基数估计对象,因此不适合细粒度统计的场景。

数据倾斜

数据倾斜是ETL中经常遇到的问题,在数据量非常大且数据在不同节点分布不均匀时,会遇到性能瓶颈。这时需要对数据进行分桶处理,实时的分桶处理和离线处理的思路是一致的。

  • 去重指标分桶

通过对去重值进行分桶Hash,相同的值一定会被分到同一个桶中去重,然后再把每个桶里面的值进行加和就得到了总值,这里利用了每个分桶的CPU和内存资源。

  • 非去重指标分桶

数据随机发送到每个桶中,然后再把每个桶的值汇总,主要利用了各个桶的CPU能力。

事务处理

实时计算集群是分布式系统,系统的不稳定会导致数据处理失败,比如网络抖动导致数据发送不成功,机器重启导致数据丢失等,如何做到数据的精确处理呢?

这依赖于流式系统提供的数据自动ACK、失败重发以及事务等机制。

  • 超时时间

由于数据处理是按照批次进行的,当一批数据处理超时时,会从数据接受端(如Storm的spout端)重发数据。另外,批次处理的数据量不宜过大,应该增加一个限流功能(限定一批数据的记录数或者容量等),避免数据处理超时。

  • 事务信息

每批数据都会附带一个事务id信息,在重发的情况下,让开发者自己根据事务信息去判断数据第一次到达和重发时不同的处理逻辑。

  • 备份机制

开发人员需要保证内存数据可以通过外部存储恢复,因此在计算中用到的中间结果数据需要备份到外部存储中。Flink的CheckPoint和SavePoint实现了此功能。

参考