WordCount on KV<String, String> counting words on value and preserving link to the key?
Solution 1:
I am not sure if I understand what you want to do, I guess you want to do a Word Count on each key separately. If that's the case, this would work:
final List<KV<String, String>> elements = Arrays.asList(
KV.of("key1", "some random text"),
KV.of("key1", "some different text"),
KV.of("key1", "another random line"),
KV.of("key2", "some random text different"),
KV.of("key2", "bla bla bla")
);
p
.apply("Create Elements", Create.of(elements))
.apply("New combined KVs", ParDo.of(new DoFn<KV<String, String>, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
String[] values = c.element().getValue().split(" ");
for (String s: values) {
String key = String.format("%s %s", c.element().getKey(), s);
c.output(KV.of(key, 1));
}
}
}))
.apply(Count.perKey())
.apply("Separate KVs", ParDo.of(new DoFn<KV<String, Long>, KV<String, KV<String, Long>>>() {
@ProcessElement
public void processElement(ProcessContext c) {
String[] keys = c.element().getKey().split(" ");
KV<String, Long> value = KV.of(keys[1], c.element().getValue());
c.output(KV.of(keys[0], value));
}
}))
.apply(GroupByKey.create())
.apply("LOG", ParDo.of(new DoFn<KV<String, Iterable<KV<String, Long>>>, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
LOG.info(c.element().toString());
}
}));
The output is
KV{key1, [KV{line, 1}, KV{text, 2}, KV{another, 1}, KV{different, 1}, KV{some, 2}, KV{random, 2}]}
KV{key2, [KV{different, 1}, KV{some, 1}, KV{bla, 3}, KV{text, 1}, KV{random, 1}]}
The general idea behind is that you combine the keys into "double keys" (i.e., key1, some
, key
, random`) and aggregate first. Then separate and group.