[The proposal is contributed by Hoon ParkKhalid Huseynov, and Lee Soo]


1. Status

 

Current State: [UNDER DISCUSSION]

Discussion Threadhttps://lists.apache.org/thread.html/6f638139bb77019a649ec7034783a650e1f558ef75acc1dda991d573@%3Cdev.zeppelin.apache.org%3E

JIRA: ZEPPELIN-2019


2. Motivation

Apache Zeppelin provides valuable features for table manipulations such as built-in visualizations, pivoting and CSV download. However, these features are limited from the table size perspective. Currently, they are executed on the browser side and the table size is limited (configurable and 1000 rows by default). Thus moving these computations from in-browser to backend will be a starting point for handling large data and improving pivoting, filtering, full CSV download, pagination, and other functionalities.

Furthermore, the tables across interpreter processes currently can’t be shared. For example, table from JDBC interpreter wouldn’t be accessible from SparkSQL or Python interpreters. So the idea here is to extend existing Zeppelin resource pool to share Table resources across interpreters. It would allow also to have one central Table menu to access and view table information of registered Table resources.

Thus the critical question is “How Zeppelin can support large data handling and share across interpreters?”
Here are already resolved issues and they can be clues to solving the problem. 


Based on above work, this proposal aims to build a mechanism for handling table resource in backend and design API for ResourcePool. This will bring Zeppelin to 

  • register the table result as a shared resource

  • list all available (registered) tables

  • preview tables including its meta information (e.g columns, types, ..)

  • download registered tables as CSV, and other formats.

  • pivoting / filtering in backend to transforming larger data

  • cross join tables in different interpreters (e.g Spark interpreter uses a table result generated from JDBC interpreter)

 

For more future work tasks, please refer the 7. Potential Future Work section


3. Proposed Changes

3.1. Overview: Sharing a table resource between different interpreters

This diagram shows how Spark Interpreter can query for the table which is generated from JDBC (another) interpreter.

  1. A newly created table result can be registered as a Resource in an interpreter.

  2. Since every resource registered in a resource pool in an interpreter can be searched via DisbitrubedResourcePool and supports remote method invocation, other interpreters can use it.

  3. Let’s say JDBCInterpreter created a table result and keep it (JDBCTableData) into its resource pool.

  4. Then, SparkInterpreter can fetch rows, columns via remote method invocation. if Zeppelin registers the distributed resource pool as Spark Data Source SparkInterpreter can use all table resources in Zeppelin smoothly.
    (e.g Querying the table in SparkSQL as like a normal table)

overview1

3.2. Overview: How an interpreter can handle table resources

Here are is a more detailed view to explain how one interpreter can handle its TableData implementation with the resource pool.

overview2

4. Public Interfaces

4.1. Interfaces for TableData related classes

TableData interface defines methods to handle a table resource. Each interpreter can implement its own TableData. The reason why we can’t make the global TableData class for all interpreters is that each interpreter uses a different storage and a different mechanism to export/import data.


classHow it can get table data
InterpreterTableDataResultContains actual data in memory
Interpreter specific TableData (e.g SparkTableData, SparkSQLTableData, …)Knows how to reproduce the original table data. (e.g keep the query in case of JDBC, SparkSQL)


tabledata-class

4.1.1. Additional methods for TableData

public interface TableData {
 
    …
    /**
     * filter the input `TableData` based on columns.
     */
    public TableData filter(List<String> columnNames);

    /**
     * Pivot the input `TableData` for visualizations 
     */
    public TableData pivot(List<String> keyColumns,
                           List<String> groupColumns, 
                           List<String> valueColumns);
 

    …
}

 

Each interpreter can implement its own TableData class. For example,

  • SparkInterpreter can have SparkTableData which

    • points RDD to get the table result

    • filter and pivot can be written by using Spark RDD APIs

  • JDBCInterpreter can have JDBCTableData which

    • keeps query to reproduce the table result

    • filter and pivot can be written using a query that has additional where and groupby statements.

Some interpreters (e.g ShellInterpreter) might not be connected with external storage. In this case, those interpreters can use the InterpreterResultTableData class.

4.2. Example Implementation: ZeppelinResourcePool as Spark Data Source

