Skip to main content

Python SDK

The official Python SDK for Klyntos allows you to execute workflows programmatically from your Python applications using the official Python SDK.

info

The Python SDK supports Python 3.8+ and provides synchronous workflow execution. All workflow executions are currently synchronous.

Installation

Install the SDK using pip:

pip install klyntos-sdk

Quick Start

Here's a simple example to get you started:

from klyntos import KlyntosClient

# Initialize the client
client = KlyntosClient(
api_key="your-api-key-here",
base_url="https://klyntos.com" # optional, defaults to https://klyntos.com
)

# Execute a workflow
try:
result = client.execute_workflow("workflow-id")
print("Workflow executed successfully:", result)
except Exception as error:
print("Workflow execution failed:", error)

API Reference

KlyntosClient

Constructor

KlyntosClient(api_key: str, base_url: str = "https://klyntos.com")

Parameters:

  • api_key (str): Your Klyntos API key
  • base_url (str, optional): Base URL for the Klyntos API

Methods

execute_workflow()

Execute a workflow with optional input data.

result = client.execute_workflow(
"workflow-id",
input_data={"message": "Hello, world!"},
timeout=30.0 # 30 seconds
)

Parameters:

  • workflow_id (str): The ID of the workflow to execute
  • input_data (dict, optional): Input data to pass to the workflow
  • timeout (float, optional): Timeout in seconds (default: 30.0)

Returns: WorkflowExecutionResult

get_workflow_status()

Get the status of a workflow (deployment status, etc.).

status = client.get_workflow_status("workflow-id")
print("Is deployed:", status.is_deployed)

Parameters:

  • workflow_id (str): The ID of the workflow

Returns: WorkflowStatus

validate_workflow()

Validate that a workflow is ready for execution.

is_ready = client.validate_workflow("workflow-id")
if is_ready:
# Workflow is deployed and ready
pass

Parameters:

  • workflow_id (str): The ID of the workflow

Returns: bool

execute_workflow_sync()
info

Currently, this method is identical to execute_workflow() since all executions are synchronous. This method is provided for future compatibility when asynchronous execution is added.

Execute a workflow (currently synchronous, same as execute_workflow()).

result = client.execute_workflow_sync(
"workflow-id",
input_data={"data": "some input"},
timeout=60.0
)

Parameters:

  • workflow_id (str): The ID of the workflow to execute
  • input_data (dict, optional): Input data to pass to the workflow
  • timeout (float): Timeout for the initial request in seconds

Returns: WorkflowExecutionResult

set_api_key()

Update the API key.

client.set_api_key("new-api-key")
set_base_url()

Update the base URL.

client.set_base_url("https://my-custom-domain.com")
close()

Close the underlying HTTP session.

client.close()

Data Classes

WorkflowExecutionResult

@dataclass
class WorkflowExecutionResult:
success: bool
output: Optional[Any] = None
error: Optional[str] = None
logs: Optional[List[Any]] = None
metadata: Optional[Dict[str, Any]] = None
trace_spans: Optional[List[Any]] = None
total_duration: Optional[float] = None

WorkflowStatus

@dataclass
class WorkflowStatus:
is_deployed: bool
deployed_at: Optional[str] = None
is_published: bool = False
needs_redeployment: bool = False

KlyntosError

class KlyntosError(Exception):
def __init__(self, message: str, code: Optional[str] = None, status: Optional[int] = None):
super().__init__(message)
self.code = code
self.status = status

Examples

Basic Workflow Execution

Set up the KlyntosClient with your API key.

Check if the workflow is deployed and ready for execution.

Run the workflow with your input data.

Process the execution result and handle any errors.

import os
from klyntos import KlyntosClient

client = KlyntosClient(api_key=os.getenv("KLYNTOS_API_KEY"))

def run_workflow():
try:
# Check if workflow is ready
is_ready = client.validate_workflow("my-workflow-id")
if not is_ready:
raise Exception("Workflow is not deployed or ready")

# Execute the workflow
result = client.execute_workflow(
"my-workflow-id",
input_data={
"message": "Process this data",
"user_id": "12345"
}
)

if result.success:
print("Output:", result.output)
print("Duration:", result.metadata.get("duration") if result.metadata else None)
else:
print("Workflow failed:", result.error)

except Exception as error:
print("Error:", error)

run_workflow()

Error Handling

Handle different types of errors that may occur during workflow execution:

from klyntos import KlyntosClient, KlyntosError
import os

client = KlyntosClient(api_key=os.getenv("KLYNTOS_API_KEY"))

def execute_with_error_handling():
try:
result = client.execute_workflow("workflow-id")
return result
except KlyntosError as error:
if error.code == "UNAUTHORIZED":
print("Invalid API key")
elif error.code == "TIMEOUT":
print("Workflow execution timed out")
elif error.code == "USAGE_LIMIT_EXCEEDED":
print("Usage limit exceeded")
elif error.code == "INVALID_JSON":
print("Invalid JSON in request body")
else:
print(f"Workflow error: {error}")
raise
except Exception as error:
print(f"Unexpected error: {error}")
raise

Context Manager Usage

Use the client as a context manager to automatically handle resource cleanup:

from klyntos import KlyntosClient
import os

# Using context manager to automatically close the session
with KlyntosClient(api_key=os.getenv("KLYNTOS_API_KEY")) as client:
result = client.execute_workflow("workflow-id")
print("Result:", result)
# Session is automatically closed here

Batch Workflow Execution

Execute multiple workflows efficiently:

from klyntos import KlyntosClient
import os

client = KlyntosClient(api_key=os.getenv("KLYNTOS_API_KEY"))

def execute_workflows_batch(workflow_data_pairs):
"""Execute multiple workflows with different input data."""
results = []

for workflow_id, input_data in workflow_data_pairs:
try:
# Validate workflow before execution
if not client.validate_workflow(workflow_id):
print(f"Skipping {workflow_id}: not deployed")
continue

result = client.execute_workflow(workflow_id, input_data)
results.append({
"workflow_id": workflow_id,
"success": result.success,
"output": result.output,
"error": result.error
})

except Exception as error:
results.append({
"workflow_id": workflow_id,
"success": False,
"error": str(error)
})

return results

# Example usage
workflows = [
("workflow-1", {"type": "analysis", "data": "sample1"}),
("workflow-2", {"type": "processing", "data": "sample2"}),
]

results = execute_workflows_batch(workflows)
for result in results:
print(f"Workflow {result['workflow_id']}: {'Success' if result['success'] else 'Failed'}")

Environment Configuration

Configure the client using environment variables:

Getting Your API Key

Navigate to Klyntos and log in to your account.

Navigate to the workflow you want to execute programmatically.

Click on "Deploy" to deploy your workflow if it hasn't been deployed yet.

During the deployment process, select or create an API key.

Copy the API key to use in your Python application.

info

Keep your API key secure and never commit it to version control. Use environment variables or secure configuration management.

Requirements

  • Python 3.8+
  • requests >= 2.25.0

License

Apache-2.0