bridge

bridge

MIT 6.5840(2) - Lab1 マップリデュース

理論#

MapReduce は主に大規模データ処理の問題を解決するために使用され、計算タスクを多くの小さなタスクに分解し、これらの小さなタスクを複数のマシンで並行して実行することで、処理速度と効率を大幅に向上させることができます。

その核心思想は Google の論文にある画像を参考にしています。

image

Master プログラムは主要なプログラムで、ファイル(N 個)を worker に割り当てます。すべての worker は対等です。worker はまず Map コマンドを実行し、いくつかのファイルを受け取り、キーと値のペアとして出力し、R 個(ユーザーが R の値をカスタマイズ可能)中間ファイル(図の Intermediate files)に書き込みます。すべての Map タスクが完了した後、Master は worker に Reduce タスクを割り当て、中間ファイルを処理し、最終的な結果ファイルを生成します。

ここで、Map 関数と Reduce 関数はプラグインの形式でシステムに提供できます。

例えば、Words Count のシナリオ(いくつかのファイル内の各同じ単語の数をカウントする)では:

map(String key, String value):
    // key: ドキュメント名
    // value: ドキュメント内容
    for each word w in value:
        EmitIntermediate(w, "1");
reduce(String key, Iterator values):
    // key: 単語
    // values: カウントのリスト
    int result = 0;
    for each v in values:
        result += ParseInt(v);
    Emit(AsString(result));

この例では、map 関数はキーが word、値が "1" の一連の中間ファイルを生成し、reduce は同じキーを集約して値の数をカウントします。

こうして私たちはデータセットの分散処理を実現しました。

コード#

システムの起動プロセスは以下の通りです:

  • mrcoordinator を実行
  • いくつかの mrworker を実行
  • mrcoordinator が coordinator を呼び出し、定期的に coordinator の完了状況をチェック
  • mrworker が map と reduce 関数をロードし、worker プロセスを立ち上げる

coordinator と worker の間は rpc 通信を使用し、私たちは coordinator と worker のロジックを実装するだけで済みます。

Coordinator#

いくつかの要点:

  • worker の状態を維持する必要はなく、task の状態を維持することを考慮します。なぜなら、worker は実際には対等だからです。
  • Coordinator はマルチスレッドであり、ロックが必要です。スレッド間の通信が不要なため、ここでは Mutex を選択します。
  • 2 つの rpc の主要な関数:FetchTask と FinishTask、一つは worker にタスクを探すため、もう一つはタスクの終了をマークするため

主要な構造体は以下の通りです:

type Coordinator struct {
	// ここに定義を追加します。
	lock               sync.Mutex
	files              []string
	mapTaskStatuses    []int
	reduceTaskStatuses []int
	nMap               int
	nReduce            int
	mapDoneNum         int // mapタスクの完了数
	reduceDoneNum      int
	mapFinished        bool
	reduceFinished     bool
}

2 つの rpc 関数の主要なロジック:


func (c *Coordinator) FetchTask(args *FetchTaskArgs, reply *FetchTaskReply) error {
	c.lock.Lock()
	defer c.lock.Unlock()
	if !c.mapFinished {
		mapNum := xxx // mapTaskStatusesで未開始のタスクを探す
		if mapTaskStatusesがすべて割り当てられている場合 {
			reply.TaskType = NOTASK
		} else {
            // タスクを割り当てる
			// 10秒以内にタスクが完了しなければエラー復旧
		}
	} else if !c.reduceFinished {
            // mapと同様
		}
	} else {
		reply.TaskType = NOTASK
	}
	return nil
}
func (c *Coordinator) FinishTask(args *FinishTaskArgs, reply *FinishTaskReply) error {
	c.lock.Lock()
	defer c.lock.Unlock()
	switch args.TaskType {
	case MAPTASK:
		{
			// エラー復旧メカニズムのため、ここでは同じタスクが提出される可能性があります。最初に提出されたタスクのみを受け入れます。
			if c.mapTaskStatuses[args.TaskNum] != DONE {
				c.mapDoneNum += 1
				c.mapTaskStatuses[args.TaskNum] = DONE
				if c.mapDoneNum == c.nMap {
					c.mapFinished = true
				}
			}
		}
	case REDUCETASK:
        // 上記と同様
	}
	return nil
}

Worker#

worker は主にビジネスロジックです。大体のフレームワークは以下の通りです:

