Mizan

Download Icon Download Mizan source code from:
Google Code or GitHub
Ubuntu Logo Configure your Local Ubuntu to Run Mizan
AWS Logo Run Mizan on EC2
pdfIconD Download Mizan’s Paper or Presentation

EuroSys Video Presentation

What is Mizan?

Mizan is an advanced clone to Google’s graph processing system Pregel that utilizes online graph vertex migrations to dynamically optimizes the execution of graph algorithms. You can use our Mizan system to develop any vertex centric graph algorithm and run in parallel over a local cluster or over cloud infrastructure. Mizan is compatible with Pregel’s API, written in C++ and uses MPICH2 for communication. You can download a copy of Mizan and start using it today on your local machine or try Mizan on EC2. We also welcome programers who are interested to go deeper into our Mizan code to optimize or tweak.
Mizan is published in EuroSys 13 as “Mizan: A System for Dynamic Load Balancing in Large-scale Graph Processing“. We have an earlier work of Mizan as “Mizan: Optimizing Graph Mining in Large Parallel Systems“, which we recently changed it to Libra to avoid confusions. We show below the abstract for Mizan’s EuroSys 13 paper. We also include Mizan’s general architecture and its API available for users.

Abstract

Pregel was recently introduced as a scalable graph mining system that can provide significant performance improvements over traditional MapReduce implementations. Existing implementations focus primarily on graph partitioning as a preprocessing step to balance computation across compute nodes. In this paper, we examine the runtime characteristics of a Pregel system. We show that graph partitioning alone is insufficient for minimizing end-to-end computation. Especially where data is very large or the runtime behavior of the algorithm is unknown, an adaptive approach is needed. To this end, we introduce Mizan, a Pregel system that achieves efficient load balancing to better adapt to changes in computing needs. Unlike known implementations of Pregel, Mizan does not assume any a priori knowledge of the structure of the graph or behavior of the algorithm. Instead, it monitors the runtime characteristics of the system. Mizan then performs efficient fine-grained vertex migration to balance computation and communication. We have fully implemented Mizan; using extensive evaluation we show that—especially for highly-dynamic workloads— Mizan provides up to 84% improvement over techniques leveraging static graph pre-partitioning.

Architecture

Each Mizan worker contains three distinct main components: bulk synchronous parallel (BSP) Processor, distributed hash table (DHT) communicator and storage manager. All of the three components are combined to serve as a Pregel-clone graph processor. Each component communicate with others through clear API, which allows for easier code improvements without affecting other components.

The independent components of Mizan

The BSP processor takes the user’s code and applies it on all graph vertices stored by the storage manager in each worker. The BSP processes synchronizes with other workers through command messages delivered by the communicator. The communicator keep track of vertex ownership through the DHT and exchange command and algorithm messages with other workers.  Each storage manager stores the assigned sub-graph in memory and any related data to the sub-graph, such as received messages and vertices’ data. The storage manager can access both local disks and HDFS to read the input graph partitions and writes the algorithm results. The migration planner is a forth component that interacts with all of the three main components during the BSP barrier to provide a fine-grain dynamic graph repartitioning.

Mizan’s BSP flow with the Migration Planner
component

User’s API

Implementing interface “IsuperStep.h” enables Mizan to run the user’s vertex centric graph algorithm. Function initialize() is called before starting the first superstep, which can be used to initialize vertices and/or edges values, which is called for each vertex pointed by the class pointer “userVertexObject<K,V1,M,A> *”. Function compute() is called once for each vertex at each superstep; “messageIterator *” pointer is a pointer to the message iterator for the vertex pointed by “userVertexObject<K,V1,M,A> *” while the class pointer “messageManager<K,V1,M,A> *” is used to send messages to other vertices. All of the user API depends on template classes to dynamically defines the vertex ID class <K>, vertex value class <V1>, message value class <M> and aggregation class <A>. The following is some of the predefined Mizan specific data types that can be used with the template classes:

  1. Basic data types:
    1. mDouble
    2. mInt
    3. mLong
  2. Array data types
    1. mCharArray
    2. mCharArrayNoCpy
    3. mDoubleArray
    4. mLongArray

Mizan also contains complex data types while the user can define his own data types. Please refer to the source code for more information.

IsuperStep.h

