how to load some of the files in a dir and monitor new file in that dir in spark streaming without missing? -
i have 1 hdfs dir, contains many files:
/user/root/1.txt /user/root/2.txt /user/root/3.txt /user/root/4.txt and there daemon process add 1 file per minute dir. (e.g., 5.txt, 6.txt, 7.txt...)
i want start spark streaming job load 3.txt, 4.txt , detect new files after 4.txt.
please pay attention because these files large, processing these files take long time. if process 3.txt , 4.txt before launching streaming task, maybe 5.txt, 6.txt produced dir during processing 3.txt , 4.txt. , when streaming task start, 5.txt , 6.txt missed processing because process new file(from 7.txt)
i'm not sure if describe problem clearly, if have question, please ask me
i found solution:
according doc api : https://spark.apache.org/docs/1.1.0/api/scala/index.html#org.apache.spark.streaming.streamingcontext
def filestream[k, v, f <: inputformat[k, v]](directory: string, filter: (path) ⇒ boolean, newfilesonly: boolean)(implicit arg0: classtag[k], arg1: classtag[v], arg2: classtag[f]): inputdstream[(k, v)] create input stream monitors hadoop-compatible filesystem new files , reads them using given key-value types , input format.
we set filter function filter file < 4.txt
and set "newfilesonly" false
Comments
Post a Comment