Apache KNOX provides a single gateway to many services in your Hadoop cluster. You can leverage the KNOX shell DSL interface to interact with services such as WebHdfs, WebHCat (Templeton), Oozie, HBase, etc. For example, using groovy and DSL you can submit Hive queries via WebHCat (Templeton) as simple as:

println "[Hive.groovy] Copy Hive query file to HDFS"
Hdfs.put(session).text( hive_query ).to( jobDir + "/input/query.hive" ).now()

jobId = Job.submitHive(session) \
            .file("${jobDir}/input/query.hive") \
            .arg("-v").arg("--hiveconf").arg("TABLE_NAME=${tmpTableName}") \
            .statusDir("${jobDir}/output") \
            .now().jobId

submitSqoop Job API

With version of Apache KNOX 0.10.0, you can now write application using KNOX DSL for Apache SQOOP and easily submit SQOOP jobs. The WebHCAT Job class in DSL language now supports submitSqoop() as follow:

Job.submitSqoop(session)
    .command("import --connect jdbc:mysql://hostname:3306/dbname ... ")
    .statusDir(remoteStatusDir)
    .now().jobId

submitSqoop Request takes the following arguments:

  • command (String) - The sqoop command string to execute.
  • files (String) - Comma separated files to be copied to the templeton controller job.
  • optionsfile (String) - The remote file which contain Sqoop command need to run.
  • libdir (String) - The remote directory containing jdbc jar to include with sqoop lib
  • statusDir (String) - The remote directory to store status output.

which will return jobId as Response.

Simple example

In this example we will run a simple sqoop job to extract scBlastTab table to HFDS from the public genome database (mySQL) at UCSC.

First, import the following packages:

import com.jayway.jsonpath.JsonPath
import groovy.json.JsonSlurper
import org.apache.hadoop.gateway.shell.Hadoop
import org.apache.hadoop.gateway.shell.hdfs.Hdfs
import org.apache.hadoop.gateway.shell.job.Job
import static java.util.concurrent.TimeUnit.SECONDS

Next, establish connection to KNOX gateway with Hadoop.login:

// Get gatewayUrl and credentials from environment
def env = System.getenv()
gatewayUrl = env.gateway
username = env.username
password = env.password

jobDir = "/user/" + username + "/sqoop"

session = Hadoop.login( gatewayUrl, username, password )
 
println "[Sqoop.groovy] Delete " + jobDir + ": " + Hdfs.rm( session ).file( jobDir ).recursive().now().statusCode
println "[Sqoop.groovy] Mkdir " + jobDir + ": " + Hdfs.mkdir( session ).dir( jobDir ).now().statusCode

Define your SQOOP job (assuming SQOOP is already configured with mySql driver already):

// Database connection information

db = [ driver:"com.mysql.jdbc.Driver", url:"jdbc:mysql://genome-mysql.cse.ucsc.edu/hg38", user:"genome", password:"", name:"hg38", table:"scBlastTab", split:"query" ]

targetdir = jobDir + "/" + db.table

sqoop_command = "import --driver ${db.driver} --connect ${db.url} --username ${db.user} --password ${db.password} --table ${db.table} --split-by ${db.split} --target-dir ${targetdir}"

You can now submit the sqoop_command to the cluster with submitSqoop:

jobId = Job.submitSqoop(session) \
            .command(sqoop_command) \
            .statusDir("${jobDir}/output") \
            .now().jobId

println "[Sqoop.groovy] Submitted job: " + jobId

You can then check job status and output as usual:

println "[Sqoop.groovy] Polling up to 60s for job completion..."

done = false
count = 0
while( !done && count++ < 60 ) {
  sleep( 1000 )
  json = Job.queryStatus(session).jobId(jobId).now().string
  done = JsonPath.read( json, "\$.status.jobComplete" )
  print "."; System.out.flush();
}
println ""
println "[Sqoop.groovy] Job status: " + done

// Check output directory
text = Hdfs.ls( session ).dir( jobDir + "/output" ).now().string
json = (new JsonSlurper()).parseText( text )
println json.FileStatuses.FileStatus.pathSuffix

