Skip to content

Commit 1d7c45b

Browse files
author
Joey Echeverria
committed
Merge pull request #6 from rogerhadoop/master
update to Cloudear Manager 4.8 and CDH 4.5, update README.
2 parents 25705dc + 145d731 commit 1d7c45b

File tree

3 files changed

+51
-46
lines changed

3 files changed

+51
-46
lines changed

README.md

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,17 @@ This repository contains an example application for analyzing Twitter data using
66
Getting Started
77
---------------
88

9-
1. **Install Cloudera Manager 4.0 and CDH4**
9+
1. **Install Cloudera Manager 4.8 and CDH4**
1010

11-
Before you get started with the actual application, you'll first need CDH4 installed. Specifically, you'll need Hadoop, Flume, Oozie, and Hive. The easiest way to get the core components is to use Cloudera Manager to set up your initial environment. You can download Cloudera Manager from the [Cloudera website](https://ccp.cloudera.com/display/SUPPORT/Cloudera+Manager+Downloads#ClouderaManagerDownloads-ClouderaManager4.0), or install [CDH](https://ccp.cloudera.com/display/SUPPORT/CDH+Downloads#CDHDownloads-CDH4PackagesandDownloads) manually.
11+
Before you get started with the actual application, you'll first need CDH4 installed. Specifically, you'll need Hadoop, Flume, Oozie, and Hive. The easiest way to get the core components is to use Cloudera Manager to set up your initial environment. You can download Cloudera Manager from the [Cloudera website](https://www.cloudera.com/content/cloudera-content/cloudera-docs/CM4Ent/latest/Cloudera-Manager-Version-and-Download-Information/Cloudera-Manager-Version-and-Download-Information.html), or install [CDH](http://www.cloudera.com/content/cloudera/en/products-and-services/cdh.html) manually.
1212

13-
If you go the Cloudera Manager route, you'll still need to [install Flume manually](https://ccp.cloudera.com/display/CDH4DOC/Flume+Installation).
13+
If you go the Cloudera Manager route, you'll still need to [install Flume manually](http://www.cloudera.com/content/cloudera-content/cloudera-docs/CM4Free/4.5.1/Cloudera-Manager-Free-Edition-User-Guide/cmfeug_topic_5_1.html).
1414

1515
2. **Install MySQL**
1616

1717
MySQL is the recommended database for the Oozie database and the Hive metastore. Click [here](http://dev.mysql.com/doc/refman/5.1/en/linux-installation-native.html) for installation documentation.
1818

19-
Configuring Flume
19+
Configuring Flume (Cloudera Manager path)
2020
------------------
2121

2222
1. **Build or Download the custom Flume Source**
@@ -37,22 +37,22 @@ Configuring Flume
3737

3838
2. **Add the JAR to the Flume classpath**
3939

40-
<pre>$ sudo cp /etc/flume-ng/conf/flume-env.sh.template /etc/flume-ng/conf/flume-env.sh</pre>
41-
42-
Edit the `flume-env.sh` file and uncomment the `FLUME_CLASSPATH` line, and enter the path to the JAR. If adding multiple paths, separate them with a colon.
40+
Copy `flume-sources-1.0-SNAPSHOT.jar` to /usr/share/cmf/lib/plugins/.
4341

44-
3. **Set the Flume agent name to TwitterAgent in /etc/default/flume-ng-agent**
42+
3. **Configure Flume agent in Cloudera Manager Web UI flume**
4543

46-
If you don't see the `/etc/default/flume-ng-agent` file, it likely means that you didn't install the `flume-ng-agent` package. In the file, you should have the following:
47-
48-
<pre>FLUME_AGENT_NAME=TwitterAgent</pre>
49-
50-
4. **Modify the provided Flume configuration and copy it to /etc/flume-ng/conf**
51-
52-
There is a file called `flume.conf` in the `flume-sources` directory, which needs some minor editing. There are four fields which need to be filled in with values from Twitter. The relevant information is available on the Details page for [your Twitter app](https://dev.twitter.com/apps). Fill in the consumer key, consumer secret, access token, and access token secret. The `keywords` parameter accepts a comma-separated list of keywords to use to filter tweets and collect a relevant set of data. If the parameter is not defined, the Twitter Sample API will be used to collect a sample of the entire Twitter Firehose.
53-
54-
<pre>$ sudo cp flume.conf /etc/flume-ng/conf</pre>
44+
Go to the Flume Service page (by selecting Flume service from the Services menu or from the All Services page).
45+
46+
Pull down the `Configuration` tab, and select `View and Edit`.
47+
48+
Select the Agent (Default) in the left hand column.
49+
50+
Set the Agent Name property to `TwitterAgent` whose configuration is defined in flume.conf.
5551

52+
Copy the contents of flume.conf file, in its entirety, into the Configuration File field.
53+
54+
Click `Save Changes` button.
55+
5656
Setting up Hive
5757
----------------
5858

@@ -87,7 +87,7 @@ Setting up Hive
8787

8888
3. **Configure the Hive metastore**
8989

90-
The Hive metastore should be configured to use MySQL. Follow these [instructions](https://ccp.cloudera.com/display/CDH4DOC/Hive+Installation#HiveInstallation-ConfiguringtheHiveMetastore) to configure the metastore. Make sure to install the MySQL JDBC driver in `/usr/lib/hive/lib`.
90+
The Hive metastore should be configured to use MySQL. Follow these [instructions](http://www.cloudera.com/content/cloudera-content/cloudera-docs/CDH4/4.2.0/CDH4-Installation-Guide/cdh4ig_topic_18_4.html) to configure the metastore. Make sure to install the MySQL JDBC driver in `/var/lib/hive/lib`.
9191

9292
4. **Create the tweets table**
9393

@@ -134,7 +134,7 @@ Prepare the Oozie workflow
134134

135135
If using Cloudera Manager, Oozie can be reconfigured to use MySQL via the service configuration page on the Databases tab. Make sure to restart the Oozie service after reconfiguring. You will need to install the MySQL JDBC driver in `/usr/lib/oozie/libext`.
136136

137-
If Oozie was installed manually, Cloudera provides [instructions](https://ccp.cloudera.com/display/CDH4DOC/Oozie+Installation#OozieInstallation-ConfiguringOozietoUseMySQL) for configuring Oozie to use MySQL.
137+
If Oozie was installed manually, Cloudera provides [instructions](http://www.cloudera.com/content/cloudera-content/cloudera-docs/CDH4/4.2.1/CDH4-Installation-Guide/cdh4ig_topic_17_6.html) for configuring Oozie to use MySQL.
138138

139139
2. **Create a lib directory and copy any necessary external JARs into it**
140140

@@ -166,7 +166,7 @@ Prepare the Oozie workflow
166166
$ sudo -u hdfs hadoop fs -chown oozie:oozie /user/oozie
167167
</pre>
168168

169-
In order to use the Hive action, the Oozie ShareLib must be installed. Installation instructions can be found [here](https://ccp.cloudera.com/display/CDH4DOC/Oozie+Installation#OozieInstallation-InstallingtheOozieShareLibinHadoopHDFS).
169+
In order to use the Hive action, the Oozie ShareLib must be installed. Installation instructions can be found [here](http://www.cloudera.com/content/cloudera-content/cloudera-docs/CDH4/4.2.1/CDH4-Installation-Guide/cdh4ig_topic_17_6.html).
170170

171171
Starting the data pipeline
172172
------------------------
@@ -181,6 +181,8 @@ Starting the data pipeline
181181
$ hadoop fs -chmod -R 770 /user/flume
182182
$ sudo /etc/init.d/flume-ng-agent start
183183
</pre>
184+
185+
If using Cloudera Manager, start Flume agent from Cloudera Manager Web UI.
184186

185187
2. **Adjust the start time of the Oozie coordinator workflow in job.properties**
186188

flume-sources/pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929

3030
<properties>
3131
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
32-
<flume.version>1.3.0-cdh4.2.0</flume.version>
33-
<hadoop.version>2.0.0-cdh4.2.0</hadoop.version>
32+
<flume.version>1.4.0-cdh4.5.0</flume.version>
33+
<hadoop.version>2.0.0-cdh4.5.0</hadoop.version>
3434
</properties>
3535

3636
<build>
@@ -94,7 +94,7 @@
9494
<dependency>
9595
<groupId>org.twitter4j</groupId>
9696
<artifactId>twitter4j-stream</artifactId>
97-
<version>[2.2,3)</version>
97+
<version>3.0.5</version>
9898
</dependency>
9999

100100
<!-- Hadoop Dependencies -->

flume-sources/src/main/java/com/cloudera/flume/source/TwitterSource.java

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.slf4j.LoggerFactory;
3333

3434
import twitter4j.FilterQuery;
35+
import twitter4j.StallWarning;
3536
import twitter4j.Status;
3637
import twitter4j.StatusDeletionNotice;
3738
import twitter4j.StatusListener;
@@ -48,7 +49,7 @@
4849
*/
4950
public class TwitterSource extends AbstractSource
5051
implements EventDrivenSource, Configurable {
51-
52+
5253
private static final Logger logger =
5354
LoggerFactory.getLogger(TwitterSource.class);
5455

@@ -57,14 +58,11 @@ public class TwitterSource extends AbstractSource
5758
private String consumerSecret;
5859
private String accessToken;
5960
private String accessTokenSecret;
60-
61+
6162
private String[] keywords;
62-
63+
6364
/** The actual Twitter stream. It's set up to collect raw JSON data */
64-
private final TwitterStream twitterStream = new TwitterStreamFactory(
65-
new ConfigurationBuilder()
66-
.setJSONStoreEnabled(true)
67-
.build()).getInstance();
65+
private TwitterStream twitterStream;
6866

6967
/**
7068
* The initialization method for the Source. The context contains all the
@@ -77,12 +75,21 @@ public void configure(Context context) {
7775
consumerSecret = context.getString(TwitterSourceConstants.CONSUMER_SECRET_KEY);
7876
accessToken = context.getString(TwitterSourceConstants.ACCESS_TOKEN_KEY);
7977
accessTokenSecret = context.getString(TwitterSourceConstants.ACCESS_TOKEN_SECRET_KEY);
80-
78+
8179
String keywordString = context.getString(TwitterSourceConstants.KEYWORDS_KEY, "");
8280
keywords = keywordString.split(",");
8381
for (int i = 0; i < keywords.length; i++) {
8482
keywords[i] = keywords[i].trim();
8583
}
84+
85+
ConfigurationBuilder cb = new ConfigurationBuilder();
86+
cb.setOAuthConsumerKey(consumerKey);
87+
cb.setOAuthConsumerSecret(consumerSecret);
88+
cb.setOAuthAccessToken(accessToken);
89+
cb.setOAuthAccessTokenSecret(accessTokenSecret);
90+
cb.setJSONStoreEnabled(true);
91+
92+
twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
8693
}
8794

8895
/**
@@ -94,9 +101,9 @@ public void start() {
94101
// The channel is the piece of Flume that sits between the Source and Sink,
95102
// and is used to process events.
96103
final ChannelProcessor channel = getChannelProcessor();
97-
104+
98105
final Map<String, String> headers = new HashMap<String, String>();
99-
106+
100107
// The StatusListener is a twitter4j API, which can be added to a Twitter
101108
// stream, and will execute methods every time a message comes in through
102109
// the stream.
@@ -110,40 +117,36 @@ public void onStatus(Status status) {
110117
headers.put("timestamp", String.valueOf(status.getCreatedAt().getTime()));
111118
Event event = EventBuilder.withBody(
112119
DataObjectFactory.getRawJSON(status).getBytes(), headers);
113-
120+
114121
channel.processEvent(event);
115122
}
116-
123+
117124
// This listener will ignore everything except for new tweets
118125
public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
119126
public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
120127
public void onScrubGeo(long userId, long upToStatusId) {}
121128
public void onException(Exception ex) {}
129+
public void onStallWarning(StallWarning warning) {}
122130
};
123-
131+
124132
logger.debug("Setting up Twitter sample stream using consumer key {} and" +
125133
" access token {}", new String[] { consumerKey, accessToken });
126-
// Set up the stream's listener (defined above), and set any necessary
127-
// security information.
134+
// Set up the stream's listener (defined above),
128135
twitterStream.addListener(listener);
129-
twitterStream.setOAuthConsumer(consumerKey, consumerSecret);
130-
AccessToken token = new AccessToken(accessToken, accessTokenSecret);
131-
twitterStream.setOAuthAccessToken(token);
132-
136+
133137
// Set up a filter to pull out industry-relevant tweets
134138
if (keywords.length == 0) {
135139
logger.debug("Starting up Twitter sampling...");
136140
twitterStream.sample();
137141
} else {
138142
logger.debug("Starting up Twitter filtering...");
139-
FilterQuery query = new FilterQuery()
140-
.track(keywords)
141-
.setIncludeEntities(true);
143+
144+
FilterQuery query = new FilterQuery().track(keywords);
142145
twitterStream.filter(query);
143146
}
144147
super.start();
145148
}
146-
149+
147150
/**
148151
* Stops the Source's event processing and shuts down the Twitter stream.
149152
*/

0 commit comments

Comments
 (0)