Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For more future work tasks, please refer the 6. Potential Future Work section.

3. Proposed Changes

3.1. Overview: Sharing a table resource between different interpreters

...

Gliffy Diagram
nameoverview2
 

 

4. Public Interfaces

4.1. Interfaces for TableData related classes

...

4.2.1. BaseRelation Implementation

 

 

...


Code Block
languagejava
public class TableDataRelation extends BaseRelation implements Serializable, TableScan {

 transient SQLContext context;
 private final TableData data;

 public TableDataRelation(SQLContext context, TableData data) {
   this.context = context;
   this.data = data;
 }

 @Override
 public SQLContext sqlContext() {
   return context;
 }

 @Override
 public StructType schema() {
   ColumnDef[] columns = data.columns();
   StructField [] fields = new StructField[columns.length];

   int i = 0;

   for (ColumnDef c : columns) {
     if (c.type() == ColumnDef.TYPE.INT) {
       fields[i] = new StructField(c.name(), IntegerType, true, Metadata.empty());
     } else if (c.type() == ColumnDef.TYPE.LONG) {
       fields[i] = new StructField(c.name(), LongType, true, Metadata.empty());
     } else {
       fields[i] = new StructField(c.name(), StringType, true, Metadata.empty());
     }

     i++;
   }

   return new StructType(fields);
 }

 @Override
 public RDD<Row> buildScan() {
   Iterator<org.apache.zeppelin.tabledata.Row> rows = data.rows();
   List<org.apache.zeppelin.tabledata.Row> result = new ArrayList();

   while (rows.hasNext()){
     result.add(rows.next());
   }

   JavaSparkContext jsc = new JavaSparkContext(context.sparkContext());
   JavaRDD<org.apache.zeppelin.tabledata.Row> rdd = jsc.parallelize(result);

   return rdd.map(new Function<org.apache.zeppelin.tabledata.Row, Row>() {
     @Override
     public Row call(org.apache.zeppelin.tabledata.Row row) throws Exception {
       return org.apache.spark.sql.RowFactory.create(row.get());
     }
   }).rdd();

 }
}

 

 

...



4.2.2. DefaultSource Implementation

 

 

...


Code Block
languagejava
public class DefaultSource implements RelationProvider, SchemaRelationProvider {


 Logger logger = LoggerFactory.getLogger(DefaultSource.class);
 public static ResourcePool resourcePool;

 public DefaultSource() {
 }

 @Override
 public BaseRelation createRelation(SQLContext sqlContext, Map<String, String> parameters) {
   return createRelation(sqlContext, parameters, null);
 }


 @Override
 public BaseRelation createRelation(
     SQLContext sqlContext,
     Map<String, String> parameters,
     StructType schema) {

   String path = parameters.get("path").get();
   String [] noteIdAndParagraphId = path.split("\\|");

   ResourceSet rs = ResourcePoolUtils.getAllResources();

   Resource resource = resourcePool.get(
       noteIdAndParagraphId[0],
       noteIdAndParagraphId[1],
       WellKnownResourceName.ZeppelinTableResult.toString());


   InterpreterResultMessage message = (InterpreterResultMessage) resource.get();
   TableData tableData = new InterpreterResultTableData(message);

   return new TableDataRelation(sqlContext, tableData);

 }
}

 

...

 

4.3. ResourceRegistry Class

ResourceRegistry class manages a list of available resources (e.g. tables). Thus it should provide the following functionalities:

  •  list all resources
  • get a resource

 

 

 

  • resource 

In this proposal, we mainly discussed the table result as a resource. However, an object can be also a resource (e.g String, Number, Map). 

4.4. ResourcePoolRestAPI Class

...

ResourcePoolRestAPI class provides APIs to access resources to end-users. Thus it should provide the following functionalities:

...

5.1. How can a user create TableData instance to share the resource?

...

  • For interpreters which use SQL

    • provide an interpreter option: create TableData whenever executing a paragraph

    • or provide new interpreter magic for it: %spark.sql_share, %jdbc.mysql_share, …

    • or automatically put all table results into the resource pool if they are not heavy (e.g keeping query only, or just reference for RDD)

    • If interpreter supports runtime interpreter, we can use this syntax: %jdbc(share=true) to specify whether share the  table result or not

  • For interpreters which use programming language (e.g python)

    • provide API like z.put()

      Code Block
      languagescala
      // infer instance type and convert it to predefined the `TableData` subclass such as `SparkDataFrameTableData`
      z.put (“myTable01”, myDataFrame01)
      
      // or force user to put the `TableData` subclass
      val myTableData01 = new SparkRDDTableData(myRdd01)
      z.put(“myTable01”, myTableData01)

       

  • For interpreters which use DSL (e.g ElasticsearchInterpreter)

    • provide an interpreter option: create `TableData` whenever executing a paragraph

    • or provide new interpreter magic for it: `elasticserach_share

    • or automatically put all table results into the resource pool if they are not heavy

5.2. How can each interpreter implement its own TableData?

 

  • For interpreters which use SQL

    • Keep the query to reproduce table result later

    • Or create a view in the storage using the requested query

  • For interpreters which use programming language

    • Keep reference/info to RDD, Data Frame, or other variables in repl

  • For interpreters which use DSL (e.g ElasticsearchInterpreter)

    • TBD

5.3. What should the table name be?

 

  • If a note has a title can be part of the table name. For example, Note Title + Paragraph Id + Result Index

  • when using API like z.put(resourceName, …), use the passed resource name

...

 

 

The next paragraph execution, the resource will be updated if it has the same name.

 

6. Potential Future Work

...

  • ZEPPELIN-2029: ACL for `ResourcePool`

  • ZEPPELIN-2022: Make SparkInterpreter directly access TableData in ResourcePool

  • UI for list / preview / download available resources

  • Watch / Unwatch: for automatic paragraph updating for Streaming Data Representation.

  • ZEPPELIN-1494: Bind JDBC result to a dataset on the Zeppelin context

  • Ability to construct table result from the resource pool in language interpreters (e.g python)

    • Let’s assume that we can build a pandas dataframe using TableData 

      Code Block
      languagepy
      # in python interpreter
      
      t = z.get("tableResourceName") # will return object that has `hasNext` and `next`
      p = new PandasTableData(t)
      
      # use p.pandasInstance …