PySpark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.
There are two deploy modes that can be used to launch Spark applications on YARN. In cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.
frompyspark.sqlimportSparkSession,Row# Import data typesfrompyspark.sql.typesimport*defbase_test(spark):# spark is an existing SparkSessiondf=spark.read.json("file:///home/sunyh/app/spark-2.4.5-bin-hadoop2.6/examples/src/main/resources/people.json")# Displays the content of the DataFrame to stdoutdf.show()# +----+-------+# | age| name|# +----+-------+# |null|Michael|# | 30| Andy|# | 19| Justin|# +----+-------+# spark, df are from the previous example# Print the schema in a tree formatdf.printSchema()# root# |-- age: long (nullable = true)# |-- name: string (nullable = true)# Select only the "name" columndf.select("name").show()# +-------+# | name|# +-------+# |Michael|# | Andy|# | Justin|# +-------+# Select everybody, but increment the age by 1df.select(df['name'],df['age']+1).show()# +-------+---------+# | name|(age + 1)|# +-------+---------+# |Michael| null|# | Andy| 31|# | Justin| 20|# +-------+---------+# Select people older than 21df.filter(df['age']>21).show()# +---+----+# |age|name|# +---+----+# | 30|Andy|# +---+----+# Count people by agedf.groupBy("age").count().show()# +----+-----+# | age|count|# +----+-----+# | 19| 1|# |null| 1|# | 30| 1|# +----+-----+defschema_inference_example(spark):"""
RDD --> DataFrame
自动推导Schema
:param spark:
"""sc=spark.sparkContext# Load a text file and convert each line to a Row.lines=sc.textFile("file:///home/sunyh/app/spark-2.4.5-bin-hadoop2.6/examples/src/main/resources/people.txt")parts=lines.map(lambdal:l.split(","))people=parts.map(lambdap:Row(name=p[0],age=int(p[1])))# Infer the schema, and register the DataFrame as a table.schemaPeople=spark.createDataFrame(people)schemaPeople.printSchema()schemaPeople.createOrReplaceTempView("people")# SQL can be run over DataFrames that have been registered as a table.teenagers=spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")# The results of SQL queries are Dataframe objects.# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.teenNames=teenagers.rdd.map(lambdap:"Name: "+p.name).collect()fornameinteenNames:print(name)# Name: Justindefschema_programmatic_example(spark):"""
RDD --> DataFrame
编程实现Schema
:param spark:
"""sc=spark.sparkContext# Load a text file and convert each line to a Row.lines=sc.textFile("file:///home/sunyh/app/spark-2.4.5-bin-hadoop2.6/examples/src/main/resources/people.txt")parts=lines.map(lambdal:l.split(","))# Each line is converted to a tuple.people=parts.map(lambdap:(p[0],p[1].strip()))# The schema is encoded in a string.schemaString="name age"fields=[StructField(field_name,StringType(),True)forfield_nameinschemaString.split()]schema=StructType(fields)# Apply the schema to the RDD.schemaPeople=spark.createDataFrame(people,schema)# Creates a temporary view using the DataFrameschemaPeople.createOrReplaceTempView("people")# SQL can be run over DataFrames that have been registered as a table.results=spark.sql("SELECT name FROM people")results.show()# +-------+# | name|# +-------+# |Michael|# | Andy|# | Justin|# +-------+if__name__=='__main__':spark=SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option","some-value") \
.getOrCreate()# base_test(spark=spark)# schema_inference_example(spark)schema_programmatic_example(spark)spark.stop()