可以先阅读该文档:客户端缓存 leader 设计文档

背景

目前客户端缓存了device和endpoint的关系,其主要数据结构如下所示,但是随着device的增加,其占用客户端内存就会逐渐增加。假设一个device是10byte,1000万的device就需要100MB的内存。要是一个device 100byte,这就需要1GB内存。对于客户端来说内存压力较大。


此外,目前客户端缓存的实现方式仅对于每个device被第二次写入时才能起到转发效果。如果每个session写了大量不同device的数据且每个device都只写了一次,那cache相当于就没有用了。


另外,分布式目前是以存储组的时间分区为粒度做数据分区和调度的,因此客户端用 deviceId 做 cache leader 的 key 粒度略微偏大。

protected Map<String, EndPoint> deviceIdToEndpoint;

protected Map<EndPoint, SessionConnection> endPointToSessionConnection;

目标

  • 解决随着device的增多,客户端内存占用越来越大的问题。
  • 能够更有效的利用分区缓存信息。

接口设计

方案1

方案1: 增加新的接口,增加一个参数是storage group;

  

// Data writing interface。deviceId可以是全路径,也可以是storage group+devie id组成全路径,个人倾向于device id是全路径。

/**

 * @param storageGroup the storage group.

 * @param deviceId     the device id

 * @param time         the record time.

 * @param measurements the measurements.

 * @param values       the values.

 */

public void insertRecord(

    String storageGroup,

    String deviceId,

    long time,

    List<String> measurements,

    List<String> values)

    throws IoTDBConnectionException, StatementExecutionException {

}

  

public void insertRecord(

    String storageGroup,

    String deviceId,

    long time,

    List<String> measurements,

    List<TSDataType> types,

    List<Object> values)

    throws IoTDBConnectionException, StatementExecutionException {

}

  

public void insertRecord(

    String storageGroup,

    String deviceId,

    long time,

    List<String> measurements,

    List<TSDataType> types,

    Object... values)

    throws IoTDBConnectionException, StatementExecutionException {

}

  

  

// 但是这个新接口与下面的这个旧接口冲突

public void insertRecords(

    List<String> storageGroups,

    List<String> deviceIds,

    List<Long> times,

    List<List<String>> measurementsList,

    List<List<String>> valuesList)

    throws IoTDBConnectionException, StatementExecutionException {

}

  

public void insertRecords(

    List<String> deviceIds,

    List<Long> times,

    List<List<String>> measurementsList,

    List<List<TSDataType>> typesList,

    List<List<Object>> valuesList)

    throws IoTDBConnectionException, StatementExecutionException {

}

  

  

public void insertRecords(

    List<String> storageGroups,

    List<String> deviceIds,

    List<Long> times,

    List<List<String>> measurementsList,

    List<List<TSDataType>> typesList,

    List<List<Object>> valuesList)

    throws IoTDBConnectionException, StatementExecutionException {

}

  

  

public void insertRecordsOfOneDevice(

    String storageGroup,

    String deviceId,

    List<Long> times,

    List<List<String>> measurementsList,

    List<List<TSDataType>> typesList,

    List<List<Object>> valuesList)

    throws IoTDBConnectionException, StatementExecutionException {

}

  

public void insertRecordsOfOneDevice(

    String storageGroup,

    String deviceId,

    List<Long> times,

    List<List<String>> measurementsList,

    List<List<TSDataType>> typesList,

    List<List<Object>> valuesList,

    boolean haveSorted)

    throws IoTDBConnectionException, StatementExecutionException {

   

}

// insert tablet 接口不变,但是Tablet中需要新增加sg属性。

public void insertTablet(Tablet tablet){}

  

方案2

客户端不记录deviceIdToEndpoint的map,而是记录storagegroupToEndpoint的 map。现有接口不修改,每次写入利用压缩前缀树 device id 的前缀判断此device所属的sg,若存在对应的 sg 则走该 endpoint 对应的 sessionConnection直接转发,若不存在则走 defaultSessionConnection。

对于协调者节点返回的 TSStatus,判断该device是否需要转发,如果需要则记录 TSStatus 中的该 deviceID 对应的 sg 信息到压缩前缀树中,同时记录该 sg 对应的 endpoint,这样下一次属于同一存储组底下的device就能够直接发到对应的节点了。

优缺点对比

方案1;直观,计算简单,但需要改动用户接口,极大增加了用户的迁移成本

