Skip to content

Downloads cannot tolerate a shut down HDFS data node #43

@MattBlissett

Description

@MattBlissett

Downloads fail when a single HDFS data node has been cleanly shut down:

2025-03-28T10:53:29,336 WARN [task-result-getter-0] org.apache.spark.scheduler.TaskSetManager - Lost task 13.0 in stage 0.0 (TID 13) (192.168.196.69 executor 4): org.apache.iceberg.exceptions.RuntimeIOException: Failed to open Parquet file: hdfs://gbif-hdfs/uat2/occurrence/data/00031-10031-9c188ddd-688e-4bf2-a67d-ee211410a0f7-0-00001.parquet
	at org.apache.iceberg.parquet.ReadConf.newReader(ReadConf.java:240)
	at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:81)
	at org.apache.iceberg.parquet.ParquetReader.init(ParquetReader.java:71)
	at org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:91)
	at org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:37)
	at org.apache.iceberg.util.Filter.lambda$filter$0(Filter.java:34)
	at org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:72)
	at org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:95)
	at org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:43)
	at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:143)
	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120)
	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63)
	at scala.Option.exists(Option.scala:376)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:97)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.hadoop.ipc.RemoteException(java.lang.NullPointerException): java.lang.NullPointerException
	at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1612)
	at org.apache.hadoop.ipc.Client.call(Client.java:1558)
	at org.apache.hadoop.ipc.Client.call(Client.java:1455)
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:242)
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:129)
	at com.sun.proxy.$Proxy47.getBlockLocations(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:333)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
	at com.sun.proxy.$Proxy48.getBlockLocations(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:900)
	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:889)
	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:878)
	at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1046)
	at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:340)
	at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:336)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:353)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:976)
	at org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:69)
	at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:796)
	at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:666)
	at org.apache.iceberg.parquet.ReadConf.newReader(ReadConf.java:238)
	... 38 more

This has happened when a node was shut down while downloads were running, and when downloads were started with a single node already shut down.

FSCK queries suggested the data block was probably replicated to other nodes in the first case, and the block must have been on a running node in the second case.

/uat2/occurrence/data/00112-10112-f3f5babf-8dfa-49a2-814a-5fde3ae57b85-0-00001.parquet 38706181 bytes, replicated: replication=3, 1 block(s):  OK
0. BP-681748575-192.168.196.80-1731409307563:blk_1172648732_107654816 len=38706181 Live_repl=3  [/f_rack_3/c6machines/130.225.43.73:31821(LIVE), /f_rack_3/c6machines/130.225.43.76:31331(LIVE), /f_rack_2/default/130.225.43.207:31552(LIVE)]

HDFS gave a NullPointerException too:

bash-4.4$ hdfs dfs -ls /uat2/occurrence/data/00112-10112-f3f5babf-8dfa-49a2-814a-5fde3ae57b85-0-00001.parquet
-rw-r--r--   3 stackable supergroup   38706181 2025-03-27 05:03 /uat2/occurrence/data/00112-10112-f3f5babf-8dfa-49a2-814a-5fde3ae57b85-0-00001.parquet

bash-4.4$ hdfs dfs -cat /uat2/occurrence/data/00112-10112-f3f5babf-8dfa-49a2-814a-5fde3ae57b85-0-00001.parquet
cat: java.lang.NullPointerException

This is important, as if a machine fails it might mean all large downloads will start failing until (perhaps) HDFS is reconfigured.

I suggest trying to reproduce this on Dev, e.g. creating 100 files, shutting down a node, and seeing if those files can be retrieved with hdfs -cat. If it's not a straightforward fix, we need to find and document a workaround (e.g. using HDFS tools to mark a node as removed from the cluster?) in case we lose a node unexpectedly.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions