WordCount on KV<String, String> counting words on value and preserving link to the key?

Solution 1:

I am not sure if I understand what you want to do, I guess you want to do a Word Count on each key separately. If that's the case, this would work:

    final List<KV<String, String>> elements = Arrays.asList(
            KV.of("key1", "some random text"),
            KV.of("key1", "some different text"),
            KV.of("key1", "another random line"),
            KV.of("key2", "some random text different"),
            KV.of("key2", "bla bla bla")
    );

    p
            .apply("Create Elements", Create.of(elements))
            .apply("New combined KVs", ParDo.of(new DoFn<KV<String, String>, KV<String, Integer>>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    String[] values = c.element().getValue().split(" ");
                    for (String s: values) {
                        String key = String.format("%s %s", c.element().getKey(), s);
                        c.output(KV.of(key, 1));
                    }
                }
            }))
            .apply(Count.perKey())
            .apply("Separate KVs", ParDo.of(new DoFn<KV<String, Long>, KV<String, KV<String, Long>>>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    String[] keys = c.element().getKey().split(" ");
                    KV<String, Long> value = KV.of(keys[1], c.element().getValue());
                    c.output(KV.of(keys[0], value));
                }
            }))
            .apply(GroupByKey.create())
            .apply("LOG", ParDo.of(new DoFn<KV<String, Iterable<KV<String, Long>>>, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    LOG.info(c.element().toString());
                }
            }));

The output is

KV{key1, [KV{line, 1}, KV{text, 2}, KV{another, 1}, KV{different, 1}, KV{some, 2}, KV{random, 2}]}

KV{key2, [KV{different, 1}, KV{some, 1}, KV{bla, 3}, KV{text, 1}, KV{random, 1}]}

The general idea behind is that you combine the keys into "double keys" (i.e., key1, some, key, random`) and aggregate first. Then separate and group.