THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!

Apache Kylin : Analytical Data Warehouse for Big Data

Page tree

Welcome to Kylin Wiki.

Background



希望解决的问题

  • 删除 ModelDesc,只保留 CubeDesc
  • 消除只支持 64 个 维度的限制
  • 在 Agg 级别 和 Cuboid 级别更加灵活地设置 Rowkey 和 shardBy column
  • 可能支持明细索引


New metadata Design


Main Module

  • 数据源
    • Hive
    • Kafka
  • 权限管理
    • User/Group
  • 模型
    • DataModel
    • IndexPlan
    • Dataflow
  • 查询历史
  • 构建任务

Cube and Model Metadata

Original NameNew NameComment

DataModelDesc

DELETED

CubeDesc

DataModel

定义维度和度量,还有事实表和维度表的 Join 关系。
CubeInstance

IndexPlan

IndexPlan 对应于 Kylin 3 中的 CubeInstance。IndexPlan 里面有多个index(索引),每个IndexEntity (索引)代表了不同的维度和度量组合。

IndexPlan 主要管理 Index/Cuboid。

CubeInstance

Dataflow

IndexPlan 也对应于 Kylin 3 中的 CubeInstance,Dataflow 里面包含多个 Segment/Partition。

主要管理 Segment 和如何构建 Segment。

CubeSegment

NDataSegment


N/AIndexEntityIndexPlan 中的索引(IndexEntity)可以分为聚合索引(aggIndex)和明细索引(table),聚合索引是对原始数据根据维度度量进行聚合计算之后的数据的索引,用于回答聚合查询,聚合索引是 Kylin 能加快查询速度的核心;明细索引则比较类似于传统数据库当中的索引,是对原始数据形成的大宽表的索引,适合回答精确到记录的明细查询。
CuboidLayoutEntity

IndexEntity 可以看作是一个抽象的概念,LayoutEntity 是 IndexEntity 的具体表现。从图一中可以看到,每个 IndexEntity 下又有多个 LayoutEntity,同一个 IndexEntity 下的 LayoutEntity 所使用的维度和度量都相同,col_order的顺序或 shardBy 列有不同,col_order 的顺序或 shardBy 列只会影响到查询时的速度,除此以外同一个 IndexEntity 下的 LayoutEntity 没有区别。


AggregationGroup

聚合索引组(aggregationGroups),用户可以在此定义每个聚合组所使用的维度、度量、必须维度、层级维度和联合维度等,用于生成自定义聚合索引。如果没有选择必须维度、层级维度和联合维度,包含的维度为n个的话,一个聚合索引组可以生成2的n次方减一个索引。

Agg


Dataflow



DataflowDetail



  1. IndexPlan  包含多个 IndexEntity
  2. IndexEntity 包含多个 LayoutEntity
  3. MDC表示一个自定义聚合索引所能包含的最大维度数,在自定义聚合索引的生成过程中,由于MDC的限制,维度数超过MDC的聚合索引将会被剪枝。

UML Diagram

DataModel String rootFactTableName;List<JoinTableDesc> joinTables;PartitionDesc partitionDesc;List<NamedColumn> allNamedColumns;List<ComputedColumnDesc> computedColumnDescs;List<Measure> allMeasures;int storageType;String project; JoinTableDesc String table;NDataModel.TableKind kind ;JoinDesc join; JoinDesc String type;String[] primaryKey;String[] foreignKey;String primaryTable;String foreignTable; TableKind FACT,LOOKUP DataFlow RealizationStatusEnum status;int queryHitCount = 0;long lastQueryTime = 0L;Map<Long, FrequencyMap> layoutHitCount ;Segments<NDataSegment> segments; IndexPlan RuleBasedIndex ruleBasedIndex;List<IndexEntity> indexes;List<IndexEntity> toBeDeletedIndexesList<Integer> aggShardByColumnsList<NDictionaryDesc> dictionaries; IndexEntity List<Integer> dimensions ;List<Integer> measures;List<LayoutEntity> layouts; LayoutEntity List<Integer> colOrder;List<Integer> shardByColumns;List<Integer> partitionByColumns;List<Integer> sortByColumns;int storageType; NDataSegment long createTimeUTC;SegmentStatusEnum status;SegmentRange segmentRange;TimeRange timeRange;Map<String, String> dictionaries;Map<String, String> snapshots;long lastBuildTime;long sourceCount = 0;long sourceBytesSize = 0;// Computed value belowtransient NDataSegDetails segDetails; NDataSegDetails String dataflowId;List<NDataLayout> layouts; RuleBasedIndex AggregationGroup CuboidScheduler 索引管理 Join 关系管理 Segment 管理 基于 AggregationGroup 生成 IndexEntity Cuboid 级别的自定义设置(一对多)


