The Story So Far….
You can read part 1 here.
A few weeks ago I started on finding out which were the most popular items that a GP practice would prescribe. Once again I turned to Sparkling and Clojure to do the grunt work for me.
The Practice List
What I didn’t have at the time was the practice list. You can download that from the HSC Business Services Organisation.
I’m going to create another PairRDD with the practice number as the key and then the information as it’s value.
(def practice-fields [:pracno :partnershipno :practicename :address1 :address2 :address3 :postcode :telno :lcg]) (defn load-practices [sc filepath] (->> (spark/text-file sc filepath) (spark/map #(->> (csv/read-csv %) first)) (spark/filter #(not= (first %) "PracNo")) (spark/map #(zipmap practice-fields %)) (spark/map-to-pair (fn [rec] (let [practice-id (:pracno rec)] (spark/tuple practice-id rec))))))
It’s very similar to the original function I used to load the prescription CSV.
Joining The RDD’s
With two pair RDD’s I can safely perform the join.
(defn join-practice-prescriptions [practices prescriptions] (spark/join practices prescriptions))
This will give me a Pair RDD with a [key, [value1, value2]] form. I’ll need to tweak the function that works out the frequencies so it takes into account this new data structure.
(defn def-practice-prescription-freq [prescriptiondata] (->> prescriptiondata (spark/map-to-pair (s-de/key-val-val-fn (fn [k v pr] (let [freqmap (map (fn [rec] (:vmp_nm rec)) pr)] (spark/tuple (str (:practicename v) " " (:postcode v)) (apply list (take 10 (reverse (sort-by val (frequencies freqmap))))))))))))
The last thing to do it wrap up the process-data function to load in the practice list.
(defn process-data [sc filepath practicefile outputpath] (let [prescription-rdd (load-prescription-data sc filepath) practice-rdd (load-practices sc practicefile)] (->> (join-practice-prescriptions practice-rdd prescription-rdd) (def-practice-prescription-freq) (spark/coalesce 1) (spark/save-as-text-file outputpath))))
Testing From The REPL
To test these changes is fairly trivial from the REPL.
; CIDER 0.8.1 (package: 20141120.1746) (Java 1.7.0_40, Clojure 1.6.0, nREPL 0.2.10) nipresciptions.core> (def c (-> (conf/spark-conf) (conf/master "local[3]") (conf/app-name "niprescriptions-sparkjob"))) (def sc (spark/spark-context c)) #'nipresciptions.core/c #'nipresciptions.core/sc nipresciptions.core> (def practice-file "/Users/Jason/work/data/Northern_Ireland_Practice_List_0107151.csv") #'nipresciptions.core/practice-file nipresciptions.core> (def prescription-path "/Users/Jason/work/data/niprescriptions") #'nipresciptions.core/prescription-path nipresciptions.core> (process-data sc prescription-path practice-file "/Users/Jason/testoutput")
The Spark job will take a bit of time as there’s 347 practices and a lot of prescription data…..
References:
Detail Data Prescription CSV’s: http://data.nicva.org/dataset/prescribing-statistics
Github Repo for this project: https://github.com/jasebell/niprescriptiondata
Practice List: http://www.hscbusiness.hscni.net/services/1816.htm