(image copied from https://databricks.com/blog)

 

Spark supports pluggable data sources. We can use make Zeppelin’s DistributedResourcePool a spark data source using Spark DataSource API. Please refer these articles for more information.

 

4.2.1. BaseRelation Implementation

public class TableDataRelation extends BaseRelation implements Serializable, TableScan {

 transient SQLContext context;
 private final TableData data;

 public TableDataRelation(SQLContext context, TableData data) {
   this.context = context;
   this.data = data;
 }

 @Override
 public SQLContext sqlContext() {
   return context;
 }

 @Override
 public StructType schema() {
   ColumnDef[] columns = data.columns();
   StructField [] fields = new StructField[columns.length];

   int i = 0;

   for (ColumnDef c : columns) {
     if (c.type() == ColumnDef.TYPE.INT) {
       fields[i] = new StructField(c.name(), IntegerType, true, Metadata.empty());
     } else if (c.type() == ColumnDef.TYPE.LONG) {
       fields[i] = new StructField(c.name(), LongType, true, Metadata.empty());
     } else {
       fields[i] = new StructField(c.name(), StringType, true, Metadata.empty());
     }

     i++;
   }

   return new StructType(fields);
 }

 @Override
 public RDD<Row> buildScan() {
   Iterator<org.apache.zeppelin.tabledata.Row> rows = data.rows();
   List<org.apache.zeppelin.tabledata.Row> result = new ArrayList();

   while (rows.hasNext()){
     result.add(rows.next());
   }

   JavaSparkContext jsc = new JavaSparkContext(context.sparkContext());
   JavaRDD<org.apache.zeppelin.tabledata.Row> rdd = jsc.parallelize(result);

   return rdd.map(new Function<org.apache.zeppelin.tabledata.Row, Row>() {
     @Override
     public Row call(org.apache.zeppelin.tabledata.Row row) throws Exception {
       return org.apache.spark.sql.RowFactory.create(row.get());
     }
   }).rdd();

 }
}


4.2.2. DefaultSource Implementation

public class DefaultSource implements RelationProvider, SchemaRelationProvider {


 Logger logger = LoggerFactory.getLogger(DefaultSource.class);
 public static ResourcePool resourcePool;

 public DefaultSource() {
 }

 @Override
 public BaseRelation createRelation(SQLContext sqlContext, Map<String, String> parameters) {
   return createRelation(sqlContext, parameters, null);
 }


 @Override
 public BaseRelation createRelation(
     SQLContext sqlContext,
     Map<String, String> parameters,
     StructType schema) {

   String path = parameters.get("path").get();
   String [] noteIdAndParagraphId = path.split("\\|");

   ResourceSet rs = ResourcePoolUtils.getAllResources();

   Resource resource = resourcePool.get(
       noteIdAndParagraphId[0],
       noteIdAndParagraphId[1],
       WellKnownResourceName.ZeppelinTableResult.toString());


   InterpreterResultMessage message = (InterpreterResultMessage) resource.get();
   TableData tableData = new InterpreterResultTableData(message);

   return new TableDataRelation(sqlContext, tableData);

 }
}


4.3. ResourceRegistry Class

ResourceRegistry class manages a list of available resources (e.g. tables). Thus it should provide the following functionalities:

  •  list all resources
  • get a resource 

In this proposal, we mainly discussed the table result as a resource. However, an object can be also a resource (e.g String, Number, Map).

4.4. ResourcePoolRestAPI Class

ResourcePoolRestAPI class provides APIs to access resources to end-users. Thus it should provide the following functionalities:

  • list all resources

  • get information for a resource

    • column name, type for tables

    • preview for tables

  • get a resource

    • If the resource is table, it should be downloaded using streaming

5. Discussion

5.1. How can a user decide to create TableData instance for sharing the resource?

  • For interpreters which use SQL

    • provide an interpreter option: create TableData whenever executing a paragraph

    • or provide new interpreter magic for it: %spark.sql_share, %jdbc.mysql_share, …

    • or automatically put all table results into the resource pool if they are not heavy (e.g keeping query only, or just reference for RDD)

    • If interpreter supports runtime parameters, we can use this syntax: %jdbc(share=true) to specify whether share the  table result or not

  • For interpreters which use programming language (e.g python)

    • provide API like z.put()

      // infer instance type and convert it to predefined the `TableData` subclass such as `SparkDataFrameTableData`
      z.put (“myTable01”, myDataFrame01)
      
      // or force user to put the `TableData` subclass
      val myTableData01 = new SparkRDDTableData(myRdd01)
      z.put(“myTable01”, myTableData01)

       

  • For interpreters which use DSL (e.g ElasticsearchInterpreter)

    • provide an interpreter option: create TableData whenever executing a paragraph

    • or provide new interpreter magic for it: %elasticserach_share

    • or automatically put all table results into the resource pool if they are not heavy

5.2. How can each interpreter implement its own TableData?

  • For interpreters which use SQL

    • Keep the query to reproduce table result later

    • Or create a view in the storage using the requested query

  • For interpreters which use programming language

    • Keep reference/info to RDD, Data Frame, or other variables in REPL

  • For interpreters which use DSL (e.g ElasticsearchInterpreter)

    • TBD

5.3. What should the table name be?

  • If a note has a title can be part of the table name. (e.g Note Title + Paragraph Id + Result Index)

  • when using API like z.put(resourceName, …), use the passed resource name

The next paragraph execution, the resource will be updated if it has the same name. 

 

6. Roadmap

The issues we discussed above can be implemented in the following order of priority

  • ZEPPELIN-TBD: Adding pivot, filter methods to TableData

  • ZEPPELIN-TBD: ResourceRegistry

  • ZEPPELIN-TBD: Rest API for resource pool

  • ZEPPELIN-TBD: UI for Table page

  • ZEPPELIN-TBD: Apply pivot, filter methods for built-in visualizations

  • ZEPPELIN-TBD: SparkTableData, SparkSQLTableData, JDBCTableData, etc.

  • ZEPPELN-2029: ACL for ResourcePool

  • ZEPPELIN-2022: Zeppelin resource pool as a Spark Data Source

7. Potential Future Work

  • Watch / Unwatch: for automatic paragraph updating for Streaming Data Representation.

  • ZEPPELIN-1494: Bind JDBC result to a dataset on the Zeppelin context

  • Ability to construct table result from the resource pool in language interpreters (e.g python)

    • Let’s assume that we can build a pandas data frame using TableData

      # in python interpreter
      
      t = z.get("tableResourceName") # will return object that has `hasNext` and `next`
      p = new PandasTableData(t)
      
      # use p.pandasInstance …

 

  • No labels