Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 5.3
Table of Contents

General

Can I run two instances of the flume node on the same unix machine?

Yes. Run flume with the -n option.

Code Block
flume node &
flume node -n <physicalnodename> &

Can I run two instances of the flume node on the same windows machine?

Yes. Install two instances of Flume Node for Windows. Change the installation path (ex: flume to flume-a) for the second installation.
You also need to change default port numbers (35862) in conf file in the FlumeNode Properties. Also you need to change some java options to change the flume.home path.

TODO: Add way to specify physical node name in configuration file.

I'm having problems with feature xxx.

First check to see if this is an experimental feature or a recommended feature in the Feature Status page. We'll focus on bugs in recommended or beta features before we focus on the experimental features.

I'm having a hard time with the argument syntax in the catalog section of the manual. Help!

The general form is

Code Block
sinkName(reqArg1, reqArg2[, optArg1="default" [optArg2=0]]{, kwarg1="default", kwarg2=0})

reqArg1 and reqArg2 are positional arguments and required in all instances. [ ] chars enclose optional positional arguments. All optional arguments have a default value and must be enumerated in order. Thus optArg1 and optArg2 are optional positional arguments, and have defaults that get filled in if the are not present. { } chars enclose optional keyword arguments. All keyword arguments are optional and have a default value and can be enumerated in any order. Thus kwarg1 and kwarg2 are keyword arguments with defaults.

Let's take tailDir as an example. Here's the definition in the manual.

Code Block
tailDir("dirname"[, fileregex=".*"[, startFromEnd=false[, recurseDepth=0]]]{,delim="regex", delimMode="exclude|prev|next"}) 

Here are some valid examples:

Code Block
tailDir("/var/log/app")            // all files 
tailDir("/var/log/app",".*\.log")  // all files with names that match the ".*\.log" regex (in shell this is *.log)
tailDir("/var/log/app",".*\.log", false, 1)  // all files with names that match the ".*\.log" regex, starting from beginning of file, with one level of recursion depth.
tailDir("/var/log/app", delim="\n\n", delimMode="exclude")  // all files with names that match the ".*\.log" regex, starting from beginning of file, with one level of recursion depth, that end with double new lines, excluding the double new lines
tailDir("/var/log/app",".*\.log", false, 1, delim="\n\n", delimMode="exclude")  // all files with names that match the ".*\.log" regex, starting from beginning of file, with one level of recursion depth, that end with double new lines, excluding the double new lines

Here are some invalid examples (should fail):

Code Block
tailDir()                                            // must have at least one arg
tailDir("/var/log/app", ".*", startFromEnd=true, 1)  // positional args by default cannot be used as kwargs

Here are some currently valid but likely not what you want examples.

Code Block
tailDir("/var/log/app", ".*", startFromEnd=true, recurseDepth=1)  // positional args by default cannot be used as kwargs

I'm new and I'm having a problem using dfs, customDfs/formatDfs, or escapedCustomDfs/escapedFormatDfs sinks.

You should use the collectorSink. It is sufficient for most users and greatly simplifies configuration. The sinks mentioned above are "low-level" and exposed for advanced users. HDFS files are not durable until they close or are synced, and these sinks do not automatically do this. The collectorSink is smarter and handles periodic closing of files.

Agent Side

I'm generating events from my application and sending it to a flume agent listening for Thrift/Avro RPCs and my timestamps seem to be in the 1970s.

Event generated is expected to have unix time in milliseconds. If the data is being generated by an external application, this application must generated data in terms of milliseconds.

For example, 1305680461000 should result in 5/18/11 01:01:01 GMT, but 1305680461 will result in something like 1/16/70 2:41:20 GMT

Collector Side

I already use syslog/thrift/scribe and want to just have a collector that spools to disk on failure. Can I do this?

Yes. The current solution is complex but seems to work.

Code Block
< mask("rolltag") roll(1500) { escapedCustomDfs("hdfs://...", "prefix-%{rolltag}) } ? mask("rolltag") diskFailover insistentAppend stubbornAppend insistentOpen mask("rolltag") roll(1500) { escapedCustomDfs("hdfs://...", "prefix-%{rolltag}) } >

The roll wrappers add a "rolltag" attribute and the mask("rolltag") removes it before going into another roller. Otherwise the roller may try to overwrite another "rolltag" which is currently not allowed.

Can I control the level of HDFS replication / block size / other client HDFS property?

Yes. HDFS block size and replication level are HDFS client parameters, so you should expect them to be set by client. The parameters you get are probably coming from hadoop-core.*.jar file (it usually contains hdfs-default.xml and friends). If you want to overwrite the default parameters, you need to set dfs.block.size and dfs.replication in your hdfs-site.xml or flume-site.xml file

What is a good amount of time for collector rolling?

Agents and Collectors

How do end-to-end acks work and where can I add a "filter" decorator to drop events?

The acks are generated from checksums of the body of events. So if you augment your events with new attributes (regex, value) the acks will still work. However, if you filter out events the checksums between the agentSink and the collectorSink the checksums won't sum up.

You can however, put filtering "after" the collector, or do filtering "before" the agent.

Ok because value adds attributes and does not modify the body.
node : <source> | agentE2ESink("ip of collector");
collector: collectorSource | value("newattr","newvalue") collectorSink("hdfs://xxxx", ...);

Ok because filter is before checksums calculated
node : <source> | filterOutEvents agentE2ESink("ip of collector");
collector: collectorSource | collectorSink("hdfs://xxxx", ...);

Ok because filter is after checksums are validated.
node : <source> | agentE2ESink("ip of collector");
collector: collectorSource | collector(xxx) { filterOutEvents escapedFormatDfs("hdfs://xxxx", ...) } ;

Not ok – checksums won't work out because events with checksum info never get checksum calculation.
node : <source> | agentE2ESink("ip of collector");
collector: collectorSource | filterOutEvents collectorSink("hdfs://xxxx", ...);

Plugins

I have a plugin that uses version xxx of Thrift and Flume is using version yyy.

Thrift versions have been wire-compatible from 0.5.0 to 0.6.0. Thus an application with a thrift 0.5.0 server should accept data from a thrift 0.6.0 client and vice-versa. I believe it has been wire compatible since 0.2.0 (NEEDS verification). The API generated code by the thrift compiler and the runtime libraries for java jars however, break compatibility. This will require a regeneration of thrift generated code. We suggest modifying the plugin as opposed to modifying flume or the target application.

Trivia

Why do the flume services have crazy port numbers?

The initial flume ports were the telephone numbers corresponding to F-L-U-M-E. F=3, L=5, U=8, M=6, E=3 => 35863. After this decision we picked arbitrary ports near that number. Maybe in a future release we'll pick ports that are easier.

Where did the name Flume come from?

The name Flume is the result a word play. Flume collects log data. Log is also a large tree or branch that has been cut down. A log flume is a narrow stream of water that carries logs. Get it? Told you it was bad. (smile)

Aaron Newton, a Cloudera Alum, actually suggested the name for the Flume project and it just seemed to fit.

.