- 解决了批次计算延迟后出现的任务append导致整体恢复后 计算消费还是跟不上的问题
- 支持动态调节 streaming 的 批次间隔时间 (不同于sparkstreaming 的 定长的批次间隔,StructuredStreaming中使用trigger实现了。)
- 支持在streaming过程中 重设 topics,用于生产中动态地增加删减数据源
- 添加了速率控制,KafkaRateController。用来控制读取速率,由于不是用的sparkstreaming,所有速率控制的一些参数拿不到,得自己去计算。
- 提供spark-streaming-kafka-0-10_2.10 spark 1.6 来支持 kafka的ssl
- 支持rdd.updateOffset 来管理偏移量。
- 只是结合了 sparkstreaming 1.6 和 kafka 010 。 使低版本的spark能够使用kafka的ssl验证
- 支持 SSL
- 支持spark 1.6 和 kafka 0.10 的结合
- 支持管理offset
- 解决了批次计算延迟后出现的任务append导致整体恢复后 计算消费还是跟不上的问题
- 支持动态调节 streaming 的 批次间隔时间 (不同于sparkstreaming 的 定长的批次间隔,StructuredStreaming中使用trigger实现了。)
- 支持在streaming过程中 重设 topics,用于生产中动态地增加删减数据源
- 提供spark-streaming-kafka-0-10_2.10 spark 1.6 来支持 kafka的ssl
- 支持rdd.updateOffset 来管理偏移量。
- 由于kakfa-010 的api的变化,之前的 kafka-08 版本的 spark-kafka 虽然能用,但是他依赖于spark-streaming-kafka-0-8_2.10
.(可能会导致一些版本问题);所以这次重新写了一个 kafka010 & spark-2.x 版本 ;但是使用方法还是跟之前的差不多,- kafka010有两种来管理offset的方式,一种是旧版的用zookeeper来管理,一种是本身自带的。现只提供zookeeper的管理方式
- 要确保编译的kafka-client的版本和服务器端的版本一致,否则会报 Error reading string of length 27489, only 475 bytes available 等错误
- 添加了速率控制,KafkaRateController。用来控制读取速率,由于不是用的sparkstreaming,所有速率控制的一些参数拿不到,得自己去计算。
- spark与其他组件的封装api
- 支持动态调节 streaming 的 批次间隔时间 (不同于sparkstreaming 的 定长的批次间隔,在Structed Streaming中使用trigger触发机制实现);不使用streamingContext 来实现流式计算,因为streamingContext是严格的时间间隔执行job任务,当job时间远小于batchtime时,会有大量的时间是在sleep等待下一个批次执行的到来(具体可以看看streamingContext的源码);StreamingDynamicContext 的设计借鉴了streamingContext的设计。但是在Job的提交上不使用Queue队列来appending堆积的job。当job执行完后,用户可以自己选择是否立刻执行下一个批次的计算,还是选择继续等待指定时长。
- 支持在streaming过程中 重设 topics,用于生产中动态地增加删减数据源
- 添加了速率控制,KafkaRateController。用来控制读取速率,由于不是用的sparkstreaming,所有速率控制的一些参数拿不到,得自己去计算。
- 提供spark-streaming-kafka-0-10_2.10 spark 1.6 来支持 kafka的ssl
- 支持rdd.updateOffset 来管理偏移量。
- 封装 StreamingKafkaContext :你依然可以用 streamingContext来实现流式计算,词Api封装了读取kafka数据。
scala version | Kafka version | hbase 1.0+ | es 2.3.0 | kudu 1.3.0 | SSL | |
---|---|---|---|---|---|---|
spark 1.3.x | 2.10 | 0.8 | 👌 | 🌟 | 🍆 | NO |
spark 1.6.x | 2.10 | 0.8 | 🐤 | 🎅 | 🌽 | NO |
spark 1.6.x | 2.10 | 0.10+ | 🐤 | 🎅 | 🌽 | YES |
spark 2.0.x | 2.10/2.11 | 0.10+ | 😃 | 🍒 | 🍑 | YES |
- Spark kafka /sparkstreaming kafka
- Spark Hbase
- Spark ES Util
- Spark Kudu
- Flink kafka
- Kafka Util
- Hbase Util
- Database util
- Elasticserach shade
- Rabbitmq util
- Splunk
- 封装了StreamingDynamicContext 。动态地调整 streaming的批次间隔时间,不像sparkstreaming的批次间隔时间是固定的(Streaming Kafka DynamicContext is encapsulated. Dynamically adjust the batch interval of streaming, unlike sparkstreaming, where the batch interval is fixed)
- 使用StreamingDynamicContext 可以让你在流式程序的执行过程中动态的调整你的topic和获取kafkardd的方式。而不需要重新启动程序
- 添加了 sparkStreaming 1.6 -> kafka 010 的 spark-streaming-kafka-0-10_2.10 。用以支持ssl 。
- 封装了spark/sparkstreaming direct读取kafka数据的方式;提供rdd.updateOffset方法来手动管理偏移量到zk; 提供配置参数。
(Encapsulated spark/sparkstreaming to read Kafka with Low level integration (offset in zookeeper)。Provides many configuration parameters to control the way to read Kafka data) - 支持topic新增分区
(Support topic to add new partition) - 支持rdd数据写入kafka 的算子
(Supporting RDD data to write to Kafka) - 支持 Kafka SSL (提供spark 1.6 + Kafka 010 的整合api)(sparkstreaming 1.6 with kafka 010 )
(Support Kafka SSL (0.10+,spark 1.6+)) - Add parameters : 'kafka.consumer.from' To dynamically decide whether to get Kafka data from last or from consumption point
- The version support of spark2.x Kafka 0.10+ is provided.(0.8, there is a big change compared to the 0.10 version.)
- https://github.com/LinMingQiang/spark-util/tree/spark-kafka-0-8_1.6 或者 https://github.com/LinMingQiang/spark-kafka
val kp = SparkKafkaContext.getKafkaParam(brokers,groupId,"consum","earliest")
val skc = new SparkKafkaContext(kp,sparkconf)
val kafkadataRdd = skc.kafkaRDD(topics,last,msgHandle)
//...do something
kafkadataRdd.updateOffsets(groupId)//update offset to zk
- 根据scan条件扫描hbase数据成RDD
(spark scan hbase data to RDD)
scan -> RDD[T] - 根据RDD的数据来批量gethbase
(spark RDD[T] get from hbase to RDD[U])
RDD[T] -> Get -> RDD[U] - 根据RDD的数据来批量 写入
spark RDD[T] write to hbase
RDD[T] -> Put -> Hbase - 根据RDD的数据来批量更新rdd数据
spark RDD[T] update with hbase data
RDD[T] -> Get -> Combine -> RDD[U] - 根据RDD的数据来批量更新rdd数据并写回hbase
spark RDD[T] update with hbase data then put return to hbase
RDD[T] -> Get -> Combine -> Put -> Hbase
val conf = new SparkConf().setMaster("local").setAppName("tets")
val sc = new SparkContext(conf)
val hc = new SparkHBaseContext(sc, zk)
hc.hbaseRDD(tablename, f).foreach { println }
hc.scanHbaseRDD(tablename, new Scan(), f)
- spark集成es
ElasticSearch integration for Apache Spark - scan es数据为rdd
Scanning es data into RDD - https://github.com/LinMingQiang/spark-util/tree/spark-es
sc.esRDD("testindex/testtype", query)
- 读取kudu的数据为rdd
Read kudu data into RDD - 讲rdd数据写入kudu
Write RDD data to kudu - draw lessons from: https://github.com/tmalaska/SparkOnKudu
- https://github.com/LinMingQiang/spark-util/tree/spark-kudu
- 这是一个简单的例子。读取卡夫卡数据,实现WordCount统计并写入HBase
- This is a simple example. Read Kafka data, implement WordCount statistics and write to HBase
- Splunk是一个日志显示和监视系统
(Splunk is a log display and monitoring system.) - Splunk的安装和使用
(Installation and use of Splunk)
- 操作kafka工具类,提供每天记录主题的偏移量,主要用于日重新计算、小时重新计算等功能。
Operate the tool class of kafka, provide offset to record topic by day, mainly used for day recalculation, hour recalculation and other functions
- 操作Hbase的工具类,查询HBase表的region信息,用于手动分割过大的region
The tool class that operates Hbase, inquires the region information of HBase table, used for manual split some excessive region
- Provides a connection tool for each database. include: es,hbase,mysql,mongo
- Resolving conflicts between ES and spark and Hadoop related packages
- A tool class for sending and consuming MQ messages
https://github.com/LinMingQiang/spark-util/tree/rabbitmq-util