Cuboid and CuboidScheduler

Class NameComment

CuboidScheduler

 根据 IndexPlan 获取整个 Cuboid 树的算法实现
CuboidSchedulerV2
RuleBasedIndex基于聚合组设置的一些 Index


聚合索引更新和索引树生成流程

索引树生成流程

介绍如何根据聚合索引组生成索引树,即从 aggregationGroups 到 allCuboid(索引树)的过程。

mask值的计算方法

索引树上的每个节点都储存了一个索引的mask值,最终所有的mask值对存放在allCuboidid中,因此我们首先来介绍mask值的计算方法。
首先会根据各个聚合组选择的维度进行合并和排序,合并排序之后这些维度会有一个顺序,同时度量也会有一个顺序。例如排序后的顺序为a、b、c、d、e 、m1、m2。那么这7个列的mask分别为 64(二进制为1000000)、32(0100000)、16(0010000)、8(0001000)、4(0000100)、2(0000010)、1(0000001) , index的mask值就是所使用到的列的mask之和。假如只创建了一个agg group,那么各个index的mask值就如下图所示。


多个agg group也是同样的道理。假设用户编辑了以下3个Agg group:Agg group1 选择的维度 a;选择的度量m1 Agg group2 选择的维度 b,c;选择的度量m1 Agg group3 选择的维度 a;选择的度量m1,m2。并且假设生成的维度和度量顺序为 a,b,c,m1,m2。那么生成的index树和mask值如下图所示。

自底向上的index树生成算法

mask值计算好之后,就可以自底向上按照规则生成所需的index。
从最底层的那个index开始向上找父亲,mask=0的那个index,就是第一层index。
通过位运算可以轻松得到两个index之间的包含关系,通过这个包含关系来帮助他找到父亲。在寻找父亲时优先寻找mandatoryColumnMask,一旦找到,这一层的寻找就结束。如图所示的蓝色index,便是我们找到的第二层index。也就是说,如果存在必要维度,那所有必要维度所组成的index一定就是我们要找的第二层index。
接下来再由第二层index去分别寻找他们的父亲,所有他们的父亲共同组成第三层index。如图的绿色index便是我们找到的第三层index,其中mask=79的是由所有联合维度共同组成的index,也就是说若存在必要维度和联合维度,联合维度共同组成的index一定会在第三层被加入,若不存在必要维度只存在联合维度,则联合维度共同组成的index一定会在第二层被加入;其中mask=99的index是通过层级维度添加进来,层级维度每层只能添加一个维度进来。同理红色的是生成的第四层,灰色的是生成的第五层。

最终,除第一层以外,所有被加入进来的index(图中带颜色的index)会被生成,其他的都将被剪枝。所有被生成的index的mask都被放置在allCuboidIds中,用于图三后续的操作。

聚合索引更新流程

用户编辑好的聚合索引组即对应 IndexPlan 中的 aggregationGroups 字段,里面存放着用户所选择的维度、必要维度、层级维度、联合维度和度量等;根据相应的规则,每一个聚合索引组会生成一颗索引树,树上的每个节点都存放着一个索引的mask值,最终所有的mask值会存放在allCuboidIds中。


由一个cuboidId最终生成ruleBasedLayouts会有三种情况出现:

情况一:与已有的layout完全相同(相同指的是两个layoutEntity的hash值相同,也就是它们的colOrder, shardByColumns, sortByColumns等等都相同)。这种情况就不用生成新的layout,直接把这个已有的layout放入result中。

情况二:与已有的layout属于相同的index。这种情况会根据已有的layout的id加1生成一个新的id,然后复用这个已有的layout的index。

情况三:没有已有的layout和这个layout有相同的index。此时就创建一个全新的layout和index。


遍历完成之后所有的layout都存放在result中,再把result中出现在黑名单中的layout删除。最后这个 result 会赋值给 indexPlan 的 ruleBasedLayouts,完成 IndexPlan 中聚合索引的更新。


大致代码

CuboidScheduler
/**
 * Defines a cuboid tree, rooted by the base cuboid. A parent cuboid generates its child cuboids.
 */
public abstract class CuboidScheduler implements Serializable {

    protected static final String OUT_OF_MAX_COMBINATION_MSG_FORMAT = "Too many cuboids for the cube. Cuboid combination reached %s and limit is %s. Abort calculation.";

