...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public interface TableData { … /** * filter the input `TableData` based on columns. */ public TableData filter(List<String> columnNames); /** * Pivot the input `TableData` for visualizations */ public TableData pivot(List<String> keyColumns, List<String> groupColumns, List<String> valueColumns); … } |
...
4.2.1. BaseRelation Implementation
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class TableDataRelation extends BaseRelation implements Serializable, TableScan { transient SQLContext context; private final TableData data; public TableDataRelation(SQLContext context, TableData data) { this.context = context; this.data = data; } @Override public SQLContext sqlContext() { return context; } @Override public StructType schema() { ColumnDef[] columns = data.columns(); StructField [] fields = new StructField[columns.length]; int i = 0; for (ColumnDef c : columns) { if (c.type() == ColumnDef.TYPE.INT) { fields[i] = new StructField(c.name(), IntegerType, true, Metadata.empty()); } else if (c.type() == ColumnDef.TYPE.LONG) { fields[i] = new StructField(c.name(), LongType, true, Metadata.empty()); } else { fields[i] = new StructField(c.name(), StringType, true, Metadata.empty()); } i++; } return new StructType(fields); } @Override public RDD<Row> buildScan() { Iterator<org.apache.zeppelin.tabledata.Row> rows = data.rows(); List<org.apache.zeppelin.tabledata.Row> result = new ArrayList(); while (rows.hasNext()){ result.add(rows.next()); } JavaSparkContext jsc = new JavaSparkContext(context.sparkContext()); JavaRDD<org.apache.zeppelin.tabledata.Row> rdd = jsc.parallelize(result); return rdd.map(new Function<org.apache.zeppelin.tabledata.Row, Row>() { @Override public Row call(org.apache.zeppelin.tabledata.Row row) throws Exception { return org.apache.spark.sql.RowFactory.create(row.get()); } }).rdd(); } } |
...
4.2.2. DefaultSource Implementation
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class DefaultSource implements RelationProvider, SchemaRelationProvider { Logger logger = LoggerFactory.getLogger(DefaultSource.class); public static ResourcePool resourcePool; public DefaultSource() { } @Override public BaseRelation createRelation(SQLContext sqlContext, Map<String, String> parameters) { return createRelation(sqlContext, parameters, null); } @Override public BaseRelation createRelation( SQLContext sqlContext, Map<String, String> parameters, StructType schema) { String path = parameters.get("path").get(); String [] noteIdAndParagraphId = path.split("\\|"); ResourceSet rs = ResourcePoolUtils.getAllResources(); Resource resource = resourcePool.get( noteIdAndParagraphId[0], noteIdAndParagraphId[1], WellKnownResourceName.ZeppelinTableResult.toString()); InterpreterResultMessage message = (InterpreterResultMessage) resource.get(); TableData tableData = new InterpreterResultTableData(message); return new TableDataRelation(sqlContext, tableData); } } |
...
For interpreters which use SQL
provide an interpreter option: create TableData whenever executing a paragraph
or provide new interpreter magic for it: %spark.sql_share, %jdbc.mysql_share, …
or automatically put all table results into the resource pool if they are not heavy (e.g keeping query only, or just reference for RDD)
If interpreter supports runtime interpreter, we can use this syntax: %jdbc(share=true) to specify whether share the table result or not
For interpreters which use programming language (e.g python)
provide API like z.put()
Code Block language scala theme Eclipse // infer instance type and convert it to predefined the `TableData` subclass such as `SparkDataFrameTableData` z.put (“myTable01”, myDataFrame01) // or force user to put the `TableData` subclass val myTableData01 = new SparkRDDTableData(myRdd01) z.put(“myTable01”, myTableData01)
...