Transform csv file into json and ingest
Steps to transform a csv file and ingest it into an IKG.
Use case
There is a csv file for the nodes: the first column is the external_id, the other column are properties and the file is named after the node type wanted for the nodes.
The csv will be transformed into a valid json to ingest nodes into the IK platform.
There is a csv file for the relationships with the following columns: SourceId, SourceType, TargetId, TargetType.
The csv will be transformed into a valid json to ingest relationships into the IK platform.
Then the json files will be captured into an IKG.
Requirements
- API_TOKEN: AppAgent credentials created in the IndyKite Hub, using the REST endpoints or using Terraform for your Project / Application.
- NODES_API_URL: The capture API endpoint URL for ingesting Nodes
- RELS_API_URL: The capture API endpoint URL for ingesting relationships
- BATCH_SIZE: The number of nodes to send in each batch
- THREADS: The number of worker threads to use
Steps
1. Transform the nodes csv file into json.
2. Transform the relationships csv file into json.
3. Capture data into the IKG.
Step 1
Nodes csv file.
record_id,title
101,Hamlet
102,Othello
103,Macbeth
104,King Lear
105,Romeo and Juliet
106,A Midsummer Night's Dream
107,The Tempest
108,Twelfth Night
109,Julius Caesar
110,As you Like It
111,Much Ado About Nothing
112,The Merchant of Venice
113,Henry V
114,Richard III
115,Coriolanus
116,Taming of the Shrew
117,Antony and Cleopatra
118,Measure for Measure
119,The Winter's Tale
120,All's Well That Ends Well
Transform the nodes csv file into json.
"""""
Converts a CSV file into a JSON file for IK data ingestion
Copyright (C) 2025 Indykite
Version: 1.0.0
Description:
This script reads a CSV input file containing nodes, and
outputs a JSON file with the same data formatted for ingestion.
Usage:
python script.py input.csv output.json
Author:
alex.babeanu@indykite.com
"""
import os
import csv
import json
import sys
# --- Function to convert CSV to JSON ---
def csv_to_json(csv_file, json_file, is_identity):
"""
Converts a CSV file containing nodes into a JSON file for IK data ingestion.
The input CSV file is expected to have the first column as the external_id and the rest as properties.
Args:
csv_file (String): The input CSV file containing nodes.
json_file (String): The output JSON file for IK data ingestion.
is_identity (String|Bool): Whether nodes are identities ("true"/"false").
"""
node_type = os.path.splitext(os.path.basename(csv_file))[0] # Get filename without extension
def coerce_value(raw_value):
# Trim whitespace
if raw_value is None:
return None
value = raw_value.strip()
# Empty string -> None (skip later)
if value == "":
return None
# Booleans
low = value.lower()
if low == "true":
return True
if low == "false":
return False
# Integers
try:
if value.isdigit() or (value.startswith("-") and value[1:].isdigit()):
return int(value)
except Exception:
pass
# Floats
try:
if any(ch in value for ch in [".", "e", "E"]):
return float(value)
except Exception:
pass
# Default to string
return value
with open(csv_file, newline='', encoding='utf-8') as file, open(json_file, "w", encoding="utf-8") as outfile:
reader = csv.DictReader(file)
# Start object with nodes array
outfile.write("{
"nodes": [")
first = True
for row in reader:
if not first:
outfile.write(",
") # Add comma before each new node (except the first)
first = False
keys = list(row.keys())
external_id_field = keys[0] if keys else None
external_id = row.get(external_id_field, "") if external_id_field else ""
properties = []
for key, raw_value in row.items():
if key == external_id_field:
continue # Skip external_id column from properties
coerced = coerce_value(raw_value)
if coerced is None:
continue # skip empty values
properties.append({
"type": key,
"value": coerced
})
node = {
"external_id": external_id,
"type": node_type,
"is_identity": str(is_identity).lower() != "false",
"properties": properties
}
outfile.write(json.dumps(node, ensure_ascii=False))
outfile.write("]
}") # Close array and object
print(f"Formatted JSON output (nodes array) saved to {json_file}")
# -------------
# Main function
# -------------
if __name__ == "__main__":
if len(sys.argv) < 3:
print("Usage: python script.py input.csv output.json is_identity")
else:
csv_to_json(sys.argv[1], sys.argv[2],sys.argv[3])
Nodes json file.
{
"nodes": [
{
"external_id": "101",
"type": "Record",
"is_identity": false,
"properties": [
{
"type": "record_id",
"value": 101
},
{
"type": "title",
"value": "Hamlet"
}
]
},
{
"external_id": "102",
"type": "Record",
"is_identity": false,
"properties": [
{
"type": "record_id",
"value": 102
},
{
"type": "title",
"value": "Othello"
}
]
},
{
"external_id": "103",
"type": "Record",
"is_identity": false,
"properties": [
{
"type": "record_id",
"value": 103
},
{
"type": "title",
"value": "Macbeth"
}
]
},
{
"external_id": "104",
"type": "Record",
"is_identity": false,
"properties": [
{
"type": "record_id",
"value": 104
},
{
"type": "title",
"value": "King Lear"
}
]
},
{
"external_id": "105",
"type": "Record",
"is_identity": false,
"properties": [
{
"type": "record_id",
"value": 105
},
{
"type": "title",
"value": "Romeo and Juliet"
}
]
},
{
"external_id": "106",
"type": "Record",
"is_identity": false,
"properties": [
{
"type": "record_id",
"value": {
"integerValue": 106
}
},
{
"type": "title",
"value": "A Midsummer Night's Dream"
}
]
},
{
"external_id": "107",
"type": "Record",
"is_identity": false,
"properties": [
{
"type": "record_id",
"value": 107
},
{
"type": "title",
"value": "The Tempest"
}
]
},
{
"external_id": "108",
"type": "Record",
"is_identity": false,
"properties": [
{
"type": "record_id",
"value": 108
},
{
"type": "title",
"value": "Twelfth Night"
}
]
},
{
"external_id": "109",
"type": "Record",
"is_identity": false,
"properties": [
{
"type": "record_id",
"value": 109
},
{
"type": "title",
"value": "Julius Caesar"
}
]
},
{
"external_id": "110",
"type": "Record",
"is_identity": false,
"properties": [
{
"type": "record_id",
"value": 110
},
{
"type": "title",
"value": "As you Like It"
}
]
},
{
"external_id": "111",
"type": "Record",
"is_identity": false,
"properties": [
{
"type": "record_id",
"value": 111
},
{
"type": "title",
"value": "Much Ado About Nothing"
}
]
},
{
"external_id": "112",
"type": "Record",
"is_identity": false,
"properties": [
{
"type": "record_id",
"value": 112
},
{
"type": "title",
"value": "The Merchant of Venice"
}
]
},
{
"external_id": "113",
"type": "Record",
"is_identity": false,
"properties": [
{
"type": "record_id",
"value": {
"integerValue": 113
}
},
{
"type": "title",
"value": "Henry V"
}
]
},
{
"external_id": "114",
"type": "Record",
"is_identity": false,
"properties": [
{
"type": "record_id",
"value": 114
},
{
"type": "title",
"value": "Richard III"
}
]
},
{
"external_id": "115",
"type": "Record",
"is_identity": false,
"properties": [
{
"type": "record_id",
"value": 115
},
{
"type": "title",
"value": "Coriolanus"
}
]
},
{
"external_id": "116",
"type": "Record",
"is_identity": false,
"properties": [
{
"type": "record_id",
"value": 116
},
{
"type": "title",
"value": "Taming of the Shrew"
}
]
},
{
"external_id": "117",
"type": "Record",
"is_identity": false,
"properties": [
{
"type": "record_id",
"value": 117
},
{
"type": "title",
"value": "Antony and Cleopatra"
}
]
},
{
"external_id": "118",
"type": "Record",
"is_identity": false,
"properties": [
{
"type": "record_id",
"value": 118
},
{
"type": "title",
"value": "Measure for Measure"
}
]
},
{
"external_id": "119",
"type": "Record",
"is_identity": false,
"properties": [
{
"type": "record_id",
"value": 119
},
{
"type": "title",
"value": "The Winter's Tale"
}
]
},
{
"external_id": "120",
"type": "Record",
"is_identity": false,
"properties": [
{
"type": "record_id",
"value": 120
},
{
"type": "title",
"value": "All's Well That Ends Well"
}
]
}
]
}Step 2
Relationships csv file.
SourceId, SourceType, TargetId, TargetType
101,record,102,record
103,record,104,record
105,record,106,record
107,record,108,record
109,record,110,record
111,record,112,record
Transform the relationships csv file into json.
"""""
Converts a CSV file into a JSON file for IK data ingestion
Specifically for relationships
Copyright (C) 2025 Indykite
Version: 1.0.0
Description:
This script reads a CSV input file containing Relationships, and
outputs a JSON file with the same data formatted for ingestion.
The inoput CSV file is expected to have the follwoing columns:
SourceId,SourceType,TargetId,TargetType
Usage:
python script.py input.csv output.json REL_Label
Author:
alex.babeanu@indykite.com
"""
import csv
import json
import sys
# --- Function to convert CSV to JSON ---
def convert_csv_to_json(input_file, output_file, rel_label):
"""
Converts a CSV file containing relationships into a JSON file for IK data ingestion.
The input CSV file is expected to have the following columns: SourceId, SourceType, TargetId, TargetType.
Output format:
{
"relationships": [
{
"source": {"external_id": "...", "type": "..."},
"target": {"external_id": "...", "type": "..."},
"type": "REL",
"properties": []
}
]
}
"""
relationships = []
with open(input_file, mode='r', newline='', encoding='utf-8') as infile:
reader = csv.DictReader(infile)
for row in reader:
json_entry = {
"source": {
"external_id": row["SourceId"],
"type": row["SourceType"]
},
"target": {
"external_id": row["TargetId"],
"type": row["TargetType"]
},
"type": rel_label,
"properties": []
}
relationships.append(json_entry)
output_wrapper = {"relationships": relationships}
with open(output_file, mode='w', encoding='utf-8') as outfile:
json.dump(output_wrapper, outfile, ensure_ascii=False, indent=2)
# --- Main function ---
if __name__ == "__main__":
if len(sys.argv) != 4:
print("Usage: python convert2RelJSON.py <input_csv> <output_json> <REL_Label>")
sys.exit(1)
input_csv = sys.argv[1]
output_json = sys.argv[2]
rel_label = sys.argv[3]
convert_csv_to_json(input_csv, output_json, rel_label)
Relationships json file.
{
"relationships": [
{
"source": {
"external_id": "101",
"type": "record"
},
"target": {
"external_id": "102",
"type": "record"
},
"type": "LIVES_WITH",
"properties": []
},
{
"source": {
"external_id": "103",
"type": "record"
},
"target": {
"external_id": "104",
"type": "record"
},
"type": "LIVES_WITH",
"properties": []
},
{
"source": {
"external_id": "105",
"type": "record"
},
"target": {
"external_id": "106",
"type": "record"
},
"type": "LIVES_WITH",
"properties": []
},
{
"source": {
"external_id": "107",
"type": "record"
},
"target": {
"external_id": "108",
"type": "record"
},
"type": "LIVES_WITH",
"properties": []
},
{
"source": {
"external_id": "109",
"type": "record"
},
"target": {
"external_id": "110",
"type": "record"
},
"type": "LIVES_WITH",
"properties": []
},
{
"source": {
"external_id": "111",
"type": "record"
},
"target": {
"external_id": "112",
"type": "record"
},
"type": "LIVES_WITH",
"properties": []
}
]
}Step 3
Ingest the nodes needed for this use case.
"""""
JSON Nodes IngestionProcessor
Copyright (C) 2025 Indykite
Version: 1.0.0
Description:
This script reads a JSON file containing nodes or relationships,
processes them in batches and sends them as POST requests to a
specified Indykite API endpoint concurrently.
Usage:
python ingest_batchedNodes.py Mode input.json error.file
- "Mode:" can be either "nodes" or "relationships"
- "input.json" is the JSON file containing nodes or relationships
- "error.file" is the output file for failed payloads
Environment Variables:
NODES_API_URL - The API endpoint URL for ingesting Nodes
RELS_API_URL - The API endpoint URL for ingesting relationships
API_TOKEN - The authentication token for API requests
BATCH_SIZE - The number of nodes to send in each batch
THREADS - The number of worker threads to use
Authors:
zain.rizvi@indykite.com
alex.babeanu@indykite.com
"""
import os
import json
import sys
import requests
import concurrent.futures
import time
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Define the REST API endpoint
NODES_API_URL = os.getenv("NODES_API_URL")
RELS_API_URL = os.getenv("RELS_API_URL")
# Define headers for the requests
HEADERS = {
"Content-Type": "application/json",
"Authorization": f"Bearer {os.getenv('API_TOKEN')}"
}
# Ingest Batch Size
BATCH_SIZE = int(os.getenv("BATCH_SIZE"))
# Number of worker threads to use
THREADS = int(os.getenv("THREADS"))
# output errors file initialization
errors_file = ""
# ---------- Function to send batched POST requests ------------
def send_post_request(batch, mode, retries=3):
"""_summary_
Sends a batch of nodes to the Ingest API endpoint with retries.
Returns:
_type_: API responses or errors
"""
attempt = 0
# Use provided API URL based on the mode
API_URL = {
"nodes": NODES_API_URL,
"relationships": RELS_API_URL
}.get(mode, None)
if not API_URL:
raise ValueError(f"Invalid mode: {mode}. Expected 'nodes' or 'relationships'.")
while attempt < retries:
try:
response = requests.post(API_URL, headers=HEADERS, json={mode: batch})
if response.status_code == 200:
return {
"status_code": response.status_code,
"response": response.json() if response.content else None
}
else:
print(f"Error response for batch: {response.status_code} - {response.text}")
except Exception as e:
print(f"Exception for batch on attempt {attempt + 1}: {e}")
attempt += 1
# Write the failed payload to an output file if all retries fail
with open(errors_file, "a") as error_file:
error_file.write(f"--- Failed Batch:
{batch}
")
return {"error": f"Failed after {retries} attempts"}
# ------------ Main function ------------
if __name__ == "__main__":
if len(sys.argv) < 4:
print("Usage: python ingest_batchData.py <mode> <input.json> <error.file>")
sys.exit(1)
mode = sys.argv[1]
json_file = sys.argv[2]
errors_file = sys.argv[3]
# Record start time in milliseconds
start_time = int(time.time() * 1000)
# Read input JSON file
with open(json_file, "r", encoding="utf-8") as file:
nodes = json.load(file)
# Total nb of nodes
total_nodes = len(nodes)
print(f"Total nodes: {total_nodes}")
# Split nodes into batches
batches = [nodes[i:i + BATCH_SIZE] for i in range(0, len(nodes), BATCH_SIZE)]
print(f"Total batches: {len(batches)}")
# Use ThreadPoolExecutor to send batches concurrently
with concurrent.futures.ThreadPoolExecutor(max_workers=THREADS) as executor:
future_to_batch = {executor.submit(send_post_request, batch, mode): batch for batch in batches}
for future in concurrent.futures.as_completed(future_to_batch):
batch = future_to_batch[future]
try:
result = future.result()
if "error" in result:
print(f"Error for batch: {result['error']}")
else:
print(f"Batch sent successfully: {result['status_code']}")
except Exception as e:
print(f"Unexpected error for batch: {e}")
# Record end time in milliseconds
end_time = int(time.time() * 1000)
# Calculate duration
duration = end_time - start_time
print(f"Total execution time: {duration} ms")
print(f"Total number of entries processed: {total_nodes}")
## Rest endpoints definitions: https://openapi.indykite.com.