# This code comes with the Hands-On for the Introduction to Spark # - some examples are adaptions from Advanced Analytics with Spark (Sandy Ryza et al.) # - some examples are adaptions form the Databricks API examples # # AI and Predictive Analytics in Datacenter Environments # http://dcai.bsc.es # Download datasets: # wget http://bit.ly/1Aoywaq --output-document=donation.zip # mkdir linkage # mv donation.zip linkage/ # cd linkage/ # unzip donation.zip # unzip 'block_*.zip' # cat block_*.csv > tmp.csv # mv tmp.csv block_all.csv # BASIC MAP REDUCE rawblocks = sc.textFile("file:///home/vagrant/linkage/block_all.csv") rawblocks.count() rawblocks.first() head = rawblocks.take(10) len(head) head rawblocks_yh = rawblocks.map(lambda s : s.split(',')).filter(lambda x : 'id_1' in x[0]) rawblocks_yh.count() rawblocks_nh = rawblocks.map(lambda s : s.split(',')).filter(lambda x : not 'id_1' in x[0]) rawblocks_nh.count() sblock = rawblocks.sample(False, 1/400000, 1) sblock.count() sblock.foreach(print) sblock_nh = sblock.map(lambda s : s.split(',')).filter(lambda x : not 'id_1' in x[0]) sblock_nh.map(lambda x : x[0]).map(lambda i : int(i)).foreach(print) sample_down = sblock_nh.collect() sample_down def toFloat(s): return(float('nan') if (s == '?') else float(s)) def parseLine(fields): id1 = int(fields[0]) id2 = int(fields[1]) scores = [toFloat(elem) for elem in fields[2:11]] is_match = bool(fields[11]) return(id1, id2, scores, is_match) aux1 = sblock_nh.map(lambda x: parseLine(x)) aux1.foreach(print) class MatchData: def __init__(self, id1, id2, scores, matched): self.id1 = id1 self.id2 = id2 self.scores = scores self.matched = matched def parseLine(fields): id1 = int(fields[0]) id2 = int(fields[1]) scores = [toFloat(elem) for elem in fields[2:11]] is_match = bool(fields[11]) return(MatchData(id1, id2, scores, is_match)) mapMatch1 = sblock_nh.map(lambda m : parseLine(m)).map(lambda e : e.matched) mapMatch1.countByValue() mapMatch1.collect() mapMatch2 = rawblocks_nh.map(lambda m : parseLine(m)).map(lambda e : e.matched) mapMatch2.countByValue() mapMatch3 = rawblocks_nh.map(lambda m : parseLine(m)).map(lambda e : e.id2 > 50000) mapMatch3.countByValue() # WORD COUNT textFile = sc.textFile("../spark/README.md") textFlatMap = textFile.flatMap(lambda line : line.split(' ')) words = textFlatMap.map(lambda word : (word, 1)) counts = words.reduceByKey(lambda x, y: x + y) counts.take(5) ranking = counts.sortBy(lambda x : x[1], False) ranking.take(5) local_result = ranking.collect() words2 = textFlatMap.filter(lambda word : not word == '' ) filteredwords = words2.map(lambda word : (word, 1)) counts2 = filteredwords.reduceByKey(lambda x, y: x + y) ranking2 = counts2.sortBy(lambda x : x[1], False) ranking2.take(5) local_result2 = ranking2.collect()