writeas直播()

在之前的文章中,我们分别学习了Spark RDD和PairRDD编程。本文将通过简单的例子加深我们对RDD的理解。一.前期准备开发环境:Windows 7+Ec

在之前的文章中,我们分别学习了Spark RDD和PairRDD编程。本文将通过简单的例子加深我们对RDD的理解。

一.前期准备

开发环境:Windows 7+Eclipse+JDK 1.7

部署环境:Linux+Zookeeper+Kafka+Hadoop+Spark

在开发这个例子之前,已经默认设置了开发环境和部署环境。如果没有,请参考我关于大数据开发的博客。

二.概念理解

Spark Streaming是Spark核心API的扩展,可以实现实时流数据的处理,具有高吞吐量和容错机制。它支持从各种数据源获取数据,包括Kafka、Flume、Kinesis和TCP sockets。从数据源获取数据后,可以使用map、reduce、join、window等高级函数来处理复杂的算法。最后,处理结果可以存储在HDFS、数据库、仪表板等中。事实上,你可以将流数据应用到Spark的机器学习和图形处理算法中。

火花流处理的数据流图

Spark Streaming的内部工作原理,接收实时输入的数据流,将数据分成批次,然后通过Spark engine处理,根据批次生成结果流。

火花流的内部工作原理

Spark流提供了一种高度抽象的数据流,称为离散流,代表连续的数据流。数据流本质上代表了RDD序列。对DStream的任何操作都将被转换成对底层RDD的操作。

三.实例需求

通过Spark Streaming+kafka,可以实时统计订单总数和所有订单的价格。

四.实例实现4.1 订单实体order

