理论
MapReduce主要用来解决大规模数据处理的问题,它能够将一个计算任务分解为许多小任务,这些小任务可以在多台机器上并行执行,从而大大提高处理速度和效率。
它的核心思想参考Google论文中的图片
Master程序是主要的程序,它会将文件(N个)分配个worker,所有worker是对等的。worker将会首先执行Map命令,接受一些文件,然后输出为键值对并写入R个(用户可以自定义R的值)中间文件(图中的Intermediate files)。当所有Map任务完成后,Master会给worker分配Reduce任务,对中间文件进行处理,然后生成最终的结果文件。
其中,Map函数和Reduce函数可以以插件的形式提供给系统。
例如,在Words Count场景下(统计若干文件里每个相同单词的数量):
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
这个例子中,map函数将生成一系列key为word,value为"1"的中间文件,然后reduce将同一个key聚合,统计value的数量。
这样我们就实现了数据集的分布式处理。
代码
系统的启动流程如下:
- 运行mrcoordinator
- 运行若干个mrworker
- mrcoordinator调用coordinator,并定期检查coordinator完成情况
- mrworker加载map和reduce函数,并拉起worker进程
其中coordinator和worker之间使用rpc通信,我们只需要实现coordinator和worker的逻辑。
Coordinator
几个要点:
- 考虑维护task的状态,而不需要维护worker的状态,因为worker实际上是对等的。
- Coordinator是多线程的,需要加锁。由于不需要各线程通信,这里选用Mutex
- 两个rpc关键函数:FetchTask和FinishTask,一个用来给worker查找任务,一个用来标记任务结束
关键结构体如下:
type Coordinator struct {
// Your definitions here.
lock sync.Mutex
files []string
mapTaskStatuses []int
reduceTaskStatuses []int
nMap int
nReduce int
mapDoneNum int // map任务完成数量
reduceDoneNum int
mapFinished bool
reduceFinished bool
}
两个rpc函数主要逻辑:
func (c *Coordinator) FetchTask(args *FetchTaskArgs, reply *FetchTaskReply) error {
c.lock.Lock()
defer c.lock.Unlock()
if !c.mapFinished {
mapNum := xxx // 循环mapTaskStatuses找到未开始的任务
if mapTaskStatuses全都分配了 {
reply.TaskType = NOTASK
} else {
// 分配任务
// 10s不完成任务就错误恢复
}
} else if !c.reduceFinished {
// 和map类似
}
} else {
reply.TaskType = NOTASK
}
return nil
}
func (c *Coordinator) FinishTask(args *FinishTaskArgs, reply *FinishTaskReply) error {
c.lock.Lock()
defer c.lock.Unlock()
switch args.TaskType {
case MAPTASK:
{
// 由于错误恢复机制,这里可能提交同样的任务。仅接受第一个提交的任务即可
if c.mapTaskStatuses[args.TaskNum] != DONE {
c.mapDoneNum += 1
c.mapTaskStatuses[args.TaskNum] = DONE
if c.mapDoneNum == c.nMap {
c.mapFinished = true
}
}
}
case REDUCETASK:
// 和上面类似
}
return nil
}
Worker
worker主要是业务逻辑。大体框架如下:
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
for {
reply := &FetchTaskReply{}
ok := call("Coordinator.FetchTask", &FetchTaskArgs{}, &reply)
if ok {
switch reply.TaskType {
case MAPTASK:
{
// 根据reply指定的filename调用map处理,并输出中间文件
// rpc FinishTask
}
case REDUCETASK:
{
// 根据reply的ReduceNum读取中间文件(多个),输出为一个结果文件
mp := make(map[string][]string)
// 聚合
// reduce
// rpc FinishTask
}
case NOTASK:
{
break
}
}
} else {
break
}
}
}
一些细节需要注意:
- 中间文件名为mr-X-Y,X是MapNum(即map任务编号),Y是map函数生成的key到[0, NReduce)整数集的单射,具体而言是通过如下哈希函数计算的:
func ihash(key string) int {
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32() & 0x7fffffff)
}
- 每一个reduce函数需要处理一个Y对应的所有X
- 由于reduce的输入是一个key和它对应的所有values,所以需要对中间文件进行聚合。这里使用的是HashMap。在生产级别的MapReduce实现中,该部分需要内存+外存来保证不会OOM。
- 为了保证创建并写入文件的原子性(要么完全写入,要么完全不写入),可以先创建临时文件,再写入,再重命名:
func writeStringToFileAtomic(filename string, content string) {
f, _ := os.CreateTemp(".", filename+"*")
f.WriteString(content)
os.Rename(f.Name(), filename)
f.Close()
}
- reduce最后生成的文件内容需要不断拼接结果字符串,应该使用StringBuilder来获得更好的性能
var result strings.Builder
for key, values := range mp {
reduceResult := reducef(key, values)
result.WriteString(fmt.Sprintf("%v %v\n", key, reduceResult))
}
// 获得字符串:result.String()
拓展
重读Google的论文可以发现Google用于生产环境的MapReduce相比于我们上面实现的lab还有几个改进:
- 当worker运行在多个机器上时,需要使用一些分布式文件系统方案来管理文件,例如GFS
- reduce生成的文件可能会被用于下一个MapReduce任务,从而形成某种链子
- 在实际实现中,还是需要维护Worker的状态,这样可以便于获取Worker机器的信息、错误恢复等
- 实际使用中,Google发现当一些机器在即将处理完Map或Reduce任务时,性能会出现明显的下降。解决方法是Master在组任务即将完成时会分配剩下的任务作为Backup Tasks,分配到其他worker上,以最快完成的任务为最终结果。
Google也提出了一些改进思路
- Partitioning Function:上面把key映射的hash函数是可以自定义的,叫做Partitioning Function。例如key是URL的时候可以把同一个Host分到同一个Reduce任务里
- Ordering Guarantees:在同一个partition里可以先做key的排序处理,这样可以使得结果更为友好
- Combiner Function:注意到上面Words Count例子中,大量的类似(word,“1”)会在网络上传输,而根据key聚合values发生在reduce函数中。如果把聚合操作写在map函数里则可以避免这些重复的数据传输。combiner function和reduce function的唯一区别是前者输出中间文件,后者输出最终文件。
- Input and Output Types:Google提供了reader接口,使得用户在实现Map函数的时候可以读取更多的Input Types,例如数据库里的数据或内存里的数据。Output同理。
- master可以维护一个HTTP panel显示各个worker、task等的状态
- Counter:可以在map函数调用自定义的counter,持续返回给master。者可以用于收集一些数据的信息,例如Words Count例子中
Counter* uppercase;
uppercase = GetCounter("uppercase");
map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, "1");
最后,2024年的今天MapReduce对于Google来说早已经成为了历史。但是它催生出了Hadoop等开源大数据处理框架。