How to do it...

In this example, we will learn how to specify the schema programmatically:

import pyspark.sql.types as typ

sch = typ.StructType([
typ.StructField('Id', typ.LongType(), False)
, typ.StructField('Model', typ.StringType(), True)
, typ.StructField('Year', typ.IntegerType(), True)
, typ.StructField('ScreenSize', typ.StringType(), True)
, typ.StructField('RAM', typ.StringType(), True)
, typ.StructField('HDD', typ.StringType(), True)
, typ.StructField('W', typ.DoubleType(), True)
, typ.StructField('D', typ.DoubleType(), True)
, typ.StructField('H', typ.DoubleType(), True)
, typ.StructField('Weight', typ.DoubleType(), True)
])

sample_data_rdd = sc.textFile('../Data/DataFrames_sample.csv')

header = sample_data_rdd.first()

sample_data_rdd = (
sample_data_rdd
.filter(lambda row: row != header)
.map(lambda row: row.split(','))
.map(lambda row: (
int(row[0])
, row[1]
, int(row[2])
, row[3]
, row[4]
, row[5]
, float(row[6])
, float(row[7])
, float(row[8])
, float(row[9])
)
)
)

sample_data_schema = spark.createDataFrame(sample_data_rdd, schema=sch)
sample_data_schema.show()
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset