数据结构-BloomFilter 概要

过滤器

 Burton Bloom 在1970年提出了Bloom Filter 算法,时间复杂度是O(1)
  可能会把不属于这个集合的元素误认为属于这个集合(false positive),缺点是有一定的误识别率和删除困难。
 适用场景:用于拼写检查和数据库系统
      在警察系统中,一个嫌疑人的名字是否出现在嫌疑名单上;
      在网络爬虫里,一个网址是否已经被访问过,等等。

Spark中的Bloom Filter

  数据添加到过滤器中,它不存储元素本身,通过K个散列函数将这个元素映射成一个位数组中的K个点,把它们置为1
  expectedNumItems   布隆过滤器最大缓存的记录数 expectedNumItems 参数对于布隆过滤器的内存消耗影响非常大。
  增量更新过滤器 bloomFilter.putString(uuid)

Spark使用过滤器去重

01.代码示例:
  *  使用布隆过滤器过滤数据 spark官方封装了基于DataFrame的布隆过滤器
   Dataset<Row> dataTemp1 = itemsDF.where("active_date='20191029'");
  //  1.创建布隆过滤器 第一个参数是使用的数据列
  //第二个参数 expectedNumItems 为期望的元素数量,第三个参数 fpp 为期望的错误概率,数值取 0.0  1.0 之间
  BloomFilter bf = dataTemp1.stat().bloomFilter("grade_cd", 2000, 0.01);
  // JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
  JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
  // 2.创建布隆过滤器广播变量
  Broadcast<BloomFilter> bfBc = sparkContext.broadcast(bf);
  // 3.使用布隆过滤器过滤前天访问用户
  dataTemp2.filter(new BoolFiltFunction(bfBc)).show();
 其中 BoolFiltFunction 为如下
     import org.apache.spark.api.java.function.FilterFunction;
     import org.apache.spark.broadcast.Broadcast;
     import org.apache.spark.sql.Row;
     import org.apache.spark.util.sketch.BloomFilter;
     public class BoolFiltFunction implements FilterFunction<Row> {
         Broadcast<BloomFilter> gradeData =null;
         public BoolFiltFunction(Broadcast<BloomFilter> userBroad) {
             gradeData  = userBroad;
         }
         @Override
         public boolean call(Row va1u) throws Exception {
           Integer  broadDa = va1u.getAs("grade_cd");
           boolean flag =  gradeData.getValue().mightContain(broadDa);
           return flag;
         }
     }

02.方式二  
    //超过10万活跃用户时,不要增加此上限值,而是通过userId分区,用多个微服务并行处理,防止单个布隆过滤器OOM
    //expectedNumItems 参数对于布隆过滤器的内存消耗影响非常大。 
    mightContain(Object var1), mightContainString(String val1), mightContainLong(long var1),mightContainBinary(byte[] var1)
      val bf = df.stat.bloomFilter("dd",dataLen,0.01)
      val rightNum = rdd.map(x=>(x.toInt,bf.mightContainString(x)))
    或者
       val bloomFilterInit = BloomFilter.create(expectedNumItems, fpp)

Spark源码信息

1.org.apache.spark.util.sketch.BloomFilter
   public abstract class BloomFilter {}
   进一步的信息
   * Combines this bloom filter with another bloom filter by performing a bitwise OR of the underlying data
    Callers must ensure the  bloom filters are appropriately sized to avoid saturating them.
      filter1.mergeInPlace(filter2)
     02. put() putString() putLong() putBinary()
     参考: test.org.apache.spark.sql.JavaDataFrameSuite
    Currently supported data types include
        Byte,Short,Integer,Long,String
     * The implementation is largely based on the {@code BloomFilter} class from Guava.
   * @since 2.0.0
  2.org.apache.spark.sql.DataFrameStatFunctions
   final class DataFrameStatFunctions private[sql](df: DataFrame) {
  // Builds a Bloom filter over a specified column.
   def bloomFilter(colName: String, expectedNumItems: Long, fpp: Double): BloomFilter = {
     buildBloomFilter(Column(colName), BloomFilter.create(expectedNumItems, fpp))}
   }

   3.org.apache.spark.sql.Dataset
     class Dataset[T] private[sql]{
      def stat: DataFrameStatFunctions = new DataFrameStatFunctions(toDF())
     }

Guava library

 com.google.common.hash.BloomFilter
     BloomFilter 提供了create静态方法来创建BloomFilter实例
 fpp( false positive probability)
 正确估计预期插入数量是很关键的一个参数。当插入的数量接近或高于预期值的时候,布隆过滤器将会填满,这样的话,它会产生很多无用的误报点。
 boolean exists = dealIdBloomFilter.mightContain(deal_id);
    if(!exists){
        dealIdBloomFilter.put(deal_id);
    }
通过BloomFilter mightContain 判断deal_id是否已经存在了,如果不存在则put到BloomFilter中
2.其他
Guava的preconditions
参数检查的工具类--Preconditions, 简化代码中对于参数的预判断和处理
// checkNotNull会检查参数是否为null, 当为null的时候会抛出NullPointerException, 否则直接返回参数.
public static <T> T checkNotNull(T reference) {
  if (reference == null) {
    throw new NullPointerException();
  }
  return reference;
}

参考:

 海量数据去重神器——布隆过滤器  https://blog.csdn.net/xianyuxiaoqiang/article/details/93896428

blogroll

social