1 BloomFilterCache 有效性验证测试

背景

ISSUE:https://issues.apache.org/jira/browse/IOTDB-1950

PR:https://github.com/apache/iotdb/pull/4350

目的:测试验证BloomFilterCache的增加是有效果的

测试方案

  • 先创建root.sg1.d1.s1root.sg1.d1.s2这两个序列

  • 只写root.sg1.d1.s1,写一个点,flush一次,写一万个点,生成一万个文件(注意,需要关闭compaction)

  • 执行select s2 from root.sg1.d1这个查询100次,统计时间

  • 原来的方案应该是每次都从磁盘上读取一万个文件的BloomFilter,再用BloomFilter做过滤,但是现在的分支,经过第一次查询后,应该能缓存这一万个文件的BloomFilter,后面99次查询,应该不用读磁盘,应该会快

需要注意:

  • IoTDB需要关compaction
      IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false);
      IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
      IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
  • allocateMemoryForBloomFilterCache 要设置得足够大,避免 cache miss

实验结果

分别进行两次实验,且不关闭OS缓存,结果如下:

新增BloomFilterCache后的查询计时:BloomFilterCache

  • 51158

  • 58067

原分支的查询计时:

  • 60615

  • 65263

测试代码

public class BloomFilterExperiment {
​
  private static Session session;
  private static final String ROOT_SG1_D1_S1 = "root.sg1.d1.s1";
  private static final String ROOT_SG1_D1_S2 = "root.sg1.d1.s2";
  private static final String ROOT_SG1_D1 = "root.sg1.d1";
    private static final String LOCAL_HOST = "127.0.0.1";
​
  public static void main(String[] args)
      throws IoTDBConnectionException, StatementExecutionException {
    session =
        new Session.Builder().host(LOCAL_HOST).port(6667).username("root").password("root").build();
    session.open(false);
​
    // set session fetchSize
    session.setFetchSize(10000);
​
    try {
      session.setStorageGroup("root.sg1");
    } catch (StatementExecutionException e) {
      if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
        throw e;
      }
    }
    createTimeseries();
    insertRecord();
    query();
    session.close();
  }
​
  private static void query() throws IoTDBConnectionException, StatementExecutionException {
    long startTime = System.currentTimeMillis();
    for (int i = 0; i < 100; i++) {
      SessionDataSet dataSet = session.executeQueryStatement("select s2 from root.sg1.d1");
      dataSet.closeOperationHandle();
    }
    long endTime = System.currentTimeMillis();
    System.out.println(endTime - startTime);
  }
​
  private static void createTimeseries()
      throws IoTDBConnectionException, StatementExecutionException {
    if (!session.checkTimeseriesExists(ROOT_SG1_D1_S1)) {
      session.createTimeseries(
          ROOT_SG1_D1_S1, TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
    }
    if (!session.checkTimeseriesExists(ROOT_SG1_D1_S2)) {
      session.createTimeseries(
          ROOT_SG1_D1_S2, TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
    }
  }
​
  private static void insertRecord() throws IoTDBConnectionException, StatementExecutionException {
    String deviceId = ROOT_SG1_D1;
    List<String> measurements = new ArrayList<>();
    List<TSDataType> types = new ArrayList<>();
    measurements.add("s1");
    types.add(TSDataType.INT64);
    types.add(TSDataType.INT64);
    types.add(TSDataType.INT64);
​
    for (long time = 0; time < 10000; time++) {
      //        for (long time = 0; time < 10; time++) {
      List<Object> values = new ArrayList<>();
      values.add(1L);
      session.insertRecord(deviceId, time, measurements, types, values);
      session.executeNonQueryStatement("flush");
    }
  }
}


2 测算一个文件的BloomFilter缓存项的大小

方案

往一个文件里写10万个序列,然后打印一下计算出来的大小

实验结果

cache item size is 8128

测试代码

public class BloomFilterExperiment2 {

  private static Session session;
  private static final int MEASUREMENT_COUNT = 10000;
  private static final String ROOT_SG1_D1 = "root.sg1.d1";
    private static final String LOCAL_HOST = "127.0.0.1";

  public static void main(String[] args)
      throws IoTDBConnectionException, StatementExecutionException {
    session =
        new Session.Builder().host(LOCAL_HOST).port(6667).username("root").password("root").build();
    session.open(false);

    // set session fetchSize
    session.setFetchSize(10000);

    try {
      session.setStorageGroup("root.sg1");
    } catch (StatementExecutionException e) {
      if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
        throw e;
      }
    }
    createTimeseries();
    insertRecord();
    query();
    session.close();
  }

  private static void query() throws IoTDBConnectionException, StatementExecutionException {
    long startTime = System.currentTimeMillis();
    for (int i = 0; i < 1; i++) {
      String measurement = "s"+(i+1);
      SessionDataSet dataSet = session.executeQueryStatement("select "+measurement+" from root.sg1.d1");
      dataSet.closeOperationHandle();
    }
    long endTime = System.currentTimeMillis();
    System.out.println(endTime - startTime);
  }

  private static void createTimeseries()
      throws IoTDBConnectionException, StatementExecutionException {
    for (int i = 0; i < MEASUREMENT_COUNT; i++) {
      String path = ROOT_SG1_D1+".s"+(i+1);
      if (!session.checkTimeseriesExists(path)) {
        session.createTimeseries(
                path, TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
      }
    }
  }

  private static void insertRecord() throws IoTDBConnectionException, StatementExecutionException {
    String deviceId = ROOT_SG1_D1;
    List<String> measurements = new ArrayList<>();
    List<TSDataType> types = new ArrayList<>();
    for (int i = 0; i < MEASUREMENT_COUNT; i++) {
      measurements.add("s"+(i+1));
      types.add(TSDataType.INT64);
    }

    for (long time = 0; time < 1; time++) {
      List<Object> values = new ArrayList<>();
      for (int i = 0; i < MEASUREMENT_COUNT; i++) {
        values.add(1L);
      }
      session.insertRecord(deviceId, time, measurements, types, values);
    }
    session.executeNonQueryStatement("flush");
  }
}

启动server,在BloomFilterCache get时打印size即可



  • No labels