Sunday, July 21, 2024

Hive for an Iceberg test suite

I'm currently having trouble changing my catalog implementation from hadoop to hive for my test suite. The tests work fine as they are but they're backed by a local filesystem which is not atomic. For the purposes of testing, this seemed OK but the fact they were not really realistic was nagging me.

Spark can use numerous catalogs. You just need to set them up in SparkConf by giving the config their own namespaces below spark.sql.catalog. Your call to writeTo will use the default namespace unless another is defined.

If you use a Hive catalog, concurrent creation of a table will blow up with:

org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: [TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create table or view `database`.`concurrentwritespec` because it already exists.
Choose a different name, drop or replace the existing object, or add the IF NOT EXISTS clause to tolerate pre-existing objects.
at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$createTable$1(HiveExternalCatalog.scala:255)

Whilte backed by a local file system, the test correctly encounters an error but this time it's:

org.apache.spark.util.Utils - Aborting task
org.apache.iceberg.exceptions.CommitFailedException: Version 1 already exists: /tmp/SparkForTesting5181526160591714739/ConcurrentWriteSpec/metadata/v1.metadata.json

The behaviour for both is entirely expected but the actual Exceptions diff.

Meanwhile, a test that merely appended data blew up (see this test) using hive rather than hadoop failing with:

Cannot write into v1 table: `spark_catalog`.`database`.`spark_file_test_writeto`.

The reason for this currently eludes me but it seems I am not the only one.

Spark Datasource API V1 and V2

Amongst the problem with V1 was "Partitioning and Sorting is not propagated from the data sources, and thus, not used in the Spark optimizer" [Qorela]. V2 gives much greater freedom to developers of external stores, like various cloud solutions. For example Russel Spitzer says:
"Now V2 Predicate function pushdown is allowed (or at least should be coming soon) but for that you must use the datasource functions and not the spark wones" [Slack]

"Best practice for partitions is partition by low cardinality and sort for high cardanality. This may reduce the level of partitions you need" [Slack]
Another key difference between the two is atomicity. "Spark ships with two default Hadoop commit algorithms — version 1, which moves staged task output files to their final locations at the end of the job, and version 2, which moves files as individual job tasks complete...  the v2 Hadoop commit protocol is almost five times faster than v1. This is why in the latest Hadoop release, v2 is the default commit protocol... while v2 is faster, it also leaves behind partial results on job failures, breaking transactionality requirements. In practice, this means that with chained ETL jobs, a job failure — even if retried successfully — could duplicate some of the input data for downstream jobs." [Databricks]

Remediation attempts

Changing the table made no difference to the error:

      spark.sql(
        s"""
          |ALTER TABLE $fqn
          |SET TBLPROPERTIES ('format-version'='2')
      """.stripMargin)

So, I tried creating the table with:

... USING ICEBERG TBLPROPERTIES ('format-version'='2')

only for it to then complain upon table creation:

ERROR | o.a.h.h.metastore.RetryingHMSHandler - MetaException(message:Unable to update transaction database java.sql.SQLSyntaxErrorException: Table/View 'NEXT_LOCK_ID' does not exist.

coming from Hive when it calls Derby. "The Derby catalog doesn't support the interfaces (or concurrent access) required for iceberg to work" [GitHub issue]

So, for the time being at least, it seems that my tests will need to be somewhat unrealistic when it comes to atomicity.