println "\n[Sqoop.groovy] Content of stderr:"
println Hdfs.get( session ).from( jobDir + "/output/stderr" ).now().string

// Check table files
text = Hdfs.ls( session ).dir( jobDir + "/" + db.table ).now().string
json = (new JsonSlurper()).parseText( text )
println json.FileStatuses.FileStatus.pathSuffix

session.shutdown()

 

Here is sample output of the above example against Hadoop cluster. You need to have properly configured Hadoop cluster with Apache KNOX gateway, Apache Sqoop and WebHcat (Templeton). Test was ran against BigInsights Hadoop cluster.

:compileJava UP-TO-DATE
:compileGroovy
:processResources UP-TO-DATE
:classes
:Sqoop

[Sqoop.groovy] Delete /user/biadmin/sqoop: 200
[Sqoop.groovy] Mkdir /user/biadmin/sqoop: 200
[Sqoop.groovy] Submitted job: job_1476266127941_0692
[Sqoop.groovy] Polling up to 60s for job completion...
............................................
[Sqoop.groovy] Job status: true
[exit, stderr, stdout]

[Sqoop.groovy] Content of stderr:
log4j:WARN custom level class [Relative to Yarn Log Dir Prefix] not found.
16/11/03 16:53:05 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6_IBM_27
16/11/03 16:53:06 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
16/11/03 16:53:06 WARN sqoop.ConnFactory: Parameter --driver is set to an explicit driver however appropriate connection manager is not being set (via --connection-manager). Sqoop is going to fall back to org.apache.sqoop.manager.GenericJdbcManager. Please specify explicitly which connection manager should be used next time.
16/11/03 16:53:06 INFO manager.SqlManager: Using default fetchSize of 1000
16/11/03 16:53:06 INFO tool.CodeGenTool: Beginning code generation
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/iop/4.2.0.0/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/iop/4.2.0.0/zookeeper/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
16/11/03 16:53:07 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM scBlastTab AS t WHERE 1=0
16/11/03 16:53:07 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM scBlastTab AS t WHERE 1=0
16/11/03 16:53:08 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/iop/4.2.0.0/hadoop-mapreduce
Note: /tmp/sqoop-biadmin/compile/4432005ab10742f26cc82d5438497cae/scBlastTab.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
16/11/03 16:53:09 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-biadmin/compile/4432005ab10742f26cc82d5438497cae/scBlastTab.jar
16/11/03 16:53:09 INFO mapreduce.ImportJobBase: Beginning import of scBlastTab
16/11/03 16:53:09 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
16/11/03 16:53:09 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM scBlastTab AS t WHERE 1=0
16/11/03 16:53:10 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
16/11/03 16:53:10 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm2
16/11/03 16:53:15 INFO db.DBInputFormat: Using read commited transaction isolation
16/11/03 16:53:15 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(query), MAX(query) FROM scBlastTab
16/11/03 16:53:16 WARN db.TextSplitter: Generating splits for a textual index column.
16/11/03 16:53:16 WARN db.TextSplitter: If your database sorts in a case-insensitive order, this may result in a partial import or duplicate records.
16/11/03 16:53:16 WARN db.TextSplitter: You are strongly encouraged to choose an integral split column.
16/11/03 16:53:16 INFO mapreduce.JobSubmitter: number of splits:5
16/11/03 16:53:16 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1476266127941_0693
16/11/03 16:53:16 INFO mapreduce.JobSubmitter: Kind: mapreduce.job, Service: job_1476266127941_0692, Ident: (org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier@6fbb4061)
16/11/03 16:53:16 INFO mapreduce.JobSubmitter: Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:ehaascluster, Ident: (HDFS_DELEGATION_TOKEN token 4660 for biadmin)
16/11/03 16:53:16 INFO mapreduce.JobSubmitter: Kind: RM_DELEGATION_TOKEN, Service: 172.16.222.2:8032,172.16.222.3:8032, Ident: (owner=biadmin, renewer=mr token, realUser=HTTP/bicloud-fyre-physical-17-master-3.fyre.ibm.com@IBM.COM, issueDate=1478191971063, maxDate=1478796771063, sequenceNumber=67, masterKeyId=66)
16/11/03 16:53:16 WARN token.Token: Cannot find class for token kind kms-dt
16/11/03 16:53:16 WARN token.Token: Cannot find class for token kind kms-dt
Kind: kms-dt, Service: 172.16.222.1:16000, Ident: 00 07 62 69 61 64 6d 69 6e 04 79 61 72 6e 05 68 62 61 73 65 8a 01 58 2b 1b 7b 34 8a 01 58 4f 27 ff 34 8e 03 a4 09
16/11/03 16:53:16 INFO mapreduce.JobSubmitter: Kind: MR_DELEGATION_TOKEN, Service: 172.16.222.3:10020, Ident: (owner=biadmin, renewer=yarn, realUser=HTTP/bicloud-fyre-physical-17-master-3.fyre.ibm.com@IBM.COM, issueDate=1478191972979, maxDate=1478796772979, sequenceNumber=52, masterKeyId=49)
16/11/03 16:53:17 INFO impl.YarnClientImpl: Submitted application application_1476266127941_0693
16/11/03 16:53:17 INFO mapreduce.Job: The url to track the job: http://bicloud-fyre-physical-17-master-2.fyre.ibm.com:8088/proxy/application_1476266127941_0693/
16/11/03 16:53:17 INFO mapreduce.Job: Running job: job_1476266127941_0693
16/11/03 16:53:24 INFO mapreduce.Job: Job job_1476266127941_0693 running in uber mode : false
16/11/03 16:53:24 INFO mapreduce.Job:  map 0% reduce 0%
16/11/03 16:53:32 INFO mapreduce.Job:  map 20% reduce 0%
16/11/03 16:53:33 INFO mapreduce.Job:  map 100% reduce 0%
16/11/03 16:53:34 INFO mapreduce.Job: Job job_1476266127941_0693 completed successfully
16/11/03 16:53:34 INFO mapreduce.Job: Counters: 30
	File System Counters
		FILE: Number of bytes read=0
		FILE: Number of bytes written=799000
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=644
		HDFS: Number of bytes written=148247
		HDFS: Number of read operations=20
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=10
	Job Counters 
		Launched map tasks=5
		Other local map tasks=5
		Total time spent by all maps in occupied slots (ms)=62016
		Total time spent by all reduces in occupied slots (ms)=0
		Total time spent by all map tasks (ms)=31008
		Total vcore-milliseconds taken by all map tasks=31008
		Total megabyte-milliseconds taken by all map tasks=190513152
	Map-Reduce Framework
		Map input records=2379
		Map output records=2379
		Input split bytes=644
		Spilled Records=0
		Failed Shuffles=0
		Merged Map outputs=0
		GC time elapsed (ms)=249
		CPU time spent (ms)=6590
		Physical memory (bytes) snapshot=1758576640
		Virtual memory (bytes) snapshot=35233165312
		Total committed heap usage (bytes)=2638741504
	Fit Format Counters 
		Bytes Read=0
	File Output Format Counters 
		Bytes Written=148247
16/11/03 16:53:34 INFO mapreduce.ImportJobBase: Transferred 144.7725 KB in 23.9493 seconds (6.0449 KB/sec)
16/11/03 16:53:34 INFO mapreduce.ImportJobBase: Retrieved 2379 records.

[_SUCCESS, part-m-00000, part-m-00001, part-m-00002, part-m-00003, part-m-00004]

BUILD SUCCESSFUL
Total time: 1 mins 2.202 secs

From output above you can see the job output as well as the content of the table directory on HDFS which contains 5 parts (used 5 map tasks). WebHcat (Templeton) job console output will go to stderr in this case.

As part of compiling/running your code ensure you have the following dependency: org.apache.knox:gateway-shell:0.10.0.

 

 

  • No labels