-
Notifications
You must be signed in to change notification settings - Fork 343
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Custom Flume source and Flume configuration
This adds a Flume source which will access the Twitter Streaming API and stream raw JSON tweets through Flume. The configuration defines a setup which will take the JSON data and load it into HDFS via a Memory Channel.
- Loading branch information
Jon Natkins
committed
Aug 30, 2012
1 parent
41c7dbd
commit 2b93087
Showing
4 changed files
with
325 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
|
||
|
||
# The configuration file needs to define the sources, | ||
# the channels and the sinks. | ||
# Sources, channels and sinks are defined per agent, | ||
# in this case called 'TwitterAgent' | ||
|
||
TwitterAgent.sources = Twitter | ||
TwitterAgent.channels = MemChannel | ||
TwitterAgent.sinks = HDFS | ||
|
||
TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource | ||
TwitterAgent.sources.Twitter.channels = Channel-1 | ||
TwitterAgent.sources.Twitter.consumerKey = <required> | ||
TwitterAgent.sources.Twitter.consumerSecret = <required> | ||
TwitterAgent.sources.Twitter.accessToken = <required> | ||
TwitterAgent.sources.Twitter.accessTokenSecret = <required> | ||
|
||
TwitterAgent.sinks.HDFS.channel = Channel-1 | ||
TwitterAgent.sinks.HDFS.type = hdfs | ||
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://hadoop1:8020/user/flume/tweets/%Y/%m/%d/%H/ | ||
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream | ||
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text | ||
TwitterAgent.sinks.HDFS.hdfs.batchSize = 10000 | ||
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0 | ||
TwitterAgent.sinks.HDFS.hdfs.rollCount = 0 | ||
|
||
TwitterAgent.channels.MemChannel.type = memory | ||
TwitterAgent.channels.MemChannel.capacity = 10000 | ||
TwitterAgent.channels.MemChannel.transactionCapacity = 10000 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<groupId>com.cloudera</groupId> | ||
<artifactId>flume-sources</artifactId> | ||
<version>1.0-SNAPSHOT</version> | ||
<packaging>jar</packaging> | ||
|
||
<name>flume-sources</name> | ||
<url>http://www.cloudera.com</url> | ||
|
||
<properties> | ||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | ||
<flume.version>1.1.0-cdh4.0.1</flume.version> | ||
<hadoop.version>2.0.0-cdh4.0.1</hadoop.version> | ||
</properties> | ||
|
||
<build> | ||
<plugins> | ||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-eclipse-plugin</artifactId> | ||
<version>2.9</version> | ||
<configuration> | ||
<buildOutputDirectory>eclipse-classes</buildOutputDirectory> | ||
<downloadSources>true</downloadSources> | ||
<downloadJavadocs>false</downloadJavadocs> | ||
</configuration> | ||
</plugin> | ||
|
||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-shade-plugin</artifactId> | ||
<version>1.7.1</version> | ||
<executions> | ||
<execution> | ||
<phase>package</phase> | ||
<goals> | ||
<goal>shade</goal> | ||
</goals> | ||
</execution> | ||
</executions> | ||
</plugin> | ||
</plugins> | ||
|
||
<pluginManagement> | ||
<plugins> | ||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-compiler-plugin</artifactId> | ||
<version>2.3.2</version> | ||
<configuration> | ||
<source>1.6</source> | ||
<target>1.6</target> | ||
</configuration> | ||
</plugin> | ||
</plugins> | ||
</pluginManagement> | ||
</build> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>junit</groupId> | ||
<artifactId>junit</artifactId> | ||
<version>4.8.2</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.slf4j</groupId> | ||
<artifactId>slf4j-api</artifactId> | ||
<version>1.6.1</version> | ||
</dependency> | ||
|
||
<!-- For the Twitter API --> | ||
<dependency> | ||
<groupId>org.twitter4j</groupId> | ||
<artifactId>twitter4j-stream</artifactId> | ||
<version>[2.2,)</version> | ||
</dependency> | ||
|
||
<!-- Hadoop Dependencies --> | ||
<dependency> | ||
<groupId>org.apache.flume</groupId> | ||
<artifactId>flume-ng-core</artifactId> | ||
<version>${flume.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.flume</groupId> | ||
<artifactId>flume-ng-sdk</artifactId> | ||
<version>${flume.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.hadoop</groupId> | ||
<artifactId>hadoop-common</artifactId> | ||
<version>${hadoop.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
</dependencies> | ||
|
||
<repositories> | ||
<repository> | ||
<id>cloudera</id> | ||
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url> | ||
<releases> | ||
<enabled>true</enabled> | ||
</releases> | ||
<snapshots> | ||
<enabled>false</enabled> | ||
</snapshots> | ||
</repository> | ||
</repositories> | ||
</project> |
150 changes: 150 additions & 0 deletions
150
flume-sources/src/main/java/com/cloudera/flume/source/TwitterSource.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.cloudera.flume.source; | ||
|
||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import org.apache.flume.Context; | ||
import org.apache.flume.Event; | ||
import org.apache.flume.EventDrivenSource; | ||
import org.apache.flume.channel.ChannelProcessor; | ||
import org.apache.flume.conf.Configurable; | ||
import org.apache.flume.event.EventBuilder; | ||
import org.apache.flume.source.AbstractSource; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import twitter4j.Status; | ||
import twitter4j.StatusDeletionNotice; | ||
import twitter4j.StatusListener; | ||
import twitter4j.TwitterStream; | ||
import twitter4j.TwitterStreamFactory; | ||
import twitter4j.auth.AccessToken; | ||
import twitter4j.conf.ConfigurationBuilder; | ||
import twitter4j.json.DataObjectFactory; | ||
|
||
/** | ||
* A Flume Source, which pulls data from Twitter's streaming API. Currently, | ||
* this only supports pulling from the sample API, and only gets new status | ||
* updates. | ||
*/ | ||
public class TwitterSource extends AbstractSource | ||
implements EventDrivenSource, Configurable { | ||
|
||
private static final Logger logger = | ||
LoggerFactory.getLogger(TwitterSource.class); | ||
|
||
/** Information necessary for accessing the Twitter API */ | ||
private String consumerKey; | ||
private String consumerSecret; | ||
private String accessToken; | ||
private String accessTokenSecret; | ||
|
||
/** Size of event batches */ | ||
private long batchSize; | ||
|
||
/** The actual Twitter stream. It's set up to collect raw JSON data */ | ||
private final TwitterStream twitterStream = new TwitterStreamFactory( | ||
new ConfigurationBuilder() | ||
.setJSONStoreEnabled(true) | ||
.build()).getInstance(); | ||
|
||
/** | ||
* The initialization method for the Source. The context contains all the | ||
* Flume configuration info, and can be used to retrieve any configuration | ||
* values necessary to set up the Source. | ||
*/ | ||
@Override | ||
public void configure(Context context) { | ||
consumerKey = context.getString(TwitterSourceConstants.CONSUMER_KEY_KEY); | ||
consumerSecret = context.getString(TwitterSourceConstants.CONSUMER_SECRET_KEY); | ||
accessToken = context.getString(TwitterSourceConstants.ACCESS_TOKEN_KEY); | ||
accessTokenSecret = context.getString(TwitterSourceConstants.ACCESS_TOKEN_SECRET_KEY); | ||
|
||
batchSize = context.getLong(TwitterSourceConstants.BATCH_SIZE_KEY, | ||
TwitterSourceConstants.DEFAULT_BATCH_SIZE); | ||
} | ||
|
||
/** | ||
* Start processing events. This uses the Twitter Streaming API to sample | ||
* Twitter, and process tweets. | ||
*/ | ||
@Override | ||
public void start() { | ||
// The channel is the piece of Flume that sits between the Source and Sink, | ||
// and is used to process events. | ||
final ChannelProcessor channel = getChannelProcessor(); | ||
|
||
final List<Event> eventList = new ArrayList<Event>(); | ||
final Map<String, String> headers = new HashMap<String, String>(); | ||
|
||
// The StatusListener is a twitter4j API, which can be added to a Twitter | ||
// stream, and will execute methods every time a message comes in through | ||
// the stream. | ||
StatusListener listener = new StatusListener() { | ||
// The onStatus method is executed every time a new tweet comes in. | ||
public void onStatus(Status status) { | ||
// The EventBuilder is used to build an event using the headers and | ||
// the raw JSON of a tweet | ||
eventList.add(EventBuilder.withBody( | ||
DataObjectFactory.getRawJSON(status).getBytes(), headers)); | ||
|
||
// When we've filled up a batch, we use the channel to process the | ||
// list of events. | ||
if (eventList.size() >= batchSize) { | ||
logger.debug("Processing a batch of {} events", eventList.size()); | ||
channel.processEventBatch(eventList); | ||
eventList.clear(); | ||
} | ||
} | ||
|
||
// This listener will ignore everything except for new tweets | ||
public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {} | ||
public void onTrackLimitationNotice(int numberOfLimitedStatuses) {} | ||
public void onScrubGeo(long userId, long upToStatusId) {} | ||
public void onException(Exception ex) {} | ||
}; | ||
|
||
logger.debug("Setting up Twitter sample stream using consumer key {} and" + | ||
" access token {}", new String[] { consumerKey, accessToken }); | ||
// Set up the stream's listener (defined above), and set any necessary | ||
// security information. | ||
twitterStream.addListener(listener); | ||
twitterStream.setOAuthConsumer(consumerKey, consumerSecret); | ||
AccessToken token = new AccessToken(accessToken, accessTokenSecret); | ||
twitterStream.setOAuthAccessToken(token); | ||
|
||
// Start sampling Twitter! | ||
twitterStream.sample(); | ||
super.start(); | ||
} | ||
|
||
/** | ||
* Stops the Source's event processing and shuts down the Twitter stream. | ||
*/ | ||
@Override | ||
public void stop() { | ||
logger.debug("Shutting down Twitter sample stream..."); | ||
twitterStream.shutdown(); | ||
super.stop(); | ||
} | ||
} |
13 changes: 13 additions & 0 deletions
13
flume-sources/src/main/java/com/cloudera/flume/source/TwitterSourceConstants.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
package com.cloudera.flume.source; | ||
|
||
public class TwitterSourceConstants { | ||
|
||
public static final String CONSUMER_KEY_KEY = "consumerKey"; | ||
public static final String CONSUMER_SECRET_KEY = "consumerSecret"; | ||
public static final String ACCESS_TOKEN_KEY = "accessToken"; | ||
public static final String ACCESS_TOKEN_SECRET_KEY = "accessTokenSecret"; | ||
|
||
public static final String BATCH_SIZE_KEY = "batchSize"; | ||
public static final long DEFAULT_BATCH_SIZE = 1000L; | ||
|
||
} |