方案2无需改动现有接口,且实现基本和现有框架一致,只不过降低了缓存的粒度(device级别->存储组级别)。但是每次插入都需要根据device id去获取sg的信息,需要前缀字符串对比,不过经过测试(测试数据附文档最后),用普通的trie树耗时不多且内存占用不是大。为了代码质量和减少内存占用,需要进一步考虑用压缩trie树实现。


综上:决定使用方案2,对用户友好且开发难度不大。

Server端实现

当coordinator节点发现写入请求需要转发时,在tsstatus中既携带对应的endpoint也携带该写入请求device对应的sg(注:批量日志请求需要对每个需要转发的device都返回一个sg和endpoint让客户端去维护,可以参照现有的实现逻辑)

重定向状态码

现在 java 客户端执行一个写请求来评估该deviceId的写入请求是否需要转发是专门通过一个 707 状态码来标识的,其中 java 客户端对该状态码专门做了特判处理,从而支持 cache leader 的优化。然而这也可能导致 go,python 等未实现 cache leader 缓存优化的客户端认为该写入失败,因此更优雅的方式是协调者节点对于要转发的请求仍然返回 200 状态, java 客户端直接用 redirectNode 字段来检测该请求是否需要转发,这样其他尚未做 cache leader 优化的客户端也能无缝支持分布式。因此争取干掉 707.

参考文献

  1. https://chubaofs.readthedocs.io/zh_CN/latest/design/client.html
  2. https://www.taosdata.com/cn/documentation/architecture

类似系统做法

TDEngine: https://segmentfault.com/a/1190000023534651?sort=votes

chubaofs:

客户端进程在以下几种情况下会使用客户端缓存。

客户端为了减少与资源管理节点的通信负担,会在挂载启动时获取该挂载卷中所有元数据和数据节点的地址,并且进行缓存,后续会定期从资源管理节点进行更新。

客户端为了减少与元数据节点的通信,会缓存inode,dentry以及extent元数据信息。通常意义上,读请求应该能够读到之前所有的写入,但是客户端元数据缓存可能会导致多客户端写同一个文件时的一致性问题。所以,ChubaoFS的设计中,不同客户端,或者说挂载点,可以同时读一个文件,但是不能够同时写一个文件(注意:不同进程可以从同一个挂载点并发写一个文件)。打开文件时,客户端会强制从元数据节点更新文件元数据信息。

由于故障恢复时,raft复制组的主节点有可能发生变化,导致客户端缓存的主节点地址无效。因此,客户端在发送请求收到not leader回复时,会轮询重试该复制组的所有节点。重试成功后识别出新的主节点,客户端会缓存新的主节点地址。


前缀树测试

trie树支持共计96个字符,ASCII码从32-127,即sg可以支持如下字符,测试结果如下所示:

“ !"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefghijklmnopqrstuvwxyz{|}”

sg number,最小长度,最大长度(不算root.)

trie树内存占用

device num

检索平均耗时

20,5,20

117KB

20,000,000

109ns

40, 5, 20

230KB

20,000,000

108ns

100, 5, 20

540KB

20,000,000

114ns

100,5,100

2.3MB

2,000,000

274ns

100,50,100

3.1MB

2,000,000

326ns



package prefixTree;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import utils.RamUsageEstimator;
/**
 * @author HouliangQi (neuyilan@163.com)
 * @description
 * @since 2021-04-12 15:17
 */
public class PrefixTree {
  private int sgNum = 20;
  private int sgNameMinLength = 5;
  private int sgNameMaxLength = 20;
  private String ROOT = "root.";
  private int deviceNum = 10_000_000;
  private Trie trieTree = new Trie();
  private Random random = new Random();
  private List<String> sgNameList = new ArrayList<>();
  private List<String> allDevicesList = new ArrayList<>();
  public static void main(String[] args) {
    PrefixTree prefixTree = new PrefixTree();
    prefixTree.constructTrieTree();
    System.out
        .println("sg number = " + prefixTree.sgNameList.size() + ",the trie tree memory usage="
            + RamUsageEstimator.humanSizeOf(prefixTree));
    prefixTree.constructDevice();
    prefixTree.test();
  }


  private void constructTrieTree() {
    String template = constructTemplate();
    for (int i = 0; i < sgNum; i++) {
      String tmpSgName = ROOT + createRandomStr(template);
      trieTree.insert(tmpSgName);
      sgNameList.add(tmpSgName);
    }
  }


