Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
themeEclipse
linenumberstrueMidnight
public interface TableData {
 
    …
    /**
     * filter the input `TableData` based on columns.
     */
    public TableData filter(List<String> columnNames);

    /**
     * Pivot the input `TableData` for visualizations 
     */
    public TableData pivot(List<String> keyColumns,
                           List<String> groupColumns, 
                           List<String> valueColumns);
 

    …
}

...

4.2.1. BaseRelation Implementation


Code Block
languagejava
themeEclipse
linenumberstrue
public class TableDataRelation extends BaseRelation implements Serializable, TableScan {

 transient SQLContext context;
 private final TableData data;

 public TableDataRelation(SQLContext context, TableData data) {
   this.context = context;
   this.data = data;
 }

 @Override
 public SQLContext sqlContext() {
   return context;
 }

 @Override
 public StructType schema() {
   ColumnDef[] columns = data.columns();
   StructField [] fields = new StructField[columns.length];

   int i = 0;

   for (ColumnDef c : columns) {
     if (c.type() == ColumnDef.TYPE.INT) {
       fields[i] = new StructField(c.name(), IntegerType, true, Metadata.empty());
     } else if (c.type() == ColumnDef.TYPE.LONG) {
       fields[i] = new StructField(c.name(), LongType, true, Metadata.empty());
     } else {
       fields[i] = new StructField(c.name(), StringType, true, Metadata.empty());
     }

     i++;
   }

   return new StructType(fields);
 }

 @Override
 public RDD<Row> buildScan() {
   Iterator<org.apache.zeppelin.tabledata.Row> rows = data.rows();
   List<org.apache.zeppelin.tabledata.Row> result = new ArrayList();

   while (rows.hasNext()){
     result.add(rows.next());
   }

   JavaSparkContext jsc = new JavaSparkContext(context.sparkContext());
   JavaRDD<org.apache.zeppelin.tabledata.Row> rdd = jsc.parallelize(result);

   return rdd.map(new Function<org.apache.zeppelin.tabledata.Row, Row>() {
     @Override
     public Row call(org.apache.zeppelin.tabledata.Row row) throws Exception {
       return org.apache.spark.sql.RowFactory.create(row.get());
     }
   }).rdd();

 }
}

...

4.2.2. DefaultSource Implementation


Code Block
languagejava
themeEclipse
linenumberstrue
public class DefaultSource implements RelationProvider, SchemaRelationProvider {


 Logger logger = LoggerFactory.getLogger(DefaultSource.class);
 public static ResourcePool resourcePool;

 public DefaultSource() {
 }

 @Override
 public BaseRelation createRelation(SQLContext sqlContext, Map<String, String> parameters) {
   return createRelation(sqlContext, parameters, null);
 }


 @Override
 public BaseRelation createRelation(
     SQLContext sqlContext,
     Map<String, String> parameters,
     StructType schema) {

   String path = parameters.get("path").get();
   String [] noteIdAndParagraphId = path.split("\\|");

   ResourceSet rs = ResourcePoolUtils.getAllResources();

   Resource resource = resourcePool.get(
       noteIdAndParagraphId[0],
       noteIdAndParagraphId[1],
       WellKnownResourceName.ZeppelinTableResult.toString());


   InterpreterResultMessage message = (InterpreterResultMessage) resource.get();
   TableData tableData = new InterpreterResultTableData(message);

   return new TableDataRelation(sqlContext, tableData);

 }
}

...

  • For interpreters which use SQL

    • provide an interpreter option: create TableData whenever executing a paragraph

    • or provide new interpreter magic for it: %spark.sql_share, %jdbc.mysql_share, …

    • or automatically put all table results into the resource pool if they are not heavy (e.g keeping query only, or just reference for RDD)

    • If interpreter supports runtime interpreter, we can use this syntax: %jdbc(share=true) to specify whether share the  table result or not

  • For interpreters which use programming language (e.g python)

    • provide API like z.put()

      Code Block
      languagescala
      themeEclipse
      // infer instance type and convert it to predefined the `TableData` subclass such as `SparkDataFrameTableData`
      z.put (“myTable01”, myDataFrame01)
      
      // or force user to put the `TableData` subclass
      val myTableData01 = new SparkRDDTableData(myRdd01)
      z.put(“myTable01”, myTableData01)

       

...