求下图中顶点的最小值。
Pregel的计算过程如下:
Graphx提供的Pregel API如下:
class GraphOps[VD, ED](graph : org.apache.spark.graphx.Graph[VD, ED]){
def pregel[A](
initialMsg : A, // 在superstep 0之前发送至顶点的初始消息
maxIterations : Int, // 将要执行的最大迭代次数
activeDirection : EdgeDirection // 发送消息方向(默认是出边方向:EdgeDirection.Out)
)
(
vprog : Function3, // 用户定义函数,用于顶点接收消息
sendMsg : Function1, // 用户定义的函数,用于确定下一个迭代发送的消息及发往何处
mergeMsg : Function2[A, A, A] // 用户定义的函数,在vprog前,合并到达顶点的多个消息
){ }
}
Pregel求下图的最小值。
import org.apache.spark.SparkContext
import org.apache.spark.graphx.{Edge, EdgeDirection, EdgeTriplet, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object Pregel {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.master("local[4]")
.appName(this.getClass.getName)
.getOrCreate()
val sc: SparkContext = spark.sparkContext
import spark.implicits._
// 1. 构建顶点的RDD
val verts: RDD[(Long, (Int, Int))] = sc.parallelize(Array(
(1L, (7, -1)),
(2L, (3, -1)),
(3L, (2, -1)),
(4L, (6, -1))
))
// 2. 构建边的RDD
val edges: RDD[Edge[Boolean]] = sc.parallelize(Array(
Edge(1L, 2L, true),
Edge(1L, 4L, true),
Edge(2L, 4L, true),
Edge(3L, 1L, true),
Edge(3L, 4L, true)
))
// 3. 构建图
val graph: Graph[(Int, Int), Boolean] = Graph(verts, edges)
graph.triplets.foreach(println)
// ((1,(7,-1)),(4,(6,-1)),true)
// ((3,(2,-1)),(1,(7,-1)),true)
// ((1,(7,-1)),(2,(3,-1)),true)
// ((2,(3,-1)),(4,(6,-1)),true)
// ((3,(2,-1)),(4,(6,-1)),true)
// initialMsg 在superstep 0之前发送至顶点的初始消息
// maxIterations 将要执行的最大迭代次数
// activeDirection 发送消息方向(默认是出边方向:EdgeDirection.Out)
// vprog 用户定义函数,用于顶点接收消息
// sendMsg 用户定义的函数,用于确定下一个迭代发送的消息及发往何处
// mergeMsg 用户定义的函数,在vprog前,合并到达顶点的多个消息
val initialMsg: Int = 9999
val maxIterations: Int = Int.MaxValue
// vertexId: 当前顶点的Id
// value: 当前顶点的attr
// message: 当前顶点要接收的消息
def vprog(vertexId: VertexId, value: (Int, Int), message: Int) = {
if (message == initialMsg) value else (message min value._1, value._1)
}
def sendMsg(triple: EdgeTriplet[(Int, Int), Boolean]) = {
val sourceVertex: (Int, Int) = triple.srcAttr
if (sourceVertex._1 == sourceVertex._2) Iterator.empty
else Iterator((triple.dstId, sourceVertex._1))
}
def mergeMsg(msg1: Int, msg2: Int): Int = msg1 min msg2
graph.pregel(initialMsg, maxIterations, EdgeDirection.Out)(vprog, sendMsg, mergeMsg)
.vertices.foreach(println)
// (1,(2,7))
// (2,(2,3))
// (3,(2,-1))
// (4,(2,2))
}
}
内容来源于网络,如有侵权,请联系作者删除!