Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

4.2. Example Implementation: ZeppelinResourcePool as Spark Data Source


Image Added

(image copied from https://databricks.com/blog/2015/01/09/spark-sql-data-sources-api-unified-data-access-for-the-spark-platform.html)


 

Spark supports pluggable data sources. We can use make Zeppelin’s  `DistributedResourcePool` a spark data source using Spark DataSource API. Please refer these articles for more information.

 


4.2.1. BaseRelation Implementation

 

 

 

Code Block
languagejava
public class TableDataRelation extends BaseRelation implements Serializable, TableScan {

 transient SQLContext context;
 private final TableData data;

 public TableDataRelation(SQLContext context, TableData data) {
   this.context = context;
   this.data = data;
 }

 @Override
 public SQLContext sqlContext() {
   return context;
 }

 @Override
 public StructType schema() {
   ColumnDef[] columns = data.columns();
   StructField [] fields = new StructField[columns.length];

   int i = 0;

   for (ColumnDef c : columns) {
     if (c.type() == ColumnDef.TYPE.INT) {
       fields[i] = new StructField(c.name(), IntegerType, true, Metadata.empty());
     } else if (c.type() == ColumnDef.TYPE.LONG) {
       fields[i] = new StructField(c.name(), LongType, true, Metadata.empty());
     } else {
       fields[i] = new StructField(c.name(), StringType, true, Metadata.empty());
     }

     i++;
   }

   return new StructType(fields);
 }

 @Override
 public RDD<Row> buildScan() {
   Iterator<org.apache.zeppelin.tabledata.Row> rows = data.rows();
   List<org.apache.zeppelin.tabledata.Row> result = new ArrayList();

   while (rows.hasNext()){
     result.add(rows.next());
   }

   JavaSparkContext jsc = new JavaSparkContext(context.sparkContext());
   JavaRDD<org.apache.zeppelin.tabledata.Row> rdd = jsc.parallelize(result);

   return rdd.map(new Function<org.apache.zeppelin.tabledata.Row, Row>() {
     @Override
     public Row call(org.apache.zeppelin.tabledata.Row row) throws Exception {
       return org.apache.spark.sql.RowFactory.create(row.get());
     }
   }).rdd();

 }
}

 

 

 

4.2.2. DefaultSource Implementation

 

 

 

Code Block
languagejava
public class DefaultSource implements RelationProvider, SchemaRelationProvider {


 Logger logger = LoggerFactory.getLogger(DefaultSource.class);
 public static ResourcePool resourcePool;

 public DefaultSource() {
 }

 @Override
 public BaseRelation createRelation(SQLContext sqlContext, Map<String, String> parameters) {
   return createRelation(sqlContext, parameters, null);
 }


 @Override
 public BaseRelation createRelation(
     SQLContext sqlContext,
     Map<String, String> parameters,
     StructType schema) {

   String path = parameters.get("path").get();
   String [] noteIdAndParagraphId = path.split("\\|");

   ResourceSet rs = ResourcePoolUtils.getAllResources();

   Resource resource = resourcePool.get(
       noteIdAndParagraphId[0],
       noteIdAndParagraphId[1],
       WellKnownResourceName.ZeppelinTableResult.toString());


   InterpreterResultMessage message = (InterpreterResultMessage) resource.get();
   TableData tableData = new InterpreterResultTableData(message);

   return new TableDataRelation(sqlContext, tableData);

 }
}

 

 

 

4.3. ResourceRegistry Class

ResourceRegistry class manages a list of available resources (e.g. tables). Thus it should provide the following functionalities:

  •  list all resources
  • get a resource

 

 

 

In this proposal, we mainly discussed the table result as a resource. However, an object can be also a resource (e.g String, Number, Map).

 

4.4. ResourcePoolRestAPI Class

 

ResourcePoolRestAPI class provides APIs to access resources to end-users. Thus it should provide the following functionalities:

 

  • list all resources

  • get information for a resource

    • column name, type for tables

    • preview for tables

  • get a resource

    • If the resource is table, it should be downloaded using streaming

 

5. Discussion

5.1. How can a user create TableData instance to share the resource?

 

  • For interpreters which use SQL

    • provide an interpreter option: create TableData whenever executing a paragraph

    • or provide new interpreter magic for it: %spark.sql_share, %jdbc.mysql_share, …

    • or automatically put all table results into the resource pool if they are not heavy (e.g keeping query only, or just reference for RDD)

    • If interpreter supports runtime interpreter, we can use this syntax: %jdbc(share=true) to specify whether share the  table result or not

  • For interpreters which use programming language (e.g python)

    • provide API like z.put()

      Code Block
      languagescala
      // infer instance type and convert it to predefined the `TableData` subclass such as `SparkDataFrameTableData`
      z.put (“myTable01”, myDataFrame01)
      
      // or force user to put the `TableData` subclass
      val myTableData01 = new SparkRDDTableData(myRdd01)
      z.put(“myTable01”, myTableData01)

 

  • For interpreters which use DSL (e.g ElasticsearchInterpreter)

    • provide an interpreter option: create `TableData` whenever executing a paragraph

    • or provide new interpreter magic for it: `elasticserach_share

    • or automatically put all table results into the resource pool if they are not heavy

5.2. How can each interpreter implement its own TableData?

 

  • For interpreters which use SQL

    • Keep the query to reproduce table result later

    • Or create a view in the storage using the requested query

  • For interpreters which use programming language

    • Keep reference/info to RDD, Data Frame, or other variables in repl

  • For interpreters which use DSL (e.g ElasticsearchInterpreter)

    • TBD


5.3. What should the table name be?

 

  • If a note has a title can be part of the table name. For example, Note Title + Paragraph Id + Result Index

  • when using API like z.put(resourceName, …), use the passed resource name

 

 

 

The next paragraph execution, the resource will be updated if it has the same name.

 


6. Potential Future Work

 

  • ZEPPELIN-2029: ACL for `ResourcePool`

  • ZEPPELIN-2022: Make SparkInterpreter directly access TableData in ResourcePool

  • UI for list / preview / download available resources

  • Watch / Unwatch: for automatic paragraph updating for Streaming Data Representation.

  • ZEPPELIN-1494: Bind JDBC result to a dataset on the Zeppelin context

  • Ability to construct table result from the resource pool in language interpreters (e.g python)

    • Let’s assume that we can build a pandas dataframe using TableData


       

      Code Block
      languagepy
      # in python interpreter
      
      t = z.get("tableResourceName") # will return object that has `hasNext` and `next`
      p = new PandasTableData(t)
      
      # use p.pandasInstance …