The ComparableWatermarkSelector makes use of an abstract method called comparableQualifier() depending on which selector is being used (1 for MAX, -1 for MIN). Using this value, it tries to compare two values together, and based on the result, it changes the watermark value. However, this is an incorrect implementation because it assumed that the compareTo methods of Comparable returns -1, 0, or 1. This is not the case - the Comparable interface returns negative integers, positive integers or 0.
Taking the following example: a.compareTo(b). A positive integer is returned if "a" is the larger of the two, a negative integer is returned if "b" is larger and 0 if both are equal.
As a result of this issue, sometimes records are processed twice since the watermark selector is not able to select the largest value of the retrieved list.
I have attached a project that replicates this issue. Run TestWatermarkBehaviour.java in src/test/java.