  private void constructDevice() {
    int deviceNumForEachSg = deviceNum / sgNameList.size();
    for (int i = 0; i < sgNameList.size(); i++) {
      String sgName = sgNameList.get(i);
      for (int j = 0; j < deviceNumForEachSg; j++) {
        String tmpDevice = sgName + ".device_" + j;
        String noExistTmpDevice = sgName + "_no" + ".device_" + j;
        allDevicesList.add(noExistTmpDevice);
        allDevicesList.add(tmpDevice);
      }
    }
  }
  private void test() {
    long totalCostTime = 0;
    for (int i = 0; i < allDevicesList.size(); i++) {
      long start = System.nanoTime();
      boolean result = trieTree.search2(allDevicesList.get(i));
      totalCostTime += (System.nanoTime() - start);
    }
    System.out
        .println(
            "total device num = " + allDevicesList.size() + ", total time cost= " + totalCostTime
                + "ns, avg=" + totalCostTime / allDevicesList.size() + "ns");
  }


  public String constructTemplate() {
    String template = "";
    for (char c = ' '; c < '~'; c++) {
      template += c;
    }
    System.out.println("the string template=" + template);
    return template;
  }
  public String createRandomStr(String template) {
    StringBuilder sb = new StringBuilder();
    int deviceLength = (int) (Math.random() * (sgNameMaxLength - sgNameMinLength) + sgNameMinLength
        + 1);
    sb.append("\"");
    int templateLength = template.length();
    for (int i = 0; i < deviceLength; i++) {
      int number = random.nextInt(templateLength);
      char tmpChar = template.charAt(number);
      if (tmpChar == '"') {
        sb.append('\\');
      }
      sb.append(tmpChar);
    }
    sb.append("\"");
    return sb.toString();
  }
}
class TrieNode {
  //记录孩子节点
  TrieNode[] child;
  //记录当前节点是不是一个单词的结束字母
  boolean is_end;
  public TrieNode() {
    //从空格开始,到ascii结束,共计96个字符
    child = new TrieNode[96];
    is_end = false;
  }
}
class Trie {
  private final char first = ' ';
  // 记录前缀树的根
  TrieNode root;
  /**
   * Initialize your data structure here.
   */
  public Trie() {
    root = new TrieNode();
  }
  /**
   * Inserts a word into the trie.
   */
  public void insert(String word) {
    TrieNode ptr = root;//从根出发
    for (int i = 0; i < word.length(); i++) {
      char c = word.charAt(i);//对于每个单词
      if (ptr.child[c - first] == null) {//如果c - first为空,说明还没有存入
        ptr.child[c - first] = new TrieNode();//存入节点
      }
      ptr = ptr.child[c - first];//指向当前节点
    }
    ptr.is_end = true;//最后的节点为单词的最后一个单词,is_end设置为true
  }
  /**
   * Returns if the word is in the trie.
   */
  public boolean search(String word) {
    TrieNode ptr = root;//从根出发
    for (int i = 0; i < word.length(); i++) {
      char c = word.charAt(i);//对于每个字母
      if (ptr.child[c - first] == null) {//如果不存在于前缀树中,返回false
        return false;
      }
      ptr = ptr.child[c - first];
    }
    return ptr.is_end;//如果所有字符都在前缀树中,那么判断最后一个字母结束标志是否为true,
    // 为true,返回true,说明单词找到,否则,false,没找到
  }
  /**
   * Returns if there is any word in the trie that starts with the given prefix.
   */
  public boolean startsWith(String prefix) {
    TrieNode ptr = root;//从根出发
    for (int i = 0; i < prefix.length(); i++) {
      char c = prefix.charAt(i);//对于每个字母
      if (ptr.child[c - first] == null) {//如果不存在于前缀树中,返回false
        return false;
      }
      ptr = ptr.child[c - first];
    }
    return true;
  }
  public boolean search2(String word) {
    TrieNode ptr = root;//从根出发
    char pre = ' ';
    char current = ' ';
    int quotationNum = 0;
    for (int i = 0; i < word.length(); i++) {
      char c = word.charAt(i);//对于每个字母
      current = c;
      if (pre != '\\' && current == '"') {
        quotationNum++;
      }
      pre = c;
      if (ptr.child[c - first] == null) {//如果不存在于前缀树中,返回false
        if (c == '.' && quotationNum % 2 == 0) {
          return true;
        }
        return false;
      }
      ptr = ptr.child[c - first];
    }
    return ptr.is_end;//如果所有字符都在前缀树中,那么判断最后一个字母结束标志是否为true,
    // 为true,返回true,说明单词找到,否则,false,没找到
  }
}



  • No labels