ETL: Python
Table of Contents
1. Description
This driver is used to execute code with a Python interpreter. Python has to be installed and visible to the graphlytic OS level user.
2. Connection
Creates a Python environment used in the job.
Use “url” to specify pah to a Python binary that should be used. If no “url” is defined a global Python binary is assumed.
On the job startup, there is a “python --version” execution, just to check the path to the binary is correct. If Incorrect python binary defined.
error occurs, check again if the “url” is set correctly, or if the Python is installed correctly.
All parameter defined in the <connection> body are set as environment variables for the Python interpreter (can be used in the Python script).
Parameters
Name | Description | Example |
---|---|---|
url | Path to a Python binary in the OS. If no “url” is defined a global Python binary is assumed. | /path/to/python/binary/bin/python |
Example
<
connection
id
=
"python_connection"
driver
=
"python"
url
=
"/path/to/python/binary/bin/python"
>
OPENAI_API_KEY=...
OTHER_ENV=some value
</
connection
>
3. Query
Runs the Python script and returns data written to the output using “print()”.
Properties defined in the <properties> part of the job or returned by the previous job step can be referenced in the script (see example below).
Please use proper indentation in the Python script with the first line starting at the beginning of the line.
Otherwise, the Python interpreter will throw an indentation error.
Example
<
properties
>
prop1=some value
</
properties
>
<
query
connection-id
=
"python_connection"
>
print("this value will be returned for the next job step: $prop1")
</
query
>
4. Script
Not implemented for Python driver. Use Query instead.
5. Examples
Example 1: This example loads nodes, calculates embeddings with LangChain, and sets the embeddings as a node property
<!DOCTYPE etl SYSTEM "
https://scriptella.org/dtd/etl.dtd
">
<
etl
>
<
description
>Graphlytic job</
description
>
<!-- CONNECTIONS -->
<
connection
id
=
"logInfo"
driver
=
"log"
>
level=ERROR
</
connection
>
<
connection
id
=
"lc_example"
driver
=
"python"
url
=
"/path/to/python/binary/bin/python"
>
OPENAI_API_KEY=...
</
connection
>
<
connection
id
=
"graphdb"
driver
=
"graphConnection"
>
project_id=1
</
connection
>
<!-- JOB STEPS -->
<
query
connection-id
=
"graphdb"
>
MATCH (n) RETURN id(n) as id, n.title as title, n.description as description, n.year as year
<
query
connection-id
=
"lc_example"
>
from langchain_openai import ChatOpenAI
from langchain_openai import OpenAIEmbeddings
import json
embeddings_model = OpenAIEmbeddings()
embeddings = embeddings_model.embed_documents(
[
"$title",
"$description",
"$year"
]
)
print(json.dumps(embeddings))
<
script
connection-id
=
"logInfo"
>Result: $result</
script
>
<
script
connection-id
=
"graphdb"
>
MATCH (n) WHERE id(n)=$id SET n.openAiEmbedding='$result'
</
script
>
</
query
>
</
query
>
</
etl
>
Example 2: This example asks the openAi chatbot a question and the response is logged. Question is defined in job properties and dynamically inserted into the Python script.
<!DOCTYPE etl SYSTEM "
https://scriptella.org/dtd/etl.dtd
">
<
etl
>
<
description
>Graphlytic job</
description
>
<
properties
>
langChainRequest=How to use LangChain with graphlytic?
</
properties
>
<!-- CONNECTIONS -->
<
connection
id
=
"logInfo"
driver
=
"log"
>
level=ERROR
</
connection
>
<
connection
id
=
"lc_example"
driver
=
"python"
url
=
"/path/to/binary/bin/python"
>
OPENAI_API_KEY=...
</
connection
>
<!-- JOB STEPS -->
<
query
connection-id
=
"lc_example"
>
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
llm = ChatOpenAI(openai_api_key="...")
prompt = ChatPromptTemplate.from_messages([
("system", "You are world class technical documentation writer."),
("user", "{input}")
])
chain = prompt | llm
response = chain.invoke({"input": "$langChainRequest"})
print(response)
<
script
connection-id
=
"logInfo"
>Result: $result</
script
>
</
query
>
</
etl
>