- Blog

Keep it simple

Exporting Beam metrics on Flink with JmxReporter and Prometheus JMX Exporter

written on 23 May 2018
Beam allows to define metrics, which are then forwarded to the specific runner (e.g. Flink). The underlying execution engine then handles reporting of those metrics. In my case, I tried to report the metrics with the `GraphiteReporter` first. However Beam includes dots in some parts of the metric name (e.g. the namespace), which makes the Graphite paths generated by Flink almost unusable. Instead, I opted for the `JmxReporter`, which properly handles these cases. As an example, say we create a metric in a Beam DoFn: ```java package org.example; public class MyOperator extends DoFn<...> { private Counter myCounter = Metrics.counter(MyOperator.class, "myCounter"); ... @ProcessElement public void processElement(ProcessContext ctx) {; ... } } ``` Now, let's configure the JmxReporter in `flink-conf.yaml`: ```yaml metrics.reporters: jmx metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter metrics.reporter.jmx.port: 8789 ``` To be able to view the exported metrics with jconsole, we have to instruct the JVM to publish the jmxremote service in `flink-conf.yaml`: ```yaml ``` Note that when you're running your Flink in a docker container and want to access it from outside the docker host, you also have to specify the **Docker Host** hostname and open port `8789` (configured above): ``` ... -Djava.rmi.server.hostname=flink ``` We can now examine the exported metrics on the "MBeans" tab in jconsole when connecting to port `8789`. Here is how the domain of a bean could look like that contains our counter metric: ```java org.apache.flink.taskmanager.job.task.operator.__counter__XYZ/ParMultiDo(XYZ)__org.example.MyOperator__myCounter ``` I now wanted to collect these metrics in a Prometheus system. For this, I used the <a href="">Prometheux JMX Exporter</a>. This exporter runs as a javaagent and can be configured with a regex pattern to match beans. Here are the rules that should match all Flink Jobmanager, Taskmanager and Beam operator metrics: ```yaml rules: # Pattern Format: domain<beanpropertyName1=beanPropertyValue1, beanpropertyName2=beanPropertyValue2, ...><key1, key2, ...>attrName: value - pattern: org.apache.flink.taskmanager.job.task.operator.([^:]+)<job_id=(\w+), job_name=([^,]+), tm_id=(\w+), task_attempt_id=(\w+), task_attempt_num=(\d+), subtask_index=(\d+), task_id=(\w+), operator_name=([^,]+), task_name=([^,]+), operator_id=(\w+), host=(\w+)><>(Count|Value) name: flink_operator_metric labels: metric: $1 ... (add whatever you need) ... - pattern: org.apache.flink.taskmanager.(\w+)<host=(\w+), tm_id=(\w+)><>(Count|Value) name: flink_taskmanager_metric labels: host: $2 tm_id: $3 metric: $1 - pattern: org.apache.flink.jobmanager.(\w+)<host=(\w+)><>(Count|Value) name: flink_jobmanager_metric labels: host: $2 metric: $1 ``` To run the JMX Reporter agent with the Flink JVM, add this option to your `` in `flink-conf.yaml`: ```java -javaagent:/path/to/jmx_prometheus_javaagent.jar=8097:/path/to/jmx_config.yaml ``` All that's left is to add a prometheus scrape target: ```yaml scrape_configs: - job_name: flink static_configs: - target: ['flink:8097'] ``` You can now query the operator metrics, e.g. in Grafana: ```java flink_operator_metrics{metric=~".*MyOperator__myCounter"} ```