001    package fj.control.parallel;
002    
003    import fj.Effect;
004    import fj.F;
005    import fj.F2;
006    import fj.Function;
007    import fj.P;
008    import fj.P1;
009    import fj.P2;
010    import fj.P3;
011    import fj.P4;
012    import fj.Unit;
013    import static fj.P.p;
014    import static fj.F2W.$$;
015    import static fj.FW.$;
016    import static fj.Function.curry;
017    import static fj.Function.uncurryF2;
018    import static fj.control.parallel.Promise.liftM2;
019    import fj.data.Array;
020    import fj.data.IterableW;
021    import fj.data.List;
022    import fj.data.NonEmptyList;
023    import fj.data.Option;
024    import fj.data.Stream;
025    import fj.data.Tree;
026    import fj.data.TreeZipper;
027    import fj.data.Zipper;
028    import static fj.data.Option.some;
029    import static fj.data.Stream.iterableStream;
030    import fj.pre.Monoid;
031    
032    /**
033     * A module of higher-order concurrency features.
034     */
035    public final class ParModule {
036      private final Strategy<Unit> strategy;
037    
038      private ParModule(final Strategy<Unit> strategy) {
039        this.strategy = strategy;
040      }
041    
042      /**
043       * Constructor method for ParModule
044       *
045       * @param u A parallel strategy for the module.
046       * @return A ParModule that uses the given strategy for parallelism.
047       */
048      public static ParModule parModule(final Strategy<Unit> u) {
049        return new ParModule(u);
050      }
051    
052      /**
053       * Evaluates the given product concurrently and returns a Promise of the result.
054       *
055       * @param p A product to evaluate concurrently.
056       * @return A Promise of the value of the given product, that can be claimed in the future.
057       */
058      public <A> Promise<A> promise(final P1<A> p) {
059        return Promise.promise(strategy, p);
060      }
061    
062      /**
063       * Returns a function that evaluates a given product concurrently and returns a Promise of the result.
064       *
065       * @return a function that evaluates a given product concurrently and returns a Promise of the result.
066       */
067      public <A> F<P1<A>, Promise<A>> promise() {
068        return new F<P1<A>, Promise<A>>() {
069          public Promise<A> f(final P1<A> ap1) {
070            return promise(ap1);
071          }
072        };
073      }
074    
075      /**
076       * Promotes the given function to a concurrent function that returns a Promise.
077       *
078       * @param f A given function to promote to a concurrent function.
079       * @return A function that is applied concurrently when given an argument, yielding a Promise of the result
080       *         that can be claimed in the future.
081       */
082      public <A, B> F<A, Promise<B>> promise(final F<A, B> f) {
083        return $(f).promise(strategy);
084      }
085    
086      /**
087       * Returns a function that promotes a given function to a concurrent function that returns a Promise.
088       * The pure Kleisli arrow of Promise.
089       *
090       * @return A higher-order function that takes pure functions to promise-valued functions.
091       */
092      public <A, B> F<F<A, B>, F<A, Promise<B>>> promisePure() {
093        return new F<F<A, B>, F<A, Promise<B>>>() {
094          public F<A, Promise<B>> f(final F<A, B> abf) {
095            return promise(abf);
096          }
097        };
098      }
099    
100      /**
101       * Promotes the given function to a concurrent function that returns a Promise.
102       *
103       * @param f A given function to promote to a concurrent function.
104       * @return A function that is applied concurrently when given an argument, yielding a Promise of the result
105       *         that can be claimed in the future.
106       */
107      public <A, B, C> F2<A, B, Promise<C>> promise(final F2<A, B, C> f) {
108        return P2.untuple($$(f).tuple().promise(strategy));
109      }
110    
111    
112      /**
113       * Creates a very fast concurrent effect, as an actor that does not guarantee ordering of its messages.
114       * Such an actor is not thread-safe unless the given Effect is.
115       *
116       * @param e The effect that the actor should have on its messages.
117       * @return A concurrent actor that does not guarantee ordering of its messages.
118       */
119      public <A> Actor<A> effect(final Effect<A> e) {
120        return Actor.actor(strategy, e);
121      }
122    
123      /**
124       * A first-class constructor of concurrent effects, as actors that don't guarantee ordering of messages.
125       * Such an actor is not thread-safe unless the given Effect is.
126       *
127       * @return A function that takes an effect and returns a concurrent effect.
128       */
129      public <A> F<Effect<A>, Actor<A>> effect() {
130        return new F<Effect<A>, Actor<A>>() {
131          public Actor<A> f(final Effect<A> effect) {
132            return effect(effect);
133          }
134        };
135      }
136    
137      /**
138       * Creates a concurrent actor that is guaranteed to process only one message at a time.
139       *
140       * @param e The effect that the actor should have on its messages.
141       * @return A concurrent actor that is guaranteed to process its messages in some order.
142       */
143      public <A> QueueActor<A> actor(final Effect<A> e) {
144        return QueueActor.queueActor(strategy, e);
145      }
146    
147      /**
148       * A first-class constructor of actors.
149       *
150       * @return A function that takes an effect and returns an actor that processes messages in some order.
151       */
152      public <A> F<Effect<A>, QueueActor<A>> actor() {
153        return new F<Effect<A>, QueueActor<A>>() {
154          public QueueActor<A> f(final Effect<A> effect) {
155            return actor(effect);
156          }
157        };
158      }
159    
160      /**
161       * List iteration inside a Promise. Traverses a List of Promises yielding a Promise of a List.
162       *
163       * @param ps A list of promises to sequence.
164       * @return A promise of the List of values promised by the list of promises.
165       */
166      public <A> Promise<List<A>> sequence(final List<Promise<A>> ps) {
167        return Promise.sequence(strategy, ps);
168      }
169    
170      /**
171       * A first-class function that traverses a list inside a promise.
172       *
173       * @return A first-class function that traverses a list inside a promise.
174       */
175      public <A> F<List<Promise<A>>, Promise<List<A>>> sequenceList() {
176        return new F<List<Promise<A>>, Promise<List<A>>>() {
177          public Promise<List<A>> f(final List<Promise<A>> list) {
178            return sequence(list);
179          }
180        };
181      }
182    
183      /**
184       * Stream iteration inside a Promise. Traverses a Stream of Promises yielding a Promise of a Stream.
185       *
186       * @param ps A Stream of promises to sequence.
187       * @return A promise of the Stream of values promised by the Stream of promises.
188       */
189      public <A> Promise<Stream<A>> sequence(final Stream<Promise<A>> ps) {
190        return Promise.sequence(strategy, ps);
191      }
192    
193      /**
194       * A first-class function that traverses a stream inside a promise.
195       *
196       * @return A first-class function that traverses a stream inside a promise.
197       */
198      public <A> F<Stream<Promise<A>>, Promise<Stream<A>>> sequenceStream() {
199        return new F<Stream<Promise<A>>, Promise<Stream<A>>>() {
200          public Promise<Stream<A>> f(final Stream<Promise<A>> stream) {
201            return sequence(stream);
202          }
203        };
204      }
205    
206      /**
207       * Traverses a product-1 inside a promise.
208       *
209       * @param p A product-1 of a promised value.
210       * @return A promise of a product of the value promised by the argument.
211       */
212      public <A> Promise<P1<A>> sequence(final P1<Promise<A>> p) {
213        return Promise.sequence(strategy, p);
214      }
215    
216      /**
217       * Takes a Promise-valued function and applies it to each element
218       * in the given List, yielding a promise of a List of results.
219       *
220       * @param as A list to map across.
221       * @param f  A promise-valued function to map across the list.
222       * @return A Promise of a new list with the given function applied to each element.
223       */
224      public <A, B> Promise<List<B>> mapM(final List<A> as, final F<A, Promise<B>> f) {
225        return sequence(as.map(f));
226      }
227    
228      /**
229       * First-class function that maps a concurrent function over a List inside a promise.
230       *
231       * @return a function that maps a concurrent function over a List inside a promise.
232       */
233      public <A, B> F<F<A, Promise<B>>, F<List<A>, Promise<List<B>>>> mapList() {
234        return curry(new F2<F<A, Promise<B>>, List<A>, Promise<List<B>>>() {
235          public Promise<List<B>> f(final F<A, Promise<B>> f, final List<A> list) {
236            return mapM(list, f);
237          }
238        });
239      }
240    
241      /**
242       * Takes a Promise-valued function and applies it to each element
243       * in the given Stream, yielding a promise of a Stream of results.
244       *
245       * @param as A Stream to map across.
246       * @param f  A promise-valued function to map across the Stream.
247       * @return A Promise of a new Stream with the given function applied to each element.
248       */
249      public <A, B> Promise<Stream<B>> mapM(final Stream<A> as, final F<A, Promise<B>> f) {
250        return sequence(as.map(f));
251      }
252    
253      /**
254       * First-class function that maps a concurrent function over a Stream inside a promise.
255       *
256       * @return a function that maps a concurrent function over a Stream inside a promise.
257       */
258      public <A, B> F<F<A, Promise<B>>, F<Stream<A>, Promise<Stream<B>>>> mapStream() {
259        return curry(new F2<F<A, Promise<B>>, Stream<A>, Promise<Stream<B>>>() {
260          public Promise<Stream<B>> f(final F<A, Promise<B>> f, final Stream<A> stream) {
261            return mapM(stream, f);
262          }
263        });
264      }
265    
266      /**
267       * Maps a concurrent function over a Product-1 inside a Promise.
268       *
269       * @param a A product-1 across which to map.
270       * @param f A concurrent function to map over the product inside a promise.
271       * @return A promised product of the result of mapping the given function over the given product.
272       */
273      public <A, B> Promise<P1<B>> mapM(final P1<A> a, final F<A, Promise<B>> f) {
274        return sequence(a.map(f));
275      }
276    
277      /**
278       * Maps across a list in parallel.
279       *
280       * @param as A list to map across in parallel.
281       * @param f  A function to map across the given list.
282       * @return A Promise of a new list with the given function applied to each element.
283       */
284      public <A, B> Promise<List<B>> parMap(final List<A> as, final F<A, B> f) {
285        return mapM(as, promise(f));
286      }
287    
288      /**
289       * A first-class function that maps another function across a list in parallel.
290       *
291       * @return A function that maps another function across a list in parallel.
292       */
293      public <A, B> F<F<A, B>, F<List<A>, Promise<List<B>>>> parMapList() {
294        return curry(new F2<F<A, B>, List<A>, Promise<List<B>>>() {
295          public Promise<List<B>> f(final F<A, B> abf, final List<A> list) {
296            return parMap(list, abf);
297          }
298        });
299      }
300    
301      /**
302       * Maps across a nonempty list in parallel.
303       *
304       * @param as A NonEmptyList to map across in parallel.
305       * @param f  A function to map across the given NonEmptyList.
306       * @return A Promise of a new NonEmptyList with the given function applied to each element.
307       */
308      public <A, B> Promise<NonEmptyList<B>> parMap(final NonEmptyList<A> as, final F<A, B> f) {
309        return mapM(as.toList(), promise(f)).fmap(new F<List<B>, NonEmptyList<B>>() {
310          public NonEmptyList<B> f(final List<B> list) {
311            return NonEmptyList.fromList(list).some();
312          }
313        });
314      }
315    
316      /**
317       * Maps across a Stream in parallel.
318       *
319       * @param as A Stream to map across in parallel.
320       * @param f  A function to map across the given Stream.
321       * @return A Promise of a new Stream with the given function applied to each element.
322       */
323      public <A, B> Promise<Stream<B>> parMap(final Stream<A> as, final F<A, B> f) {
324        return mapM(as, promise(f));
325      }
326    
327      /**
328       * A first-class function that maps another function across a stream in parallel.
329       *
330       * @return A function that maps another function across a stream in parallel.
331       */
332      public <A, B> F<F<A, B>, F<Stream<A>, Promise<Stream<B>>>> parMapStream() {
333        return curry(new F2<F<A, B>, Stream<A>, Promise<Stream<B>>>() {
334          public Promise<Stream<B>> f(final F<A, B> abf, final Stream<A> stream) {
335            return parMap(stream, abf);
336          }
337        });
338      }
339    
340      /**
341       * Maps across an Iterable in parallel.
342       *
343       * @param as An Iterable to map across in parallel.
344       * @param f  A function to map across the given Iterable.
345       * @return A Promise of a new Iterable with the given function applied to each element.
346       */
347      public <A, B> Promise<Iterable<B>> parMap(final Iterable<A> as, final F<A, B> f) {
348        return parMap(iterableStream(as), f)
349            .fmap(Function.<Stream<B>, Iterable<B>>vary(Function.<Stream<B>>identity()));
350      }
351    
352      /**
353       * A first-class function that maps another function across an iterable in parallel.
354       *
355       * @return A function that maps another function across an iterable in parallel.
356       */
357      public <A, B> F<F<A, B>, F<Iterable<A>, Promise<Iterable<B>>>> parMapIterable() {
358        return curry(new F2<F<A, B>, Iterable<A>, Promise<Iterable<B>>>() {
359          public Promise<Iterable<B>> f(final F<A, B> abf, final Iterable<A> iterable) {
360            return parMap(iterable, abf);
361          }
362        });
363      }
364    
365      /**
366       * Maps across an Array in parallel.
367       *
368       * @param as An array to map across in parallel.
369       * @param f  A function to map across the given Array.
370       * @return A Promise of a new Array with the given function applied to each element.
371       */
372      public <A, B> Promise<Array<B>> parMap(final Array<A> as, final F<A, B> f) {
373        return parMap(as.toStream(), f).fmap(new F<Stream<B>, Array<B>>() {
374          public Array<B> f(final Stream<B> stream) {
375            return stream.toArray();
376          }
377        });
378      }
379    
380      /**
381       * A first-class function that maps another function across an array in parallel.
382       *
383       * @return A function that maps another function across an array in parallel.
384       */
385      public <A, B> F<F<A, B>, F<Array<A>, Promise<Array<B>>>> parMapArray() {
386        return curry(new F2<F<A, B>, Array<A>, Promise<Array<B>>>() {
387          public Promise<Array<B>> f(final F<A, B> abf, final Array<A> array) {
388            return parMap(array, abf);
389          }
390        });
391      }
392    
393      /**
394       * Maps a function across a Zipper in parallel.
395       *
396       * @param za A Zipper to map across in parallel.
397       * @param f  A function to map across the given Zipper.
398       * @return A promise of a new Zipper with the given function applied to each element.
399       */
400      public <A, B> Promise<Zipper<B>> parMap(final Zipper<A> za, final F<A, B> f) {
401        return parMap(za.rights(), f)
402            .apply(promise(f).f(za.focus()).apply(parMap(za.lefts(), f).fmap(curry(Zipper.<B>zipper()))));
403      }
404    
405      /**
406       * Maps a function across a Tree in parallel.
407       *
408       * @param ta A Tree to map across in parallel.
409       * @param f  A function to map across the given Tree.
410       * @return A promise of a new Tree with the given function applied to each element.
411       */
412      public <A, B> Promise<Tree<B>> parMap(final Tree<A> ta, final F<A, B> f) {
413        return mapM(ta.subForest(), this.<Tree<A>, Tree<B>>mapStream().f(this.<A, B>parMapTree().f(f)))
414            .apply(promise(f).f(ta.root()).fmap(Tree.<B>node()));
415      }
416    
417      /**
418       * A first-class function that maps across a Tree in parallel.
419       *
420       * @return A function that maps a given function across a Tree in parallel.
421       */
422      public <A, B> F<F<A, B>, F<Tree<A>, Promise<Tree<B>>>> parMapTree() {
423        return curry(new F2<F<A, B>, Tree<A>, Promise<Tree<B>>>() {
424          public Promise<Tree<B>> f(final F<A, B> abf, final Tree<A> tree) {
425            return parMap(tree, abf);
426          }
427        });
428      }
429    
430      /**
431       * Maps a function across a TreeZipper in parallel.
432       *
433       * @param za A TreeZipper to map across in parallel.
434       * @param f  A function to map across the given TreeZipper.
435       * @return A promise of a new TreeZipper with the given function applied to each element of the tree.
436       */
437      public <A, B> Promise<TreeZipper<B>> parMap(final TreeZipper<A> za, final F<A, B> f) {
438        final F<Tree<A>, Tree<B>> tf = Tree.<A, B>fmap_().f(f);
439        final P4<Tree<A>, Stream<Tree<A>>, Stream<Tree<A>>, Stream<P3<Stream<Tree<A>>, A, Stream<Tree<A>>>>> p = za.p();
440        return mapM(p._4(),
441                    new F<P3<Stream<Tree<A>>, A, Stream<Tree<A>>>, Promise<P3<Stream<Tree<B>>, B, Stream<Tree<B>>>>>() {
442                      public Promise<P3<Stream<Tree<B>>, B, Stream<Tree<B>>>> f(
443                          final P3<Stream<Tree<A>>, A, Stream<Tree<A>>> p3) {
444                        return parMap(p3._3(), tf).apply(promise(f).f(p3._2()).apply(
445                            parMap(p3._1(), tf).fmap(P.<Stream<Tree<B>>, B, Stream<Tree<B>>>p3())));
446                      }
447                    }).apply(parMap(za.rights(), tf).apply(
448            parMap(za.lefts(), tf).apply(parMap(p._1(), f).fmap(TreeZipper.<B>treeZipper()))));
449      }
450    
451      /**
452       * Binds a list-valued function across a list in parallel, concatenating the results into a new list.
453       *
454       * @param as A list to bind across in parallel.
455       * @param f  A function to bind across the given list in parallel.
456       * @return A promise of a new List with the given function bound across its elements.
457       */
458      public <A, B> Promise<List<B>> parFlatMap(final List<A> as, final F<A, List<B>> f) {
459        return parFoldMap(as, f, Monoid.<B>listMonoid());
460      }
461    
462      /**
463       * Binds a Stream-valued function across a Stream in parallel, concatenating the results into a new Stream.
464       *
465       * @param as A Stream to bind across in parallel.
466       * @param f  A function to bind across the given Stream in parallel.
467       * @return A promise of a new Stream with the given function bound across its elements.
468       */
469      public <A, B> Promise<Stream<B>> parFlatMap(final Stream<A> as, final F<A, Stream<B>> f) {
470        return parFoldMap(as, f, Monoid.<B>streamMonoid());
471      }
472    
473      /**
474       * Binds an Array-valued function across an Array in parallel, concatenating the results into a new Array.
475       *
476       * @param as An Array to bind across in parallel.
477       * @param f  A function to bind across the given Array in parallel.
478       * @return A promise of a new Array with the given function bound across its elements.
479       */
480      public <A, B> Promise<Array<B>> parFlatMap(final Array<A> as, final F<A, Array<B>> f) {
481        return parMap(as, f).fmap(Array.<B>join());
482      }
483    
484      /**
485       * Binds an Iterable-valued function across an Iterable in parallel, concatenating the results into a new Iterable.
486       *
487       * @param as A Iterable to bind across in parallel.
488       * @param f  A function to bind across the given Iterable in parallel.
489       * @return A promise of a new Iterable with the given function bound across its elements.
490       */
491      public <A, B> Promise<Iterable<B>> parFlatMap(final Iterable<A> as, final F<A, Iterable<B>> f) {
492        return parMap(as, f).fmap(IterableW.<B, Iterable<B>>join())
493            .fmap(Function.<IterableW<B>, Iterable<B>>vary(Function.<Iterable<B>>identity()));
494      }
495    
496      /**
497       * Zips two lists together with a given function, in parallel.
498       *
499       * @param as A list to zip with another in parallel.
500       * @param bs A list to zip with another in parallel.
501       * @param f  A function with which to zip two lists in parallel.
502       * @return A Promise of a new list with the results of applying the given function across the two lists in lockstep.
503       */
504      public <A, B, C> Promise<List<C>> parZipWith(final List<A> as, final List<B> bs, final F<A, F<B, C>> f) {
505        return sequence(as.<B, Promise<C>>zipWith(bs, promise(uncurryF2(f))));
506      }
507    
508      /**
509       * Zips two streams together with a given function, in parallel.
510       *
511       * @param as A stream to zip with another in parallel.
512       * @param bs A stream to zip with another in parallel.
513       * @param f  A function with which to zip two streams in parallel.
514       * @return A Promise of a new stream with the results of applying the given function across the two streams, stepwise.
515       */
516      public <A, B, C> Promise<Stream<C>> parZipWith(final Stream<A> as, final Stream<B> bs, final F<A, F<B, C>> f) {
517        return sequence(as.<B, Promise<C>>zipWith(bs, promise(uncurryF2(f))));
518      }
519    
520      /**
521       * Zips two arrays together with a given function, in parallel.
522       *
523       * @param as An array to zip with another in parallel.
524       * @param bs An array to zip with another in parallel.
525       * @param f  A function with which to zip two arrays in parallel.
526       * @return A Promise of a new array with the results of applying the given function across the two arrays, stepwise.
527       */
528      public <A, B, C> Promise<Array<C>> parZipWith(final Array<A> as, final Array<B> bs, final F<A, F<B, C>> f) {
529        return parZipWith(as.toStream(), bs.toStream(), f).fmap(new F<Stream<C>, Array<C>>() {
530          public Array<C> f(final Stream<C> stream) {
531            return stream.toArray();
532          }
533        });
534      }
535    
536      /**
537       * Zips two iterables together with a given function, in parallel.
538       *
539       * @param as An iterable to zip with another in parallel.
540       * @param bs An iterable to zip with another in parallel.
541       * @param f  A function with which to zip two iterables in parallel.
542       * @return A Promise of a new iterable with the results of applying the given function across the two iterables, stepwise.
543       */
544      public <A, B, C> Promise<Iterable<C>> parZipWith(final Iterable<A> as, final Iterable<B> bs, final F<A, F<B, C>> f) {
545        return parZipWith(iterableStream(as), iterableStream(bs), f).fmap(
546            Function.<Stream<C>, Iterable<C>>vary(Function.<Iterable<C>>identity()));
547      }
548    
549      /**
550       * Maps with the given function across the given stream in parallel, while folding with
551       * the given monoid.
552       *
553       * @param as     A stream to map over and reduce.
554       * @param map    The function to map over the given stream.
555       * @param reduce The monoid with which to sum the results.
556       * @return A promise of a result of mapping and folding in parallel.
557       */
558      public <A, B> Promise<B> parFoldMap(final Stream<A> as, final F<A, B> map, final Monoid<B> reduce) {
559        return as.isEmpty() ? promise(p(reduce.zero())) : as.map(promise(map)).foldLeft1(liftM2(reduce.sum()));
560      }
561    
562      /**
563       * Maps with the given function across chunks of the given stream in parallel, while folding with
564       * the given monoid. The stream is split into chunks according to the given chunking function,
565       * the given map function is mapped over all chunks simultaneously, but over each chunk sequentially.
566       * All chunks are summed concurrently and the sums are then summed sequentially.
567       *
568       * @param as       A stream to chunk, then map over and reduce.
569       * @param map      The function to map over the given stream.
570       * @param reduce   The monoid with which to sum the results.
571       * @param chunking A function describing how the stream should be split into chunks. Should return the first chunk
572       *                 and the rest of the stream.
573       * @return A promise of a result of mapping and folding in parallel.
574       */
575      public <A, B> Promise<B> parFoldMap(final Stream<A> as, final F<A, B> map, final Monoid<B> reduce,
576                                          final F<Stream<A>, P2<Stream<A>, Stream<A>>> chunking) {
577        return parMap(Stream.unfold(new F<Stream<A>, Option<P2<Stream<A>, Stream<A>>>>() {
578          public Option<P2<Stream<A>, Stream<A>>> f(final Stream<A> stream) {
579            return stream.isEmpty() ? Option.<P2<Stream<A>, Stream<A>>>none() : some(chunking.f(stream));
580          }
581        }, as), Stream.<A, B>map_().f(map)).bind(new F<Stream<Stream<B>>, Promise<B>>() {
582          public Promise<B> f(final Stream<Stream<B>> stream) {
583            return parMap(stream, reduce.sumLeftS()).fmap(reduce.sumLeftS());
584          }
585        });
586      }
587    
588      /**
589       * Maps with the given function across chunks of the given Iterable in parallel, while folding with
590       * the given monoid. The Iterable is split into chunks according to the given chunking function,
591       * the given map function is mapped over all chunks simultaneously, but over each chunk sequentially.
592       * All chunks are summed concurrently and the sums are then summed sequentially.
593       *
594       * @param as       An Iterable to chunk, then map over and reduce.
595       * @param map      The function to map over the given Iterable.
596       * @param reduce   The monoid with which to sum the results.
597       * @param chunking A function describing how the Iterable should be split into chunks. Should return the first chunk
598       *                 and the rest of the Iterable.
599       * @return A promise of a result of mapping and folding in parallel.
600       */
601      public <A, B> Promise<B> parFoldMap(final Iterable<A> as, final F<A, B> map, final Monoid<B> reduce,
602                                          final F<Iterable<A>, P2<Iterable<A>, Iterable<A>>> chunking) {
603        return parFoldMap(iterableStream(as), map, reduce, new F<Stream<A>, P2<Stream<A>, Stream<A>>>() {
604          public P2<Stream<A>, Stream<A>> f(final Stream<A> stream) {
605            final F<Iterable<A>, Stream<A>> is = new F<Iterable<A>, Stream<A>>() {
606              public Stream<A> f(final Iterable<A> iterable) {
607                return iterableStream(iterable);
608              }
609            };
610            return chunking.f(stream).map1(is).map2(is);
611          }
612        });
613      }
614    
615      /**
616       * Maps with the given function across the given iterable in parallel, while folding with
617       * the given monoid.
618       *
619       * @param as     An Iterable to map over and reduce.
620       * @param map    The function to map over the given Iterable.
621       * @param reduce The Monoid with which to sum the results.
622       * @return A promise of a result of mapping and folding in parallel.
623       */
624      public <A, B> Promise<B> parFoldMap(final Iterable<A> as, final F<A, B> map, final Monoid<B> reduce) {
625        return parFoldMap(iterableStream(as), map, reduce);
626      }
627    
628    
629      /**
630       * Maps the given function across all positions of the given zipper in parallel.
631       *
632       * @param za A zipper to extend the given function across.
633       * @param f  A function to extend across the given zipper.
634       * @return A promise of a new zipper of the results of applying the given function to all positions of the given
635       *         zipper.
636       */
637      public <A, B> Promise<Zipper<B>> parExtend(final Zipper<A> za, final F<Zipper<A>, B> f) {
638        return parMap(za.positions(), f);
639      }
640    
641      /**
642       * Maps the given function across all subtrees of the given Tree in parallel.
643       *
644       * @param ta A tree to extend the given function across.
645       * @param f  A function to extend across the given Tree.
646       * @return A promise of a new Tree of the results of applying the given function to all subtrees of the given Tree.
647       */
648      public <A, B> Promise<Tree<B>> parExtend(final Tree<A> ta, final F<Tree<A>, B> f) {
649        return parMap(ta.cojoin(), f);
650      }
651    
652      /**
653       * Maps the given function across all positions of the given TreeZipper in parallel.
654       *
655       * @param za A TreeZipper to extend the given function across.
656       * @param f  A function to extend across the given TreeZipper.
657       * @return A promise of a new TreeZipper of the results of applying the given function to all positions of the
658       *         given TreeZipper.
659       */
660      public <A, B> Promise<TreeZipper<B>> parExtend(final TreeZipper<A> za, final F<TreeZipper<A>, B> f) {
661        return parMap(za.positions(), f);
662      }
663    
664      /**
665       * Maps the given function across all sublists of the given NonEmptyList in parallel.
666       *
667       * @param as A NonEmptyList to extend the given function across.
668       * @param f  A function to extend across the given NonEmptyList
669       * @return A promise of a new NonEmptyList of the results of applying the given function to all sublists of the
670       *         given NonEmptyList.
671       */
672      public <A, B> Promise<NonEmptyList<B>> parExtend(final NonEmptyList<A> as, final F<NonEmptyList<A>, B> f) {
673        return parMap(as.tails(), f);
674      }
675    
676    }