Jedná se o programovací paradigma (= programming model) a implementaci pro zpracování velkých objemů dat v distribuovaných systémech. Jeho síla je v rozdělování složitých úloh do menších, paralelně zpracovatelných částí.
Vyvinul ho Google v roce 2004 a implementován v nástroji Hadoop. Udělal to hlavně kvůli tomu, aby mohl efektivně počítat Page Rank pro všechny zalistované stránky rychle a efektivně (už nebylo možné mít centrální index, přesuny dat po síti byly drahé, servery často vypadávaly apod.).
Pro využití frameworku je potřeba implementovat pouze Map a Reduce fáze, vše ostatní zařídí framework (distribuce dat, řazení úkolů, řešení chyb jednotlivých nodů atd.)
- také je potřeba implementovat Input Reader, Partition function, Compare function atd. - ale ten už existuje pro všechny základní datové typy (implementovat musím, pokud chci zpracovávat nějaké pokročilé nebo vlastní soubory)
Proces
- data jsou rozdělená na různých serverech v clusteru
- založen na principu Rozděl a panuj (Divide and conquer)
Map fáze
- na každém uzlu se provede mapovací funkce (“přivedeme výpočet k datům”)
- vstupem jsou data ve formátu klíč-hodnota (před mapovací fází proběhne tzv. Input Split, který podle nějakého algoritmu zparsuje a rozdělí obsah daného splitu do key-value formátu)
- výstupem mapovací funkce je vypočtená sada dvojic klíč-hodnota
- většinou je to nějaký klíč a k němu vypočtená hodnota
- klíče mohou být duplicitní
- vypočtené dvojic jsou uložené do jednotlivých regionů (v paměti workera nebo ve filesystému)
- rozdělení do regionů dělá tzv. Partition function (např. hash klíče + modulo)
- chci provést stejnou úpravu na datech, který mám rozfrkaný po celým clusteru - tak místo toho, abych ty data dal na jedno místo a provedl úpravu na jednom místě, tak tu úpravu provedu na X místech najednou
Mezifáze
- data ve formátu klíč-hodnota se pak třídí a seskupují podle klíče (Shuffle and sort)
- existuje tu tzv. Partition function
- podle klíče rozhodne, do jakého regionu (pro reduce fázi) daný klíč padne
- každý reduce node může vzít víc regionů najednou (kde na ně pak aplikuje reduce fázi)
- a pak se dané regiony distribuují na společný reduce node (pohromadě podle jednoho klíče)
- ALE! až potom, co skončí mapovací fáze na všech uzlech (ty reportují master uzlu, ten pak spustí reduce fázi)
- master uzlům, které pověří reduce fází, dá informaci o klíčích + o tom, kde data k daným klíčům najde (na jakém mapovacím uzlu)
Reduce fáze
- rozdělená data (v regionech) agreguje a kombinuje a vrací finální výsledek
- nejdříve si natáhne všechny relevantní data ze všech regionů z příslušných Map serverů
- pak si data mergne dohromady a seřadí si je podle klíčů
- agreguje výsledky z Map fáze podle klíče
- jako vstup je klíč z Map fáze + všechna data spojená s tímto klíčem
- pro každý unikátní klíč provede např. součet|maximum|sloučení dat apod.
- výstupem je sada klíč - finální hodnota
Idea je taková, že se chce výpočetní výkon co nejvíce distribuovat (jedná se tady o obrovské množství dat) a zároveň minimalizovat posuny dat po síti (to zabere hodně času).
Detaily fungování
- master-slave architektura
- master - přiřazuje jednotlivé tasky jednotlivým nodům (=slaves)
- celý MapReduce Job je poslaný na mastera
- pošlou se implementace Map/Reduce funkcí, popis vstupních souborů a popis výstupní složky
- master zjistí (nebo ví), na jakých slaves je uložený obsah (splity obsahu), který je potřeba zpracovat
- Map Task = zpracování 1 splitu 1 workerem
- zároveň má všechna metadata o tom, kde jsou jaká data uložená atd. (metadata jsou uložená v distribuovaném file systému)
- celý MapReduce Job je poslaný na mastera
- slaves (=workers) - mají uložený skutečný obsah dat
- data jsou uložená v menších částech (= splitech)
- vykonávají tasky poslané od mastera (Map task, Reduce task)
- master - přiřazuje jednotlivé tasky jednotlivým nodům (=slaves)
- pokud nastane chyba
- chyby jsou v distribuovaných systémech velmi běžné
- na workerovi (master periodicky pinguje jednotlivé workery, takže se to dozví)
- pokud failne, tak master zresetuje jeho tasky a začne je posílat na jiné workery
- na masterovi
- dvě strategie:
- většinou se nic neřeší (je to celkem ojedinělá událost) a vyřeší se to tím, že uživatel bude muset spustit celý MapReduce spustit znovu
- nebo se dělají pravidelné checkpointy (zálohy), failure se detekuje a ze zálohy se vytvoří nový master node, který pokračuje v práci
- dvě strategie:
- funkce Combine
- funkce navíc, která se někdy hodí pro zmenšení objemu dat, který se musí posílat přes síť mezi Map a Reduce fází
- provedou ji rovnou Mappery lokálně po Map fázi
- je důležité, aby potom Reduce fáze byla:
- komutativní, asociativní a idempotentní (tj. když uděláme tu samou úpravu 2x, tak se výsledek nezmění)
- jedině tak se můžou data “předzpracovat” na Mapperech, jinak by toto přezpracování udělalo nepořádek v Reduce fázi
- Counters
- jednotlivé Joby mohou trvat opravdu dlouho, tak se hodí mít nějaké indikátory, že Job běží v pořádku a nic se např. nezacyklilo apod.
- mohou být předdefinované (od MapReduce frameworku, např. počet dokončených Map Tasků/Reduce Tasků, počet zparsovaných input key-value párů atd.) či user-defined
- Stragglers
- nodes, kterým výpočet trvá nějak dlouho (např. mají limitované zdroje, data jsou náročná apod.)
- master vytvoří “back-up” tasky a zkusí je poslat i na další (volné) nodes a vezme si pak nejrychlejší výsledek - nemá smysl, aby jeden pomalý node blokoval celý výpočet
Jednotlivé fáze v Apache MapReduce
- Vstupní data
- data jsou uložena na HDFS
- pomocí InputFormat je rozdělíme na jednotlivé InputSplits
- což je reference na určitou část dat
- počet splitů = počet tasků ve výpočtu = počet mapperů
- RecordReader připraví key-value páry pro Mapper
- Mapper
- zpracuje každý InputSplit a produkuje mezivýsledky (klíč-hodnota páry)
- Combiner
- předzpracování dat na Mapperech a snížení objemu dat, které se budou přesouvat po síti
- Partitioner
- vstupem je klíč-hodnota pár a rozhoduje o tom, do jakého Reduceru tyhle data půjdou
- Shuffle a Sort
- přeskupí mezivýsledky podle klíčů (tak, aby byla data se stejnými klíči na stejných nodech) podle komparátoru klíče
- tedy je kritická část pro síť, protože se data přemisťují přes síť (a může dojít k různým problémům)
- Reducer
- aplikuje agregační funkce na data se stejným klíčem (jsou přeskupená a seřazená z předchozí fáze)
- a výsledek je vlastně jedna jediná hodnota pro všechny páry se stejným klíčem (většinou nějaká agregace)
- Zápis dat (OutputFormat)
Příklady použití
- analýza logů, agregace dat, indexace obsahu, počítání výskytů slov, singulární rozklad velkých matic
- business využití:
- risk modelling, customer churn
- e.g. analysing the customer churn (= rate of losing customers) between phone operators (many of them change regularly due to different offers)
- recommendation engine, customer preferences
- risk modelling, customer churn
- počítání počtu přístupů na určitou URL (vstupem je log HTTP požadavků daného serveru)
- invertovaný index (vstupem jsou textové dokumenty a já pro každé slovo chci seznam dokumentů, ve kterých se nachází)
- distribuovaný sort
- shuffle fáze (pomocí Compare funkce) automaticky výsledky sesortí a Reduce fáze pak už nic nedělá
- reverse web-link graph
- pro každý outgoing link se vloží key-value hodnota (target link, link aktuální stránky) - je to vlastně prohozené a takhle jednoduše mohu zjistit, jaké stránky na mě odkazují (to se právě hodí Googlu na ten Page rank)
- tento přístup není moc vhodný pro úlohy, které vyžadují opakované iterace (např. strojové učení) - tam je lepší Apache Spark