How to parse CSV file values into MatrixEntry in Java/Scala -
i've got code in scala , have change java:
import au.com.bytecode.opencsv.csvparser import org.apache.spark.rdd.rdd import org.apache.spark.mllib.linalg.distributed.matrixentry import org.apache.spark.mllib.linalg.distributed.coordinatematrix import org.apache.spark.mllib.linalg.distributed.rowmatrix import org.apache.spark.mllib.linalg.matrix import org.apache.spark.mllib.linalg.singularvaluedecomposition import org.apache.spark.mllib.linalg.vector import scala.collection.immutable.list import java.io._ import java.nio.file.{paths, files} import java.nio.charset.standardcharsets import org.apache.spark.mllib.linalg.matrix import org.apache.spark.mllib.linalg.matrices import org.apache.spark.mllib.linalg.densematrix import org.apache.spark.mllib.linalg.distributed.indexedrowmatrix def exportmatrix(matrix:array[double], filename: string, numcols:int, numrows:int) = { val pw = new printwriter(filename) for(columnindex <- 0 until numcols) { pw.print("word"+columnindex) if(columnindex == numcols - 1) pw.println else pw.print(",") } for( rowindex <- 0 until numrows){ for(columnindex <- 0 until numcols) { pw.print(matrix(numrows * columnindex + rowindex)) if(columnindex == numcols - 1) pw.println else pw.print(",") } } pw.flush pw.close } def exportrowmatrix(matrix:rdd[string], filename: string) = { val pw = new printwriter(filename) matrix.collect().foreach(line => pw.println(line)) pw.flush pw.close } val csv = sc.textfile("hdfs://myhost/sparse.csv").cache() // original file val data = csv.mappartitions(lines => { val parser = new csvparser(' ') lines.map(line => { parser.parseline(line) }) }).map(line => { matrixentry(line(0).tolong - 1, line(1).tolong - 1 , line(2).toint) } ) val indexedrowmatrix: indexedrowmatrix = new coordinatematrix(data).toindexedrowmatrix() /*val mat: coordinatematrix = val rowmatrix: rowmatrix = mat.torowmatrix()*/ val svd: singularvaluedecomposition[indexedrowmatrix, matrix] = indexedrowmatrix.computesvd(100, computeu = true) val u: indexedrowmatrix = svd.u // u factor rowmatrix. val s: vector = svd.s // singular values stored in local dense vector. val v: matrix = svd.v // v factor local dense matrix. val sarray: array[double] = s.toarray // done val varray: array[double] = v.toarray // done val rdd = u.rows.map( x => x.vector.toarray.mkstring(",")) exportmatrix(sarray, "../s.csv", s.size, 1) exportmatrix(varray, "../v.csv", v.numcols.toint, v.numrows.toint) exportrowmatrix(rdd, "../u.csv") val diag = matrices.diag(s) val d = new densematrix(diag.numrows, diag.numcols, diag.toarray) val multiplyresult = v.multiply(d) val darray = multiplyresult.toarray exportmatrix(darray, "../d.csv", multiplyresult.numcols, multiplyresult.numrows)
i've made this:
javasparkcontext sc = new javasparkcontext(sparkconfiguration.getsparkconfiguration()); javardd<string> csv = sc.textfile("hdfs://yoda/nlp/sparse.csv").cache(); system.out.println(csv.first()); //todo function parsing textfile rdd matrixentry rdd<matrixentry> data = null; csv.mappartitions(new flatmapfunction<iterator<string>, string>() { @override public iterable<string> call(iterator<string> t) throws exception { // todo auto-generated method stub return null; } }); indexedrowmatrix indexedrowmatrix = (new coordinatematrix(data)).toindexedrowmatrix(); singularvaluedecomposition<indexedrowmatrix, matrix> svd = indexedrowmatrix.computesvd(100, true, 0); indexedrowmatrix u = svd.u(); vector s = svd.s(); matrix v = svd.v(); double[] sarray = s.toarray(); double[] varray = v.toarray(); //todo function maping each row string value rdd<string> rdd = u.rows().map(null, null); matrix diag = matrices.diag(s); densematrix d = new densematrix(diag.numrows(), diag.numcols(), diag.toarray()); densematrix multiplyresult = v.multiply(d); double[] darray = multiplyresult.toarray();
my questions are:
- how parse each line in matrix market format matrixentry? should done csv.mappartitions()
- how define function exportmatrix in java? identical normal java function?
Comments
Post a Comment