Skip to content

Proposed Cascading DSL

ipostelnik edited this page May 9, 2013 · 4 revisions

Low-level DSL for representing Cascading flow definitions in Clojure.

Approach:

  • Use pipeline (->) macro to represent Cascading pipes
  • Generate intermediate representation, the convert to Cascading assembly.
  • Functional versions first, better DSL syntax with macros layered on top

Questions:

  • String vs. symbol vs. keyword for tuple field names

Simple example:

(-> tap 
  (w/map foo a b :> c)
  (w/mapcat foo b c :> d e)
  (w/filter bar a b)
  (w/select a b d)
  (w/rename a b d :> x y z)
  (w/group-by x y)
  (w/par-agg pagg-fn z :> zz1)
  (w/agg agg-fn z :> zz2))

Functional Version

(defn map* [pipe f in-fields out-fields])
(defn mapcat*  [pipe f in-fields out-fields])
(defn filter* [pipe pred in-fields])
(defn select* [pipe out-fields])
(defn rename* [pipe in-fields out-fields])
(defn insert* [pipe field value & more])
(defn group-by* [pipe group-fields & {:sort sort-fields :reverse rev? :name group-name})
(defn agg* [pipe f in-fields out-fields & {:with-group group?}])
(defn par-agg* [pipe f in-fields out-fields & {:with-group group?}])
(defn buffer-agg* [pipe f in-fields out-fields & {:with-group group?}])

(defn union* [& pipes])
(defn merge* [& pipes])

;; TODO: need better way to express custom co-groups
(defn co-group* pipes fields declared-fields & {:joiner joiner :reducers reducers :name name}])

(defn join* lhs lhs-fields rhs lhs-fields & {:joiner joiner :reducers reducers :name name}])

(defn hash-join* [lhs lhs-fields rhs lhs-fields])

(defn checkpoint* [& {:name name}])

;; TBD
(defn skew-join ....)
(defn FOO-join ....)