template
class IsuperStep {
	int curSuperStep;
public:
	void setCurSuperStep(int curSS){
		curSuperStep = curSS;
	}
	int getCurSuperStep(){
		return curSuperStep;
	}
	virtual void initialize(userVertexObject<K,V1,M,A> * data)=0;
	virtual void compute(messageIterator * messages,userVertexObject<K,V1,M,A> * data,messageManager<K,V1,M,A> * mManager)=0;
	virtual ~IsuperStep(){};
};

The user can create a special message combiner for vertex “K” by implementing interface “Icombiner.h”.

Icombiner.h

template
class Icombiner {
public:
	Icombiner(){};
	virtual void combineMessages(K dst,messageIterator * messages,messageManager<K,V1,M,A> * mManager)=0;
	virtual ~Icombiner(){};
};

The user can also implements a data aggregation, by implementing “IAggregator.h”, that is accessed globally by all vertices and synchronized by the end of each superstep.

IAggregator.h

template
class IAggregator {
protected:
	A aggValue;
public:
	IAggregator(){}
	virtual void aggregate(A value)=0;
	A getValue(){
		return aggValue;
	}
	void setValue(A value){
		this->aggValue = value;
	}
	virtual ~IAggregator(){}
};

Programming Example

The following is an implementation of PageRank on Mizan with an example of “max” aggregator. More details available within the source code of Mizan.

Implementing IsuperStep.h

#include "../IsuperStep.h"
#include "../dataManager/dataStructures/data/mLong.h"
#include "../dataManager/dataStructures/data/mDouble.h"

class pageRank: public IsuperStep<mLong, mDouble, mDouble, mLong> {
private:
	int vertexTotal;
	double error;
	int maxSuperStep;

public:

	pageRank(int maxSS) {
		vertexTotal = 0;
		error = 0;
		maxSuperStep = maxSS;
	}
	void initialize(userVertexObject<mLong, mDouble, mDouble, mLong> * data) {
		if (vertexTotal == 0) {
			vertexTotal = data->getGlobalVertexCount();
			error = 1.0 / (double) vertexTotal / 10.0;
		}
		vertexTotal++;
		data->setVertexValue(mDouble(1.0 / (double) vertexTotal));
	}
	void compute(messageIterator<mDouble> * messages,
			userVertexObject<mLong, mDouble, mDouble, mLong> * data,
			messageManager<mLong, mDouble, mDouble, mLong> * comm) {

		double currVal = data->getVertexValue().getValue();
		double newVal = 0;
		double c = 0.85;

		if (data->getCurrentSS() > 1) {
			while (messages->hasNext()) {
				double tmp = messages->getNext().getValue();
				newVal = newVal + tmp;
			}
			newVal = newVal * c + (1.0 - c) / ((double) vertexTotal);
			data->setVertexValue(mDouble(newVal));
		} else newVal = currVal;

		if (data->getCurrentSS() <= maxSuperStep) {
			mDouble outVal(newVal / ((double) data->getOutEdgeCount()));
			for (int i = 0; i < data->getOutEdgeCount(); i++) {
				comm->sendMessage(data->getOutEdgeID(i), outVal);
			}
		}else data->voteToHalt();
		if ((abs(currVal - newVal) < error && data->getCurrentSS() > 1)){
			data->voteToHalt();
		}
	}
};

Implementing Icombiner.h


#include "../Icombiner.h"
#include "../dataManager/dataStructures/data/mLong.h"
#include "../dataManager/dataStructures/data/mDouble.h"

class pageRankCombiner: public Icombiner<mLong, mDouble, mDouble, mLong> {

	void combineMessages(mLong dst, messageIterator * messages,
			messageManager<mLong, mDouble, mDouble, mLong> * mManager) {
		double newVal = 0;
		while (messages->hasNext()) {
			double tmp = messages->getNext().getValue();
			newVal = newVal + tmp;
		}
		mDouble messageOut(newVal);
		mManager->sendMessage(dst,messageOut);
	}
};

Implementing IAggregator.h

#include "../IAggregator.h"
#include "../dataManager/dataStructures/data/mLong.h"

class maxAggregator: public IAggregator {
public:
	maxAggregator() {
		aggValue.setValue(0);
	}
	void aggregate(mLong value) {
		if (value > aggValue) {
			aggValue = value;
		}
	}
	mLong getValue() {
		return aggValue;
	}
	void setValue(mLong value) {
		this->aggValue = value;
	}
	virtual ~maxAggregator() {
	}
};

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s