分布式计算第四次实验

分布式计算第四次实验

HoshiuZ
2025-06-12 / 0 评论 / 1 阅读 / 正在检测是否收录...

实验目的

  1. 学习基于MPI分布式并行计算程序设计方法。
  2. 学习基于MapReduce框架的分布式并行计算程序设计方法。
  3. 学习基于Spark框架的分布式并行计算程序设计方法。

实验题目

题目 1

编写基于 MPI 的并行计算程序,实现对连续函数 $f(x)=\sin(x^2+𝑥)(x\in R)$ 在区 间 $[a, b]$ 上的定积分求解。要求使用“梯形法(Trapezoidal Rule)”进行近似计算。

题目 2

输入文件为学生成绩信息,包含了必修课与选修课成绩,格式如下:
班级1, 姓名1, 科目1, 必修, 成绩1 <br>(注:<br> 为换行符)
班级2, 姓名2, 科目1, 必修, 成绩2 <br>
班级1, 姓名1, 科目2, 选修, 成绩3 <br>
………., ………, ………, ……… <br>
编写 Hadoop 平台上的 MapReduce 程序,分别实现如下功能:

  1. 计算每个学生必修课的平均成绩。
  2. 统计每个班级中所有课程(必修+选修)平均成绩排名前五的学生姓名和成绩

题目 3

输入文件的每一行为具有父子/父女/母子/母女/关系的一对人名,例如:
Tim, Andy <br>
Harry, Alice <br>
Mark, Louis <br>
Andy, Joseph <br>
……….., ………… <br>
假定不会出现重名现象。
编写 Hadoop 平台上的 MapReduce 程序,找出所有具有 grandchild-grandparent 关系的人名对。

题目 4

输入文件为学生成绩信息,包含了必修课与选修课成绩,格式如下:
班级1, 姓名1, 科目1, 必修, 成绩1 <br>(注: <br> 为换行符)
班级2, 姓名2, 科目1, 必修, 成绩2 <br>
班级1, 姓名1, 科目2, 选修, 成绩3 <br>
………., ………, ………, ……… <br>
编写Spark程序,实现如下功能: 按班级统计必修课平均成绩在:90~100、80~89、70~79、60~69 和 60 分以下这 5 个分数段的学生人数。

准备工作

安装 Microsoft MPI

首先需要安装 Microsoft MPI。

参考了 windows安装MPI及python安装mpi4py_下载mpi4py代码在vmware-CSDN博客

直接照着下载安装配置环境变量即可。

安装 Docker

安装 WSL 2

Docker Desktop for Windows 需要 WSL 2 支持,所以需要先安装 WSL 2。

参考了 # Windows10安装WSL的Ubuntu20.04系统及踩坑记录

安装 Docker

参考了 在国内 Windows 平台上安装 Docker 的详细教程_docker windows intel-CSDN博客

部署 Hadoop 和 Spark 实验环境

git clone 一下 hadoop-sandbox/hadoop-sandbox: A fully-functional Hadoop Yarn cluster as docker-compose deployment.

然后切换到解压后的目录下,使用 powershell 执行 docker-compose up -d,会根据该目录下 .yml 文件来下载镜像并启动容器。

接着执行 docker-compose down 来关闭启动的容器。

下载老师提供的 spark-install.zip 压缩包,并解压。然后用 powershell 切换到解压后的目录,并执行 docker build -t packet23/hadoop-client:latest . ,此命令将根据 Dockerfile 的指示在名为 hadoop-client 的 Docker 镜像中下载并安装 Python3.8,Spark-3.2.1 和 apache-maven-3.8.5。

上述操作结束后就成功生成了实验所需的全部 Docker 镜像。

为 clientnode 安装 mrjob 库

由于我使用的语言为 Python,所以需要为容器安装 mrjob 库。

由于容器内没有 pip,所以先要对其安装 pip

docker ps 查看 clientnode 容器 ID,然后 docker exec -it --user root clientnode bash (其中 clientnode 换成容器 ID),进入后再执行

apt update
apt install -y python3-pip
pip3 install mrjob

如此,便安装好了 mrjob 库。

为 nodemanager 安装 python3

再测试老师给的代码时,发现 nodemanager 没有安装 python3 ,无法运行。

所以需要对其安装 python3

docker ps 查看 nodemaanager 容器 ID,然后 docker exec -it --user root nodemanager bash (其中 nodemanager 换成容器 ID),进入后再执行

