#How to run: python MST.py testData.txt from mrjob.job import MRJob from mrjob.job import MRStep import networkx as nx class MST(MRJob): k=2 #Map every edge to E_ij where ij is represented as k*i+j def Mapper(self, key, value): #Map edge to the respective machine #We can consider a random partitioning of vertices here. if "#" not in value and len(value.strip()) > 0: values = value.split() uid=int(values[0])%self.k vid=int(values[1])%self.k yield uid*self.k+vid, (int(values[0]),int(values[1])) def Reducer(self, key, value): """ Merge all the duplicated keys to extract the elements in same column """ yield key, value def Mapperround2(self, key, value): G=nx.Graph() #Calculate the MST and yield the edges for e in value: G.add_edge(e[0],e[1]) T=nx.minimum_spanning_tree(G) for e in T.edges(data=True): yield ("edge",(e[0],e[1])) def steps(self): return [MRStep(mapper=self.Mapper, reducer=self.Reducer),MRStep(mapper=self.Mapperround2, reducer=self.Reducer),MRStep(mapper=self.Mapperround2, reducer=self.Reducer)] if __name__ == '__main__': MST.run()