本Intelij Idea工程是针对TBDS组件认证而开发,包含的demo程序如下:
- HBaseDemo:简单的HBase二次开发程序,传入表名,如果不存在,则创建;然后列出所有的表
- HDFSDemo:列出根目录/下的所有文件与文件夹
- HiveDemo:连接hiveserver,并列出所有的数据库
- KafkaProducerDemo:向指定topic发送数据
- KafkaConsumerDemo:消费指定topic的数据
- MapReduceDemo:wordCount程序
- SparkDemo: spark版本的wordCount,使用spark-submit提交
- SparkLauncherDemo: spark版本的wordCount,使用java直接提交
- ReadHiveTableDemo:使用spark读取hive表的数据
- WriteHiveTableDemo:使用spark输出数据到hive表
- FlinkKafkaDemo:flink消费kafka数据
- Spark输入到Hbase:spark往Hbase写数据的3种方式
使用maven编译出 dev-demo-.jar,使用java命令直接执行,各功能模块执行方式如下。
假定在dev-demo-.jar所在目录执行
java -cp dev-demo-<version>.jar:/usr/hdp/2.2.0.0-2041/hbase/lib/* com.tencent.tbds.demo.hbase.HBaseDemo --auth-id <id> --auth-key <key> --zk-host <host1,host2> --table-name <table name>
参数解释:
auth-id: 认证ID
auth-key: 认证key
zk-host: zookeeper主机列表
table-name: hbase表名
假定在dev-demo-.jar所在目录执行
hadoop jar dev-demo-<version>.jar com.tencent.tbds.demo.hdfs.HDFSDemo --auth-user <username> --auth-id <id> --auth-key <key>
参数解释:
auth-user: 认证用户
auth-id: 认证ID
auth-key: 认证key
准备
代码中默认采用高可用连接方式,因此在运行程序时需要传入zk地址、用户名、密码
运行
假定在dev-demo-.jar所在目录执行
java -cp dev-demo-<version>.jar:$(echo /usr/hdp/2.2.0.0-2041/hive/lib/*.jar | tr ' ' ':'):/usr/hdp/2.2.0.0-2041/hadoop/hadoop-common.jar com.tencent.tbds.demo.hive.HiveDemo --zk-list <host1:port1,host2:port2> --user <user name> --password <password>
参数解释:
zk-list: zookeeper地址列表
user: 连接hive的用户名
password: 连接hive的密码
假定在dev-demo-.jar所在目录执行
java -cp dev-demo-<version>.jar:/usr/hdp/2.2.0.0-2041/kafka/libs/* com.tencent.tbds.demo.kafka.KafkaProducerDemo --auth-id <id> --auth-key <key> --kafka-brokers <broker1:6667,broker2:6667> --topic <topic name>
参数解释:
auth-id: 认证ID
auth-key: 认证key
kafka-brokers: kafka brokers列表
topic: 指定数据发送到哪个topic
假定在dev-demo-.jar所在目录执行
java -cp dev-demo-<version>.jar:/usr/hdp/2.2.0.0-2041/kafka/libs/* com.tencent.tbds.demo.kafka.KafkaConsumerDemo --auth-id <id> --auth-key <key> --kafka-brokers <broker1:6667,broker2:6667> --topic <topic name>
参数解释:
auth-id: 认证ID
auth-key: 认证key
kafka-brokers: kafka brokers列表
topic: 指定消费哪个topic的数据
offset-reset: 可选参数,默认值是latest
假定在dev-demo-.jar所在目录执行
hadoop jar dev-demo-<version>.jar com.tencent.tbds.demo.mapreduce.MapReduceDemo --auth-user <username> --auth-id <id> --auth-key <key> --input <input path> --output <output path>
参数解释:
auth-user: 认证用户
auth-id: 认证ID
auth-key: 认证key
input: 数据输入目录
output: 数据输出目录
准备
运行spark程序当前仅支持export认证的方式,不支持在代码中直接注入认证信息
- 在运行前请export 认证信息:
即从路径/opt/cluster_conf/hadoop/ 中读取配置信息,所以请从集群中获取该配置文件并放置到对应的路径中
export hadoop_security_authentication_tbds_secureid=g9q06icsbwYWjQ4i2wbjz3MWNpo8DXqAZxzZ export hadoop_security_authentication_tbds_securekey=qbQyCiWaCJ0HmgiVpc5qofcKd8kVsJgj export hadoop_security_authentication_tbds_username=bhchen
运行
假定在dev-demo-.jar所在目录执行
- 运行方式1:采用spark-submit方式
其中 /tmp/wordcount/input/readme.txt 为输入数据,/tmp/wordcount/output为输出目录
/usr/hdp/2.2.0.0-2041/spark/bin/spark-submit --master yarn-cluster --executor-memory 3g --driver-memory 1g --num-executors 2 --executor-cores 2 --class com.tencent.tbds.demo.spark.SparkDemo dev-demo-<version>.jar /tmp/pyspark_demo/pyspark_demo.csv /tmp/spark_wordcount/
- 运行方式2:采用java的方式,该方式需要使用spark Launcher进程来把主任务调度起来:
原理
执行
SparkAppHandle handler = new SparkLauncher() .setAppName("spark-wordcount") .setSparkHome("/usr/hdp/2.2.0.0-2041/spark ") .setMaster("yarn") .setConf("spark.driver.memory", "1g") .setConf("spark.executor.memory", "3g") .setConf("spark.executor.cores", "2") .setConf("spark.executor.instances", "2") .setAppResource("dev-demo-<version>.jar") .setMainClass("com.tencent.tbds.demo.spark.SparkDemo") .addAppArgs(args[0], args[1]) .setDeployMode("cluster")
java -Djava.ext.dirs=/usr/hdp/2.2.0.0-2041/spark/jars:/usr/hdp/2.2.0.0-2041/hadoop -cp dev-demo-<version>.jar com.tencent.tbds.demo.spark.SparkLauncherDemo /usr/hdp/2.2.0.0-2041/spark cluster /tmp/pyspark_demo/pyspark_demo.csv /tmp/spark_wordcount/
这是一个使用spark读取hive表的示例程序
运行步骤:
- 使用maven命令打包项目:mvn clean compile package
- 将target下的zip包上传到服务器
- 解压zip包,进入解压目录
- 执行
./bin/spark_read_hive_table_demo.sh --auth-user <user name> --auth-id <secure id> --auth-key <secure key> --hive-metastore-uris <hive metastore address> --hive-db <hive database> --hive-table <hive table name>
参数解释:
auth-user: 认证用户
auth-id: 认证ID
auth-key: 认证key
hive-metastore-uris: hive metastore的地址
hive-db: hive数据库
hive-table: hive表
这是一个使用spark往hive表写数据的示例程序
运行步骤:
- 使用maven命令打包项目:mvn clean compile package
- 将target下的zip包上传到服务器
- 解压zip包,进入解压目录
- 执行
./bin/spark_write_hive_table_demo.sh --auth-user <user name> --auth-id <secure id> --auth-key <secure key> --hive-metastore-uris <hive metastore address> --hive-db <hive database> --hive-table <hive table name> --hdfs-path <hdfs path>
参数解释:
auth-user: 认证用户
auth-id: 认证ID
auth-key: 认证key
hive-metastore-uris: hive metastore的地址
hive-db: hive数据库
hive-table: hive表
hdfs-path: 读取数据的路径
这是一个使用flink实时消费kafka数据的示例程序,需要上传到oceanus运行
参数解释:
kafka-brokers: kafka broker地址列表
group-id: 消费者组ID
topic: kafka topic名称
auth-id: 认证ID
auth-key: 认证key
这是一个使用flink实时消费kafka数据,并输出到HDFS的示例程序,需要上传到oceanus运行
参数解释:
kafka-brokers: kafka broker地址列表
group-id: 消费者组ID
topic: kafka topic名称
auth-id: 认证ID
auth-key: 认证key
hdfs-path: 数据输出到HDFS的路径
这是一个使用Spark Streaming实时消费kafka数据的示例程序
spark-submit --class com.tencent.tbds.demo.spark.SparkStreamKafkaDemo --master yarn --deploy-mode client dev-demo-<version>.jar --kafka-brokers <kafka brokers> --group-id <group id> --auth-id <auth id> --auth-key <auth key> --topic <topic name>
参数解释:
kafka-brokers: kafka broker地址列表
group-id: 消费者组ID
auth-id: 认证ID
auth-key: 认证key
topic: kafka topic名称
Spark输出到HBase总体来说有3种方式,分别对应不同的应用场景
- 通过常规HBase API,这种方式吞吐量低,适用于数据量较小的场景,如流计算。
- 调用saveAsNewAPIHadoopDataset,底层实现方式是批量+异步,吞吐量高,但容易对region server造成太大压力,适用于中等规模数据的场景
- 生成HFile文件,再将此文件导入HBase,适用于大规模数据场景
这3种方式的例子请分别参考:SparkWriteHBaseDirectDemo、SparkWriteHBaseBatchDemo、SparkWriteHBaseBulkLoadDemo,其运行的命令如下:
spark-submit --class com.tencent.tbds.demo.spark.SparkWriteHBaseDirectDemo --jars $(echo /usr/hdp/2.2.0.0-2041/hbase/lib/*.jar | tr ' ' ',') dev-demo-<version>.jar --auth-id <auth id> --auth-key <auth key> --zk-host <host1,host2...> --table-name <tableName>