    public static CuboidScheduler getInstance(IndexPlan indexPlan, RuleBasedIndex ruleBasedIndex, boolean skipAll) {
        if (ruleBasedIndex.getSchedulerVersion() == 1) {
            return new CuboidSchedulerV1(indexPlan, ruleBasedIndex, skipAll);
        } else if (ruleBasedIndex.getSchedulerVersion() == 2) {
            return new CuboidSchedulerV2(indexPlan, ruleBasedIndex, skipAll);
        }
        throw new NotImplementedException("Not Support version " + ruleBasedIndex.getSchedulerVersion());
    }

    public static CuboidScheduler getInstance(IndexPlan indexPlan, RuleBasedIndex ruleBasedIndex) {
        return getInstance(indexPlan, ruleBasedIndex, false);
    }

    // ============================================================================

    protected final IndexPlan indexPlan;
    protected final RuleBasedIndex ruleBasedAggIndex;

    protected CuboidScheduler(final IndexPlan indexPlan, RuleBasedIndex ruleBasedAggIndex) {
        this.indexPlan = indexPlan;
        this.ruleBasedAggIndex = ruleBasedAggIndex == null ? indexPlan.getRuleBasedIndex() : ruleBasedAggIndex;
    }

    /**
     * Returns all cuboids on the tree.
     */
    public abstract List<ColOrder> getAllColOrders();

    /**
     * Returns the number of all cuboids.
     */
    public abstract int getCuboidCount();

    public abstract void validateOrder();

    public abstract void updateOrder();

    /**
     * optional
     */
    public abstract List<ColOrder> calculateCuboidsForAggGroup(AggregationGroup agg);

    // ============================================================================

    public IndexPlan getIndexPlan() {
        return indexPlan;
    }

    protected ColOrder extractDimAndMeaFromBigInt(BigInteger bigInteger) {
        List<Integer> allDims = ruleBasedAggIndex.getDimensions();
        List<Integer> allMeas = ruleBasedAggIndex.getMeasures();
        return extractDimAndMeaFromBigInt(allDims, allMeas, bigInteger);
    }

    protected ColOrder extractDimAndMeaFromBigInt(List<Integer> allDims, List<Integer> allMeas, BigInteger bigInteger) {
        List<Integer> dims = Lists.<Integer> newArrayList();
        List<Integer> meas = Lists.<Integer> newArrayList();

        int size = allDims.size() + allMeas.size();

        for (int i = 0; i < size; i++) {
            int shift = size - i - 1;
            if (bigInteger.testBit(shift)) {
                if (i >= allDims.size()) {
                    meas.add(allMeas.get(i - allDims.size()));
                } else {
                    dims.add(allDims.get(i));
                }
            }
        }

        return new ColOrder(dims, meas);
    }

    public static class ColOrder {
        private List<Integer> dimensions;
        private List<Integer> measures;

        public List<Integer> toList() {
            return Stream.concat(dimensions.stream(), measures.stream()).collect(Collectors.toList());
        }

    }
}


// Implementation

public class CuboidSchedulerV2 extends CuboidScheduler {

    private final BigInteger max;
    private final int measureSize;
    private transient final OrderedSet<ColOrder> allColOrders;

    CuboidSchedulerV2(IndexPlan indexPlan, RuleBasedIndex ruleBasedAggIndex, boolean skipAll) {
        super(indexPlan, ruleBasedAggIndex);

        this.max = ruleBasedAggIndex.getFullMask();
        this.measureSize = ruleBasedAggIndex.getMeasures().size();
        boolean isBaseCuboidValid = ruleBasedAggIndex.getIndexPlan().getConfig().isBaseCuboidAlwaysValid();

        // handle nRuleBasedCuboidDesc has 0 dimensions
        allColOrders = new OrderedSet<>();
        if (max.bitCount() == 0 || skipAll) {
            return;
        }
        long maxCombination = indexPlan.getConfig().getCubeAggrGroupMaxCombination() * 10;
        maxCombination = maxCombination < 0 ? Long.MAX_VALUE : maxCombination;
        if (isBaseCuboidValid) {
            allColOrders.add(new ColOrder(ruleBasedAggIndex.getDimensions(), ruleBasedAggIndex.getMeasures()));
        }
        for (NAggregationGroup agg : ruleBasedAggIndex.getAggregationGroups()) {
            allColOrders.addAll(calculateCuboidsForAggGroup(agg));
            if (allColOrders.size() > maxCombination) {
                throw new OutOfMaxCombinationException(String.format(Locale.ROOT, OUT_OF_MAX_COMBINATION_MSG_FORMAT,
                        allColOrders.size(), maxCombination));
            }
        }
    }

