How to submit a spark Job Programatically -


while submitting application spark-submit works.

but while trying submit programatically using command below

mvn exec:java -dexec.mainclass="org.cybergen.submitjobexample" -dexec.args="/opt/spark/current/readme.md please" 

am getting following error while trying so

application log

15/05/12 17:19:46 info appclient$clientactor: connecting master spark://cyborg:7077... 15/05/12 17:19:46 warn reliabledeliverysupervisor: association remote system [akka.tcp://sparkmaster@cyborg:7077] has failed, address gated [5000] ms. reason is: [disassociated]. 15/05/12 17:20:06 info appclient$clientactor: connecting master spark://cyborg:7077... 15/05/12 17:20:06 warn reliabledeliverysupervisor: association remote system [akka.tcp://sparkmaster@cyborg:7077] has failed, address gated [5000] ms. reason is: [disassociated]. 

spark master log

15/05/12 17:33:22 error endpointwriter: associationerror [akka.tcp://sparkmaster@cyborg:7077] <- [akka.tcp://sparkdriver@10.18.26.116:49592]: error [org.apache.spark.deploy.applicationdescription; local class incompatible: stream classdesc serialversionuid = 7674242335164700840, local class serialversionuid = 2596819202403185464] [ java.io.invalidclassexception: org.apache.spark.deploy.applicationdescription; local class incompatible: stream classdesc serialversionuid = 7674242335164700840, local class serialversionuid = 2596819202403185464     @ java.io.objectstreamclass.initnonproxy(objectstreamclass.java:617)     @ java.io.objectinputstream.readnonproxydesc(objectinputstream.java:1622)     @ java.io.objectinputstream.readclassdesc(objectinputstream.java:1517)     @ java.io.objectinputstream.readordinaryobject(objectinputstream.java:1771)     @ java.io.objectinputstream.readobject0(objectinputstream.java:1350)     @ java.io.objectinputstream.defaultreadfields(objectinputstream.java:1990)     @ java.io.objectinputstream.readserialdata(objectinputstream.java:1915)     @ java.io.objectinputstream.readordinaryobject(objectinputstream.java:1798)     @ java.io.objectinputstream.readobject0(objectinputstream.java:1350)     @ java.io.objectinputstream.readobject(objectinputstream.java:370)     @ akka.serialization.javaserializer$$anonfun$1.apply(serializer.scala:136)     @ scala.util.dynamicvariable.withvalue(dynamicvariable.scala:57)     @ akka.serialization.javaserializer.frombinary(serializer.scala:136)     @ akka.serialization.serialization$$anonfun$deserialize$1.apply(serialization.scala:104)     @ scala.util.try$.apply(try.scala:161)     @ akka.serialization.serialization.deserialize(serialization.scala:98)     @ akka.remote.serialization.messagecontainerserializer.frombinary(messagecontainerserializer.scala:63)     @ akka.serialization.serialization$$anonfun$deserialize$1.apply(serialization.scala:104)     @ scala.util.try$.apply(try.scala:161)     @ akka.serialization.serialization.deserialize(serialization.scala:98)     @ akka.remote.messageserializer$.deserialize(messageserializer.scala:23)     @ akka.remote.defaultmessagedispatcher.payload$lzycompute$1(endpoint.scala:58)     @ akka.remote.defaultmessagedispatcher.payload$1(endpoint.scala:58)     @ akka.remote.defaultmessagedispatcher.dispatch(endpoint.scala:76)     @ akka.remote.endpointreader$$anonfun$receive$2.applyorelse(endpoint.scala:937)     @ akka.actor.actor$class.aroundreceive(actor.scala:465)     @ akka.remote.endpointactor.aroundreceive(endpoint.scala:415)     @ akka.actor.actorcell.receivemessage(actorcell.scala:516)     @ akka.actor.actorcell.invoke(actorcell.scala:487)     @ akka.dispatch.mailbox.processmailbox(mailbox.scala:238)     @ akka.dispatch.mailbox.run(mailbox.scala:220)     @ akka.dispatch.forkjoinexecutorconfigurator$akkaforkjointask.exec(abstractdispatcher.scala:393)     @ scala.concurrent.forkjoin.forkjointask.doexec(forkjointask.java:260)     @ scala.concurrent.forkjoin.forkjoinpool$workqueue.runtask(forkjoinpool.java:1339)     @ scala.concurrent.forkjoin.forkjoinpool.runworker(forkjoinpool.java:1979)     @ scala.concurrent.forkjoin.forkjoinworkerthread.run(forkjoinworkerthread.java:107) ] 15/05/12 17:33:22 info master: akka.tcp://sparkdriver@10.18.26.116:49592 got disassociated, removing it. 15/05/12 17:33:22 warn reliabledeliverysupervisor: association remote system [akka.tcp://sparkdriver@10.18.26.116:49592] has failed, address gated [5000] ms. reason is: [org.apache.spark.deploy.applicationdescription; local class incompatible: stream classdesc serialversionuid = 7674242335164700840, local class serialversionuid = 2596819202403185464]. 15/05/12 17:33:22 info localactorref: message [akka.remote.transport.associationhandle$disassociated] actor[akka://sparkmaster/deadletters] actor[akka://sparkmaster/system/endpointmanager/reliableendpointwriter-akka.tcp%3a%2f%2fsparkdriver%4010.18.26.116%3a49592-6/endpointwriter/endpointreader-akka.tcp%3a%2f%2fsparkdriver%4010.18.26.116%3a49592-0#1749840468] not delivered. [10] dead letters encountered. logging can turned off or adjusted configuration settings 'akka.log-dead-letters' , 'akka.log-dead-letters-during-shutdown'. 15/05/12 17:33:22 info localactorref: message [akka.remote.transport.associationhandle$disassociated] actor[akka://sparkmaster/deadletters] actor[akka://sparkmaster/system/transports/akkaprotocolmanager.tcp0/akkaprotocol-tcp%3a%2f%2fsparkmaster%40127.0.0.1%3a50366-7#-1224275483] not delivered. [11] dead letters encountered. logging can turned off or adjusted configuration settings 'akka.log-dead-letters' , 'akka.log-dead-letters-during-shutdown'. 15/05/12 17:33:42 error endpointwriter: associationerror [akka.tcp://sparkmaster@cyborg:7077] <- [akka.tcp://sparkdriver@10.18.26.116:49592]: error [org.apache.spark.deploy.applicationdescription; local class incompatible: stream classdesc serialversionuid = 7674242335164700840, local class serialversionuid = 2596819202403185464] [ java.io.invalidclassexception: org.apache.spark.deploy.applicationdescription; local class incompatible: stream classdesc serialversionuid = 7674242335164700840, local class serialversionuid = 2596819202403185464     @ java.io.objectstreamclass.initnonproxy(objectstreamclass.java:617)     @ java.io.objectinputstream.readnonproxydesc(objectinputstream.java:1622) 

sparktestjob : spark job class

class sparktestjob(val filepath:string="",val filter:string ="") extends serializable{   def runwordcount() :long = {   val conf = new sparkconf()     .setappname("word count word"+filter)     .setmaster("spark://cyborg:7077")     .setjars(seq("/tmp/spark-example-1.0-snapshot-driver.jar"))     .setsparkhome("/opt/spark/current")   val sc = new sparkcontext(conf)   val file = sc.textfile(filepath)   file.filter(line => line.contains(filter)).count()  } } 

submitjobexample object initiates sparktestjob class

object submitjobexample {    def main(args: array[string]):unit={     if(args.length==2){       val filename = args(0)       val filterbyword = args(1)       println("reading file "+filename+" word "+filterbyword)       val jobobject = new sparktestjob(filename,filterbyword)       println("word count file "+filename+" "+jobobject.runwordcount())     }else{       val jobobject = new sparktestjob("/opt/spark/current/readme.md","please")       println("word count file /opt/spark/current/readme.md "+jobobject.runwordcount())     }   } } 

the actual problem mismatch of spark versions 1 of dependencies changing dependencies same spark version fixed problem.

reason why worked while performing spark-submit because of java jar-class-path precedence used correct spark jar version.


Comments

Popular posts from this blog

android - MPAndroidChart - How to add Annotations or images to the chart -

javascript - Add class to another page attribute using URL id - Jquery -

firefox - Where is 'webgl.osmesalib' parameter? -