原理
在接收到client端來的MapReduce要求後,會將request中的process散佈給cluster中的所有node。再將inputs所指定的資料從vnode中取出,再經過「Map」、「Reduce」兩個階段的處理。一一傳給Map function,再由Reduce function給收斂起來。處理階段
Map
這個階段會依 inputs 所指定條件中算出所有符合的 bucket-key,vnode會依bucket-key找出這些data並將他丟到Map function中做運算,回傳結果(通常是Array)。Reduce
這個階段所拿到的input就是在Map階段算完的結果的集合。舉例來說:A data 及 B data 經過Map function運算之後的結果分別是 [1,2,2,4] 和 [3,3,4,5],在進入Reduce階段前會先被結合成 [1,2,2,4,3,3,4,5] ,再跑Reduce function。請注意,Reduce function通常會被重複的執行,以Riak來說至少會被執行兩次,也就是再各個node reduce一次,再將各個node reduce後的結果reduce起來,成為最終的輸出。
HTTP Query Syntax
由RESTful介面向Riak下mapreduce指令的方式是POST到cluster主機的 "mapred" path,例如:
POST http://localhost:8098/mapred
而POST過去的data大約長這樣:
{"inputs":[...inputs...],"query":[...query...],"timeout": 90000}
Inputs (String or Array):
我們在下query時,在inputs這個欄位可以輸入兩種資料格式:1. 字串:代表我要對哪個bucket做mapreduce
2. 二維陣列:對某個bucket的某筆資料做mapreduce,例:
[ ["bucket1","key1"],["bucket2","key2"],["bucket3","key3"] ]或是:
[ ["Bucket1","Key1","KeyData1"],["Bucket2","Key2","KeyData2"],["Bucket3","Key3","KeyData3"] ]
Query (Array):
這個參數內會放一個或多個分別屬於不同階段的Mapper、Reducer或Link,格式大約有這幾種:- {modfun, Module, Function} where Module and Function are atoms that name an Erlang function in a specific module.
- {qfun,Fun} where Fun is a callable fun term (closure or anonymous function).
- {jsfun,Name} where Name is a binary that, when evaluated in Javascript, points to a built-in Javascript function.
- {jsanon, Source} where Source is a binary that, when evaluated in Javascript is an anonymous function.
- {jsanon, {Bucket, Key}} where the object at {Bucket, Key} contains the source for an anonymous Javascript function.
如果想把Riak的MapReduce玩熟,上面這五種格式可能需要好好看過一遍。
先來看Mapper的簡單範例:
{ "map":{"language":"javascript","source":"function(v) { return [v]; }","keep":true} }首先會用key註明這個是map()、reduce或link,在key對應的object中則記錄了是使用「什麼語言(language)」、「執行式(source)」以及「是否在最後的結果中顯現(keep)」。language和source應該不用特別解釋,keep是個布林值,代表在這個階段運算的結果要不要呈現在最終的運算結果裡面。通常只有在最後一個階段,才會將keep設為true。
TimeOut (Integer, optional):
Map/Reduce queries 預設 timeout 為 60000 milliseconds (60 seconds),當運算超過timeout時間的時候會回傳error。我們可以依需求覆寫他(in milliseconds)。Key filters
Key filters 可以在MapReduce之前先對input資料做預處理(pre-process),在載入資料之前事先檢查是否通過檢查條件。使用方法
將inputs的key_filters對應到一個array,riak將會依陣列中的條件順序進行篩選,如:{ "inputs":{ "bucket":"invoices" "key_filters":[["tokenize", "-", 1],["eq", "basho"]] }, // ... }以上面這個例子來說,riak會先將invoice這個bucket中每筆資料的key先以"-"符號做分割(tokenize),取得「第一個」字串區段,再將這個字串與"basho"進行「完全比對」(eq)。
我們接下來就來看看在key_filter中有哪些方法可以用:
tokenize
將key依傳入的符號進行分割,再取出某個區段,通常還會接著其他條件做篩選。"key_filters":[["tokenize", "-", 1]]
to_lower
將key的每個字母字元都轉為小寫,通常還會接著其他條件做篩選。"key_filters":[["to_lower"]]
eq
將key與傳入字串進行完全比對。"key_filters":[["eq", "kevin"]]
between
提供一個區段進行比對"key_filters":[["between", "20100101", "20101231"]]
matches
篩選key中包含傳入字串的資料,這樣的比對方式叫做「wildcard」,通常會有效能上的問題。"key_filters":[["matches", "kevin"]]
ends_with
篩選key結尾為傳入字串的資料"key_filters":[["ends_with", "riak"]]
範例
以下的範例就是做SQL中的distinct動作,Mapper單純將自身的資料以array的形式傳回,多個map的結果concate在一起後,由reducer做distinct的動作。{ "inputs":"goog", "query":[{"map":{ "language":"javascript", "source":"function(value){ return [ value ];}"} }, {"reduce":{ "language":"javascript", "source":"function(values){ var i=0, max=values.length, r=[]; for(i=0;i++;i<max) { if( r.indexof(values[i])==-1 ) {r.push(values[i]);} } return r; }" }] }