2013年10月6日 星期日

Riak實戰(2) - MapReduce in javascript

原理

在接收到client端來的MapReduce要求後,會將request中的process散佈給cluster中的所有node。再將inputs所指定的資料從vnode中取出,再經過「Map」、「Reduce」兩個階段的處理。一一傳給Map function,再由Reduce function給收斂起來。


處理階段


Map

這個階段會依 inputs 所指定條件中算出所有符合的 bucket-key,vnode會依bucket-key找出這些data並將他丟到Map function中做運算,回傳結果(通常是Array)。


Reduce

這個階段所拿到的input就是在Map階段算完的結果的集合。舉例來說:A data 及 B data 經過Map function運算之後的結果分別是 [1,2,2,4] 和 [3,3,4,5],在進入Reduce階段前會先被結合成 [1,2,2,4,3,3,4,5] ,再跑Reduce function。
請注意,Reduce function通常會被重複的執行,以Riak來說至少會被執行兩次,也就是再各個node reduce一次,再將各個node reduce後的結果reduce起來,成為最終的輸出。




HTTP Query Syntax


由RESTful介面向Riak下mapreduce指令的方式是POST到cluster主機的 "mapred" path,例如:
POST http://localhost:8098/mapred

而POST過去的data大約長這樣:
{"inputs":[...inputs...],"query":[...query...],"timeout": 90000}


Inputs (String or Array):

我們在下query時,在inputs這個欄位可以輸入兩種資料格式:
1. 字串:代表我要對哪個bucket做mapreduce
2. 二維陣列:對某個bucket的某筆資料做mapreduce,例:
[ ["bucket1","key1"],["bucket2","key2"],["bucket3","key3"] ]
或是:
[ ["Bucket1","Key1","KeyData1"],["Bucket2","Key2","KeyData2"],["Bucket3","Key3","KeyData3"] ]


Query (Array):

這個參數內會放一個或多個分別屬於不同階段的Mapper、Reducer或Link,格式大約有這幾種:

  • {modfun, Module, Function} where Module and Function are atoms that name an Erlang function in a specific module.
  • {qfun,Fun} where Fun is a callable fun term (closure or anonymous function).
  • {jsfun,Name} where Name is a binary that, when evaluated in Javascript, points to a built-in Javascript function.
  • {jsanon, Source} where Source is a binary that, when evaluated in Javascript is an anonymous function.
  • {jsanon, {Bucket, Key}} where the object at {Bucket, Key} contains the source for an anonymous Javascript function.

如果想把Riak的MapReduce玩熟,上面這五種格式可能需要好好看過一遍。


先來看Mapper的簡單範例:
{ "map":{"language":"javascript","source":"function(v) { return [v]; }","keep":true} }
首先會用key註明這個是map()、reduce或link,在key對應的object中則記錄了是使用「什麼語言(language)」、「執行式(source)」以及「是否在最後的結果中顯現(keep)」。language和source應該不用特別解釋,keep是個布林值,代表在這個階段運算的結果要不要呈現在最終的運算結果裡面。通常只有在最後一個階段,才會將keep設為true。

TimeOut (Integer, optional):

Map/Reduce queries 預設 timeout 為 60000 milliseconds (60 seconds),當運算超過timeout時間的時候會回傳error。我們可以依需求覆寫他(in milliseconds)。


Key filters

Key filters 可以在MapReduce之前先對input資料做預處理(pre-process),在載入資料之前事先檢查是否通過檢查條件。

使用方法

將inputs的key_filters對應到一個array,riak將會依陣列中的條件順序進行篩選,如:
{
  "inputs":{
     "bucket":"invoices"
     "key_filters":[["tokenize", "-", 1],["eq", "basho"]]
   },
   // ...
}
以上面這個例子來說,riak會先將invoice這個bucket中每筆資料的key先以"-"符號做分割(tokenize),取得「第一個」字串區段,再將這個字串與"basho"進行「完全比對」(eq)。
我們接下來就來看看在key_filter中有哪些方法可以用:

tokenize

將key依傳入的符號進行分割,再取出某個區段,通常還會接著其他條件做篩選。
"key_filters":[["tokenize", "-", 1]]

to_lower

將key的每個字母字元都轉為小寫,通常還會接著其他條件做篩選。
"key_filters":[["to_lower"]]

eq

將key與傳入字串進行完全比對。
"key_filters":[["eq", "kevin"]]

between

提供一個區段進行比對
"key_filters":[["between", "20100101", "20101231"]]

matches

篩選key中包含傳入字串的資料,這樣的比對方式叫做「wildcard」,通常會有效能上的問題。
"key_filters":[["matches", "kevin"]]

ends_with

篩選key結尾為傳入字串的資料
"key_filters":[["ends_with", "riak"]]


範例

以下的範例就是做SQL中的distinct動作,Mapper單純將自身的資料以array的形式傳回,多個map的結果concate在一起後,由reducer做distinct的動作。

{ "inputs":"goog",
  "query":[{"map":{
                "language":"javascript",
                "source":"function(value){ return [ value ];}"}
           },
           {"reduce":{
                "language":"javascript",
                "source":"function(values){ 
                              var i=0, max=values.length, r=[];
                              for(i=0;i++;i<max) {
                                  if( r.indexof(values[i])==-1 ) {r.push(values[i]);}
                              }
                              return r;
                          }"
           }]
}