apt-get update
apt-get install -y python3 python3-pip

如此,便成功为 nodemanager 安装了 python3

实验思路

题目 1

梯形法计算定积分近似值。

考虑将定积分区间分成 n 个子区间,然后再均分给进程(开的进程数是 n 的因数,假设为 size ),每个进程计算 n // size 个 子区间用梯形法计算出的值的和,然后再用 Reduce 函数来把各个进程的结果用 MPI.SUM 累加起来即可。

题目 2

对于第一个要求,计算每个学生必修课的平均成绩,在 map 阶段只需将每一行信息转换成键值对 {(班级, 姓名), (必修课分数, 1)},键加入班级是为了避免重名问题。然后在 reduce 阶段对同一个人的成绩进行聚合,计算每个人的必修课平均成绩即可。

在做第一个要求的时候遇到了一个问题,花了很长时间才解决,那就是输出的文件的中文总是会变成 unicode 编码。问了很久大模型,提供的方法都无法解决。最后无果去 google 了一下,在 mrjobgithub 仓库找到了一个 issue,也是跟我一样的问题,参考了一下成功解决了。看来还是不能太依赖大模型(当然也有可能是我提问的姿势不太对)。

对于第二个要求,在 map 阶段把每一行信息转换为键值对 {班级, (姓名, 分数, 1)}。在 reduce 阶段对同一个班级的学生成绩,开一个字典统计其总分与科目数,用于计算平均成绩,接着根据平均成绩进行从高到低排序,取前五即可。

题目 3

根据给出的亲子关系可以画出一个图,要找 grandchild-grandparent 关系的话,只需找一个中介点,其所有子代与父代相互之间构成 grandchild-grandparent 关系。

所以,在 map 阶段,将每行的信息 A,B,输出两个键值对 {A, ("child", B)}, {B, ("parent", A)},分别代表 A 的孩子是 B,B 的父母是 A。

在 reduce 阶段,对同一个人的信息进行聚合,开两个列表分别存储这个人的孩子与父母,接着把这两个列表相互之间输出 grandchild-grandparent 关系即可。

题目 4

显然,这个题目需要用到题目 2 的要求 1 生成的结果。

这个结果的格式为 班级 姓名 必修课平均成绩

所以读入之后,先对其进行两次 map,第一次将信息按照空格分割成一个列表,第二次将列表的第一个字段和第三个字段(班级和分数,因为不需要关注姓名)形成键值对,形成了一个键值对 rdd。

得到这个键值对 rdd 后,可以写一个函数来把分数映射到对应的区间,然后生成形如 {班级, (分数区间, 1)} 的键值对,这个只需写好这个分数映射函数之后使用 mapValues 算子即可。

然后,是对同一个班级的同一个分数区间进行计数统计,所以可以把原键值对 {班级, (分数区间, 1)} 变成一个新的键值对 {(班级, 分数区间), 1)} ,这样方便统计结果。这个也是用 map 算子即可。

然后统计结果就很简单了,直接用 reduceByKey 即可。

为了使结果更清晰,我使用 sortByKey 函数把结果按照区间从小到大的顺序来输出了。

代码

题目 1

from mpi4py import MPI  
from math import sin  
  
def f(x):  
    return sin(x * x + x)  
  
comm = MPI.COMM_WORLD  
rank = comm.Get_rank()  
size = comm.Get_size()  
  
if rank == 0:  
    a = float(input("请输入定积分下限:"))  
    b = float(input("请输入定积分上限:"))  
else :  
    a = None  
    b = None  
  
a = comm.bcast(a, root=0)  
b = comm.bcast(b, root=0)  
  
n = 10000000  
chunk_size = n // size  
length = (b - a) / n  
  
partial_res = 0  
for i in range(0, chunk_size):  
    l = a + (rank * chunk_size + i) * length  
    partial_res += length * (f(l) + f(l + length)) * 0.5  
  
total_res = comm.reduce(partial_res, MPI.SUM, root=0)  
  
if rank == 0:  
    print(f"近似值为:{total_res}")

题目 2

1.

# aveScore.py
from mrjob.job import MRJob  
from mrjob.protocol import RawProtocol  
  
