Back to all tools
Capture

Transform csv file into json and ingest

Transform csv file into json and ingest into the IK platform.

Transform csv file into json and ingest

Steps to transform a csv file and ingest it into an IKG.

Use case

There is a csv file for the nodes: the first column is the external_id, the other column are properties and the file is named after the node type wanted for the nodes.

The csv will be transformed into a valid json to ingest nodes into the IK platform.

There is a csv file for the relationships with the following columns: SourceId, SourceType, TargetId, TargetType.

The csv will be transformed into a valid json to ingest relationships into the IK platform.

Then the json files will be captured into an IKG.

Requirements

- API_TOKEN: AppAgent credentials created in the IndyKite Hub, using the REST endpoints or using Terraform for your Project / Application.

- NODES_API_URL: The capture API endpoint URL for ingesting Nodes

- RELS_API_URL: The capture API endpoint URL for ingesting relationships

- BATCH_SIZE: The number of nodes to send in each batch

- THREADS: The number of worker threads to use

Steps

1. Transform the nodes csv file into json.

2. Transform the relationships csv file into json.

3. Capture data into the IKG.

Step 1

Nodes csv file.

csv records.csv

record_id,title
101,Hamlet
102,Othello
103,Macbeth
104,King Lear
105,Romeo and Juliet
106,A Midsummer Night's Dream
107,The Tempest
108,Twelfth Night
109,Julius Caesar
110,As you Like It
111,Much Ado About Nothing
112,The Merchant of Venice
113,Henry V
114,Richard III
115,Coriolanus
116,Taming of the Shrew
117,Antony and Cleopatra
118,Measure for Measure
119,The Winter's Tale
120,All's Well That Ends Well

Transform the nodes csv file into json.

python convert2NodeJSON.py

"""""
Converts a CSV file into a JSON file for IK data ingestion

Copyright (C) 2025 Indykite
Version: 1.0.0

Description:
This script reads a CSV input file containing nodes, and
outputs a JSON file with the same data formatted for ingestion.

Usage:
    python script.py input.csv output.json

Author:
    alex.babeanu@indykite.com
"""

import os
import csv
import json
import sys