    @Override
    public int getCuboidCount() {
        return allColOrders.size();
    }

    @Override
    public void validateOrder() {
        // do nothing
    }

    @Override
    public void updateOrder() {
        // do nothing
    }

    @Override
    public List<ColOrder> getAllColOrders() {
        return allColOrders.getSortedList();
    }

    /**
     * Get all valid cuboids for agg group, ignoring padding
     *
     * @param agg agg group
     * @return cuboidId list
     */
    @Override
    public List<ColOrder> calculateCuboidsForAggGroup(NAggregationGroup agg) {
        Set<CuboidBigInteger> cuboidHolder = new OrderedSet<>();

        // build tree structure
        Set<CuboidBigInteger> children = getOnTreeParentsByLayer(Sets.newHashSet(new CuboidBigInteger(BigInteger.ZERO)),
                agg); // lowest level cuboids
        while (!children.isEmpty()) {
            if (cuboidHolder.size() + children.size() > indexPlan.getConfig().getCubeAggrGroupMaxCombination()) {
                throw new OutOfMaxCombinationException("Holder size larger than kylin.cube.aggrgroup.max-combination");
            }
            cuboidHolder.addAll(children);
            children = getOnTreeParentsByLayer(children, agg);
        }

        return cuboidHolder.stream().map(c -> {
            val colOrder = extractDimAndMeaFromBigInt(c.getDimMeas());
            colOrder.getDimensions().sort(Comparator.comparingInt(x -> ArrayUtils.indexOf(agg.getIncludes(), x)));
            return colOrder;
        }).collect(Collectors.toList());
    }

    private Set<CuboidBigInteger> getOnTreeParentsByLayer(Collection<CuboidBigInteger> children,
            final NAggregationGroup agg) {
        Set<CuboidBigInteger> parents = new OrderedSet<>();
        for (CuboidBigInteger child : children) {
            parents.addAll(getOnTreeParents(child, agg));
        }
        val filteredParent = Iterators.filter(parents.iterator(), cuboidId -> {
            if (cuboidId == null)
                return false;

            return agg.checkDimCap(cuboidId.getDimMeas());
        });
        parents = new OrderedSet<>();
        while (filteredParent.hasNext()) {
            parents.add(filteredParent.next());
        }
        return parents;
    }

    private Set<CuboidBigInteger> getOnTreeParents(CuboidBigInteger child, NAggregationGroup agg) {
        Set<CuboidBigInteger> parentCandidate = new OrderedSet<>();

        BigInteger tmpChild = child.getDimMeas();
        if (tmpChild.equals(agg.getPartialCubeFullMask())) {
            return parentCandidate;
        }

        if (!agg.getMandatoryColumnMask().equals(agg.getMeasureMask())) {
            if (agg.isMandatoryOnlyValid()) {
                if (fillBit(tmpChild, agg.getMandatoryColumnMask(), parentCandidate)) {
                    return parentCandidate;
                }
            } else {
                tmpChild = tmpChild.or(agg.getMandatoryColumnMask());
            }
        }

        for (BigInteger normal : agg.getNormalDimMeas()) {
            fillBit(tmpChild, normal, parentCandidate);
        }

        for (BigInteger joint : agg.getJoints()) {
            fillBit(tmpChild, joint, parentCandidate);
        }

        for (NAggregationGroup.HierarchyMask hierarchy : agg.getHierarchyMasks()) {
            for (BigInteger mask : hierarchy.getAllMasks()) {
                if (fillBit(tmpChild, mask, parentCandidate)) {
                    break;
                }
            }
        }

        return parentCandidate;
    }

    private boolean fillBit(BigInteger origin, BigInteger other, Set<CuboidBigInteger> coll) {
        // if origin contains does not all elements in other
        if (!(origin.and(other)).equals(other)) {
            coll.add(new CuboidBigInteger(origin.or(other), measureSize));
            return true;
        }
        return false;
    }

}



RuleBasedIndex.java
public class RuleBasedIndex implements Serializable, IKeep {

    @Getter
    @JsonBackReference
    private IndexPlan indexPlan;

    @JsonProperty("dimensions")
    private List<Integer> dimensions = Lists.newArrayList();
    @JsonProperty("measures")
    private List<Integer> measures = Lists.newArrayList();

    @Setter
    @JsonProperty("global_dim_cap")
    private Integer globalDimCap;