func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	for {
		reply := &FetchTaskReply{}
		ok := call("Coordinator.FetchTask", &FetchTaskArgs{}, &reply)
		if ok {
			switch reply.TaskType {
			case MAPTASK:
				{
                    // replyで指定されたfilenameに基づいてmap処理を呼び出し、中間ファイルを出力
                    // rpc FinishTask
				}
			case REDUCETASK:
				{
                    // replyのReduceNumに基づいて中間ファイル(複数)を読み込み、1つの結果ファイルとして出力
                    mp := make(map[string][]string)
                    // 集約
                    // reduce
                    // rpc FinishTask
				}
			case NOTASK:
				{
					break
				}
			}
		} else {
			break
		}

	}
}

いくつかの詳細に注意が必要です:

  • 中間ファイル名はmr-X-Y、X は MapNum(つまり map タスク番号)、Y は map 関数が生成した key から [0, NReduce) の整数集合への単射です。具体的には、以下のハッシュ関数を使用して計算されます:
func ihash(key string) int {
	h := fnv.New32a()
	h.Write([]byte(key))
	return int(h.Sum32() & 0x7fffffff)
}
  • 各 reduce 関数は Y に対応するすべての X を処理する必要があります。
  • reduce の入力は 1 つの key とそれに対応するすべての values であるため、中間ファイルを集約する必要があります。ここでは HashMap を使用します。生産レベルの MapReduce 実装では、この部分はメモリ + 外部ストレージを使用して OOM を防ぐ必要があります。
  • ファイルの作成と書き込みの原子性を保証するために(完全に書き込むか、完全に書き込まないか)、まず一時ファイルを作成し、その後書き込み、最後に名前を変更することができます:
func writeStringToFileAtomic(filename string, content string) {
	f, _ := os.CreateTemp(".", filename+"*")
	f.WriteString(content)
	os.Rename(f.Name(), filename)
	f.Close()
}
  • reduce が最終的に生成するファイルの内容は結果の文字列を継続的に結合する必要があり、より良いパフォーマンスを得るために StringBuilder を使用すべきです。
var result strings.Builder
for key, values := range mp {
    reduceResult := reducef(key, values)
    result.WriteString(fmt.Sprintf("%v %v\n", key, reduceResult))
}
// 文字列を取得:result.String()

拡張#

Google の論文を再読すると、Google が生産環境で使用する MapReduce は、上記の実装に比べていくつかの改善点があります:

  • worker が複数のマシンで実行される場合、ファイルを管理するために分散ファイルシステムのソリューションを使用する必要があります。例えば GFS
  • reduce が生成するファイルは、次の MapReduce タスクで使用される可能性があり、ある種のチェーンを形成します。
  • 実際の実装では、Worker の状態を維持する必要があり、これにより Worker マシンの情報やエラー復旧などが容易になります。
  • 実際の使用において、Google は一部のマシンが Map または Reduce タスクをほぼ完了しようとすると、パフォーマンスが著しく低下することを発見しました。解決策は、Master がタスクがほぼ完了する際に残りのタスクを Backup Tasks として他の worker に割り当て、最も早く完了したタスクを最終結果とすることです。

Google はまたいくつかの改善のアイデアを提案しました。

  • Partitioning Function:上記の key をマッピングするハッシュ関数はカスタマイズ可能で、Partitioning Function と呼ばれます。例えば key が URL の場合、同じ Host を同じ Reduce タスクに割り当てることができます。
  • Ordering Guarantees:同じ partition 内で key のソート処理を先に行うことで、結果がより親しみやすくなります。
  • Combiner Function:上記の Words Count の例で、大量の類似した (word,"1") がネットワーク上で転送されることに注意してください。key を集約する操作は reduce 関数内で行われます。もし集約操作を map 関数内に書くと、これらの重複データ転送を回避できます。combiner function と reduce function の唯一の違いは、前者が中間ファイルを出力し、後者が最終ファイルを出力することです。
  • Input and Output Types:Google は reader インターフェースを提供し、ユーザーが Map 関数を実装する際に、データベース内のデータやメモリ内のデータなど、より多くの Input Types を読み取ることができるようにします。Output も同様です。
  • master は HTTP パネルを維持し、各 worker、task などの状態を表示できます。
  • Counter:map 関数内でカスタムカウンターを呼び出し、master に継続的に返すことができます。これにより、Words Count の例のように、いくつかのデータ情報を収集できます。
Counter* uppercase;
uppercase = GetCounter("uppercase");

map(String name, String contents):
    for each word w in contents:
        if (IsCapitalized(w)):
            uppercase->Increment();
        EmitIntermediate(w, "1");

最後に、2024 年の今日、MapReduce は Google にとってすでに歴史の一部となっています。しかし、それは Hadoop などのオープンソースの大規模データ処理フレームワークを生み出しました。

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。