# --- Function to convert CSV to JSON ---
def csv_to_json(csv_file, json_file, is_identity):
    """
        Converts a CSV file containing nodes into a JSON file for IK data ingestion.
        The input CSV file is expected to have the first column as the external_id and the rest as properties.
    Args:
        csv_file (String): The input CSV file containing nodes.
        json_file (String): The output JSON file for IK data ingestion.
        is_identity (String|Bool): Whether nodes are identities ("true"/"false").
    """
    node_type = os.path.splitext(os.path.basename(csv_file))[0]  # Get filename without extension

    def coerce_value(raw_value):
        # Trim whitespace
        if raw_value is None:
            return None
        value = raw_value.strip()
        # Empty string -> None (skip later)
        if value == "":
            return None
        # Booleans
        low = value.lower()
        if low == "true":
            return True
        if low == "false":
            return False
        # Integers
        try:
            if value.isdigit() or (value.startswith("-") and value[1:].isdigit()):
                return int(value)
        except Exception:
            pass
        # Floats
        try:
            if any(ch in value for ch in [".", "e", "E"]):
                return float(value)
        except Exception:
            pass
        # Default to string
        return value

    with open(csv_file, newline='', encoding='utf-8') as file, open(json_file, "w", encoding="utf-8") as outfile:
        reader = csv.DictReader(file)
        # Start object with nodes array
        outfile.write("{
	"nodes": [")
        first = True

        for row in reader:
            if not first:
                outfile.write(",
")  # Add comma before each new node (except the first)
            first = False

            keys = list(row.keys())
            external_id_field = keys[0] if keys else None
            external_id = row.get(external_id_field, "") if external_id_field else ""

            properties = []
            for key, raw_value in row.items():
                if key == external_id_field:
                    continue  # Skip external_id column from properties
                coerced = coerce_value(raw_value)
                if coerced is None:
                    continue  # skip empty values
                properties.append({
                    "type": key,
                    "value": coerced
                })

            node = {
                "external_id": external_id,
                "type": node_type,
                "is_identity": str(is_identity).lower() != "false",
                "properties": properties
            }

            outfile.write(json.dumps(node, ensure_ascii=False))

        outfile.write("]
}")  # Close array and object

    print(f"Formatted JSON output (nodes array) saved to {json_file}")

# -------------
# Main function
# -------------
if __name__ == "__main__":
    if len(sys.argv) < 3:
        print("Usage: python script.py input.csv output.json is_identity")
    else:
        csv_to_json(sys.argv[1], sys.argv[2],sys.argv[3])

Nodes json file.

json nodes
{
  "nodes": [
    {
      "external_id": "101",
      "type": "Record",
      "is_identity": false,
      "properties": [
        {
          "type": "record_id",
          "value": 101
        },
        {
          "type": "title",
          "value": "Hamlet"
        }
      ]
    },
    {
      "external_id": "102",
      "type": "Record",
      "is_identity": false,
      "properties": [
        {
          "type": "record_id",
          "value": 102
        },
        {
          "type": "title",
          "value": "Othello"
        }
      ]
    },
    {
      "external_id": "103",
      "type": "Record",
      "is_identity": false,
      "properties": [
        {
          "type": "record_id",
          "value": 103
        },
        {
          "type": "title",
          "value": "Macbeth"
        }
      ]
    },
    {
      "external_id": "104",
      "type": "Record",
      "is_identity": false,
      "properties": [
        {
          "type": "record_id",
          "value": 104
        },
        {
          "type": "title",
          "value": "King Lear"
        }
      ]
    },
    {
      "external_id": "105",
      "type": "Record",
      "is_identity": false,
      "properties": [
        {
          "type": "record_id",
          "value": 105
        },
        {
          "type": "title",
          "value": "Romeo and Juliet"
        }
      ]
    },
    {
      "external_id": "106",
      "type": "Record",
      "is_identity": false,
      "properties": [
        {
          "type": "record_id",
          "value": {
            "integerValue": 106
          }
        },
        {
          "type": "title",
          "value": "A Midsummer Night's Dream"
        }
      ]
    },
    {
      "external_id": "107",
      "type": "Record",
      "is_identity": false,
      "properties": [
        {
          "type": "record_id",
          "value": 107
        },
        {
          "type": "title",
          "value": "The Tempest"
        }
      ]
    },
    {
      "external_id": "108",
      "type": "Record",
      "is_identity": false,
      "properties": [
        {
          "type": "record_id",
          "value": 108
        },
        {
          "type": "title",
          "value": "Twelfth Night"
        }
      ]
    },
    {
      "external_id": "109",
      "type": "Record",
      "is_identity": false,
      "properties": [
        {
          "type": "record_id",
          "value": 109
        },
        {
          "type": "title",
          "value": "Julius Caesar"
        }
      ]
    },
    {
      "external_id": "110",
      "type": "Record",
      "is_identity": false,
      "properties": [
        {
          "type": "record_id",
          "value": 110
        },
        {
          "type": "title",
          "value": "As you Like It"
        }
      ]
    },
    {
      "external_id": "111",
      "type": "Record",
      "is_identity": false,
      "properties": [
        {
          "type": "record_id",
          "value": 111
        },
        {
          "type": "title",
          "value": "Much Ado About Nothing"
        }
      ]
    },
    {
      "external_id": "112",
      "type": "Record",
      "is_identity": false,
      "properties": [
        {
          "type": "record_id",
          "value": 112
        },
        {
          "type": "title",
          "value": "The Merchant of Venice"
        }
      ]
    },
    {
      "external_id": "113",
      "type": "Record",
      "is_identity": false,
      "properties": [
        {
          "type": "record_id",
          "value": {
            "integerValue": 113
          }
        },
        {
          "type": "title",
          "value": "Henry V"
        }
      ]
    },
    {
      "external_id": "114",
      "type": "Record",
      "is_identity": false,
      "properties": [
        {
          "type": "record_id",
          "value": 114
        },
        {
          "type": "title",
          "value": "Richard III"
        }
      ]
    },
    {
      "external_id": "115",
      "type": "Record",
      "is_identity": false,
      "properties": [
        {
          "type": "record_id",
          "value": 115
        },
        {
          "type": "title",
          "value": "Coriolanus"
        }
      ]
    },
    {
      "external_id": "116",
      "type": "Record",
      "is_identity": false,
      "properties": [
        {
          "type": "record_id",
          "value": 116
        },
        {
          "type": "title",
          "value": "Taming of the Shrew"
        }
      ]
    },
    {
      "external_id": "117",
      "type": "Record",
      "is_identity": false,
      "properties": [
        {
          "type": "record_id",
          "value": 117
        },
        {
          "type": "title",
          "value": "Antony and Cleopatra"
        }
      ]
    },
    {
      "external_id": "118",
      "type": "Record",
      "is_identity": false,
      "properties": [
        {
          "type": "record_id",
          "value": 118
        },
        {
          "type": "title",
          "value": "Measure for Measure"
        }
      ]
    },
    {
      "external_id": "119",
      "type": "Record",
      "is_identity": false,
      "properties": [
        {
          "type": "record_id",
          "value": 119
        },
        {
          "type": "title",
          "value": "The Winter's Tale"
        }
      ]
    },
    {
      "external_id": "120",
      "type": "Record",
      "is_identity": false,
      "properties": [
        {
          "type": "record_id",
          "value": 120
        },
        {
          "type": "title",
          "value": "All's Well That Ends Well"
        }
      ]
    }
  ]
}

Step 2

Relationships csv file.

csv relationships

SourceId, SourceType, TargetId, TargetType
101,record,102,record
103,record,104,record
105,record,106,record
107,record,108,record
109,record,110,record
111,record,112,record

Transform the relationships csv file into json.

python convert2RelJSON.py

"""""
Converts a CSV file into a JSON file for IK data ingestion
Specifically for relationships

Copyright (C) 2025 Indykite
Version: 1.0.0

Description:
This script reads a CSV input file containing Relationships, and
outputs a JSON file with the same data formatted for ingestion.

The inoput CSV file is expected to have the follwoing columns:

    SourceId,SourceType,TargetId,TargetType

Usage:
    python script.py input.csv output.json REL_Label

Author:
    alex.babeanu@indykite.com
"""
import csv
import json
import sys

# --- Function to convert CSV to JSON ---
def convert_csv_to_json(input_file, output_file, rel_label):
    """
        Converts a CSV file containing relationships into a JSON file for IK data ingestion.
        The input CSV file is expected to have the following columns: SourceId, SourceType, TargetId, TargetType.
        Output format:
        {
          "relationships": [
            {
              "source": {"external_id": "...", "type": "..."},
              "target": {"external_id": "...", "type": "..."},
              "type": "REL",
              "properties": []
            }
          ]
        }
    """
    relationships = []

    with open(input_file, mode='r', newline='', encoding='utf-8') as infile:
        reader = csv.DictReader(infile)

        for row in reader:
            json_entry = {
                "source": {
                    "external_id": row["SourceId"],
                    "type": row["SourceType"]
                },
                "target": {
                    "external_id": row["TargetId"],
                    "type": row["TargetType"]
                },
                "type": rel_label,
                "properties": []
            }
            relationships.append(json_entry)

    output_wrapper = {"relationships": relationships}
    with open(output_file, mode='w', encoding='utf-8') as outfile:
        json.dump(output_wrapper, outfile, ensure_ascii=False, indent=2)

# --- Main function ---
if __name__ == "__main__":
    if len(sys.argv) != 4:
        print("Usage: python convert2RelJSON.py <input_csv> <output_json> <REL_Label>")
        sys.exit(1)

    input_csv = sys.argv[1]
    output_json = sys.argv[2]
    rel_label = sys.argv[3]

    convert_csv_to_json(input_csv, output_json, rel_label)

Relationships json file.

json relationships
{
  "relationships": [
    {
      "source": {
        "external_id": "101",
        "type": "record"
      },
      "target": {
        "external_id": "102",
        "type": "record"
      },
      "type": "LIVES_WITH",
      "properties": []
    },
    {
      "source": {
        "external_id": "103",
        "type": "record"
      },
      "target": {
        "external_id": "104",
        "type": "record"
      },
      "type": "LIVES_WITH",
      "properties": []
    },
    {
      "source": {
        "external_id": "105",
        "type": "record"
      },
      "target": {
        "external_id": "106",
        "type": "record"
      },
      "type": "LIVES_WITH",
      "properties": []
    },
    {
      "source": {
        "external_id": "107",
        "type": "record"
      },
      "target": {
        "external_id": "108",
        "type": "record"
      },
      "type": "LIVES_WITH",
      "properties": []
    },
    {
      "source": {
        "external_id": "109",
        "type": "record"
      },
      "target": {
        "external_id": "110",
        "type": "record"
      },
      "type": "LIVES_WITH",
      "properties": []
    },
    {
      "source": {
        "external_id": "111",
        "type": "record"
      },
      "target": {
        "external_id": "112",
        "type": "record"
      },
      "type": "LIVES_WITH",
      "properties": []
    }
  ]
}

Step 3

Ingest the nodes needed for this use case.

python ingest_batchData.py

"""""
JSON Nodes IngestionProcessor

Copyright (C) 2025 Indykite
Version: 1.0.0

Description:
This script reads a JSON file containing nodes or relationships,
processes them in batches and sends them as POST requests to a
specified Indykite API endpoint concurrently.

Usage:
    python ingest_batchedNodes.py Mode input.json error.file

    - "Mode:" can be either "nodes" or "relationships"
    - "input.json" is the JSON file containing nodes or relationships
    - "error.file" is the output file for failed payloads

Environment Variables:
    NODES_API_URL  - The API endpoint URL for ingesting Nodes
    RELS_API_URL  - The API endpoint URL for ingesting relationships
    API_TOKEN - The authentication token for API requests
    BATCH_SIZE - The number of nodes to send in each batch
    THREADS - The number of worker threads to use

Authors:
    zain.rizvi@indykite.com
    alex.babeanu@indykite.com
"""
import os
import json
import sys
import requests
import concurrent.futures
import time
from dotenv import load_dotenv

# Load environment variables
load_dotenv()
# Define the REST API endpoint
NODES_API_URL = os.getenv("NODES_API_URL")
RELS_API_URL = os.getenv("RELS_API_URL")
# Define headers for the requests
HEADERS = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {os.getenv('API_TOKEN')}"
}
# Ingest Batch Size
BATCH_SIZE = int(os.getenv("BATCH_SIZE"))
# Number of worker threads to use
THREADS = int(os.getenv("THREADS"))
# output errors file initialization
errors_file = ""

# ---------- Function to send batched POST requests ------------
def send_post_request(batch, mode, retries=3):
    """_summary_
        Sends a batch of nodes to the Ingest API endpoint with retries.

    Returns:
        _type_: API responses or errors
    """
    attempt = 0

    # Use provided API URL based on the mode
    API_URL = {
        "nodes": NODES_API_URL,
        "relationships": RELS_API_URL
    }.get(mode, None)
    if not API_URL:
        raise ValueError(f"Invalid mode: {mode}. Expected 'nodes' or 'relationships'.")

    while attempt < retries:
        try:
            response = requests.post(API_URL, headers=HEADERS, json={mode: batch})
            if response.status_code == 200:
                return {
                    "status_code": response.status_code,
                    "response": response.json() if response.content else None
                }
            else:
                print(f"Error response for batch: {response.status_code} - {response.text}")
        except Exception as e:
            print(f"Exception for batch on attempt {attempt + 1}: {e}")
        attempt += 1

    # Write the failed payload to an output file if all retries fail
    with open(errors_file, "a") as error_file:
        error_file.write(f"--- Failed Batch:
 {batch}
")

    return {"error": f"Failed after {retries} attempts"}

# ------------ Main function ------------
if __name__ == "__main__":
    if len(sys.argv) < 4:
        print("Usage: python ingest_batchData.py <mode> <input.json> <error.file>")
        sys.exit(1)

    mode = sys.argv[1]
    json_file = sys.argv[2]
    errors_file = sys.argv[3]

    # Record start time in milliseconds
    start_time = int(time.time() * 1000)

    # Read input JSON file
    with open(json_file, "r", encoding="utf-8") as file:
        nodes = json.load(file)

    # Total nb of nodes
    total_nodes = len(nodes)
    print(f"Total nodes: {total_nodes}")

    # Split nodes into batches
    batches = [nodes[i:i + BATCH_SIZE] for i in range(0, len(nodes), BATCH_SIZE)]
    print(f"Total batches: {len(batches)}")

    # Use ThreadPoolExecutor to send batches concurrently
    with concurrent.futures.ThreadPoolExecutor(max_workers=THREADS) as executor:
        future_to_batch = {executor.submit(send_post_request, batch, mode): batch for batch in batches}

        for future in concurrent.futures.as_completed(future_to_batch):
            batch = future_to_batch[future]
            try:
                result = future.result()
                if "error" in result:
                    print(f"Error for batch: {result['error']}")
                else:
                    print(f"Batch sent successfully: {result['status_code']}")
            except Exception as e:
                print(f"Unexpected error for batch: {e}")

    # Record end time in milliseconds
    end_time = int(time.time() * 1000)
    # Calculate duration
    duration = end_time - start_time
    print(f"Total execution time: {duration} ms")
    print(f"Total number of entries processed: {total_nodes}")

    ## Rest endpoints definitions: https://openapi.indykite.com.