001 package fj.control.parallel; 002 003 import fj.Effect; 004 import fj.F; 005 import fj.F2; 006 import fj.Function; 007 import fj.P; 008 import fj.P1; 009 import fj.P2; 010 import fj.P3; 011 import fj.P4; 012 import fj.Unit; 013 import static fj.P.p; 014 import static fj.F2W.$$; 015 import static fj.FW.$; 016 import static fj.Function.curry; 017 import static fj.Function.uncurryF2; 018 import static fj.control.parallel.Promise.liftM2; 019 import fj.data.Array; 020 import fj.data.IterableW; 021 import fj.data.List; 022 import fj.data.NonEmptyList; 023 import fj.data.Option; 024 import fj.data.Stream; 025 import fj.data.Tree; 026 import fj.data.TreeZipper; 027 import fj.data.Zipper; 028 import static fj.data.Option.some; 029 import static fj.data.Stream.iterableStream; 030 import fj.pre.Monoid; 031 032 /** 033 * A module of higher-order concurrency features. 034 */ 035 public final class ParModule { 036 private final Strategy<Unit> strategy; 037 038 private ParModule(final Strategy<Unit> strategy) { 039 this.strategy = strategy; 040 } 041 042 /** 043 * Constructor method for ParModule 044 * 045 * @param u A parallel strategy for the module. 046 * @return A ParModule that uses the given strategy for parallelism. 047 */ 048 public static ParModule parModule(final Strategy<Unit> u) { 049 return new ParModule(u); 050 } 051 052 /** 053 * Evaluates the given product concurrently and returns a Promise of the result. 054 * 055 * @param p A product to evaluate concurrently. 056 * @return A Promise of the value of the given product, that can be claimed in the future. 057 */ 058 public <A> Promise<A> promise(final P1<A> p) { 059 return Promise.promise(strategy, p); 060 } 061 062 /** 063 * Returns a function that evaluates a given product concurrently and returns a Promise of the result. 064 * 065 * @return a function that evaluates a given product concurrently and returns a Promise of the result. 066 */ 067 public <A> F<P1<A>, Promise<A>> promise() { 068 return new F<P1<A>, Promise<A>>() { 069 public Promise<A> f(final P1<A> ap1) { 070 return promise(ap1); 071 } 072 }; 073 } 074 075 /** 076 * Promotes the given function to a concurrent function that returns a Promise. 077 * 078 * @param f A given function to promote to a concurrent function. 079 * @return A function that is applied concurrently when given an argument, yielding a Promise of the result 080 * that can be claimed in the future. 081 */ 082 public <A, B> F<A, Promise<B>> promise(final F<A, B> f) { 083 return $(f).promise(strategy); 084 } 085 086 /** 087 * Returns a function that promotes a given function to a concurrent function that returns a Promise. 088 * The pure Kleisli arrow of Promise. 089 * 090 * @return A higher-order function that takes pure functions to promise-valued functions. 091 */ 092 public <A, B> F<F<A, B>, F<A, Promise<B>>> promisePure() { 093 return new F<F<A, B>, F<A, Promise<B>>>() { 094 public F<A, Promise<B>> f(final F<A, B> abf) { 095 return promise(abf); 096 } 097 }; 098 } 099 100 /** 101 * Promotes the given function to a concurrent function that returns a Promise. 102 * 103 * @param f A given function to promote to a concurrent function. 104 * @return A function that is applied concurrently when given an argument, yielding a Promise of the result 105 * that can be claimed in the future. 106 */ 107 public <A, B, C> F2<A, B, Promise<C>> promise(final F2<A, B, C> f) { 108 return P2.untuple($$(f).tuple().promise(strategy)); 109 } 110 111 112 /** 113 * Creates a very fast concurrent effect, as an actor that does not guarantee ordering of its messages. 114 * Such an actor is not thread-safe unless the given Effect is. 115 * 116 * @param e The effect that the actor should have on its messages. 117 * @return A concurrent actor that does not guarantee ordering of its messages. 118 */ 119 public <A> Actor<A> effect(final Effect<A> e) { 120 return Actor.actor(strategy, e); 121 } 122 123 /** 124 * A first-class constructor of concurrent effects, as actors that don't guarantee ordering of messages. 125 * Such an actor is not thread-safe unless the given Effect is. 126 * 127 * @return A function that takes an effect and returns a concurrent effect. 128 */ 129 public <A> F<Effect<A>, Actor<A>> effect() { 130 return new F<Effect<A>, Actor<A>>() { 131 public Actor<A> f(final Effect<A> effect) { 132 return effect(effect); 133 } 134 }; 135 } 136 137 /** 138 * Creates a concurrent actor that is guaranteed to process only one message at a time. 139 * 140 * @param e The effect that the actor should have on its messages. 141 * @return A concurrent actor that is guaranteed to process its messages in some order. 142 */ 143 public <A> QueueActor<A> actor(final Effect<A> e) { 144 return QueueActor.queueActor(strategy, e); 145 } 146 147 /** 148 * A first-class constructor of actors. 149 * 150 * @return A function that takes an effect and returns an actor that processes messages in some order. 151 */ 152 public <A> F<Effect<A>, QueueActor<A>> actor() { 153 return new F<Effect<A>, QueueActor<A>>() { 154 public QueueActor<A> f(final Effect<A> effect) { 155 return actor(effect); 156 } 157 }; 158 } 159 160 /** 161 * List iteration inside a Promise. Traverses a List of Promises yielding a Promise of a List. 162 * 163 * @param ps A list of promises to sequence. 164 * @return A promise of the List of values promised by the list of promises. 165 */ 166 public <A> Promise<List<A>> sequence(final List<Promise<A>> ps) { 167 return Promise.sequence(strategy, ps); 168 } 169 170 /** 171 * A first-class function that traverses a list inside a promise. 172 * 173 * @return A first-class function that traverses a list inside a promise. 174 */ 175 public <A> F<List<Promise<A>>, Promise<List<A>>> sequenceList() { 176 return new F<List<Promise<A>>, Promise<List<A>>>() { 177 public Promise<List<A>> f(final List<Promise<A>> list) { 178 return sequence(list); 179 } 180 }; 181 } 182 183 /** 184 * Stream iteration inside a Promise. Traverses a Stream of Promises yielding a Promise of a Stream. 185 * 186 * @param ps A Stream of promises to sequence. 187 * @return A promise of the Stream of values promised by the Stream of promises. 188 */ 189 public <A> Promise<Stream<A>> sequence(final Stream<Promise<A>> ps) { 190 return Promise.sequence(strategy, ps); 191 } 192 193 /** 194 * A first-class function that traverses a stream inside a promise. 195 * 196 * @return A first-class function that traverses a stream inside a promise. 197 */ 198 public <A> F<Stream<Promise<A>>, Promise<Stream<A>>> sequenceStream() { 199 return new F<Stream<Promise<A>>, Promise<Stream<A>>>() { 200 public Promise<Stream<A>> f(final Stream<Promise<A>> stream) { 201 return sequence(stream); 202 } 203 }; 204 } 205 206 /** 207 * Traverses a product-1 inside a promise. 208 * 209 * @param p A product-1 of a promised value. 210 * @return A promise of a product of the value promised by the argument. 211 */ 212 public <A> Promise<P1<A>> sequence(final P1<Promise<A>> p) { 213 return Promise.sequence(strategy, p); 214 } 215 216 /** 217 * Takes a Promise-valued function and applies it to each element 218 * in the given List, yielding a promise of a List of results. 219 * 220 * @param as A list to map across. 221 * @param f A promise-valued function to map across the list. 222 * @return A Promise of a new list with the given function applied to each element. 223 */ 224 public <A, B> Promise<List<B>> mapM(final List<A> as, final F<A, Promise<B>> f) { 225 return sequence(as.map(f)); 226 } 227 228 /** 229 * First-class function that maps a concurrent function over a List inside a promise. 230 * 231 * @return a function that maps a concurrent function over a List inside a promise. 232 */ 233 public <A, B> F<F<A, Promise<B>>, F<List<A>, Promise<List<B>>>> mapList() { 234 return curry(new F2<F<A, Promise<B>>, List<A>, Promise<List<B>>>() { 235 public Promise<List<B>> f(final F<A, Promise<B>> f, final List<A> list) { 236 return mapM(list, f); 237 } 238 }); 239 } 240 241 /** 242 * Takes a Promise-valued function and applies it to each element 243 * in the given Stream, yielding a promise of a Stream of results. 244 * 245 * @param as A Stream to map across. 246 * @param f A promise-valued function to map across the Stream. 247 * @return A Promise of a new Stream with the given function applied to each element. 248 */ 249 public <A, B> Promise<Stream<B>> mapM(final Stream<A> as, final F<A, Promise<B>> f) { 250 return sequence(as.map(f)); 251 } 252 253 /** 254 * First-class function that maps a concurrent function over a Stream inside a promise. 255 * 256 * @return a function that maps a concurrent function over a Stream inside a promise. 257 */ 258 public <A, B> F<F<A, Promise<B>>, F<Stream<A>, Promise<Stream<B>>>> mapStream() { 259 return curry(new F2<F<A, Promise<B>>, Stream<A>, Promise<Stream<B>>>() { 260 public Promise<Stream<B>> f(final F<A, Promise<B>> f, final Stream<A> stream) { 261 return mapM(stream, f); 262 } 263 }); 264 } 265 266 /** 267 * Maps a concurrent function over a Product-1 inside a Promise. 268 * 269 * @param a A product-1 across which to map. 270 * @param f A concurrent function to map over the product inside a promise. 271 * @return A promised product of the result of mapping the given function over the given product. 272 */ 273 public <A, B> Promise<P1<B>> mapM(final P1<A> a, final F<A, Promise<B>> f) { 274 return sequence(a.map(f)); 275 } 276 277 /** 278 * Maps across a list in parallel. 279 * 280 * @param as A list to map across in parallel. 281 * @param f A function to map across the given list. 282 * @return A Promise of a new list with the given function applied to each element. 283 */ 284 public <A, B> Promise<List<B>> parMap(final List<A> as, final F<A, B> f) { 285 return mapM(as, promise(f)); 286 } 287 288 /** 289 * A first-class function that maps another function across a list in parallel. 290 * 291 * @return A function that maps another function across a list in parallel. 292 */ 293 public <A, B> F<F<A, B>, F<List<A>, Promise<List<B>>>> parMapList() { 294 return curry(new F2<F<A, B>, List<A>, Promise<List<B>>>() { 295 public Promise<List<B>> f(final F<A, B> abf, final List<A> list) { 296 return parMap(list, abf); 297 } 298 }); 299 } 300 301 /** 302 * Maps across a nonempty list in parallel. 303 * 304 * @param as A NonEmptyList to map across in parallel. 305 * @param f A function to map across the given NonEmptyList. 306 * @return A Promise of a new NonEmptyList with the given function applied to each element. 307 */ 308 public <A, B> Promise<NonEmptyList<B>> parMap(final NonEmptyList<A> as, final F<A, B> f) { 309 return mapM(as.toList(), promise(f)).fmap(new F<List<B>, NonEmptyList<B>>() { 310 public NonEmptyList<B> f(final List<B> list) { 311 return NonEmptyList.fromList(list).some(); 312 } 313 }); 314 } 315 316 /** 317 * Maps across a Stream in parallel. 318 * 319 * @param as A Stream to map across in parallel. 320 * @param f A function to map across the given Stream. 321 * @return A Promise of a new Stream with the given function applied to each element. 322 */ 323 public <A, B> Promise<Stream<B>> parMap(final Stream<A> as, final F<A, B> f) { 324 return mapM(as, promise(f)); 325 } 326 327 /** 328 * A first-class function that maps another function across a stream in parallel. 329 * 330 * @return A function that maps another function across a stream in parallel. 331 */ 332 public <A, B> F<F<A, B>, F<Stream<A>, Promise<Stream<B>>>> parMapStream() { 333 return curry(new F2<F<A, B>, Stream<A>, Promise<Stream<B>>>() { 334 public Promise<Stream<B>> f(final F<A, B> abf, final Stream<A> stream) { 335 return parMap(stream, abf); 336 } 337 }); 338 } 339 340 /** 341 * Maps across an Iterable in parallel. 342 * 343 * @param as An Iterable to map across in parallel. 344 * @param f A function to map across the given Iterable. 345 * @return A Promise of a new Iterable with the given function applied to each element. 346 */ 347 public <A, B> Promise<Iterable<B>> parMap(final Iterable<A> as, final F<A, B> f) { 348 return parMap(iterableStream(as), f) 349 .fmap(Function.<Stream<B>, Iterable<B>>vary(Function.<Stream<B>>identity())); 350 } 351 352 /** 353 * A first-class function that maps another function across an iterable in parallel. 354 * 355 * @return A function that maps another function across an iterable in parallel. 356 */ 357 public <A, B> F<F<A, B>, F<Iterable<A>, Promise<Iterable<B>>>> parMapIterable() { 358 return curry(new F2<F<A, B>, Iterable<A>, Promise<Iterable<B>>>() { 359 public Promise<Iterable<B>> f(final F<A, B> abf, final Iterable<A> iterable) { 360 return parMap(iterable, abf); 361 } 362 }); 363 } 364 365 /** 366 * Maps across an Array in parallel. 367 * 368 * @param as An array to map across in parallel. 369 * @param f A function to map across the given Array. 370 * @return A Promise of a new Array with the given function applied to each element. 371 */ 372 public <A, B> Promise<Array<B>> parMap(final Array<A> as, final F<A, B> f) { 373 return parMap(as.toStream(), f).fmap(new F<Stream<B>, Array<B>>() { 374 public Array<B> f(final Stream<B> stream) { 375 return stream.toArray(); 376 } 377 }); 378 } 379 380 /** 381 * A first-class function that maps another function across an array in parallel. 382 * 383 * @return A function that maps another function across an array in parallel. 384 */ 385 public <A, B> F<F<A, B>, F<Array<A>, Promise<Array<B>>>> parMapArray() { 386 return curry(new F2<F<A, B>, Array<A>, Promise<Array<B>>>() { 387 public Promise<Array<B>> f(final F<A, B> abf, final Array<A> array) { 388 return parMap(array, abf); 389 } 390 }); 391 } 392 393 /** 394 * Maps a function across a Zipper in parallel. 395 * 396 * @param za A Zipper to map across in parallel. 397 * @param f A function to map across the given Zipper. 398 * @return A promise of a new Zipper with the given function applied to each element. 399 */ 400 public <A, B> Promise<Zipper<B>> parMap(final Zipper<A> za, final F<A, B> f) { 401 return parMap(za.rights(), f) 402 .apply(promise(f).f(za.focus()).apply(parMap(za.lefts(), f).fmap(curry(Zipper.<B>zipper())))); 403 } 404 405 /** 406 * Maps a function across a Tree in parallel. 407 * 408 * @param ta A Tree to map across in parallel. 409 * @param f A function to map across the given Tree. 410 * @return A promise of a new Tree with the given function applied to each element. 411 */ 412 public <A, B> Promise<Tree<B>> parMap(final Tree<A> ta, final F<A, B> f) { 413 return mapM(ta.subForest(), this.<Tree<A>, Tree<B>>mapStream().f(this.<A, B>parMapTree().f(f))) 414 .apply(promise(f).f(ta.root()).fmap(Tree.<B>node())); 415 } 416 417 /** 418 * A first-class function that maps across a Tree in parallel. 419 * 420 * @return A function that maps a given function across a Tree in parallel. 421 */ 422 public <A, B> F<F<A, B>, F<Tree<A>, Promise<Tree<B>>>> parMapTree() { 423 return curry(new F2<F<A, B>, Tree<A>, Promise<Tree<B>>>() { 424 public Promise<Tree<B>> f(final F<A, B> abf, final Tree<A> tree) { 425 return parMap(tree, abf); 426 } 427 }); 428 } 429 430 /** 431 * Maps a function across a TreeZipper in parallel. 432 * 433 * @param za A TreeZipper to map across in parallel. 434 * @param f A function to map across the given TreeZipper. 435 * @return A promise of a new TreeZipper with the given function applied to each element of the tree. 436 */ 437 public <A, B> Promise<TreeZipper<B>> parMap(final TreeZipper<A> za, final F<A, B> f) { 438 final F<Tree<A>, Tree<B>> tf = Tree.<A, B>fmap_().f(f); 439 final P4<Tree<A>, Stream<Tree<A>>, Stream<Tree<A>>, Stream<P3<Stream<Tree<A>>, A, Stream<Tree<A>>>>> p = za.p(); 440 return mapM(p._4(), 441 new F<P3<Stream<Tree<A>>, A, Stream<Tree<A>>>, Promise<P3<Stream<Tree<B>>, B, Stream<Tree<B>>>>>() { 442 public Promise<P3<Stream<Tree<B>>, B, Stream<Tree<B>>>> f( 443 final P3<Stream<Tree<A>>, A, Stream<Tree<A>>> p3) { 444 return parMap(p3._3(), tf).apply(promise(f).f(p3._2()).apply( 445 parMap(p3._1(), tf).fmap(P.<Stream<Tree<B>>, B, Stream<Tree<B>>>p3()))); 446 } 447 }).apply(parMap(za.rights(), tf).apply( 448 parMap(za.lefts(), tf).apply(parMap(p._1(), f).fmap(TreeZipper.<B>treeZipper())))); 449 } 450 451 /** 452 * Binds a list-valued function across a list in parallel, concatenating the results into a new list. 453 * 454 * @param as A list to bind across in parallel. 455 * @param f A function to bind across the given list in parallel. 456 * @return A promise of a new List with the given function bound across its elements. 457 */ 458 public <A, B> Promise<List<B>> parFlatMap(final List<A> as, final F<A, List<B>> f) { 459 return parFoldMap(as, f, Monoid.<B>listMonoid()); 460 } 461 462 /** 463 * Binds a Stream-valued function across a Stream in parallel, concatenating the results into a new Stream. 464 * 465 * @param as A Stream to bind across in parallel. 466 * @param f A function to bind across the given Stream in parallel. 467 * @return A promise of a new Stream with the given function bound across its elements. 468 */ 469 public <A, B> Promise<Stream<B>> parFlatMap(final Stream<A> as, final F<A, Stream<B>> f) { 470 return parFoldMap(as, f, Monoid.<B>streamMonoid()); 471 } 472 473 /** 474 * Binds an Array-valued function across an Array in parallel, concatenating the results into a new Array. 475 * 476 * @param as An Array to bind across in parallel. 477 * @param f A function to bind across the given Array in parallel. 478 * @return A promise of a new Array with the given function bound across its elements. 479 */ 480 public <A, B> Promise<Array<B>> parFlatMap(final Array<A> as, final F<A, Array<B>> f) { 481 return parMap(as, f).fmap(Array.<B>join()); 482 } 483 484 /** 485 * Binds an Iterable-valued function across an Iterable in parallel, concatenating the results into a new Iterable. 486 * 487 * @param as A Iterable to bind across in parallel. 488 * @param f A function to bind across the given Iterable in parallel. 489 * @return A promise of a new Iterable with the given function bound across its elements. 490 */ 491 public <A, B> Promise<Iterable<B>> parFlatMap(final Iterable<A> as, final F<A, Iterable<B>> f) { 492 return parMap(as, f).fmap(IterableW.<B, Iterable<B>>join()) 493 .fmap(Function.<IterableW<B>, Iterable<B>>vary(Function.<Iterable<B>>identity())); 494 } 495 496 /** 497 * Zips two lists together with a given function, in parallel. 498 * 499 * @param as A list to zip with another in parallel. 500 * @param bs A list to zip with another in parallel. 501 * @param f A function with which to zip two lists in parallel. 502 * @return A Promise of a new list with the results of applying the given function across the two lists in lockstep. 503 */ 504 public <A, B, C> Promise<List<C>> parZipWith(final List<A> as, final List<B> bs, final F<A, F<B, C>> f) { 505 return sequence(as.<B, Promise<C>>zipWith(bs, promise(uncurryF2(f)))); 506 } 507 508 /** 509 * Zips two streams together with a given function, in parallel. 510 * 511 * @param as A stream to zip with another in parallel. 512 * @param bs A stream to zip with another in parallel. 513 * @param f A function with which to zip two streams in parallel. 514 * @return A Promise of a new stream with the results of applying the given function across the two streams, stepwise. 515 */ 516 public <A, B, C> Promise<Stream<C>> parZipWith(final Stream<A> as, final Stream<B> bs, final F<A, F<B, C>> f) { 517 return sequence(as.<B, Promise<C>>zipWith(bs, promise(uncurryF2(f)))); 518 } 519 520 /** 521 * Zips two arrays together with a given function, in parallel. 522 * 523 * @param as An array to zip with another in parallel. 524 * @param bs An array to zip with another in parallel. 525 * @param f A function with which to zip two arrays in parallel. 526 * @return A Promise of a new array with the results of applying the given function across the two arrays, stepwise. 527 */ 528 public <A, B, C> Promise<Array<C>> parZipWith(final Array<A> as, final Array<B> bs, final F<A, F<B, C>> f) { 529 return parZipWith(as.toStream(), bs.toStream(), f).fmap(new F<Stream<C>, Array<C>>() { 530 public Array<C> f(final Stream<C> stream) { 531 return stream.toArray(); 532 } 533 }); 534 } 535 536 /** 537 * Zips two iterables together with a given function, in parallel. 538 * 539 * @param as An iterable to zip with another in parallel. 540 * @param bs An iterable to zip with another in parallel. 541 * @param f A function with which to zip two iterables in parallel. 542 * @return A Promise of a new iterable with the results of applying the given function across the two iterables, stepwise. 543 */ 544 public <A, B, C> Promise<Iterable<C>> parZipWith(final Iterable<A> as, final Iterable<B> bs, final F<A, F<B, C>> f) { 545 return parZipWith(iterableStream(as), iterableStream(bs), f).fmap( 546 Function.<Stream<C>, Iterable<C>>vary(Function.<Iterable<C>>identity())); 547 } 548 549 /** 550 * Maps with the given function across the given stream in parallel, while folding with 551 * the given monoid. 552 * 553 * @param as A stream to map over and reduce. 554 * @param map The function to map over the given stream. 555 * @param reduce The monoid with which to sum the results. 556 * @return A promise of a result of mapping and folding in parallel. 557 */ 558 public <A, B> Promise<B> parFoldMap(final Stream<A> as, final F<A, B> map, final Monoid<B> reduce) { 559 return as.isEmpty() ? promise(p(reduce.zero())) : as.map(promise(map)).foldLeft1(liftM2(reduce.sum())); 560 } 561 562 /** 563 * Maps with the given function across chunks of the given stream in parallel, while folding with 564 * the given monoid. The stream is split into chunks according to the given chunking function, 565 * the given map function is mapped over all chunks simultaneously, but over each chunk sequentially. 566 * All chunks are summed concurrently and the sums are then summed sequentially. 567 * 568 * @param as A stream to chunk, then map over and reduce. 569 * @param map The function to map over the given stream. 570 * @param reduce The monoid with which to sum the results. 571 * @param chunking A function describing how the stream should be split into chunks. Should return the first chunk 572 * and the rest of the stream. 573 * @return A promise of a result of mapping and folding in parallel. 574 */ 575 public <A, B> Promise<B> parFoldMap(final Stream<A> as, final F<A, B> map, final Monoid<B> reduce, 576 final F<Stream<A>, P2<Stream<A>, Stream<A>>> chunking) { 577 return parMap(Stream.unfold(new F<Stream<A>, Option<P2<Stream<A>, Stream<A>>>>() { 578 public Option<P2<Stream<A>, Stream<A>>> f(final Stream<A> stream) { 579 return stream.isEmpty() ? Option.<P2<Stream<A>, Stream<A>>>none() : some(chunking.f(stream)); 580 } 581 }, as), Stream.<A, B>map_().f(map)).bind(new F<Stream<Stream<B>>, Promise<B>>() { 582 public Promise<B> f(final Stream<Stream<B>> stream) { 583 return parMap(stream, reduce.sumLeftS()).fmap(reduce.sumLeftS()); 584 } 585 }); 586 } 587 588 /** 589 * Maps with the given function across chunks of the given Iterable in parallel, while folding with 590 * the given monoid. The Iterable is split into chunks according to the given chunking function, 591 * the given map function is mapped over all chunks simultaneously, but over each chunk sequentially. 592 * All chunks are summed concurrently and the sums are then summed sequentially. 593 * 594 * @param as An Iterable to chunk, then map over and reduce. 595 * @param map The function to map over the given Iterable. 596 * @param reduce The monoid with which to sum the results. 597 * @param chunking A function describing how the Iterable should be split into chunks. Should return the first chunk 598 * and the rest of the Iterable. 599 * @return A promise of a result of mapping and folding in parallel. 600 */ 601 public <A, B> Promise<B> parFoldMap(final Iterable<A> as, final F<A, B> map, final Monoid<B> reduce, 602 final F<Iterable<A>, P2<Iterable<A>, Iterable<A>>> chunking) { 603 return parFoldMap(iterableStream(as), map, reduce, new F<Stream<A>, P2<Stream<A>, Stream<A>>>() { 604 public P2<Stream<A>, Stream<A>> f(final Stream<A> stream) { 605 final F<Iterable<A>, Stream<A>> is = new F<Iterable<A>, Stream<A>>() { 606 public Stream<A> f(final Iterable<A> iterable) { 607 return iterableStream(iterable); 608 } 609 }; 610 return chunking.f(stream).map1(is).map2(is); 611 } 612 }); 613 } 614 615 /** 616 * Maps with the given function across the given iterable in parallel, while folding with 617 * the given monoid. 618 * 619 * @param as An Iterable to map over and reduce. 620 * @param map The function to map over the given Iterable. 621 * @param reduce The Monoid with which to sum the results. 622 * @return A promise of a result of mapping and folding in parallel. 623 */ 624 public <A, B> Promise<B> parFoldMap(final Iterable<A> as, final F<A, B> map, final Monoid<B> reduce) { 625 return parFoldMap(iterableStream(as), map, reduce); 626 } 627 628 629 /** 630 * Maps the given function across all positions of the given zipper in parallel. 631 * 632 * @param za A zipper to extend the given function across. 633 * @param f A function to extend across the given zipper. 634 * @return A promise of a new zipper of the results of applying the given function to all positions of the given 635 * zipper. 636 */ 637 public <A, B> Promise<Zipper<B>> parExtend(final Zipper<A> za, final F<Zipper<A>, B> f) { 638 return parMap(za.positions(), f); 639 } 640 641 /** 642 * Maps the given function across all subtrees of the given Tree in parallel. 643 * 644 * @param ta A tree to extend the given function across. 645 * @param f A function to extend across the given Tree. 646 * @return A promise of a new Tree of the results of applying the given function to all subtrees of the given Tree. 647 */ 648 public <A, B> Promise<Tree<B>> parExtend(final Tree<A> ta, final F<Tree<A>, B> f) { 649 return parMap(ta.cojoin(), f); 650 } 651 652 /** 653 * Maps the given function across all positions of the given TreeZipper in parallel. 654 * 655 * @param za A TreeZipper to extend the given function across. 656 * @param f A function to extend across the given TreeZipper. 657 * @return A promise of a new TreeZipper of the results of applying the given function to all positions of the 658 * given TreeZipper. 659 */ 660 public <A, B> Promise<TreeZipper<B>> parExtend(final TreeZipper<A> za, final F<TreeZipper<A>, B> f) { 661 return parMap(za.positions(), f); 662 } 663 664 /** 665 * Maps the given function across all sublists of the given NonEmptyList in parallel. 666 * 667 * @param as A NonEmptyList to extend the given function across. 668 * @param f A function to extend across the given NonEmptyList 669 * @return A promise of a new NonEmptyList of the results of applying the given function to all sublists of the 670 * given NonEmptyList. 671 */ 672 public <A, B> Promise<NonEmptyList<B>> parExtend(final NonEmptyList<A> as, final F<NonEmptyList<A>, B> f) { 673 return parMap(as.tails(), f); 674 } 675 676 }