001    package fj.control.parallel;
002    
003    import fj.Effect;
004    import fj.F;
005    import fj.F2;
006    import fj.Function;
007    import fj.P;
008    import fj.P1;
009    import static fj.Function.compose;
010    import static fj.Function.curry;
011    import static fj.P1.fmap;
012    import static fj.P1.sequence;
013    import fj.data.Java;
014    import fj.data.List;
015    import fj.data.Array;
016    
017    import java.util.concurrent.Callable;
018    import java.util.concurrent.CompletionService;
019    import java.util.concurrent.ExecutionException;
020    import java.util.concurrent.ExecutorService;
021    import java.util.concurrent.Future;
022    import java.util.concurrent.FutureTask;
023    
024    /**
025     * Functional-style parallel evaluation strategies.
026     * A Strategy is a method of evaluating a product-1, yielding another product-1 from which the result of its evaluation
027     * can be retrieved at a later time.
028     * <p/>
029     *
030     * @version %build.number%<br>
031     *          <ul>
032     *          <li>$LastChangedRevision: 161 $</li>
033     *          <li>$LastChangedDate: 2009-06-01 17:14:38 +1000 (Mon, 01 Jun 2009) $</li>
034     *          <li>Author: runar</li>
035     *          </ul>
036     */
037    public final class Strategy<A> {
038    
039      private final F<P1<A>, P1<A>> f;
040    
041      private Strategy(final F<P1<A>, P1<A>> f) {
042        this.f = f;
043      }
044    
045      /**
046       * Returns the functional representation of this Strategy, a function that evaluates a product-1.
047       *
048       * @return The function representing this strategy, which evaluates a product-1.
049       */
050      public F<P1<A>, P1<A>> f() {
051        return f;
052      }
053    
054      /**
055       * Constructs a strategy from the given evaluation function.
056       *
057       * @param f The execution function for the strategy
058       * @return A strategy that uses the given function to evaluate product-1s.
059       */
060      public static <A> Strategy<A> strategy(final F<P1<A>, P1<A>> f) {
061        return new Strategy<A>(f);
062      }
063    
064      /**
065       * Apply the strategy to the given product-1.
066       *
067       * @param a A P1 to evaluate according to this strategy.
068       * @return A P1 that yields the value from calling the given product-1.
069       */
070      public P1<A> par(final P1<A> a) {
071        return f().f(a);
072      }
073    
074      /**
075       * Promotes a function to a concurrent function.
076       *
077       * @param f A function to promote to a concurrent function.
078       * @return A function that executes concurrently when called, yielding a Future value.
079       */
080      public <B> F<B, P1<A>> concurry(final F<B, A> f) {
081        return compose(f(), P1.<B, A>curry(f));
082      }
083    
084      /**
085       * Promotes a function of arity-2 to a concurrent function.
086       *
087       * @param f The function to promote to a concurrent function.
088       * @return A function that executes concurrently when called, yielding a product-1 that returns the value.
089       */
090      public <B, C> F<B, F<C, P1<A>>> concurry(final F2<B, C, A> f) {
091        return new F<B, F<C, P1<A>>>() {
092          public F<C, P1<A>> f(final B b) {
093            return concurry(curry(f).f(b));
094          }
095        };
096      }
097    
098      /**
099       * Waits for every Future in a list to obtain a value, and collects those values in a list.
100       *
101       * @param xs The list of Futures from which to get values.
102       * @return A list of values extracted from the Futures in the argument list.
103       */
104      public static <A> List<P1<A>> mergeAll(final List<Future<A>> xs) {
105        return xs.map(Strategy.<A>obtain());
106      }
107    
108      /**
109       * Evaluates a list of product-1s in parallel.
110       *
111       * @param ps A list to evaluate in parallel.
112       * @return A list of the values of the product-1s in the argument.
113       */
114      public P1<List<A>> parList(final List<P1<A>> ps) {
115        return sequence(ps.map(f()));
116      }
117    
118      /**
119       * Maps the given function over the given list in parallel using this strategy.
120       *
121       * @param f  A function to map over the given list in parallel.
122       * @param bs A list over which to map the given function in parallel.
123       * @return A product-1 that returns the list with all of its elements transformed by the given function.
124       */
125      public <B> P1<List<A>> parMap(final F<B, A> f, final List<B> bs) {
126        return sequence(bs.map(concurry(f)));
127      }
128    
129      /**
130       * Maps the given function over the given array in parallel using this strategy.
131       *
132       * @param f  A function to map over the given array in parallel.
133       * @param bs An array over which to map the given function in parallel.
134       * @return A product-1 that returns the array with all of its elements transformed by the given function.
135       */
136      public <B> P1<Array<A>> parMap(final F<B, A> f, final Array<B> bs) {
137        return sequence(bs.map(concurry(f)));
138      }
139    
140      /**
141       * A strict version of parMap over lists.
142       * Maps the given function over the given list in parallel using this strategy,
143       * blocking the current thread until all values have been obtained.
144       *
145       * @param f  A function to map over the given list in parallel.
146       * @param bs A list over which to map the given function in parallel.
147       * @return A list with all of its elements transformed by the given function.
148       */
149      public <B> List<A> parMap1(final F<B, A> f, final List<B> bs) {
150        return compose(P1.<List<A>>__1(), parMapList(f)).f(bs);
151      }
152    
153      /**
154       * A strict version of parMap over arrays.
155       * Maps the given function over the given arrays in parallel using this strategy,
156       * blocking the current thread until all values have been obtained.
157       *
158       * @param f  A function to map over the given array in parallel.
159       * @param bs An array over which to map the given function in parallel.
160       * @return An array with all of its elements transformed by the given function.
161       */
162      public <B> Array<A> parMap1(final F<B, A> f, final Array<B> bs) {
163        return compose(P1.<Array<A>>__1(), parMapArray(f)).f(bs);
164      }
165    
166      /**
167       * Promotes a function to a parallel function on lists using this strategy.
168       *
169       * @param f A function to transform into a parallel function on lists.
170       * @return The function transformed into a parallel function on lists.
171       */
172      public <B> F<List<B>, P1<List<A>>> parMapList(final F<B, A> f) {
173        return new F<List<B>, P1<List<A>>>() {
174          public P1<List<A>> f(final List<B> as) {
175            return parMap(f, as);
176          }
177        };
178      }
179    
180      /**
181       * First-class version of parMap on lists.
182       *
183       * @return A function that promotes another function to a parallel function on lists.
184       */
185      public <B> F<F<B, A>, F<List<B>, P1<List<A>>>> parMapList() {
186        return new F<F<B, A>, F<List<B>, P1<List<A>>>>() {
187          public F<List<B>, P1<List<A>>> f(final F<B, A> f) {
188            return parMapList(f);
189          }
190        };
191      }
192    
193      /**
194       * First-class version of parMap1 on lists (parallel list functor).
195       *
196       * @return A function that promotes another function to a blocking parallel function on lists.
197       */
198      public <B> F<F<B, A>, F<List<B>, List<A>>> parMapList1() {
199        return new F<F<B, A>, F<List<B>, List<A>>>() {
200          public F<List<B>, List<A>> f(final F<B, A> f) {
201            return new F<List<B>, List<A>>() {
202              public List<A> f(final List<B> bs) {
203                return parMap1(f, bs);
204              }
205            };
206          }
207        };
208      }
209    
210      /**
211       * Promotes a function to a parallel function on arrays using this strategy.
212       *
213       * @param f A function to transform into a parallel function on arrays.
214       * @return The function transformed into a parallel function on arrays.
215       */
216      public <B> F<Array<B>, P1<Array<A>>> parMapArray(final F<B, A> f) {
217        return new F<Array<B>, P1<Array<A>>>() {
218          public P1<Array<A>> f(final Array<B> as) {
219            return parMap(f, as);
220          }
221        };
222      }
223    
224      /**
225       * First-class version of parMap on arrays.
226       *
227       * @return A function that promotes another function to a parallel function on arrays.
228       */
229      public <B> F<F<B, A>, F<Array<B>, P1<Array<A>>>> parMapArray() {
230        return new F<F<B, A>, F<Array<B>, P1<Array<A>>>>() {
231          public F<Array<B>, P1<Array<A>>> f(final F<B, A> f) {
232            return parMapArray(f);
233          }
234        };
235      }
236    
237      /**
238       * First-class version of parMap1 on arrays (parallel array functor).
239       *
240       * @return A function that promotes another function to a blocking parallel function on arrays.
241       */
242      public <B> F<F<B, A>, F<Array<B>, Array<A>>> parMapArray1() {
243        return new F<F<B, A>, F<Array<B>, Array<A>>>() {
244          public F<Array<B>, Array<A>> f(final F<B, A> f) {
245            return new F<Array<B>, Array<A>>() {
246              public Array<A> f(final Array<B> bs) {
247                return parMap1(f, bs);
248              }
249            };
250          }
251        };
252      }
253    
254      /**
255       * Binds the given function in parallel across the given list, using the given strategy, with a final join.
256       *
257       * @param s  The strategy to use for parallelization.
258       * @param f  The function to bind across the given list.
259       * @param as The list across which to bind the given function.
260       * @return A P1 containing the result of the parallel map operation after the final join.
261       */
262      public static <A, B> P1<List<B>> parFlatMap(final Strategy<List<B>> s,
263                                                  final F<A, List<B>> f,
264                                                  final List<A> as) {
265        return fmap(List.<B>join()).f(s.parMap(f, as));
266      }
267    
268      /**
269       * Binds the given function in parallel across the given array, using the given strategy, with a final join.
270       *
271       * @param s  The strategy to use for parallelization.
272       * @param f  The function to bind across the given array.
273       * @param as The array across which to bind the given function.
274       * @return A P1 containing the result of the parallel map operation after the final join.
275       */
276      public static <A, B> P1<Array<B>> parFlatMap(final Strategy<Array<B>> s,
277                                                   final F<A, Array<B>> f,
278                                                   final Array<A> as) {
279        return fmap(Array.<B>join()).f(s.parMap(f, as));
280      }
281    
282      /**
283       * Sequentially evaluates chunks (sub-sequences) of a list in parallel. Splits the list into chunks,
284       * evaluating the chunks simultaneously, but each chunk as a sequence.
285       *
286       * @param s           The strategy to use for parallelization.
287       * @param chunkLength The length of each sequence.
288       * @param as          The list to evaluate in parallel chunks.
289       * @return A product-1 containing the list of results extracted from the given list of product-1s.
290       */
291      public static <A> P1<List<A>> parListChunk(final Strategy<List<A>> s,
292                                                 final int chunkLength,
293                                                 final List<P1<A>> as) {
294        return fmap(List.<A>join()).f(s.parList(as.partition(chunkLength).map(P1.<A>sequenceList())));
295      }
296    
297      /**
298       * Zips together two lists in parallel using a given function, with this strategy.
299       * Calls the given function once for each corresponding pair in the lists, position-wise,
300       * passing elements from the first list to the first argument of the function, and elements from the second list
301       * to the second argument of the function, yielding a list of the results.
302       * If the lists are not of the same length, the remaining elements of the longer list are ignored.
303       *
304       * @param f  The function of arity-2 with which to zip.
305       * @param bs A list to zip with the given function.
306       * @param cs A list to zip with the given function.
307       * @return The list of the results of calling the given function on corresponding elements of the given lists.
308       */
309      public <B, C> P1<List<A>> parZipWith(final F2<B, C, A> f, final List<B> bs, final List<C> cs) {
310        return sequence(bs.zipWith(cs, concurry(f)));
311      }
312    
313      /**
314       * Zips together two arrays in parallel using a given function, with this strategy.
315       * Calls the given function once for each corresponding pair in the arrays, position-wise,
316       * passing elements from the first array to the first argument of the function, and elements from the second array
317       * to the second argument of the function, yielding a array of the results.
318       * If the arrays are not of the same length, the remaining elements of the longer array are ignored.
319       *
320       * @param f  The function of arity-2 with which to zip.
321       * @param bs A array to zip with the given function.
322       * @param cs A array to zip with the given function.
323       * @return The array of the results of calling the given function on corresponding elements of the given arrays.
324       */
325      public <B, C> P1<Array<A>> parZipWith(final F2<B, C, A> f, final Array<B> bs, final Array<C> cs) {
326        return sequence(bs.zipWith(cs, concurry(f)));
327      }
328    
329      /**
330       * Lifts a given function of arity-2 so that it zips together two lists in parallel,
331       * using this strategy, calling the function once for each corresponding pair in the lists, position-wise.
332       *
333       * @param f The function of arity-2 with which to zip.
334       * @return A transformation that zips two lists using the argument function, in parallel.
335       */
336      public <B, C> F2<List<B>, List<C>, P1<List<A>>> parZipListWith(final F2<B, C, A> f) {
337        return new F2<List<B>, List<C>, P1<List<A>>>() {
338          public P1<List<A>> f(final List<B> bs, final List<C> cs) {
339            return parZipWith(f, bs, cs);
340          }
341        };
342      }
343    
344      /**
345       * Lifts a given function of arity-2 so that it zips together two arrays in parallel,
346       * using this strategy, calling the function once for each corresponding pair in the arrays, position-wise.
347       *
348       * @param f The function of arity-2 with which to zip.
349       * @return A transformation that zips two arrays using the argument function, in parallel.
350       */
351      public <B, C> F2<Array<B>, Array<C>, P1<Array<A>>> parZipArrayWith(final F2<B, C, A> f) {
352        return new F2<Array<B>, Array<C>, P1<Array<A>>>() {
353          public P1<Array<A>> f(final Array<B> bs, final Array<C> cs) {
354            return parZipWith(f, bs, cs);
355          }
356        };
357      }
358    
359      /**
360       * Returns a function which returns a product-1 which waits for the given Future to obtain a value.
361       *
362       * @return A function which, given a Future, yields a product-1 that waits for it.
363       */
364      public static <A> F<Future<A>, P1<A>> obtain() {
365        return new F<Future<A>, P1<A>>() {
366          public P1<A> f(final Future<A> t) {
367            return obtain(t);
368          }
369        };
370      }
371    
372      /**
373       * Provides a product-1 that waits for the given future to obtain a value.
374       *
375       * @param t A Future for which to wait.
376       * @return A product-1 that waits for the given future to obtain a value.
377       */
378      public static <A> P1<A> obtain(final Future<A> t) {
379        return new P1<A>() {
380          public A _1() {
381            try {
382              return t.get();
383            } catch (InterruptedException e) {
384              Thread.currentThread().interrupt();
385              throw new Error(e);
386            } catch (ExecutionException e) {
387              throw new Error(e);
388            }
389          }
390        };
391      }
392    
393      /**
394       * Returns an Effect that waits for a given Future to obtain a value, discarding the value.
395       *
396       * @return An effect, which, given a Future, waits for it to obtain a value, discarding the value.
397       */
398      public static <A> Effect<Future<A>> discard() {
399        return new Effect<Future<A>>() {
400          public void e(final Future<A> a) {
401            Strategy.<A>obtain().f(a)._1();
402          }
403        };
404      }
405    
406      /**
407       * Provides a simple parallelization strategy that creates, and discards, a new thread for
408       * every evaluation.
409       *
410       * @return a simple parallelization strategy that creates, and discards, a new thread for
411       *         every evaluation.
412       */
413      public static <A> Strategy<A> simpleThreadStrategy() {
414        return strategy(new F<P1<A>, P1<A>>() {
415          public P1<A> f(final P1<A> p) {
416            final FutureTask<A> t = new FutureTask<A>(Java.<A>P1_Callable().f(p));
417            new Thread(t).start();
418            return obtain(t);
419          }
420        });
421      }
422    
423      /**
424       * Provides a parallelization strategy that uses an ExecutorService to control the method and
425       * degree of parallelism.
426       *
427       * @param s The ExecutorService to use for scheduling evaluations.
428       * @return A Strategy that uses the provided ExecutorService to control the method and degree
429       *         of parallelism.
430       */
431      public static <A> Strategy<A> executorStrategy(final ExecutorService s) {
432        return strategy(new F<P1<A>, P1<A>>() {
433          public P1<A> f(final P1<A> p) {
434            return obtain(s.submit(Java.<A>P1_Callable().f(p)));
435          }
436        });
437      }
438    
439      /**
440       * Provides a parallelization strategy that uses a CompletionService to control the method and
441       * degree of parallelism, and where each parallel task's completion is registered with the service.
442       *
443       * @param s The CompletionService to use for scheduling evaluations and detect their completion.
444       * @return A Strategy that uses the provided CompletionService to control the method and degree of parallelism,
445       *         and notifies the service of task completion.
446       */
447      public static <A> Strategy<A> completionStrategy(final CompletionService<A> s) {
448        return strategy(new F<P1<A>, P1<A>>() {
449          public P1<A> f(final P1<A> p) {
450            return obtain(s.submit(Java.<A>P1_Callable().f(p)));
451          }
452        });
453      }
454    
455      /**
456       * Provides a strategy that performs sequential (non-concurrent) evaluation of its argument.
457       *
458       * @return A strategy that performs sequential (non-concurrent) evaluation of its argument.
459       */
460      public static <A> Strategy<A> seqStrategy() {
461        return strategy(new F<P1<A>, P1<A>>() {
462          public P1<A> f(final P1<A> a) {
463            return P.p(a._1());
464          }
465        });
466      }
467    
468      /**
469       * Provides a strategy that performs no evaluation of its argument.
470       *
471       * @return A strategy that performs no evaluation of its argument.
472       */
473      public static <A> Strategy<A> idStrategy() {
474        return strategy(Function.<P1<A>>identity());
475      }
476    
477      /**
478       * Maps the given bijective transformation across this strategy (Exponential Functor pattern).
479       *
480       * @param f A transformation from this strategy's codomain to the resulting strategy's codomain.
481       * @param g A transformation from the resulting strategy's domain to this strategy's domain.
482       * @return A new strategy that maps to this strategy and back again.
483       */
484      public <B> Strategy<B> xmap(final F<P1<A>, P1<B>> f, final F<P1<B>, P1<A>> g) {
485        return strategy(compose(f, compose(f(), g)));
486      }
487    
488      /**
489       * Maps the given transformation across this strategy's domain (Invariant Functor pattern).
490       *
491       * @param f A transformation from this strategy's codomain to the resulting strategy's codomain.
492       * @return A new strategy that applies the given transformation after each application of this strategy.
493       */
494      public Strategy<A> map(final F<P1<A>, P1<A>> f) {
495        return xmap(f, Function.<P1<A>>identity());
496      }
497    
498      /**
499       * Maps the given transformation across this strategy's codomain (Invariant Functor pattern).
500       *
501       * @param f A transformation from the resulting strategy's domain to this strategy's domain.
502       * @return A new strategy that applies the given transformation before each application of this strategy.
503       */
504      public Strategy<A> comap(final F<P1<A>, P1<A>> f) {
505        return xmap(Function.<P1<A>>identity(), f);
506      }
507    
508      /**
509       * Provides an error-handling strategy. Captures any uncaught runtime errors encountered by this strategy and applies
510       * the given side-effect to them.
511       *
512       * @param e The effect that should handle errors.
513       * @return A strategy that captures any runtime errors with a side-effect.
514       */
515      public Strategy<A> errorStrategy(final Effect<Error> e) {
516        return errorStrategy(this, e);
517      }
518    
519      /**
520       * Provides an error-handling strategy. Captures any uncaught runtime errors encountered by the given strategy
521       * and applies the given side-effect to them.
522       *
523       * @param s The strategy to equip with an error-handling effect.
524       * @param e The effect that should handle errors.
525       * @return A strategy that captures any runtime errors with a side-effect.
526       */
527      public static <A> Strategy<A> errorStrategy(final Strategy<A> s, final Effect<Error> e) {
528        return s.comap(new F<P1<A>, P1<A>>() {
529          public P1<A> f(final P1<A> a) {
530            return new P1<A>() {
531              public A _1() {
532                try {
533                  return a._1();
534                } catch (Throwable t) {
535                  final Error error = new Error(t);
536                  e.e(error);
537                  throw error;
538                }
539              }
540            };
541          }
542        });
543      }
544    
545      /**
546       * Provides a normalising strategy that fully evaluates its Callable argument.
547       *
548       * @param s A non-normalising strategy to use for the evaluation.
549       * @return A new strategy that fully evaluates Callables, using the given strategy.
550       */
551      public static <A> Strategy<Callable<A>> callableStrategy(final Strategy<Callable<A>> s) {
552        return s.comap(new F<P1<Callable<A>>, P1<Callable<A>>>() {
553          public P1<Callable<A>> f(final P1<Callable<A>> a) {
554            return P1.curry(Callables.<A>normalise()).f(a._1());
555          }
556        });
557      }
558    
559    }