    @Getter
    @JsonProperty("aggregation_groups")
    private List<NAggregationGroup> aggregationGroups = Lists.newArrayList();

    @Setter
    @Getter
    @JsonProperty("layout_id_mapping")
    private List<Long> layoutIdMapping = Lists.newArrayList();

    @Getter
    @JsonProperty("parent_forward")
    @JsonInclude(JsonInclude.Include.NON_NULL)
    private int parentForward = 3;

    @Setter
    @Getter
    @JsonProperty("index_start_id")
    private long indexStartId;

    @Getter
    @Setter
    @JsonProperty("last_modify_time")
    private long lastModifiedTime = System.currentTimeMillis();

    @Setter
    @Getter
    @JsonProperty("layout_black_list")
    private Set<Long> layoutBlackList = new HashSet<>();

    @Setter
    @Getter
    @JsonProperty("scheduler_version")
    private int schedulerVersion = 1;

    @Setter
    @Getter
    @JsonProperty("index_update_enabled")
    private boolean indexUpdateEnabled = true;

    // computed fields below

    @Getter
    private transient BiMap<Integer, TblColRef> effectiveDimCols; // BiMap impl (com.google.common.collect.Maps$FilteredEntryBiMap) is not serializable
    @Getter
    private ImmutableBiMap<Integer, NDataModel.Measure> orderedMeasures;
    @Getter
    private ImmutableBitSet dimensionBitset = null;
    @Getter
    private ImmutableBitSet measureBitset = null;
    @Getter
    private ImmutableSet<TblColRef> dimensionSet = null;
    @Getter
    private ImmutableSet<NDataModel.Measure> measureSet = null;
    private Map<Integer, Integer> dimMea2bitIndex; // dim id/measure id -> bit index
    @Getter
    private BigInteger fullMask = BigInteger.ZERO;

    @Getter(lazy = true)
    private final CuboidScheduler cuboidScheduler = initCuboidScheduler();

    public void init() {
        NDataModel model = getModel();
        this.dimensionBitset = ImmutableBitSet.valueOf(dimensions);
        this.measureBitset = ImmutableBitSet.valueOf(measures);

        this.effectiveDimCols = Maps.filterKeys(model.getEffectiveCols(),
                input -> input != null && dimensionBitset.get(input));

        this.dimensionSet = ImmutableSet.copyOf(this.effectiveDimCols.values());

        // all layouts' measure order follow cuboid_desc's define
        ImmutableBiMap.Builder<Integer, NDataModel.Measure> measuresBuilder = ImmutableBiMap.builder();
        for (int m : measures) {
            if (model.getEffectiveMeasures().containsKey(m)) {
                measuresBuilder.put(m, model.getEffectiveMeasures().get(m));
            }
        }
        this.orderedMeasures = measuresBuilder.build();
        this.measureSet = orderedMeasures.values();

        dimMea2bitIndex = Maps.newHashMap();
        int bitSize = dimensions.size() + measures.size();
        for (int i = 0; i < dimensions.size(); i++) {
            dimMea2bitIndex.put(dimensions.get(i), bitSize - i - 1);
        }

        for (int i = 0; i < measures.size(); i++) {
            dimMea2bitIndex.put(measures.get(i), measures.size() - i - 1);
        }

        if (CollectionUtils.isNotEmpty(dimensions)) {
            for (int i = 0; i < dimensions.size() + measures.size(); i++) {
                fullMask = fullMask.setBit(i);
            }
        }

        for (NAggregationGroup nAggregationGroup : aggregationGroups) {
            nAggregationGroup.init(this);
        }
    }

    public CuboidScheduler initCuboidScheduler() {
        return CuboidScheduler.getInstance(indexPlan, this);
    }

    public int getGlobalDimCap() {
        return globalDimCap == null ? 0 : globalDimCap;
    }

    public int getColumnBitIndex(Integer colId) {
        return dimMea2bitIndex.get(colId);
    }

    public Set<LayoutEntity> genCuboidLayouts() {
        return genCuboidLayouts(Sets.newHashSet(), Sets.newHashSet(), true);
    }

    public boolean getIndexUpdateEnabled() {
        return indexUpdateEnabled;
    }

    // ============================================================================
    // NOTE THE SPECIAL GETTERS AND SETTERS TO PROTECT CACHED OBJECTS FROM BEING MODIFIED
    // ============================================================================

    public NDataModel getModel() {
        return indexPlan.getModel();
    }

    public void setIndexPlan(IndexPlan indexPlan) {
        checkIsNotCachedAndShared();
        this.indexPlan = indexPlan;
    }

