Path: blob/master/spark_wordcount/scripts/read_files.py
314 views
import os1import re2from operator import add34from pyspark import SparkConf, SparkContext, SQLContext56datafile_json = "../sample-data/description.json"7datafile_csv = "../sample-data/description.csv"8datafile_psv = "../sample-data/description.psv"91011def get_keyval(row):1213# get the text from the row entry14text = row.text1516#remove unwanted chars17text=re.sub("\\W"," ",text)1819# lower case text and split by space to get the words20words = text.lower ().split (" ")2122# for each word, send back a count of 123return [[w, 1] for w in words]242526def get_counts(df):27# just for the heck of it, show 2 results without truncating the fields28df.show (2, False)2930# for each text entry, get it into tokens and assign a count of 131# we need to use flat map because we are going from 1 entry to many32mapped_rdd = df.rdd.flatMap (lambda row: get_keyval (row))3334# for each identical token (i.e. key) add the counts35# this gets the counts of each word36counts_rdd = mapped_rdd.reduceByKey (add)3738# get the final output into a list39word_count = counts_rdd.collect ()4041# print the counts42for e in word_count:43print (e)444546def process_json(abspath, sparkcontext):4748# Create an sql context so that we can query data files in sql like syntax49sqlContext = SQLContext (sparkcontext)5051# read the json data file and select only the field labeled as "text"52# this returns a spark data frame53df = sqlContext.read.json (os.path.join (54abspath, datafile_json)).select ("text")5556# use the data frame to get counts of the text field57get_counts(df)585960def process_csv(abspath, sparkcontext):6162# Create an sql context so that we can query data files in sql like syntax63sqlContext = SQLContext (sparkcontext)6465# read the CSV data file and select only the field labeled as "text"66# this returns a spark data frame67df = sqlContext.read.load (os.path.join (abspath, datafile_csv),68format='com.databricks.spark.csv',69header='true',70inferSchema='true').select ("text")7172# use the data frame to get counts of the text field73get_counts (df)747576def process_psv(abspath, sparkcontext):77"""Process the pipe separated file"""7879# return an rdd of the tsv file80rdd = sparkcontext.textFile (os.path.join (abspath, datafile_psv))8182# we only want the "text" field, so return only item[1] after83# tokenizing by pipe symbol.84text_field_rdd = rdd.map (lambda line: [re.split ('\|', line)[1]])8586# create a data frame from the above rdd87# label the column as 'text'88df = text_field_rdd.toDF (["text"])8990# use the data frame to get counts of the text field91get_counts (df)929394if __name__ == "__main__":9596# absolute path to this file97abspath = os.path.abspath(os.path.dirname(__file__))9899# Create a spark configuration with 20 threads.100# This code will run locally on master101conf = (SparkConf ()102. setMaster("local[20]")103. setAppName("sample app for reading files")104. set("spark.executor.memory", "2g"))105106sc = SparkContext(conf=conf)107108# process the json data file109process_json (abspath, sc)110111# process the csv data file112process_csv (abspath, sc)113114# process the pipe separated data file115process_psv (abspath, sc)116117118