How to submit a spark Job Programatically -
while submitting application spark-submit works.
but while trying submit programatically using command below
mvn exec:java -dexec.mainclass="org.cybergen.submitjobexample" -dexec.args="/opt/spark/current/readme.md please"
am getting following error while trying so
application log
15/05/12 17:19:46 info appclient$clientactor: connecting master spark://cyborg:7077... 15/05/12 17:19:46 warn reliabledeliverysupervisor: association remote system [akka.tcp://sparkmaster@cyborg:7077] has failed, address gated [5000] ms. reason is: [disassociated]. 15/05/12 17:20:06 info appclient$clientactor: connecting master spark://cyborg:7077... 15/05/12 17:20:06 warn reliabledeliverysupervisor: association remote system [akka.tcp://sparkmaster@cyborg:7077] has failed, address gated [5000] ms. reason is: [disassociated].
spark master log
15/05/12 17:33:22 error endpointwriter: associationerror [akka.tcp://sparkmaster@cyborg:7077] <- [akka.tcp://sparkdriver@10.18.26.116:49592]: error [org.apache.spark.deploy.applicationdescription; local class incompatible: stream classdesc serialversionuid = 7674242335164700840, local class serialversionuid = 2596819202403185464] [ java.io.invalidclassexception: org.apache.spark.deploy.applicationdescription; local class incompatible: stream classdesc serialversionuid = 7674242335164700840, local class serialversionuid = 2596819202403185464 @ java.io.objectstreamclass.initnonproxy(objectstreamclass.java:617) @ java.io.objectinputstream.readnonproxydesc(objectinputstream.java:1622) @ java.io.objectinputstream.readclassdesc(objectinputstream.java:1517) @ java.io.objectinputstream.readordinaryobject(objectinputstream.java:1771) @ java.io.objectinputstream.readobject0(objectinputstream.java:1350) @ java.io.objectinputstream.defaultreadfields(objectinputstream.java:1990) @ java.io.objectinputstream.readserialdata(objectinputstream.java:1915) @ java.io.objectinputstream.readordinaryobject(objectinputstream.java:1798) @ java.io.objectinputstream.readobject0(objectinputstream.java:1350) @ java.io.objectinputstream.readobject(objectinputstream.java:370) @ akka.serialization.javaserializer$$anonfun$1.apply(serializer.scala:136) @ scala.util.dynamicvariable.withvalue(dynamicvariable.scala:57) @ akka.serialization.javaserializer.frombinary(serializer.scala:136) @ akka.serialization.serialization$$anonfun$deserialize$1.apply(serialization.scala:104) @ scala.util.try$.apply(try.scala:161) @ akka.serialization.serialization.deserialize(serialization.scala:98) @ akka.remote.serialization.messagecontainerserializer.frombinary(messagecontainerserializer.scala:63) @ akka.serialization.serialization$$anonfun$deserialize$1.apply(serialization.scala:104) @ scala.util.try$.apply(try.scala:161) @ akka.serialization.serialization.deserialize(serialization.scala:98) @ akka.remote.messageserializer$.deserialize(messageserializer.scala:23) @ akka.remote.defaultmessagedispatcher.payload$lzycompute$1(endpoint.scala:58) @ akka.remote.defaultmessagedispatcher.payload$1(endpoint.scala:58) @ akka.remote.defaultmessagedispatcher.dispatch(endpoint.scala:76) @ akka.remote.endpointreader$$anonfun$receive$2.applyorelse(endpoint.scala:937) @ akka.actor.actor$class.aroundreceive(actor.scala:465) @ akka.remote.endpointactor.aroundreceive(endpoint.scala:415) @ akka.actor.actorcell.receivemessage(actorcell.scala:516) @ akka.actor.actorcell.invoke(actorcell.scala:487) @ akka.dispatch.mailbox.processmailbox(mailbox.scala:238) @ akka.dispatch.mailbox.run(mailbox.scala:220) @ akka.dispatch.forkjoinexecutorconfigurator$akkaforkjointask.exec(abstractdispatcher.scala:393) @ scala.concurrent.forkjoin.forkjointask.doexec(forkjointask.java:260) @ scala.concurrent.forkjoin.forkjoinpool$workqueue.runtask(forkjoinpool.java:1339) @ scala.concurrent.forkjoin.forkjoinpool.runworker(forkjoinpool.java:1979) @ scala.concurrent.forkjoin.forkjoinworkerthread.run(forkjoinworkerthread.java:107) ] 15/05/12 17:33:22 info master: akka.tcp://sparkdriver@10.18.26.116:49592 got disassociated, removing it. 15/05/12 17:33:22 warn reliabledeliverysupervisor: association remote system [akka.tcp://sparkdriver@10.18.26.116:49592] has failed, address gated [5000] ms. reason is: [org.apache.spark.deploy.applicationdescription; local class incompatible: stream classdesc serialversionuid = 7674242335164700840, local class serialversionuid = 2596819202403185464]. 15/05/12 17:33:22 info localactorref: message [akka.remote.transport.associationhandle$disassociated] actor[akka://sparkmaster/deadletters] actor[akka://sparkmaster/system/endpointmanager/reliableendpointwriter-akka.tcp%3a%2f%2fsparkdriver%4010.18.26.116%3a49592-6/endpointwriter/endpointreader-akka.tcp%3a%2f%2fsparkdriver%4010.18.26.116%3a49592-0#1749840468] not delivered. [10] dead letters encountered. logging can turned off or adjusted configuration settings 'akka.log-dead-letters' , 'akka.log-dead-letters-during-shutdown'. 15/05/12 17:33:22 info localactorref: message [akka.remote.transport.associationhandle$disassociated] actor[akka://sparkmaster/deadletters] actor[akka://sparkmaster/system/transports/akkaprotocolmanager.tcp0/akkaprotocol-tcp%3a%2f%2fsparkmaster%40127.0.0.1%3a50366-7#-1224275483] not delivered. [11] dead letters encountered. logging can turned off or adjusted configuration settings 'akka.log-dead-letters' , 'akka.log-dead-letters-during-shutdown'. 15/05/12 17:33:42 error endpointwriter: associationerror [akka.tcp://sparkmaster@cyborg:7077] <- [akka.tcp://sparkdriver@10.18.26.116:49592]: error [org.apache.spark.deploy.applicationdescription; local class incompatible: stream classdesc serialversionuid = 7674242335164700840, local class serialversionuid = 2596819202403185464] [ java.io.invalidclassexception: org.apache.spark.deploy.applicationdescription; local class incompatible: stream classdesc serialversionuid = 7674242335164700840, local class serialversionuid = 2596819202403185464 @ java.io.objectstreamclass.initnonproxy(objectstreamclass.java:617) @ java.io.objectinputstream.readnonproxydesc(objectinputstream.java:1622)
sparktestjob : spark job class
class sparktestjob(val filepath:string="",val filter:string ="") extends serializable{ def runwordcount() :long = { val conf = new sparkconf() .setappname("word count word"+filter) .setmaster("spark://cyborg:7077") .setjars(seq("/tmp/spark-example-1.0-snapshot-driver.jar")) .setsparkhome("/opt/spark/current") val sc = new sparkcontext(conf) val file = sc.textfile(filepath) file.filter(line => line.contains(filter)).count() } }
submitjobexample object initiates sparktestjob class
object submitjobexample { def main(args: array[string]):unit={ if(args.length==2){ val filename = args(0) val filterbyword = args(1) println("reading file "+filename+" word "+filterbyword) val jobobject = new sparktestjob(filename,filterbyword) println("word count file "+filename+" "+jobobject.runwordcount()) }else{ val jobobject = new sparktestjob("/opt/spark/current/readme.md","please") println("word count file /opt/spark/current/readme.md "+jobobject.runwordcount()) } } }
the actual problem mismatch of spark versions 1 of dependencies changing dependencies same spark version fixed problem.
reason why worked while performing spark-submit because of java jar-class-path precedence used correct spark jar version.
Comments
Post a Comment