class MRAveScore(MRJob):  
    OUTPUT_PROTOCOL = RawProtocol  
    # Mapper函数:把每一行信息分解出来,然后为每一行信息生成一个键值对{(班级, 姓名), (必修课分数, 1)}  
    def mapper(self, _, line):  
        cls, name, subject, subject_type, score = line.strip().split(',')  
        score = float(score)  
        if subject_type == "必修":  
            yield (cls, name), (score, 1)  
  
    # Reducer函数:对同一个人的成绩进行聚合,计算每个人的必修课平均成绩。  
    def reducer(self, key, values):  
        total_score = 0  
        total_num = 0  
        for score, num in values:  
            total_score += score  
            total_num += num  
        average_score = total_score / total_num if total_num else 0  
        yield f"{key[0]} {key[1]}", f"{average_score:.2f}"  
  
if __name__ == '__main__':  
    MRAveScore().run()

2.

# top5.py
from mrjob.job import MRJob  
from mrjob.protocol import RawProtocol  
  
class MRTop5(MRJob):  
    OUTPUT_PROTOCOL = RawProtocol  
  
    # Mapper函数:把每一行的信息分解出来,生成键值对 {班级, (姓名, 分数, 1)}  
    def mapper(self, _, line):  
        cls, name, subject, subject_type, score = line.strip().split(',')  
        score = float(score)  
        yield cls, (name, score, 1)  
  
    # Reducer函数:对同一个班级的学生信息进行聚合,开一个 scores 字典用来统计该班级内的学生总分与科目数,用于计算平均成绩  
    def reducer(self, key, values):  
        scores = {}  
  
        for name, score, num in values:  
            if name not in scores:  
                scores[name] = [0.0, 0]  
            scores[name][0] += score  
            scores[name][1] += num  
  
        avg_scores = [(name, score / num) for name, (score, num) in scores.items()]  
  
        top5 = sorted(avg_scores, key=lambda x: x[1], reverse=True)[:5]  
  
        for name, score in top5:  
            yield key, f"{name}\t{score:.2f}"  
  
if __name__ == '__main__':  
    MRTop5.run()

题目 3

from mrjob.job import MRJob  
  
class MRFindGrandparents(MRJob):  
    # Mapper函数:输出两个键值对 `{A, ("child", B)}, {B, ("parent", A)}`,分别代表 A 的孩子是 B,B 的父母是 A。  
    def mapper(self, _, line):  
        parent, child = line.strip().split(',')  
        yield parent, ("child", child)  
        yield child, ("parent", parent)  
  
    # Reducer函数:对同一个人的信息进行聚合,生成 grandchild-grandparent 关系  
    def reducer(self, key, values):  
        parents = []  
        children = []  
        for relation, name in values:  
            if relation == "child":  
                children.append(name)  
            else:  
                parents.append(name)  
        for child in children:  
            for parent in parents:  
                yield child, parent  
  
if __name__ == '__main__':  
    MRFindGrandparents.run()

题目 4

from pyspark import SparkContext  
  
# 分数映射到区间函数  
def score_to_range(score):  
    if score < 60:  
        return "0~59"  
    elif score < 70:  
        return "60~69"  
    elif score < 80:  
        return "70~79"  
    elif score < 90:  
        return "80~89"  
    else:  
        return "90~100"  
  
def score_statistics(input_file):  
    sc = SparkContext("local", "Score Statistics")  
  
    data_rdd = sc.textFile(input_file)  
  
    class_score = data_rdd.map(lambda line: line.split()).map(lambda fields: (fields[0], float(fields[2])))  
  
    # 分数映射到区间  
    class_score_ranges = class_score.mapValues(lambda score: (score_to_range(score), 1))  
  
    # 将键变为 {班级, 分数区间},方便统计数量  
    class_score_counts = class_score_ranges.map(lambda x: ((x[0], x[1][0]), x[1][1]))  
  
    class_score_result = class_score_counts.reduceByKey(lambda x, y: x + y)  
  
    result = class_score_result.sortByKey(ascending=True, numPartitions=1).map(lambda x: (x[0][0], x[0][1], x[1]))  
  
    result.saveAsTextFile("result")  
  
    sc.stop()  
  
if __name__ == "__main__":  
    input_file = "input.txt"  
    score_statistics(input_file)

运行示例

题目 1

afb89c9935f7db81221306cd4650f6ea.png

题目 2

1.

9f156480163f19ce770b60fb91991281.png

2.

af7874a291f2ff8230ff6b26f89c2a9d.png

题目 3

96c890cc19399ebcbf6fc4525d9ad4d5.png

题目 4

7b460380eb279efe23598abc4b5d49be.png

0

评论 (0)

取消