扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
本篇内容主要讲解“spark怎么编写udaf函数求中位数”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“spark怎么编写udaf函数求中位数”吧!
顺河网站建设公司成都创新互联,顺河网站设计制作,有大型网站制作公司丰富经验。已为顺河上1000家提供企业网站建设服务。企业网站搭建\外贸网站建设要多少钱,请找那个售后服务好的顺河做网站的公司定做!
package com.frank.sparktest.java; import org.apache.spark.sql.Row; import org.apache.spark.sql.expressions.MutableAggregationBuffer; import org.apache.spark.sql.expressions.UserDefinedAggregateFunction; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; public class MedianUdaf extends UserDefinedAggregateFunction { private StructType inputSchema; private StructType bufferSchema; public MedianUdaf(){ ListinputFields = new ArrayList<>(); inputFields.add(DataTypes.createStructField("nums",DataTypes.IntegerType,true)); inputSchema=DataTypes.createStructType(inputFields); List bufferFields = new ArrayList<>(); bufferFields.add(DataTypes.createStructField("datas",DataTypes.StringType,true)); bufferSchema=DataTypes.createStructType(bufferFields); } @Override public StructType inputSchema() { return inputSchema; } @Override public StructType bufferSchema() { return bufferSchema; } @Override public DataType dataType() { return DataTypes.DoubleType; } @Override public boolean deterministic() { return true; } @Override public void initialize(MutableAggregationBuffer buffer) { buffer.update(0,0); buffer.update(1,0); } @Override public void update(MutableAggregationBuffer buffer, Row input) { if (!input.isNullAt(0)){ buffer.update(0,buffer.getString(0)+","+input.getInt(0)); } } @Override public void merge(MutableAggregationBuffer buffer1, Row buffer2) { buffer1.update(0,buffer1.getString(0)+","+buffer2.getInt(0)); } @Override public Object evaluate(Row buffer) { List list = new ArrayList (); List stringList = Arrays.asList(buffer.getString(0).split(",")); for (String s : stringList){ list.add(Integer.valueOf(s)); } Collections.sort(list); int size = list.size(); int num=0; if(size % 2 == 1) { num = list.get((size / 2)+1); } if(size %2 == 0) { num = (list.get(size / 2)+list.get((size / 2)+1))/2; } return num; } }
上面是代码段,可以直接拿来使用
下面是测试程序
package com.frank.sparktest.java; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import java.io.IOException; import java.util.stream.IntStream; public class DemoUDAF { public static void main(String[] args) throws IOException { SQLContext sqlContext = SparkSession.builder().master("local").getOrCreate().sqlContext(); sqlContext.udf().register("generate", (Integer start, Integer end)-> IntStream.range(start, end+1).boxed().toArray(), DataTypes.createArrayType(DataTypes.IntegerType)); sqlContext.udf().register("media",new MedianUdaf()); sqlContext.sql("select generate(1,10)").show(); } }
到此,相信大家对“spark怎么编写udaf函数求中位数”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流