Accessing a mapper's counter from a reducer

I need to access the counters from my mapper in my reducer. Is this possible? If so how is it done?

As an example: my mapper is:

public class CounterMapper extends Mapper<Text,Text,Text,Text> {

    static enum TestCounters { TEST }

    @Override
    protected void map(Text key, Text value, Context context)
                    throws IOException, InterruptedException {
        context.getCounter(TestCounters.TEST).increment(1);
        context.write(key, value);
    }
}

My reducer is

public class CounterReducer extends Reducer<Text,Text,Text,LongWritable> {

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
                        throws IOException, InterruptedException {
        Counter counter = context.getCounter(CounterMapper.TestCounters.TEST);
        long counterValue = counter.getValue();
        context.write(key, new LongWritable(counterValue));
    }
}

counterValue is always 0. Am I doing something wrong or is this just not possible?


Solution 1:

In the Reducer's configure(JobConf), you can use the JobConf object to look up the reducer's own job id. With that, your reducer can create its own JobClient -- i.e. a connection to the jobtracker -- and query the counters for this job (or any job for that matter).

// in the Reducer class...
private long mapperCounter;

@Override
public void configure(JobConf conf) {
    JobClient client = new JobClient(conf);
    RunningJob parentJob = 
        client.getJob(JobID.forName( conf.get("mapred.job.id") ));
    mapperCounter = parentJob.getCounters().getCounter(MAP_COUNTER_NAME);
}

Now you can use mapperCounter inside the reduce() method itself.

You actually need a try-catch here. I'm using the old API, but it shouldn't be hard to adapt for the new API.

Note that mappers' counters should all be finalized before any reducer starts, so contrary to Justin Thomas's comment, I believe you should get accurate values (as long as the reducers aren't incrementing the same counter!)

Solution 2:

Implemented Jeff G's solution on the new API:

    @Override
    public void setup(Context context) throws IOException, InterruptedException{
        Configuration conf = context.getConfiguration();
        Cluster cluster = new Cluster(conf);
        Job currentJob = cluster.getJob(context.getJobID());
        mapperCounter = currentJob.getCounters().findCounter(COUNTER_NAME).getValue();  
    }

Solution 3:

The whole point of map/reduce is to parallelize the jobs. There will be many unique mappers/reducers so the value wouldn't be correct anyway except for that run of the map/reduce pair.

They have a word count example:

http://wiki.apache.org/hadoop/WordCount

You could change the context.write(word,one) to context.write(line,one)