Spark3.0中pandas支持及其与DataFrame相互转换的示例分析-成都快上网建站

Spark3.0中pandas支持及其与DataFrame相互转换的示例分析

这篇文章主要为大家展示了“Spark 3.0中pandas支持及其与DataFrame相互转换的示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“Spark 3.0中pandas支持及其与DataFrame相互转换的示例分析”这篇文章吧。

创新互联是专业的五常网站建设公司,五常接单;提供成都网站制作、网站设计,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行五常网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!

pandas是python用户广泛使用的数据分析库,Spark 3.0已经能较好滴支持pandas接口,从而弥补pandas不能跨机进行大数据处理的不足。pandas还能够与Spark原来的DataFrame相互转换,方便Spark和Python的库相互调用。

1、Koalas: pandas API on Apache Spark

Koalas(https://koalas.readthedocs.io/en/latest/)项目使数据科学家在处理大数据时能够更有效率,通过在Spark的上层实现一套pandas DataFrame API。pandas 是python数据处理事实上的标准,而Spark是大数据处理的事实上的标准。通过Koalas,可以:

  • 通过 Spark 立即提升大数据处理生产力,如果熟悉pandas不用学习任何新的知识。

  • 在pandas (tests, smaller datasets) 和 Spark (distributed datasets)只需要一套数据分析代码,方便从研究环境扩展到生产环节。

1.1 安装指南

Koalas要求PySpark,需要首先安装PySpark。

Koalas安装的多种方式包括:

  • Conda

  • PyPI

  • Installation from source

安装PySpark,可以使用:

  • Installation with the official release channel

  • Conda

  • PyPI

  • Installation from source

1.2 Python 支持的版本

建议Python 3.5 及以上版本。

1.3 安装 Koalas
  • 通过 Conda 安装

首先需要安装 Conda ,然后创建一个conda环境。如下:

conda create --name koalas-dev-env

将创建一个只有 Python的最小环境,激活当前环境:

conda activate koalas-dev-env

安装 Koalas:

conda install -c conda-forge koalas

安装Koalas的特定版本:

conda install -c conda-forge koalas=0.19.0
  • 从 PyPI 安装

Koalas 可以使用 pip 从 PyPI 安装:

pip install koalas
  • 从源码安装

查看 Contribution Guide 获得更多指南。

1.4 安装 PySpark
  • 采用官方频道安装:

安装PySpark,从 the official release channel 下载。下载后,解包:

tar xzvf spark-2.4.4-bin-hadoop2.7.tgz

设置 SPARK_HOME 环境变量:

cd spark-2.4.4-bin-hadoop2.7
export SPARK_HOME=`pwd`

确保 PYTHONPATH 可以被 PySpark 和 Py4J找到,在 $SPARK_HOME/python/lib

export PYTHONPATH=$(ZIPS=("$SPARK_HOME"/python/lib/*.zip); IFS=:; echo "${ZIPS[*]}"):$PYTHONPATH
  • 从Conda安装:

PySpark 也可以从 Conda 安装:

conda install -c conda-forge pyspark
  • 从PyPI安装:

PySpark 可以从 PyPI 安装:

pip install pyspark

2、Koalas快速使用

首先,import Koalas 如下:

import pandas as pdimport numpy as npimport databricks.koalas as ksfrom pyspark.sql import SparkSession

数据对象创建

创建 Koalas Series,创建一个整数序列值:

s = ks.Series([1, 3, 5, np.nan, 6, 8])
s
0    1.0
1    3.0
2    5.0
3    NaN
4    6.0
5    8.0
dtype: float64

创建 Koalas DataFrame,导入词典对象,转为一个序列:

kdf = ks.DataFrame({'a': [1, 2, 3, 4, 5, 6], 'b': [100, 200, 300, 400, 500, 600], 'c': ["one", "two", "three", "four", "five", "six"]},index=[10, 20, 30, 40, 50, 60])
kdf
 abc
101100one
202200two
303300three
404400four
505500five
606600six

创建 pandas DataFrame,导入 numpy array,带datetime index 和 labeled columns:

dates = pd.date_range('20130101', periods=6)
dates
DatetimeIndex(['2013-01-01', '2013-01-02', '2013-01-03', '2013-01-04',
               '2013-01-05', '2013-01-06'],
              dtype='datetime64[ns]', freq='D')
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
pdf
 ABCD
2013-01-01-0.4072910.066551-0.0731490.648219
2013-01-02-0.8487350.4372770.6326570.312861
2013-01-03-0.415537-1.7870720.2422210.125543
2013-01-04-1.6372711.1348100.2825320.133995
2013-01-05-1.230477-1.9257340.736288-0.547677
2013-01-061.092894-1.0712810.318752-0.477591

现在,把pandas DataFrame 转为 Koalas DataFrame:

kdf = ks.from_pandas(pdf)
type(kdf)
databricks.koalas.frame.DataFrame

看起来与 pandas DataFrame几乎一样。

  • 更多例程:https://koalas.readthedocs.io/en/latest/getting_started/10min.html

3、pandas与dataframe的转换

pandas与dataframe、koalas都可以相互转换。注意pandas与dataframe的转换效率较低,而且pandas原生接口是单机的,建议使用Koalas。

3.1 pandas的dataframe转spark的dataframe
from pyspark.sql import SparkSession# 初始化spark会话spark = SparkSession \
    .builder \
    .getOrCreate()

spark_df = spark.createDataFrame(pandas_df)
3.2 spark的dataframe转pandas的dataframe
import pandas as pdpandas_df = spark_df.toPandas()

由于pandas的方式是单机版的,即toPandas()的方式是单机版的,所以参考breeze_lsw改成分布式版本:

import pandas as pddef _map_to_pandas(rdds):return [pd.DataFrame(list(rdds))]    
def topas(df, n_partitions=None):if n_partitions is not None: df = df.repartition(n_partitions)
    df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
    df_pand = pd.concat(df_pand)
    df_pand.columns = df.columnsreturn df_pand
	
pandas_df = topas(spark_df)

以上是“Spark 3.0中pandas支持及其与DataFrame相互转换的示例分析”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注创新互联行业资讯频道!


当前文章:Spark3.0中pandas支持及其与DataFrame相互转换的示例分析
文章分享:http://kswjz.com/article/pepgcg.html
扫二维码与项目经理沟通

我们在微信上24小时期待你的声音

解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流