Exporting Beam metrics on Flink with JmxReporter and Prometheus JMX Exporter
written on 23 May 2018
Beam allows to define metrics, which are then forwarded to the specific runner (e.g. Flink). The underlying execution engine then handles reporting of those metrics. In my case, I tried to report the metrics with the `GraphiteReporter` first. However Beam includes dots in some parts of the metric name (e.g. the namespace), which makes the Graphite paths generated by Flink almost unusable.
Instead, I opted for the `JmxReporter`, which properly handles these cases. As an example, say we create a metric in a Beam DoFn:
```java
package org.example;
public class MyOperator extends DoFn<...> {
private Counter myCounter = Metrics.counter(MyOperator.class, "myCounter");
...
@ProcessElement
public void processElement(ProcessContext ctx) {
myCounter.inc(); ...
}
}
```
Now, let's configure the JmxReporter in `flink-conf.yaml`:
```yaml
metrics.reporters: jmx
metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 8789
```
To be able to view the exported metrics with jconsole, we have to instruct the JVM to publish the jmxremote service in `flink-conf.yaml`:
```yaml
env.java.opts: -Djava.net.preferIPv4Stack=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
```
Note that when you're running your Flink in a docker container and want to access it from outside the docker host, you also have to specify the **Docker Host** hostname and open port `8789` (configured above):
```
env.java.opts: ... -Djava.rmi.server.hostname=flink
```
We can now examine the exported metrics on the "MBeans" tab in jconsole when connecting to port `8789`. Here is how the domain of a bean could look like that contains our counter metric:
```java
org.apache.flink.taskmanager.job.task.operator.__counter__XYZ/ParMultiDo(XYZ)__org.example.MyOperator__myCounter
```
I now wanted to collect these metrics in a Prometheus system. For this, I used the <a href="https://github.com/prometheus/jmx_exporter">Prometheux JMX Exporter</a>. This exporter runs as a javaagent and can be configured with a regex pattern to match beans. Here are the rules that should match all Flink Jobmanager, Taskmanager and Beam operator metrics:
```yaml
rules:
# Pattern Format: domain<beanpropertyName1=beanPropertyValue1, beanpropertyName2=beanPropertyValue2, ...><key1, key2, ...>attrName: value
- pattern: org.apache.flink.taskmanager.job.task.operator.([^:]+)<job_id=(\w+), job_name=([^,]+), tm_id=(\w+), task_attempt_id=(\w+), task_attempt_num=(\d+), subtask_index=(\d+), task_id=(\w+), operator_name=([^,]+), task_name=([^,]+), operator_id=(\w+), host=(\w+)><>(Count|Value)
name: flink_operator_metric
labels:
metric: $1 ... (add whatever you need) ...
- pattern: org.apache.flink.taskmanager.(\w+)<host=(\w+), tm_id=(\w+)><>(Count|Value)
name: flink_taskmanager_metric
labels:
host: $2
tm_id: $3
metric: $1
- pattern: org.apache.flink.jobmanager.(\w+)<host=(\w+)><>(Count|Value)
name: flink_jobmanager_metric
labels:
host: $2
metric: $1
```
To run the JMX Reporter agent with the Flink JVM, add this option to your `env.java.opts` in `flink-conf.yaml`:
```java
-javaagent:/path/to/jmx_prometheus_javaagent.jar=8097:/path/to/jmx_config.yaml
```
All that's left is to add a prometheus scrape target:
```yaml
scrape_configs:
- job_name: flink
static_configs:
- target: ['flink:8097']
```
You can now query the operator metrics, e.g. in Grafana:
```java
flink_operator_metrics{metric=~".*MyOperator__myCounter"}
```