Skip to content

TIL- Converting Parquet Files to Vespa Documents

Published: at 03:42 AM

TIL: Converting Parquet Files to Vespa Documents

Today I learned how to convert Parquet files into Vespa documents using a Java program. Here’s a step-by-step guide on how this process works.

Introduction

Parquet is a columnar storage file format optimized for performance in big data processing, while Vespa is a platform for building applications with large-scale data resources. Converting data from Parquet to Vespa documents can be crucial for indexing and search optimization.

The Java Program

Below is a Java program designed to read Parquet files, extract data, and transform them into Vespa documents stored in a JSON file.

Main Steps

  1. Setup Base Directory: Define the base directory containing the Parquet files.
  2. Directory Validation: Check if the base directory exists and is valid.
  3. Iterate Through Folders: For each subfolder, create a corresponding JSON output file.
  4. Read Parquet Files: Use the CarpetReader to read each Parquet file.
  5. Convert Records: Convert each record into a Vespa document.
  6. Handle Data Formatting: Special attention is given to date fields to ensure correct format.
  7. Write to JSON: Serialize the Vespa documents into a JSON file.

Code Snippets

1. Reading Parquet Files Using Carpet Library

The following snippet demonstrates how to read Parquet files using the CarpetReader library and convert them into a list of IngestedRecord objects.

for (File parquetFile : Objects.requireNonNull(parentFolder.listFiles((dir, name) -> name.endsWith(".parquet")))) {
    System.out.println("Processing file: " + parquetFile.getAbsolutePath());
    try {
        List<IngestedRecord> records = new CarpetReader<>(parquetFile, IngestedRecord.class)
                .withFailOnNullForPrimitives(true)
                .toList();
        // Processing the records will be done in the next step
    } catch (com.jerolba.carpet.RecordTypeConversionException e) {
        System.err.println("Type conversion issue: " + e.getMessage());
    } catch (IOException e) {
        System.err.println("IO issue: " + e.getMessage());
    } catch (Exception e) {
        System.err.println("General issue: " + e.getMessage());
    }
}

2. Creating Vespa Document Representation

After reading the Parquet records, this snippet converts each IngestedRecord into a VespaDocument, properly handling necessary fields such as the date.

for (IngestedRecord record : records) {
    VespaDocument document = new VespaDocument();
    Map<String, Object> fields = new HashMap<>();
    document.setPut("id:pitem:pitem::" + record.reference_id());
    fields.put("reference_id", record.reference_id());
    fields.put("title", record.title());
    fields.put("url", record.url());
    fields.put("body", record.article_body());

    // Handle the date field properly
    String dateString = record.date();
    if (dateString != null && !"None".equalsIgnoreCase(dateString)) {
        try {
            fields.put("date", Instant.parse(dateString).getEpochSecond());
        } catch (DateTimeParseException e) {
            System.err.println("Date parsing issue for value: " + dateString + " - " + e.getMessage());
            fields.put("date", null); // Handle or log the issue accordingly
        }
    } else {
        fields.put("date", null); // Handle the 'None' case or any invalid date as per requirements
    }
    document.setFields(fields);
    vespaDocuments.add(document);
}

3. Writing the Vespa Documents to Output JSON

Finally, this snippet handles writing the accumulated VespaDocuments list to a JSON file for each processed parent directory.

String outputJsonFile = parentFolder.getAbsolutePath() + ".json";
try (FileWriter writer = new FileWriter(outputJsonFile)) {
    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.writeValue(writer, vespaDocuments);
} catch (IOException e) {
    System.err.println("Writing JSON issue: " + e.getMessage());
}

Full Picture

Combining the snippets, here’s how they fit within the context:

for (File parentFolder : Objects.requireNonNull(baseDir.listFiles(File::isDirectory))) {
    String outputJsonFile = parentFolder.getAbsolutePath() + ".json";
    List<VespaDocument> vespaDocuments = new ArrayList<>();

    for (File parquetFile : Objects.requireNonNull(parentFolder.listFiles((dir, name) -> name.endsWith(".parquet")))) {
        System.out.println("Processing file: " + parquetFile.getAbsolutePath());
        try {
            List<IngestedRecord> records = new CarpetReader<>(parquetFile, IngestedRecord.class)
                    .withFailOnNullForPrimitives(true)
                    .toList();

            for (IngestedRecord record : records) {
                VespaDocument document = new VespaDocument();
                Map<String, Object> fields = new HashMap<>();
                document.setPut("id:pitem:pitem::" + record.reference_id());
                fields.put("reference_id", record.reference_id());
                fields.put("title", record.title());
                fields.put("url", record.url());
                fields.put("body", record.article_body());

                // Handle the date field properly
                String dateString = record.date();
                if (dateString != null && !"None".equalsIgnoreCase(dateString)) {
                    try {
                        fields.put("date", Instant.parse(dateString).getEpochSecond());
                    } catch (DateTimeParseException e) {
                        System.err.println("Date parsing issue for value: " + dateString + " - " + e.getMessage());
                        fields.put("date", null); // Handle or log the issue accordingly
                    }
                } else {
                    fields.put("date", null); // Handle the 'None' case or any invalid date as per requirements
                }
                document.setFields(fields);
                vespaDocuments.add(document);
            }
        } catch (com.jerolba.carpet.RecordTypeConversionException e) {
            System.err.println("Type conversion issue: " + e.getMessage());
        } catch (IOException e) {
            System.err.println("IO issue: " + e.getMessage());
        } catch (Exception e) {
            System.err.println("General issue: " + e.getMessage());
        }
    }

    try (FileWriter writer = new FileWriter(outputJsonFile)) {
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.writeValue(writer, vespaDocuments);
    } catch (IOException e) {
        System.err.println("Writing JSON issue: " + e.getMessage());
    }
}

Key Considerations

Conclusion

Converting Parquet files to Vespa documents involves careful parsing, data transformation, and serialization. This Java program efficiently accomplishes the task while ensuring robust error handling and data validation.