    public List<Integer> getDimensions() {
        return isCachedAndShared() ? Lists.newArrayList(dimensions) : dimensions;
    }

    public void setDimensions(List<Integer> dimensions) {
        checkIsNotCachedAndShared();
        this.dimensions = dimensions;
    }

    public List<Integer> getMeasures() {
        return isCachedAndShared() ? Lists.newArrayList(measures) : measures;
    }

    public void setMeasures(List<Integer> measures) {
        checkIsNotCachedAndShared();
        this.measures = measures;
    }

    public void setAggregationGroups(List<NAggregationGroup> aggregationGroups) {
        checkIsNotCachedAndShared();
        this.aggregationGroups = aggregationGroups;
    }

    public boolean isCachedAndShared() {
        return indexPlan != null && indexPlan.isCachedAndShared();
    }

    public void checkIsNotCachedAndShared() {
        if (indexPlan != null)
            indexPlan.checkIsNotCachedAndShared();
    }

    public void setParentForward(int parentForward) {
        checkIsNotCachedAndShared();
        this.parentForward = parentForward;
    }

    @Getter(lazy = true)
    private final ImmutableBitSet measuresBitSet = initMeasuresBitSet();

    private ImmutableBitSet initMeasuresBitSet() {
        return ImmutableBitSet.valueOf(getMeasures());
    }

    Set<LayoutEntity> genCuboidLayouts(Set<LayoutEntity> previousLayouts) {
        return genCuboidLayouts(previousLayouts, Sets.newHashSet(), true);
    }

    Set<LayoutEntity> genCuboidLayouts(Set<LayoutEntity> previousLayouts, Set<LayoutEntity> needDelLayouts) {
        return genCuboidLayouts(previousLayouts, needDelLayouts, true);
    }

    Set<LayoutEntity> genCuboidLayouts(Set<LayoutEntity> previousLayouts, Set<LayoutEntity> needDelLayouts,
            boolean excludeDel) {

        Set<LayoutEntity> genLayouts = Sets.newHashSet();

        Map<LayoutEntity, Long> existLayouts = Maps.newHashMap();
        for (LayoutEntity layout : previousLayouts) {
            existLayouts.put(layout, layout.getId());
        }
        for (LayoutEntity layout : indexPlan.getWhitelistLayouts()) {
            existLayouts.put(layout, layout.getId());
        }

        Map<LayoutEntity, Long> delLayouts = Maps.newHashMap();
        for (LayoutEntity layout : needDelLayouts) {
            delLayouts.put(layout, layout.getId());
        }

        Map<IndexIdentifier, IndexEntity> identifierIndexMap = existLayouts.keySet().stream()
                .map(LayoutEntity::getIndex).collect(Collectors.groupingBy(IndexEntity::createIndexIdentifier,
                        Collectors.reducing(null, (l, r) -> r)));
        boolean needAllocationId = layoutIdMapping.isEmpty();
        long proposalId = indexStartId + 1;

        val colOrders = getCuboidScheduler().getAllColOrders();
        for (int i = 0; i < colOrders.size(); i++) {
            val colOrder = colOrders.get(i);

            val layout = createLayout(colOrder);

            val dimensionsInLayout = colOrder.getDimensions();
            val measuresInLayout = colOrder.getMeasures();

            // if a cuboid is same as the layout's one, then reuse it
            val indexIdentifier = new IndexEntity.IndexIdentifier(dimensionsInLayout, measuresInLayout, false);
            var layoutIndex = identifierIndexMap.get(indexIdentifier);
            // if two layout is equal, the id should be same
            Long prevId = existLayouts.get(layout);

            if (needAllocationId) {
                if (prevId != null) {
                    layout.setId(existLayouts.get(layout));
                } else if (delLayouts.containsKey(layout)) {
                    layout.setId(delLayouts.get(layout));
                    layoutBlackList.add(delLayouts.get(layout));
                } else if (layoutIndex != null) {
                    val id = layoutIndex.getId() + layoutIndex.getNextLayoutOffset();
                    layout.setId(id);
                } else {
                    layout.setId(proposalId);
                    proposalId += IndexEntity.INDEX_ID_STEP;
                }
                layoutIdMapping.add(layout.getId());
            } else {
                layout.setId(layoutIdMapping.get(i));
            }

            if (layoutIndex == null) {
                long indexId = layout.getIndexId();
                layoutIndex = new IndexEntity();
                layoutIndex.setId(indexId);
                layoutIndex.setDimensions(dimensionsInLayout);
                layoutIndex.setMeasures(measuresInLayout);
                layoutIndex.setIndexPlan(indexPlan);
                layoutIndex.setNextLayoutOffset(layout.getId() % IndexEntity.INDEX_ID_STEP + 1);

                identifierIndexMap.putIfAbsent(layoutIndex.createIndexIdentifier(), layoutIndex);
            } else {
                layoutIndex.setNextLayoutOffset(
                        Math.max(layout.getId() % IndexEntity.INDEX_ID_STEP + 1, layoutIndex.getNextLayoutOffset()));
            }
            layout.setIndex(layoutIndex);

            genLayouts.add(layout);
        }

        // remove layout in blacklist
        if (excludeDel) {
            genLayouts.removeIf(layout -> layoutBlackList.contains(layout.getId()));
        }
        return genLayouts;
    }

