java - How collectors are used when turning the stream in parallel -
i tried answer question how skip lines of stream<string> obtained files.lines. though collector wouldn't work in parallel:
private static collector<string, ?, list<string>> oddlines() { int[] counter = {1}; return collector.of(arraylist::new, (l, line) -> { if (counter[0] % 2 == 1) l.add(line); counter[0]++; }, (l1, l2) -> { l1.addall(l2); return l1; }); } but works.
edit: didn't work; got fooled fact input set small trigger parallelism; see discussion in comments.
i thought wouldn't work because of 2 following plans of executions comes mind.
1. counter array shared among threads.
thread t1 read first element of stream, if condition satisfied. adds first element list. execution stops before has time update array value.
thread t2, says started @ 4th element of stream add list. end non-wanted element.
of course since collector seems works, guess doesn't work that. , updates not atomic anyway.
2. each thread has own copy of array
in case there no more problems update, nothing prevents me thread t2 not start @ 4th element of stream. doesn't work either.
seems doesn't work @ all, brings me question... how collector used in parallel?
can explain me how works , why collector works when ran in parallel?
thank much!
passing parallel() source stream collector enough break logic because shared state (counter) may incremented different tasks. can verify that, because never returning correct result finite stream input:
stream<string> lines = intstream.range(1, 20000).maptoobj(i -> + ""); system.out.println(lines.isparallel()); lines = lines.parallel(); system.out.println(lines.isparallel()); list<string> collected = lines.collect(oddlines()); system.out.println(collected.size()); note infinite streams (e.g. when reading files.lines()) need generate significant amount of data in stream, forks task run chunks concurrently.
output me is:
false true 12386 which wrong.
as @holger in comments correctly pointed out, there different race can happen when collector specifying concurrent , unordered, in case operate on single shared collection across tasks (arraylist::new called once per stream), where-as parallel() run accumulator on collection per task , later combine result using defined combiner.
if you'd add characteristics collector, might run following result due shared state in single collection:
false true exception in thread "main" java.lang.arrayindexoutofboundsexception: 73 @ java.util.arraylist.add(arraylist.java:459) @ de.jungblut.stuff.streampallel.lambda$0(streampallel.java:18) @ de.jungblut.stuff.streampallel$$lambda$3/1044036744.accept(unknown source) @ java.util.stream.referencepipeline.lambda$collect$207(referencepipeline.java:496) @ java.util.stream.referencepipeline$$lambda$6/2003749087.accept(unknown source) @ java.util.stream.foreachops$foreachop$ofref.accept(foreachops.java:184) @ java.util.stream.intpipeline$4$1.accept(intpipeline.java:250) @ java.util.stream.streams$rangeintspliterator.foreachremaining(streams.java:110) @ java.util.spliterator$ofint.foreachremaining(spliterator.java:693) @ java.util.stream.abstractpipeline.copyinto(abstractpipeline.java:512) @ java.util.stream.foreachops$foreachtask.compute(foreachops.java:291) @ java.util.concurrent.countedcompleter.exec(countedcompleter.java:731) @ java.util.concurrent.forkjointask.doexec(forkjointask.java:289) @ java.util.concurrent.forkjointask.doinvoke(forkjointask.java:401) @ java.util.concurrent.forkjointask.invoke(forkjointask.java:734) @ java.util.stream.foreachops$foreachop.evaluateparallel(foreachops.java:160) @ java.util.stream.foreachops$foreachop$ofref.evaluateparallel(foreachops.java:174) @ java.util.stream.abstractpipeline.evaluate(abstractpipeline.java:233) @ java.util.stream.referencepipeline.foreach(referencepipeline.java:418) @ java.util.stream.referencepipeline.collect(referencepipeline.java:496) @ de.jungblut.stuff.streampallel.main(streampallel.java:32)12386
Comments
Post a Comment