Kafka, Java and ZFS
I recently investigated why our latest Kafka deployment behaved erratically when enforcing the configured retention window. The behavior was strange - if the retention window being used was 7 days, and as long as the broker remained online, that seemed to be the rule enforced. However, if the broker was restarted, it would immediately identify any existing log segment on disk as expired and schedule it for deletion as it had breached the retention window configuration.
Why are log segments being deleted when Kafka starts up?
Found deletable segments with base offsets [11610665,12130396,12650133] due to retention time 86400000ms breachFor all intents and purposes, it looks like the log segments are ready for deletion, but knowing the retention window is set to 86400000ms and the data had been sent only minutes prior this cannot be correct.
Theorizing that the cause was producers failing to set an appropriate timestamp or corrupting the timeindex on sent messages, we set log.message.timestamp.type to AppendTime to ensure the broker was in charge of setting valid timestamps in the index. This had no effect and the issue persisted.
I noticed another useful log message
Rolled new log segment at offset 12764291 in 1 ms. (kafka.log.Log)
Scheduling segments for deletion List(LogSegment(baseOffset=11610665, size=1073731621, lastModifiedTime=1592532125000, largestTime=0), LogSegment(baseOffset=12130396, size=1073727967, lastModifiedTime=1592532462000, largestTime=0), LogSegment(baseOffset=12650133, size=235891971, lastModifiedTime=1592532531000, largestTime=0))Wow, this looks terrifying! Although the log segments are reporting a lastModifiedTime minutes in the past, the largestTime reported within the segment is 0.
This explains why the retention check fails and the segment is deleted, but why is that largestTime ZERO? I searched for similar issues and only stumbled upon a couple of results. One similar issue was caused by permissions and ours was running as root with no errors. The other was in a mailing list archive but had no solution.
The next thing to do is dig deep into Kafka and figure it out ourselves. Pulling the Kafka repo and getting a local build/dev/debug environment setup was a breeze. I started adding additional logging around the timeIndex being loaded and quickly realized there was sufficient debug logging available that we could enable.
Enabling debug logging was extremely useful (and verbose)
DEBUG Loaded index file /mnt/kafka-logs/topic-0/00000000000017221277.timeindex with maxEntries = 873813, maxIndexSize = 10485760, entries = 873813, lastOffset = TimestampOffset(0,17221277), file position = 10485756 (kafka.log.TimeIndex)This information is a little puzzling, the timeindex reported it was full, with 873813 entries, but the timestamp extracted was 0. The fact that the offset was the baseoffset also escaped me at first, this would indicate that both the offset and the timestamp were missing from the index.
I used the Kafka utilities to inspect the index:
$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files 00000000000017221277.timeindex | tail -n 3
timestamp: 1593018531468 offset: 17741701
timestamp: 1593018531604 offset: 17742128
timestamp: 1593018531712 offset: 17742575... and was even more confused. The data is well-structured, and looks valid as the Kafka DumpLogSegments successfully parses the timestamps and offsets within. What next?
Local Debugging
I was surprised how easy it was to get Kafka setup in my local environment and debug unittests. I pulled an index file from the broker exhibiting the issue and was able to load it up and debug it very quickly
@Test
def testLoadIndex(): Unit = {
val file = new File("/tmp/00000000000017221277.timeindex")
val idx = new TimeIndex(file, 17221277, 10485760, false)
idx.lastEntry
}
This yielded the root cause. LastEntry reported the same value as on the struggling broker. It looks like when inspecting lastEntry, the mmapped index file is opened and the last record (set of 12 bytes) is used as the result. I also realized the index file was 10MB (the initial allocation size) and only the first 12kb contained useful index entries that should populate the entire file. This explains why the timestamp is being read back as 0 and why the offset isn't found either (so the base offset is assumed).
Given this discovery, it appears the issue isn't caused when reading the data, but instead it is being written improperly. But surely a bug of this magnitude wouldn't be present in 2.3.1 and 2.4.1 of Kafka, and surely there would be some news about it had this issue been a bug within Kafka... right?
I started comparing the timeindex files written by different deployments of Kafka we use internally. It is now obvious that the indexes are not being trimmed when they are rolled.
Good:
-rw-r--r-- 1 0 0 53K Jun 23 20:28 00000000000015110795.timeindex
-rw-r--r-- 1 0 0 40K Jun 24 15:51 00000000000015681291.timeindex
Bad:
-rw-r--r-- 1 0 0 10M Jun 24 17:18 00000000000017221277.timeindex
-rw-r--r-- 1 0 0 10M Jun 24 17:18 00000000000017742576.timeindex
-rw-r--r-- 1 0 0 10M Jun 24 17:19 00000000000018264295.timeindex
-rw-r--r-- 1 0 0 10M Jun 24 17:19 00000000000018785990.timeindex
-rw-r--r-- 1 0 0 10M Jun 24 17:19 00000000000019307214.timeindexInspecting the raw file (not using Kafka parsers) reveals that this is indeed what is happening xxd 00000000000017221277.timeindex | vim -. But Kafka does explicitly clean up files when a log segment is rolled, so how is this happening?
File Systems
The mount being used for Kafka was zfs with lz4 compression on an nvme drive in AWS. All of our deployments use this, so surely zfs cannot be to blame. To rule out zfs being an issue, I changed the kafka logs directory to point to an EBS volume on one of the brokers, which used ext4. Upon restarting Kafka and loading more data onto the test topic, the log segments rolled and lo-and-behold, the timeindex was properly truncated after the 14kb of records written.
-rw-r--r-- 1 0 0 14K Jun 24 23:57 00000000000021417339.timeindex
OK ext4 behaved when the zfs mount didn't. Do other filesystems behave as well? I destroyed the zpool on another broker and set it up as a btrfs mount with LZO compression enabled. It like the ext4 mount, performed as expected. It was also using the nvme drive as well, so we could rule that out. We then took the first broker and remounted the nvme drive as ext4 and stopped using the ebs volume as it was a tad slow.
I don't know why, but ZFS is at fault.
This made it look like a zfs issue, and we went down the path of trying some different configurations, even some that were shared at a confluent presentation.
-O ashift=9
-O atime=off
-O compression=lz4
-O redundant_metadata=mostWe applied the configuration above to another broker using zfs and were extremely disappointed when it solved none of our problems. I tried adding an extra flush to log segments when they rotate, to really make sure they're being flushed to disk. I was able to do this and get it deployed to the broker fairly quickly, but it had no effect, and the issue persisted.
What about the other deployments?
We have other (identical-ish) deployments of kafka using the same jvm, kernel and zfs version that do not suffer this issue, what the heck is different with them?
I watched one of the brokers that allegedly did not have this issue - a staging instance - and sure enough, I saw log segments rolling and the timeindex was sized appropriately. What is going on?? I noted the kernel version, jvm version and zfs versions. Kernel and zfs versions matched across all deployments, however the java version on this broker was different than others. 1.8.0_252 was reported, a fair bit newer than 1.8.0_191 being reported on brokers having this issue.
Whoops, not actually a ZFS problem
I took to using asdf-vm (thank you Rex!) to setup a parallel jvm installation on the broker so that I could easily swap between them to verify this issue. I ended up having to modify the /bin/kafka-env.sh file to include a export JAVA_HOME=/path/to/.asdf/java so that the init.d scripts would pick it up, but after doing that, a magical thing happened
-rw-r--r-- 1 0 0 1023M Jun 25 18:58 00000000000024194196.log
-rw-r--r-- 1 0 0 3.5K Jun 25 18:58 00000000000024194196.index
-rw-r--r-- 1 0 0 5.2K Jun 25 18:58 00000000000024194196.timeindexThe log segments were trimmed successfully when rolling to a new segment. Restarting the broker was also successful, no incorrectly deleted data.
Root Cause
My understanding of this issue is that when reducing the length of a memmapped file stored in a zfs mount, the write back to disk is failing silently. https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8202261 seems to be directly related to this, and appears to be problematic from java 8u181 until 8u202.
Too Long, Didn't Read
Kafka can prematurely expire and remove data when starting up if the timeindex was not properly trimmed when it was flushed to disk. This happens because with the changes made in KIP-263 removed index sanity checking (except transaction index) from brokers.