Skip to content

Commit

Permalink
Custom Flume source and Flume configuration
Browse files Browse the repository at this point in the history
This adds a Flume source which will access the Twitter Streaming API and stream
raw JSON tweets through Flume. The configuration defines a setup which will
take the JSON data and load it into HDFS via a Memory Channel.
  • Loading branch information
Jon Natkins committed Aug 30, 2012
1 parent 41c7dbd commit 2b93087
Show file tree
Hide file tree
Showing 4 changed files with 325 additions and 0 deletions.
46 changes: 46 additions & 0 deletions flume-sources/flume.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'TwitterAgent'

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = Channel-1
TwitterAgent.sources.Twitter.consumerKey = <required>
TwitterAgent.sources.Twitter.consumerSecret = <required>
TwitterAgent.sources.Twitter.accessToken = <required>
TwitterAgent.sources.Twitter.accessTokenSecret = <required>

TwitterAgent.sinks.HDFS.channel = Channel-1
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://hadoop1:8020/user/flume/tweets/%Y/%m/%d/%H/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 10000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 0

TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 10000
116 changes: 116 additions & 0 deletions flume-sources/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.cloudera</groupId>
<artifactId>flume-sources</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>flume-sources</name>
<url>http://www.cloudera.com</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flume.version>1.1.0-cdh4.0.1</flume.version>
<hadoop.version>2.0.0-cdh4.0.1</hadoop.version>
</properties>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.9</version>
<configuration>
<buildOutputDirectory>eclipse-classes</buildOutputDirectory>
<downloadSources>true</downloadSources>
<downloadJavadocs>false</downloadJavadocs>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.7.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>

<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.2</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.1</version>
</dependency>

<!-- For the Twitter API -->
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>[2.2,)</version>
</dependency>

<!-- Hadoop Dependencies -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>${flume.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>${flume.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.cloudera.flume.source;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.auth.AccessToken;
import twitter4j.conf.ConfigurationBuilder;
import twitter4j.json.DataObjectFactory;

/**
* A Flume Source, which pulls data from Twitter's streaming API. Currently,
* this only supports pulling from the sample API, and only gets new status
* updates.
*/
public class TwitterSource extends AbstractSource
implements EventDrivenSource, Configurable {

private static final Logger logger =
LoggerFactory.getLogger(TwitterSource.class);

/** Information necessary for accessing the Twitter API */
private String consumerKey;
private String consumerSecret;
private String accessToken;
private String accessTokenSecret;

/** Size of event batches */
private long batchSize;

/** The actual Twitter stream. It's set up to collect raw JSON data */
private final TwitterStream twitterStream = new TwitterStreamFactory(
new ConfigurationBuilder()
.setJSONStoreEnabled(true)
.build()).getInstance();

/**
* The initialization method for the Source. The context contains all the
* Flume configuration info, and can be used to retrieve any configuration
* values necessary to set up the Source.
*/
@Override
public void configure(Context context) {
consumerKey = context.getString(TwitterSourceConstants.CONSUMER_KEY_KEY);
consumerSecret = context.getString(TwitterSourceConstants.CONSUMER_SECRET_KEY);
accessToken = context.getString(TwitterSourceConstants.ACCESS_TOKEN_KEY);
accessTokenSecret = context.getString(TwitterSourceConstants.ACCESS_TOKEN_SECRET_KEY);

batchSize = context.getLong(TwitterSourceConstants.BATCH_SIZE_KEY,
TwitterSourceConstants.DEFAULT_BATCH_SIZE);
}

/**
* Start processing events. This uses the Twitter Streaming API to sample
* Twitter, and process tweets.
*/
@Override
public void start() {
// The channel is the piece of Flume that sits between the Source and Sink,
// and is used to process events.
final ChannelProcessor channel = getChannelProcessor();

final List<Event> eventList = new ArrayList<Event>();
final Map<String, String> headers = new HashMap<String, String>();

// The StatusListener is a twitter4j API, which can be added to a Twitter
// stream, and will execute methods every time a message comes in through
// the stream.
StatusListener listener = new StatusListener() {
// The onStatus method is executed every time a new tweet comes in.
public void onStatus(Status status) {
// The EventBuilder is used to build an event using the headers and
// the raw JSON of a tweet
eventList.add(EventBuilder.withBody(
DataObjectFactory.getRawJSON(status).getBytes(), headers));

// When we've filled up a batch, we use the channel to process the
// list of events.
if (eventList.size() >= batchSize) {
logger.debug("Processing a batch of {} events", eventList.size());
channel.processEventBatch(eventList);
eventList.clear();
}
}

// This listener will ignore everything except for new tweets
public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
public void onScrubGeo(long userId, long upToStatusId) {}
public void onException(Exception ex) {}
};

logger.debug("Setting up Twitter sample stream using consumer key {} and" +
" access token {}", new String[] { consumerKey, accessToken });
// Set up the stream's listener (defined above), and set any necessary
// security information.
twitterStream.addListener(listener);
twitterStream.setOAuthConsumer(consumerKey, consumerSecret);
AccessToken token = new AccessToken(accessToken, accessTokenSecret);
twitterStream.setOAuthAccessToken(token);

// Start sampling Twitter!
twitterStream.sample();
super.start();
}

/**
* Stops the Source's event processing and shuts down the Twitter stream.
*/
@Override
public void stop() {
logger.debug("Shutting down Twitter sample stream...");
twitterStream.shutdown();
super.stop();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.cloudera.flume.source;

public class TwitterSourceConstants {

public static final String CONSUMER_KEY_KEY = "consumerKey";
public static final String CONSUMER_SECRET_KEY = "consumerSecret";
public static final String ACCESS_TOKEN_KEY = "accessToken";
public static final String ACCESS_TOKEN_SECRET_KEY = "accessTokenSecret";

public static final String BATCH_SIZE_KEY = "batchSize";
public static final long DEFAULT_BATCH_SIZE = 1000L;

}

0 comments on commit 2b93087

Please sign in to comment.