    private LayoutEntity createLayout(ColOrder colOrder) {
        LayoutEntity layout = new LayoutEntity();
        layout.setManual(true);
        layout.setColOrder(colOrder.toList());
        if (colOrder.getDimensions().containsAll(indexPlan.getAggShardByColumns())) {
            layout.setShardByColumns(indexPlan.getAggShardByColumns());
        }
        if (colOrder.getDimensions().containsAll(indexPlan.getExtendPartitionColumns())
                && getModel().getStorageType() == 2) {
            layout.setPartitionByColumns(indexPlan.getExtendPartitionColumns());
        }
        layout.setUpdateTime(lastModifiedTime);
        layout.setStorageType(IStorageAware.ID_NDATA_STORAGE);
        return layout;
    }

    public Set<LayoutEntity> getBlacklistLayouts() {
        val allLayouts = genCuboidLayouts(Sets.newHashSet(), Sets.newHashSet(), false);
        val existLayouts = genCuboidLayouts();
        return allLayouts.stream().filter(layout -> !existLayouts.contains(layout)).collect(Collectors.toSet());
    }

    public static RuleBasedIndex copyAndResetAggGroups(RuleBasedIndex oldRuleBasedIndex,
            List<NAggregationGroup> reservedAggGroups) {
        List<NAggregationGroup> newAggGroups = Lists.newArrayList();
        reservedAggGroups.forEach(aggGroup -> {
            NAggregationGroup tmp = new NAggregationGroup();
            BeanUtils.copyProperties(aggGroup, tmp);

            NAggregationGroup newAggGroup = new NAggregationGroup();
            newAggGroup.setSelectRule(tmp.getSelectRule());
            newAggGroup.setIncludes(tmp.getIncludes());
            newAggGroup.setMeasures(tmp.getMeasures());
            newAggGroups.add(newAggGroup);
        });

        RuleBasedIndex tmpRuleBasedIndex = new RuleBasedIndex();
        BeanUtils.copyProperties(oldRuleBasedIndex, tmpRuleBasedIndex);

        // Only need properties of aggregationGroups, globalDimCap and schedulerVersion,
        // please refer to UpdateRuleBasedCuboidRequest.convertToRuleBasedIndex.
        RuleBasedIndex newRuleBasedIndex = new RuleBasedIndex();
        newRuleBasedIndex.setAggregationGroups(newAggGroups);
        newRuleBasedIndex.setGlobalDimCap(tmpRuleBasedIndex.getGlobalDimCap());
        newRuleBasedIndex.setSchedulerVersion(tmpRuleBasedIndex.getSchedulerVersion());
        newRuleBasedIndex.adjustDimensions();
        newRuleBasedIndex.adjustMeasures();
        newRuleBasedIndex.setLastModifiedTime(System.currentTimeMillis());
        return newRuleBasedIndex;
    }

    public void adjustMeasures() {
        if (CollectionUtils.isEmpty(aggregationGroups)) {
            this.getMeasures().clear();
            return;
        }

        List<Integer> measures = recomputeMeasures(this.getAggregationGroups());
        this.setMeasures(Lists.newArrayList(measures));
    }

    private List<Integer> recomputeMeasures(List<NAggregationGroup> aggregationGroups) {
        if (CollectionUtils.isEmpty(aggregationGroups)) {
            return Lists.newArrayList();
        }

        TreeSet<Integer> measures = new TreeSet<>();
        for (NAggregationGroup agg : aggregationGroups) {
            Integer[] aggMeasures = agg.getMeasures();
            if (aggMeasures == null || aggMeasures.length == 0)
                continue;
            measures.addAll(Sets.newHashSet(aggMeasures));
        }
        return Lists.newArrayList(measures);
    }

