spark的第一次接触


Apache Spark™ is a unified analytics engine for large-scale data processing.
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。

运行 spark-shell ^1

$ ./bin/spark-shell
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/02/21 16:34:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/02/21 16:34:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
17/02/21 16:35:03 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://127.0.1.1:4040
Spark context available as 'sc' (master = local[*], app id = local-1487666083866).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

从csv文件导入数据 ^2

scala> val flightData = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load("/home/itaken/1998.csv")
flightData: org.apache.spark.sql.DataFrame = [Year: string, Month: string ... 27 more fields]

注意: 有些版本提示 sqlContext 未定义, 需手动引入 ^3

scala> val flightData = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load("/home/itaken/1998.csv")
<console>:23: error: not found: value sqlContext
       val flightData = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load("/home/itaken/1998.csv")
                        ^
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@351d725c

创建临时表

scala> flightData.registerTempTable("flights")
warning: there was one deprecation warning; re-run with -deprecation for details

执行查询

scala> val queryFlightNumResult = sqlContext.sql("SELECT COUNT(FlightNum) FROM flights WHERE DepTime BETWEEN 0 AND 600")
queryFlightNumResult: org.apache.spark.sql.DataFrame = [count(FlightNum): bigint]

打印结果

scala> queryFlightNumResult.take(1)
17/02/21 16:56:09 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
res1: Array[org.apache.spark.sql.Row] = Array([95609])

参考文档


Author: Itaken
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint polocy. If reproduced, please indicate source Itaken !
  TOC目录