包com . lm . sparklearning . orderex maple;导入Java . io . serializable;/* * * simple order * @ author Liang Ming . Deng * */public class order实现serializable {/* * * * */private static final long serial version uid = 1l;//订单商品名称私有字符串名称;//订单价格私有浮动价格;公共秩序(){ super();} public Order(字符串名,浮动价格){ super();this.name = namethis.price =价格;} public String getName(){ return name;} public void set name(String name){ this . name = name;} public Float getPrice(){ return price;}public void setPrice(浮动价格){ this.price = price} @ override public String toString(){ return ” Order[name = “+name+”,price = “+price+”]”;} }4.2 kafka订单生产者orderProducerkafka生产者定期发送随机数量订单。

package com.lm.sparkLearning.orderexmaple; import java.io.IOException;import java.util.ArrayList;import java.util.List;import java.util.Properties; import org.slf4j.Logger;import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper;import com.lm.sparkLearning.utils.ConstantUtils;import com.lm.sparkLearning.utils.RandomUtils; import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig; /** * 订单 kafka消息生产者 * * @author liangming.deng * */public class OrderProducer {private static Logger logger = LoggerFactory.getLogger(OrderProducer.class); public static void main(String[] args) throws IOException {// set up the producerProducer<String, String> producer = null;ObjectMapper mapper = new ObjectMapper(); try { Properties props = new Properties();// kafka集群props.put(“metadata.broker.list”, ConstantUtils.METADATA_BROKER_LIST_VALUE); // 配置value的序列化类props.put(“serializer.class”, ConstantUtils.SERIALIZER_CLASS_VALUE);// 配置key的序列化类props.put(“key.serializer.class”, ConstantUtils.SERIALIZER_CLASS_VALUE); ProducerConfig config = new ProducerConfig(props);producer = new Producer<String, String>(config);// 定义发布消息体List<KeyedMessage<String, String>> messages = new ArrayList<>();// 每隔3秒生产随机个订单消息while (true) {int random = RandomUtils.getRandomNum(20);if (random == 0) {continue;}messages.clear();for (int i = 0; i < random; i++) {int orderRandom = RandomUtils.getRandomNum(random * 10);Order order = new Order(“name” + orderRandom, Float.valueOf(“” + orderRandom));// 订单消息体:topic和消息KeyedMessage<String, String> message = new KeyedMessage<String, String>(ConstantUtils.ORDER_TOPIC, mapper.writeValueAsString(order));messages.add(message);} producer.send(messages);logger.warn(“orderNum:” + random + “,message:” + messages.toString());Thread.sleep(10000); } } catch (Exception e) {e.printStackTrace();logger.error(“————-:” + e.getStackTrace());} finally {producer.close();} }}4.3 Spark Streaming+kafka订单实时统计OrderSparkStreaming

package com.lm.sparkLearning.orderexmaple; import java.util.Arrays;import java.util.HashMap;import java.util.HashSet;import java.util.Map;import java.util.Set;import java.util.concurrent.atomic.AtomicLong; import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.VoidFunction;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaPairInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import org.apache.spark.streaming.kafka.KafkaUtils;import org.slf4j.Logger;import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper;import com.google.common.util.concurrent.AtomicDouble;import com.lm.sparkLearning.utils.ConstantUtils;import com.lm.sparkLearning.utils.SparkUtils; import kafka.serializer.StringDecoder;import scala.Tuple2; /** * spark streaming统计订单量和订单总值 * * @author liangming.deng * */public class OrderSparkStreaming {private static Logger logger = LoggerFactory.getLogger(OrderSparkStreaming.class);private static AtomicLong orderCount = new AtomicLong(0);private static AtomicDouble totalPrice = new AtomicDouble(0); public static void main(String[] args) { // Create context with a 2 seconds batch intervalJavaStreamingContext jssc = SparkUtils.getJavaStreamingContext(“JavaDirectKafkaWordCount”,”local[2]”, null, Durations.seconds(20)); Set<String> topicsSet = new HashSet<>(Arrays.asList(ConstantUtils.ORDER_TOPIC.split(“,”)));Map<String, String> kafkaParams = new HashMap<>();kafkaParams.put(“metadata.broker.list”, ConstantUtils.METADATA_BROKER_LIST_VALUE);kafkaParams.put(“auto.offset.reset”, ConstantUtils.AUTO_OFFSET_RESET_VALUE); // Create direct kafka stream with brokers and topicsJavaPairInputDStream<String, String> orderMsgStream = KafkaUtils.createDirectStream(jssc,String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams,topicsSet); // json与对象映射对象final ObjectMapper mapper = new ObjectMapper();JavaDStream<Order> orderDStream = orderMsgStream.map(new Function<Tuple2<String, String>, Order>() {/** * */private static final long serialVersionUID = 1L; @Overridepublic Order call(Tuple2<String, String> t2) throws Exception {Order order = mapper.readValue(t2._2, Order.class);return order;}}).cache(); // 对DStream中的每一个RDD进行操作orderDStream.foreachRDD(new VoidFunction<JavaRDD<Order>>() {/** * */private static final long serialVersionUID = 1L; @Overridepublic void call(JavaRDD<Order> orderJavaRDD) throws Exception {long count = orderJavaRDD.count();if (count > 0) {// 累加订单总数orderCount.addAndGet(count);// 对RDD中的每一个订单,首先进行一次Map操作,产生一个包含了每笔订单的价格的新的RDD// 然后对新的RDD进行一次Reduce操作,计算出这个RDD中所有订单的价格众合Float sumPrice = orderJavaRDD.map(new Function<Order, Float>() {/** * */private static final long serialVersionUID = 1L; @Overridepublic Float call(Order order) throws Exception {return order.getPrice();}}).reduce(new Function2<Float, Float, Float>() {/** * */private static final long serialVersionUID = 1L; @Overridepublic Float call(Float a, Float b) throws Exception {return a + b;}});// 然后把本次RDD中所有订单的价格总和累加到之前所有订单的价格总和中。totalPrice.getAndAdd(sumPrice); // 数据订单总数和价格总和,生产环境中可以写入数据库logger.warn(“——-Total order count : ” + orderCount.get()+ ” with total price : ” + totalPrice.get());}}});orderDStream.print(); jssc.start(); // Start the computationjssc.awaitTermination(); // Wait for the computation to terminate}}4.4 实例实时结果

writeas直播()插图包com . lm . sparklearning . orderex maple;导入Java . io . io exception;导入Java . util . ArrayList;导入Java . util . list;导入Java . util . properties;导入org . slf4j . logger;导入org . SLF 4j . logger factory;导入com . faster XML . Jackson . databind . object mapper;导入com . lm . sparklearning . utils . constantutils;导入com . lm . sparklearning . utils . random utils;导入Kafka . javaapi . producer . producer;导入Kafka . producer . keyed message;导入Kafka . producer . producer config;/* * * order kafka消息生产者* * @ author Liang Ming . Deng * */公共类order producer { private static logger logger = loger factory . get logger(order producer . class);公共静态void main(String[] args)抛出IOException {//设置producerProducer & ltString,String & gtproducer = nullobject mapper mapper = new object mapper();请尝试{ Properties props = new Properties();//Kafka cluster props . put(” metadata . broker . list “,常量utils . metadata _ broker _ list _ value);//配置值的序列化类props . put(” serializer . class “,常量utils . serializer _ class _ value);//配置key的序列化类props . put(” key . serializer . class “,常量utils . serializer _ class _ value);producer config config = new producer config(props);生产者=新生产者& ltString,String & gt(配置);//定义发布消息正文列表< KeyedMessage & ltString,String & gt& gtmessages = new ArrayList & lt& gt();//每3秒产生一次随机顺序消息while(true){ int random = random utils . getrandomnum(20);if(random = = 0){ continue;} messages . clear();for(int I = 0;我& lt随机;i++){ int order random = random utils . getrandomnum(random * 10);Order Order = new Order(" name "+Order random,float . value of("+Order random));//Order message body:topic and message keyed message < String,String & gtmessage = new KeyedMessage & ltString,String & gt(ConstantUtils。ORDER_TOPIC,mapper . writevalueasstring(ORDER));messages.add(消息);} producer.send(消息);logger . warn(" orderNum:"+random+",message:"+messages . tostring());thread . sleep(10000);} } catch(异常e){ e . printstacktrace();logger . error("-:"+e . getstacktrace());}最后{ producer . close();} }}4.3 Spark Streaming+kafka订单实时统计orders sparkstreamingpackage com . lm . sparklearning . orderex maple;导入Java . util . arrays;导入Java . util . hashmap;导入Java . util . hashset;导入Java . util . map;导入Java . util . set;导入Java . util . concurrent . atomic . atomic long;导入org . Apache . spark . API . Java . javardd;导入org . Apache . spark . API . Java . function . function;导入org . Apache . spark . API . Java . function . function 2;导入org . Apache . spark . API . Java . function . void function;导入org . Apache . spark . streaming . durations;导入org . Apache . spark . streaming . API . Java . Java stream;导入org . Apache . spark . streaming . API . Java . javapairinputdstream;导入org . Apache . spark . streaming . API . Java . javastreaming context;import org . Apache . spark . streaming . Kafka . kafkautils;导入org . slf4j . logger;导入org . SLF 4j . logger factory;导入com . faster XML . Jackson . databind . object mapper;导入com . Google . common . util . concurrent . atomic double;导入com . lm . sparklearning . utils . constantutils;导入com . lm . sparklearning . utils . spark utils;导入Kafka . serializer . string decoder;导入scala。Tuple2/** * spark streaming统计订单数量和订单总值* * @ author Liang Ming . Deng * */public class order spark streaming { private static logger logger = logger factory . get logger(order spark streaming . class);private static AtomicLong order count = new AtomicLong(0);私有静态atomic double total price = new atomic double(0);public static void main(String[]args){//Create context with 2 seconds batch intervalJavaStreamingContext jssc = spark utils . getjavastreamingcontext(" JavaDirectKafkaWordCount "," local[2]),null,durations . seconds(20));设置& lt字符串& gttopicsSet = new HashSet & lt& gt(Arrays.asList(ConstantUtils。ORDER_TOPIC.split(",");地图& ltString,String & gtkafkaParams = new HashMap & lt& gt();kafkaparams . put(" metadata . broker . list ",ConstantUtils。元数据_经纪人_列表_值);kafkaparams . put(" auto . offset . reset ",ConstantUtils)。自动_偏移_重置_值);//使用brokers和topicsJavaPairInputDStream创建直接的kafka流& ltString,String & gtorderMsgStream = kafkautils . createdirectstream(jssc,String.class,String.class,StringDecoder.class,StringDecoder.class,kafkaParams,topicsSet);// json和对象映射对象FINAL object Mapper Mapper = NEW object Mapper();JavaDStream & lt订单& gtorderDStream = orderMsgStream.map(新函数& ltTuple2 & ltString,String & gt,订单& gt(){/* * * */private static final long serialVersionUID = 1L;@ override public Order call(tuple 2 & lt;String,String & gtt2)引发异常{ Order Order = mapper . read value(T2。_2,order . class);退货单;}}).cache();//对DStream OrderDStream中的每个RDD进行操作。Foreach RDD(新的void函数

OrderProducer消息生成器

订单分流实时计算

五 代码地址

http://git.oschina.net/a123demi/sparklearning

writeas直播()插图(1)

writeas直播()插图(2)

writeas直播()插图(3)

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。

作者:美站资讯,如若转载,请注明出处:https://www.meizw.com/n/189783.html

发表回复

登录后才能评论