    public void adjustDimensions() {
        List<Integer> dimensions = recomputeSortedDimensions(this.aggregationGroups);
        setDimensions(dimensions);
    }

    /**
     * for example,
     * [1,2,3] [4,3] [2,4] [5,4]
     *
     * this algorithm sorts them from last to first
     *
     * step 1:
     * mergedAndSorted = [5, 4]
     * trying merge [5, 4] to [2, 4]
     * the point is merging new elements to former agg group!!
     * currentSortedList = [2, 4]
     * 5 -> 5 is before 4, so insert 5 before 4
     * currentSortedList = [2, 5, 4]
     * so mergedAndSorted = [2, 5, 4], assgined from currentSortedList
     *
     * step 2:
     * mergedAndSorted = [2, 5, 4]
     * trying merge new elements from [2, 5, 4] to [4, 3]
     * 2 -> 2 is before 4, so insert 2 before 4
     * currentSortedList = [2, 4, 3]
     * 5 -> 5 is before 4, so insert 5 before 4
     * currentSortedList = [2, 5, 4, 3]
     * assign currentSortedList to mergedAndSorted
     *
     * step 3:
     * mergedAndSorted = [2, 5, 4, 3]
     * trying merge new elements from [2, 5, 4, 3] to [1, 2, 3]
     * 5 -> 5 is before 3, so insert 5 before 3
     * currentSortedList = [1, 2, 5, 3]
     * 4 -> 4 is before 3, so insert 4 before 3
     * currentSortedList = [1, 2, 5, 4, 3]
     *
     * get final result mergedAndSorted = [1, 2, 5, 4, 3]
     */
    private List<Integer> recomputeSortedDimensions(List<NAggregationGroup> aggregationGroups) {

        if (CollectionUtils.isEmpty(aggregationGroups)) {
            return Lists.newArrayList();
        }

        List<Integer> mergedAndSorted = Lists.newArrayList();

        // merging from bottom to top
        for (int aggGroupIndex = aggregationGroups.size() - 1; aggGroupIndex >= 0; aggGroupIndex--) {
            val includes = aggregationGroups.get(aggGroupIndex).getIncludes();
            if (includes == null || includes.length == 0)
                continue;

            final List<Integer> currentSortedList = Lists.newArrayList(includes);
            Map<Integer, Integer> mergedAndSortedIndexMap = Maps.newHashMap();

            int count = 0;
            for (int element : mergedAndSorted) {
                mergedAndSortedIndexMap.put(element, count);
                count++;
            }

            for (int dimensionId : mergedAndSorted) {
                calculateCurrentSortedList(mergedAndSortedIndexMap, currentSortedList, dimensionId);
            }

            mergedAndSorted = Lists.newArrayList(currentSortedList);
        }
        return mergedAndSorted;
    }

    private void calculateCurrentSortedList(Map<Integer, Integer> mergedAndSortedIndexMap,
            List<Integer> currentSortedList, int dimensionId) {
        boolean needToAppendToTail = true;
        Set<Integer> currentSortedSet = Sets.newHashSet(currentSortedList);
        if (currentSortedSet.contains(dimensionId)) {
            return;
        }

        Integer indexOfNewDimension = mergedAndSortedIndexMap.get(dimensionId);

        for (int oldDimensionId : currentSortedSet) {
            Integer indexOfOldDimension = mergedAndSortedIndexMap.get(oldDimensionId);

            if (indexOfOldDimension != null && indexOfNewDimension < indexOfOldDimension) {
                currentSortedList.add(currentSortedList.indexOf(oldDimensionId), dimensionId);
                needToAppendToTail = false;
                break;
            }
        }

        if (needToAppendToTail)
            currentSortedList.add(dimensionId);
    }
}


自问自答

单独的 AggregationGroup 是否可以包含独立的 Measure

允许用户在编辑聚合索引组界面给每个聚合索引组添加度量,相应的在代码中为NAggregationGroup添加的度量属性,并且在计算每个索引的mask值时给度量列也占一个bit位,这样的话各个索引度量的不同就可以在索引的mask值中体现出来。

如何突破64维度限制

最初的 Kylin 最多只能支持选择64个维度。这个64指的是所有聚合索引组选择的维度的并集。
原因在于索引树中索引的mask值是用long存储的,而long类型只有64个bit,每个维度占一个bit的话,最多只能支持64个维度。考虑把 long 换成了BigInteger,便突破了64位的限制。使之没有了维度数的限制。




  • No labels