Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
kavgan
GitHub Repository: kavgan/nlp-in-practice
Path: blob/master/spark_wordcount/scripts/read_files.py
314 views
1
import os
2
import re
3
from operator import add
4
5
from pyspark import SparkConf, SparkContext, SQLContext
6
7
datafile_json = "../sample-data/description.json"
8
datafile_csv = "../sample-data/description.csv"
9
datafile_psv = "../sample-data/description.psv"
10
11
12
def get_keyval(row):
13
14
# get the text from the row entry
15
text = row.text
16
17
#remove unwanted chars
18
text=re.sub("\\W"," ",text)
19
20
# lower case text and split by space to get the words
21
words = text.lower ().split (" ")
22
23
# for each word, send back a count of 1
24
return [[w, 1] for w in words]
25
26
27
def get_counts(df):
28
# just for the heck of it, show 2 results without truncating the fields
29
df.show (2, False)
30
31
# for each text entry, get it into tokens and assign a count of 1
32
# we need to use flat map because we are going from 1 entry to many
33
mapped_rdd = df.rdd.flatMap (lambda row: get_keyval (row))
34
35
# for each identical token (i.e. key) add the counts
36
# this gets the counts of each word
37
counts_rdd = mapped_rdd.reduceByKey (add)
38
39
# get the final output into a list
40
word_count = counts_rdd.collect ()
41
42
# print the counts
43
for e in word_count:
44
print (e)
45
46
47
def process_json(abspath, sparkcontext):
48
49
# Create an sql context so that we can query data files in sql like syntax
50
sqlContext = SQLContext (sparkcontext)
51
52
# read the json data file and select only the field labeled as "text"
53
# this returns a spark data frame
54
df = sqlContext.read.json (os.path.join (
55
abspath, datafile_json)).select ("text")
56
57
# use the data frame to get counts of the text field
58
get_counts(df)
59
60
61
def process_csv(abspath, sparkcontext):
62
63
# Create an sql context so that we can query data files in sql like syntax
64
sqlContext = SQLContext (sparkcontext)
65
66
# read the CSV data file and select only the field labeled as "text"
67
# this returns a spark data frame
68
df = sqlContext.read.load (os.path.join (abspath, datafile_csv),
69
format='com.databricks.spark.csv',
70
header='true',
71
inferSchema='true').select ("text")
72
73
# use the data frame to get counts of the text field
74
get_counts (df)
75
76
77
def process_psv(abspath, sparkcontext):
78
"""Process the pipe separated file"""
79
80
# return an rdd of the tsv file
81
rdd = sparkcontext.textFile (os.path.join (abspath, datafile_psv))
82
83
# we only want the "text" field, so return only item[1] after
84
# tokenizing by pipe symbol.
85
text_field_rdd = rdd.map (lambda line: [re.split ('\|', line)[1]])
86
87
# create a data frame from the above rdd
88
# label the column as 'text'
89
df = text_field_rdd.toDF (["text"])
90
91
# use the data frame to get counts of the text field
92
get_counts (df)
93
94
95
if __name__ == "__main__":
96
97
# absolute path to this file
98
abspath = os.path.abspath(os.path.dirname(__file__))
99
100
# Create a spark configuration with 20 threads.
101
# This code will run locally on master
102
conf = (SparkConf ()
103
. setMaster("local[20]")
104
. setAppName("sample app for reading files")
105
. set("spark.executor.memory", "2g"))
106
107
sc = SparkContext(conf=conf)
108
109
# process the json data file
110
process_json (abspath, sc)
111
112
# process the csv data file
113
process_csv (abspath, sc)
114
115
# process the pipe separated data file
116
process_psv (abspath, sc)
117
118