博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark机器学习1·编程入门(scala/java/python)
阅读量:7165 次
发布时间:2019-06-29

本文共 4500 字,大约阅读时间需要 15 分钟。

Spark安装目录

/Users/erichan/Garden/spark-1.4.0-bin-hadoop2.6
  • 基本测试
./bin/run-example org.apache.spark.examples.SparkPi
MASTER=local[20] ./bin/run-example org.apache.spark.examples.SparkPi

scala

import org.apache.spark.SparkContextimport org.apache.spark.SparkContext._/** * A simple Spark app in Scala */object ScalaApp {  def main(args: Array[String]) {    val sc = new SparkContext("local[2]", "First Spark App")    val data = sc.textFile("data/UserPurchaseHistory.csv")      .map(line => line.split(","))      .map(purchaseRecord => (purchaseRecord(0), purchaseRecord(1), purchaseRecord(2)))    val numPurchases = data.count()    val uniqueUsers = data.map { case (user, product, price) => user }.distinct().count()    val totalRevenue = data.map { case (user, product, price) => price.toDouble }.sum()    val productsByPopularity = data      .map { case (user, product, price) => (product, 1) }      .reduceByKey(_ + _)      .collect()      .sortBy(-_._2)    val mostPopular = productsByPopularity(0)    println("Total purchases: " + numPurchases)    println("Unique users: " + uniqueUsers)    println("Total revenue: " + totalRevenue)    println("Most popular product: %s with %d purchases".format(mostPopular._1, mostPopular._2))    sc.stop()  }}

build.sbt

name := "scala-spark-app"version := "1.0"scalaVersion := "2.11.6"libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.0"
erichan:scala-spark-app/ $ sbt run

java 8

import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;import java.util.List;public class JavaApp {    public static void main(String[] args) {        JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");        JavaRDD
data = sc.textFile("data/UserPurchaseHistory.csv").map(s -> s.split(",")); long numPurchases = data.count(); long uniqueUsers = data.map(strings -> strings[0]).distinct().count(); double totalRevenue = data.mapToDouble(strings -> Double.parseDouble(strings[2])).sum(); List
> pairs = data.mapToPair( new PairFunction
() { @Override public Tuple2
call(String[] strings) throws Exception { return new Tuple2(strings[1], 1); } } ).reduceByKey((i1, i2) -> i1 + i2).collect(); pairs.sort((o1, o2) -> -(o1._2() - o2._2())); String mostPopular = pairs.get(0)._1(); int purchases = pairs.get(0)._2(); System.out.println("Total purchases: " + numPurchases); System.out.println("Unique users: " + uniqueUsers); System.out.println("Total revenue: " + totalRevenue); System.out.println(String.format("Most popular product: %s with %d purchases", mostPopular, purchases)); sc.stop(); }}

Maven pom.xml

4.0.0
java-spark-app
java-spark-app
1.0
org.apache.spark
spark-core_2.11
1.4.0
org.apache.maven.plugins
maven-compiler-plugin
3.1
1.8
1.8

python

from pyspark import SparkContextsc = SparkContext("local[2]", "First Spark App")data = sc.textFile("data/UserPurchaseHistory.csv").map(lambda line: line.split(",")).map(lambda record: (record[0], record[1], record[2]))numPurchases = data.count()uniqueUsers = data.map(lambda record: record[0]).distinct().count()totalRevenue = data.map(lambda record: float(record[2])).sum()products = data.map(lambda record: (record[1], 1.0)).reduceByKey(lambda a, b: a + b).collect()mostPopular = sorted(products, key=lambda x: x[1], reverse=True)[0]print "Total purchases: %d" % numPurchasesprint "Unique users: %d" % uniqueUsersprint "Total revenue: %2.2f" % totalRevenueprint "Most popular product: %s with %d purchases" % (mostPopular[0], mostPopular[1])sc.stop()
cd /Users/erichan/Garden/spark-1.4.0-bin-hadoop2.6/bin./spark-submit pythonapp.py

转载地址:http://xsxwm.baihongyu.com/

你可能感兴趣的文章