Exploring Social data with Spark-GraphX
If you want to explore your data that is highly connected consider graph technology.In this post we will exercise Spark-GraphX and Cassandra for building very simple People to People connectivity graph based on events log. The solution is not focused on optimization but more about playing around with the technology and get a feeling how it works. This solution is not optimal but more exploratory.
Steps in high level:
- Present events log in graph format as input
- Parse input file which is in dot format and upload to Cassandra table
- Map events from table to Spark-GraphX-EdgeRDD and create Graph from those
- Run Pregel and collect all edges from People to Content
- Filter to Vertices and create People to People directed graph while grouping edges
- Stream out the People to People graph to file in dot format for visualization
Events log in graph format
For being able to to play around with different inputs for gaining better understanding of the GraphX and its capabilities I decided to use DOT language that can be visualized using different tools like Graphviz. The input will be list of People and Content nodes and actions people did on the content, like "read" and "own".Here example for such input. This is sample of the data, look at section "Full Data Set" for the full list
- P - stands for Person/Profile
- C - stands for community
- F - stands for File
node [ fontname="Arial"] ;
subgraph Attributes {
node [ shape="circle" color="#0B0B61"] ;
C101 [ label="Community_101" ] ;
C102 [ label="Community_102" ] ;
............
F201 [ label="File_201" ] ;
F202 [ label="File_202" ] ; ............
}
subgraph Attributes {
node [ shape="doublecircle" color="#DF0101"] ;
P1 [ label="Person_1" ] ;
P2 [ label="Person_2" ] ;
............
}
subgraph Attributes {
edge [ style="dotted" ] ;
P1 -> C101 [ label="(owns,2015)"] ;
P1 -> C102 [ label="(owns,2015)"] ;
P2 -> C101 [ label="(owns,2015)"] ;
}
}
To get visual graph for your input download Graphviz and install.
Then run command lines to generate visual representation using circo and neato.
Example:
- <graphviz dir>\Graphviz2.38\bin\circo.exe <output dir>\events.txt <output dir>\events.gv
- <graphviz dir>\Graphviz2.38\bin\neato.exe -n <output dir>\events.gv -Tpng -O
- Look for image at <output dir>\events.gv.png
Cassandra as distributed storage
Except for graphviz of course we need software packages that will be used to store data and do the processingConnector | Spark | Cassandra | Cassandra Java Driver |
---|---|---|---|
1.3 | 1.3 | 2.1, 2.0 | 2.1 |
I download apache-cassandra-2.1.5, installed, and run from command line locally, and created from cqlsh tool KEYSPACE demo.
For connecting from the application I used:
com.datastax.driver.core.Cluster cluster =
Cluster.builder().addContactPoint("127.0.0.1").build();
com.datastax.driver.core.Session session = cluster.connect("demo");
I downloaded JPGD - Java-based Parser for Graphviz Documents and used it for parsing the input file which is in dot format and translated it to Cassandra instructions.
I first created Table
com.datastax.driver.core.Session session.execute(
"CREATE TABLE events (id int PRIMARY KEY, profile varchar, content varchar, action varchar, time varchar);");
Than looped and inserted all event to table
com.datastax.driver.core.Session session.execute(
"INSERT INTO events (id, profile, content, action, time) VALUES ( ......);");
And last printed all raws in Cassandara table (not sorted by id)
[id: 23] [profile: P4] [content: C105] [action: owns] [time: 2015]
[id: 53] [profile: P2] [content: F201] [action: read] [time: 2015]
[id: 91] [profile: P10] [content: W303] [action: read] [time: 2015]
[id: 117] [profile: P2] [content: F205] [action: own] [time: 2015]
......................
......................
......................
[id: 87] [profile: P10] [content: W302] [action: read] [time: 2015]
[id: 77] [profile: P10] [content: F203] [action: read] [time: 2015]
[id: 3] [profile: P1] [content: C102] [action: owns] [time: 2015]
[id: 103] [profile: P10] [content: W304] [action: own] [time: 2015]
Map Events from table to Spark-GraphX-EdgeRDD and create graph
For connecting Cassandra and Spark I used - spark-cassandra-connector.This simple method returns JavaRDD that holds Edges from type (org.apache.spark.graphx.Edge). The method map each event source person and target content to a single edge and adds the action information to the edge itself.
private static JavaRDD<Edge<String>> createEdgeRddFromCassandaraTable(SparkContext sc) {
// first step -- match a running long-id to each string id of user/content
JavaRDD<Edge<String>> edgeRdd = CassandraJavaUtil.javaFunctions(sc).cassandraTable("demo", "events")
.map(new Function<CassandraRow, Edge<String>>() {
@Override
public Edge<String> call(CassandraRow cassandraRow) throws Exception {
String profileName = cassandraRow.getString("profile");
Long profileId = Long.parseLong(profileName.trim().substring(1), 10);
String contentName = cassandraRow.getString("content");
Long contentId = Long.parseLong(contentName.trim().substring(1), 10);
return new Edge<String>(profileId, contentId, cassandraRow.getString("action"));
}
});
return edgeRdd;
}
Now we can create graph from EdgeRDD that holds events
Graph<VertexData, String> graph = Graph.fromEdges(JavaRDD.toRDD(edgeRdd), new VertexData(), StorageLevel.MEMORY_ONLY(), StorageLevel.MEMORY_ONLY(), vertexDataTag, stringTag);
Run Pregel and collect all edges from People to Content
We will collect now on the Content Vertices all edges/path to People. This could be achieved much simpler but we want to exercise Pregel framework that will allow us to do more complex things moving forward.
ClassTag<String> stringTag = scala.reflect.ClassTag$.MODULE$.apply(String.class);
ClassTag<VertexData> vertexDataTag = scala.reflect.ClassTag$.MODULE$.apply(VertexData.class);
ClassTag<ListSet<PathOneHop>> p2ppathListTag = scala.reflect.ClassTag$.MODULE$.apply(ListSet.class);
// Run Pregel and pass it three main functions
Graph<VertexData, String> graph2 = Pregel.apply(graph, new ListSet<PathOneHop>(), 1, EdgeDirection.Out(), new VprogLogicFunction(), new SendMsgLogicFunction(), new MergeLogicFunction(), vertexDataTag, stringTag, p2ppathListTag);
static class VprogLogicFunction extends AbstractFunction3<Object, VertexData, ListSet<PathOneHop>, VertexData> implements Function3<Object, VertexData, ListSet<PathOneHop>, VertexData>, Serializable {
public VertexData apply(Object vID, VertexData vData, ListSet<PathOneHop> msg) {
VertexData newVertexData = new VertexData();
// Initialize vertex data
if(IsPersonVertex((Long) vID)) {
newVertexData.setType(VertexData.vTypes.PERSON_VERTEX_TYPE);
} else {
newVertexData.setType(VertexData.vTypes.CONTENT_VERTEX_TYPE);
}
newVertexData.setPaths(msg);
return newVertexData;
}
};
static class SendMsgLogicFunction extends AbstractFunction1<EdgeTriplet<VertexData, String>, Iterator<Tuple2<Object, ListSet<PathOneHop>>>> implements Function1<EdgeTriplet<VertexData, String>, Iterator<Tuple2<Object, ListSet<PathOneHop>>>>, Serializable {
public Iterator<Tuple2<Object, ListSet<PathOneHop>>> apply(EdgeTriplet<VertexData, String> edge) {
ListSet<PathOneHop> msgList = new ListSet<PathOneHop>();
PathOneHop path = new PathOneHop();
if(edge.srcAttr().getType() == VertexData.vTypes.PERSON_VERTEX_TYPE) {
path.setSourcePerson(edge.srcId());
path.setTargetContent(edge.dstId());
}
msgList = msgList.$plus(path);
ListSet<Tuple2<Object, ListSet<PathOneHop>>> edgesList = new ListSet<Tuple2<Object, ListSet<PathOneHop>>>();
edgesList = edgesList.$plus(new Tuple2<Object, ListSet<PathOneHop>>(edge.dstId(), msgList));
return edgesList.iterator();
}
};
static class MergeLogicFunction extends AbstractFunction2<ListSet<PathOneHop>, ListSet<PathOneHop>, ListSet<PathOneHop>> implements Function2<ListSet<PathOneHop>, ListSet<PathOneHop>, ListSet<PathOneHop>>, Serializable {
public ListSet<PathOneHop> apply(ListSet<PathOneHop> msg1, ListSet<PathOneHop> msg2) {
ListSet<PathOneHop> mergedList = (ListSet<PathOneHop>) msg1.union(msg2);
return mergedList;
}
};
private static boolean IsPersonVertex(Long vID) {
return vID < 100;
}
The outcome of the Pregel run is content Vertices with list of People that point to them. We could just hold list of People per Vertex, but wanted to practice passing around complex object which will be useful in more complex algorithms like when content could point to content and we need to track also paths between people that goes over several content items with different relationships.
Paths on Vertex: 101 2 ==> 101 1 ==> 101
Paths on Vertex: 301 3 ==> 301 1 ==> 301
Paths on Vertex: 105 6 ==> 105 5 ==> 105 4 ==> 105 3 ==> 105 2 ==> 105 1 ==> 105
Paths on Vertex: 107 8 ==> 107 7 ==> 107
Paths on Vertex: 104 4 ==> 104 3 ==> 104
Paths on Vertex: 110 10 ==> 110 9 ==> 110
Paths on Vertex: 303 10 ==> 303 10 ==> 303 9 ==> 303 9 ==> 303
Paths on Vertex: 302 10 ==> 302 9 ==> 302 8 ==> 302
Paths on Vertex: 102 2 ==> 102 1 ==> 102
Paths on Vertex: 202 6 ==> 202 5 ==> 202 2 ==> 202
Paths on Vertex: 205 5 ==> 205 2 ==> 205
Paths on Vertex: 204 5 ==> 204 2 ==> 204
Paths on Vertex: 201 8 ==> 201 7 ==> 201 6 ==> 201 5 ==> 201 4 ==> 201 3 ==> 201 2 ==> 201 1 ==> 201
Paths on Vertex: 106 6 ==> 106 5 ==> 106
Paths on Vertex: 206 5 ==> 206 2 ==> 206
Paths on Vertex: 305 8 ==> 305 7 ==> 305
Paths on Vertex: 304 10 ==> 304 10 ==> 304 9 ==> 304 9 ==> 304
Paths on Vertex: 203 10 ==> 203 8 ==> 203 7 ==> 203
Paths on Vertex: 103 4 ==> 103 3 ==> 103
Paths on Vertex: 306 8 ==> 306 7 ==> 306
Paths on Vertex: 108 10 ==> 108 8 ==> 108 7 ==> 108
Paths on Vertex: 109 10 ==> 109 9 ==> 109
Filter to Vertices and create People to People directed graph while grouping edges
The next step is simple. We drop people Vertices and keep the Content ones with the information about People pointing to the content. Then we transform those Vertices graph to new Edges RDD that allow us to build People to People direct graph with all possible paths. And last step is to group those edges between people and story the count as attribute and also filter connections less than 3 just to see connections that are more substantial, assuming 3 and more paths indicate such relationship.VertexRDD<VertexData> contentVertexRdd = graph2.vertices().filter(new filterToContentVertexOnly());
static
class filterToContentVertexOnly extends
AbstractFunction1<Tuple2<Object,VertexData>, Object>
implements Function1<Tuple2<Object,VertexData>, Object>,
Serializable {
@Override
public Object apply(Tuple2<Object, VertexData> vertex) {
return vertex._2.getType() == VertexData.vTypes.CONTENT_VERTEX_TYPE;
}
};
JavaRDD<Edge<String>> edgeRdd2 = contentVertexRdd.flatMap(new mapToP2PRdd(), edgeTag).toJavaRDD();
static class mapToP2PRdd extends AbstractFunction1<Tuple2<Object,VertexData>, TraversableOnce<Edge<String>>> implements Function1<Tuple2<Object,VertexData>, TraversableOnce<Edge<String>>>, Serializable {
@Override
public TraversableOnce<Edge<String>> apply(Tuple2<Object, VertexData> vertex) {
String contentVertex = "path:" + ((Long)vertex._1).toString();
ListSet<Edge<String>> edges = new ListSet<Edge<String>>();
Iterator<PathOneHop> it = vertex._2.getPaths().iterator();
while(it.hasNext()) {
Iterator<PathOneHop> it2 = vertex._2.getPaths().iterator();
PathOneHop path1 = it.next();
while (it2.hasNext()) {
PathOneHop path2 = it2.next();
if(path1.sourcePerson != path2.sourcePerson) {
Edge<String> edge = new Edge<String>(path1.sourcePerson, path2.sourcePerson, contentVertex);
edges = edges.$plus(edge);
}
}
}
return edges;
}
};
Stream out the People to People graph to file in dot format for visualization
JavaRDD<String> p2pDotFormatRdd = p2pGraph.edges().map(new p2p2DotFormat(), stringTag).toJavaRDD();
static class p2p2DotFormat extends AbstractFunction1<Edge<String>, String> implements Function1<Edge<String>, String>, Serializable {
public String apply(Edge<String> edge) {
String sEdge = "";
int numPath = edge.attr.split("path").length;
if(numPath >= 3) {
sEdge = edge.srcId() + "->" + edge.dstId() + "[ label=" + numPath + "]" + "\n";;
}
return sEdge;
}
};
Visualize the result of the filtered People to People graph
We can see that the resulted graph divided the People to two groups of familiar people. This result is under the assumption that we made for this exercise that only people that have more than 3 connections (path through content) are actually familiar.
Conclusion
Spark-GraphX is very simple and powerful tool to use once you figure the flow and how you need to translate your problem to graph OLAP problem. The Java version result is very complex and unreadable code which bring the question why not to write it in Scala. While the steps looks maybe long and complex all this logic is taking place in memory and across nodes and I would expect it to run very efficiently. I used small data set and it would be very interesting to start playing with larger data sets and really run it over real Spark farm of machines.Full Data Set
- P - stands for Person/Profile
- C - stands for community
- W - stands for Wiki
- F - stands for File
node [ fontname="Arial"] ;
subgraph Attributes {
node [ shape="circle" color="#0B0B61"] ;
C101 [ label="Community_101" ] ;
C102 [ label="Community_102" ] ;
C103 [ label="Community_103" ] ;
C104 [ label="Community_104" ] ;
C105 [ label="Community_105" ] ;
C106 [ label="Community_106" ] ;
C107 [ label="Community_107" ] ;
C108 [ label="Community_108" ] ;
C109 [ label="Community_109" ] ;
C110 [ label="Community_110" ] ;
F201 [ label="File_201" ] ;
F202 [ label="File_202" ] ;
F203 [ label="File_203" ] ;
F204 [ label="File_204" ] ;
F205 [ label="File_205" ] ;
F206 [ label="File_206" ] ;
W301 [ label="Wiki_301" ] ;
W302 [ label="Wiki_302" ] ;
W303 [ label="Wiki_303" ] ;
W304 [ label="Wiki_304" ] ;
W305 [ label="Wiki_305" ] ;
W306 [ label="Wiki_306" ] ;
}
subgraph Attributes {
node [ shape="doublecircle" color="#DF0101"] ;
P1 [ label="Person_1" ] ;
P2 [ label="Person_2" ] ;
P3 [ label="Person_3" ] ;
P4 [ label="Person_4" ] ;
P5 [ label="Person_5" ] ;
P6 [ label="Person_6" ] ;
P7 [ label="Person_7" ] ;
P8 [ label="Person_8" ] ;
P9 [ label="Person_9" ] ;
P10 [ label="Person_10" ] ;
}
subgraph Attributes {
edge [ style="dotted" ] ;
P1 -> C101 [ label="(owns,2015)"] ;
P1 -> C102 [ label="(owns,2015)"] ;
P2 -> C101 [ label="(owns,2015)"] ;
P2 -> C102 [ label="(owns,2015)"] ;
P3 -> C103 [ label="(owns,2015)"] ;
P3 -> C104 [ label="(owns,2015)"] ;
P4 -> C104 [ label="(owns,2015)"] ;
P4 -> C103 [ label="(owns,2015)"] ;
P1 -> C105 [ label="(owns,2015)"] ;
P2 -> C105 [ label="(owns,2015)"] ;
P3 -> C105 [ label="(owns,2015)"] ;
P4 -> C105 [ label="(owns,2015)"] ;
P5 -> C105 [ label="(owns,2015)"] ;
P5 -> C106 [ label="(owns,2015)"] ;
P6 -> C106 [ label="(owns,2015)"] ;
P6 -> C105 [ label="(owns,2015)"] ;
P7 -> C107 [ label="(owns,2015)"] ;
P7 -> C108 [ label="(owns,2015)"] ;
P8 -> C108 [ label="(owns,2015)"] ;
P10 -> C108 [ label="(owns,2015)"] ;
P8 -> C107 [ label="(owns,2015)"] ;
P9 -> C109 [ label="(owns,2015)"] ;
P9 -> C110 [ label="(owns,2015)"] ;
P10 -> C109 [ label="(owns,2015)"] ;
P10 -> C110 [ label="(owns,2015)"] ;
P1 -> F201 [ label="(read,2015)"];
P2 -> F201 [ label="(read,2015)"];
P3 -> F201 [ label="(read,2015)"];
P4 -> F201 [ label="(read,2015)"];
P5 -> F201 [ label="(read,2015)"];
P6 -> F201 [ label="(read,2015)"];
P7 -> F201 [ label="(read,2015)"];
P8 -> F201 [ label="(read,2015)"];
P2 -> F202 [ label="(read,2015)"];
P5 -> F202 [ label="(read,2015)"];
P6 -> F202 [ label="(read,2015)"];
P7 -> F203 [ label="(read,2015)"];
P8 -> F203 [ label="(read,2015)"];
P10 -> F203 [ label="(read,2015)"];
P1 -> W301 [ label="(read,2015)"];
P3 -> W301 [ label="(read,2015)"];
P9 -> W302 [ label="(read,2015)"];
P8 -> W302 [ label="(read,2015)"];
P10 -> W302 [ label="(read,2015)"];
P9 -> W303 [ label="(read,2015)"];
P10 -> W303 [ label="(read,2015)"];
P9 -> W304 [ label="(read,2015)"];
P10 -> W304 [ label="(read,2015)"];
P9 -> W303 [ label="(own,2015)"];
P10 -> W303 [ label="(own,2015)"];
P9 -> W304 [ label="(own,2015)"];
P10 -> W304 [ label="(own,2015)"];
P7 -> W305 [ label="(own,2015)"];
P8 -> W305 [ label="(own,2015)"];
P7 -> W306 [ label="(own,2015)"];
P8 -> W306 [ label="(own,2015)"];
P2 -> F204 [ label="(own,2015)"];
P5 -> F204 [ label="(read,2015)"];
P2 -> F205 [ label="(own,2015)"];
P5 -> F205 [ label="(read,2015)"];
P2 -> F206 [ label="(own,2015)"];
P5 -> F206 [ label="(read,2015)"];
}
}
The full project code can be found at - https://ibm.biz/BdX82v - and accessible to IBMers
ReplyDeleteI can share the full project, which is general code, with anyone so just send me email to eitans@il.ibm.com
Can't recreate the graphviz example:
ReplyDelete1. copied the "Full data set" to local file events.txt and to events.gv (also tried other code snippets in the page)/
2. ran
\Graphviz2.38\bin\circo.exe \events.txt \events.gv
\Graphviz2.38\bin\neato.exe -n \events.gv -Tpng -O
3. got different image (same vertices, no edges, vertices overlapping on each other)