Integration API Reference
Complete Python API reference for repoindex integrations. Use these APIs to build custom workflows, integrations, and automation tools.
Overview
The repoindex integration API provides programmatic access to all clustering, workflow, and analysis features. This allows you to:
- Build custom analysis tools
- Integrate repoindex into larger applications
- Create specialized workflows
- Extend repoindex functionality
- Automate repository management
Installation
# Install repoindex with all integration features
pip install repoindex[clustering,workflows,all]
# Or install specific integrations
pip install repoindex[clustering]
pip install repoindex[workflows]
Core Integration Classes
Base Integration Class
All integrations extend the base Integration class:
from repoindex.integrations.base import Integration
class CustomIntegration(Integration):
"""Base class for all repoindex integrations."""
def __init__(self, config: dict):
"""
Initialize integration with configuration.
Args:
config: Integration configuration dictionary
"""
super().__init__(config)
self.config = config
def validate(self) -> bool:
"""
Validate integration requirements.
Returns:
True if validation passes, False otherwise
Raises:
IntegrationError: If required dependencies are missing
"""
pass
def execute(self, data: Generator) -> Generator:
"""
Process data stream and return results.
Args:
data: Generator yielding repository data
Yields:
Processed results as dictionaries
"""
pass
def get_commands(self) -> List[click.Command]:
"""
Return CLI commands for this integration.
Returns:
List of Click command objects
"""
pass
Clustering API
ClusterAnalyzer
Main class for repository clustering:
from repoindex.integrations.clustering import ClusterAnalyzer
from repoindex.integrations.clustering.algorithms import (
KMeansClustering,
DBSCANClustering,
HierarchicalClustering,
NetworkClustering
)
# Initialize analyzer
analyzer = ClusterAnalyzer(
algorithm='kmeans',
n_clusters=5,
features=['tech-stack', 'size', 'activity'],
config={
'min_cluster_size': 2,
'max_iterations': 300,
'random_state': 42
}
)
# Analyze repositories
repos = ['/path/to/repo1', '/path/to/repo2', '/path/to/repo3']
results = analyzer.analyze(repos)
# Iterate through results
for result in results:
if result['action'] == 'cluster_result':
cluster = result['cluster']
print(f"Cluster {cluster['cluster_id']}: {len(cluster['repositories'])} repos")
print(f" Coherence: {cluster['coherence_score']:.2f}")
print(f" Language: {cluster['primary_language']}")
Feature Extractors
Extract features from repositories for clustering:
from repoindex.integrations.clustering.features import (
TechStackExtractor,
SizeExtractor,
ActivityExtractor,
ComplexityExtractor
)
# Tech stack features
tech_extractor = TechStackExtractor()
tech_features = tech_extractor.extract('/path/to/repo')
# Returns: {'languages': [...], 'frameworks': [...], 'dependencies': [...]}
# Size features
size_extractor = SizeExtractor()
size_features = size_extractor.extract('/path/to/repo')
# Returns: {'loc': 1500, 'file_count': 45, 'avg_file_size': 33.3}
# Activity features
activity_extractor = ActivityExtractor()
activity_features = activity_extractor.extract('/path/to/repo')
# Returns: {'commit_count': 150, 'days_since_last_commit': 5, 'contributor_count': 3}
# Complexity features
complexity_extractor = ComplexityExtractor()
complexity_features = complexity_extractor.extract('/path/to/repo')
# Returns: {'cyclomatic_complexity': 2.5, 'max_depth': 4, 'coupling': 0.3}
Clustering Algorithms
Use specific clustering algorithms directly:
from repoindex.integrations.clustering.algorithms import KMeansClustering
import numpy as np
# Create feature matrix
feature_matrix = np.array([
[1.0, 0.5, 0.8], # repo1 features
[0.9, 0.6, 0.7], # repo2 features
[0.1, 0.2, 0.3], # repo3 features
])
# K-means clustering
kmeans = KMeansClustering(n_clusters=2, random_state=42)
labels = kmeans.fit_predict(feature_matrix)
print(f"Cluster assignments: {labels}")
# Get cluster centroids
centroids = kmeans.get_centroids()
print(f"Centroids shape: {centroids.shape}")
# Calculate silhouette score
score = kmeans.silhouette_score(feature_matrix)
print(f"Silhouette score: {score:.2f}")
DBSCAN Clustering
from repoindex.integrations.clustering.algorithms import DBSCANClustering
dbscan = DBSCANClustering(eps=0.5, min_samples=2, metric='euclidean')
labels = dbscan.fit_predict(feature_matrix)
# -1 indicates outliers
outliers = np.where(labels == -1)[0]
print(f"Found {len(outliers)} outliers")
# Get cluster metrics
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
print(f"Found {n_clusters} clusters")
Hierarchical Clustering
from repoindex.integrations.clustering.algorithms import HierarchicalClustering
hierarchical = HierarchicalClustering(
n_clusters=3,
linkage='ward',
affinity='euclidean'
)
labels = hierarchical.fit_predict(feature_matrix)
# Get dendrogram data
dendrogram_data = hierarchical.get_dendrogram_data(feature_matrix)
# Plot with matplotlib
import matplotlib.pyplot as plt
from scipy.cluster.hierarchy import dendrogram
plt.figure(figsize=(10, 6))
dendrogram(dendrogram_data['linkage_matrix'], labels=dendrogram_data['labels'])
plt.title('Repository Clustering Dendrogram')
plt.show()
DuplicationAnalyzer
Detect duplicate code across repositories:
from repoindex.integrations.clustering import DuplicationAnalyzer
analyzer = DuplicationAnalyzer(
min_block_size=10,
languages=['python', 'javascript'],
ignore_patterns=['test_*', '*_test.py']
)
# Analyze repositories for duplicates
repos = ['/path/to/repo1', '/path/to/repo2']
results = analyzer.find_duplicates(repos)
for result in results:
if result['action'] == 'duplication_found':
print(f"Duplication: {result['repo1']} ↔ {result['repo2']}")
print(f" Similarity: {result['similarity']:.0%}")
print(f" Shared lines: {result['shared_lines']}")
print(f" Recommendation: {result['recommendation']}")
print()
ConsolidationAdvisor
Get intelligent consolidation suggestions:
from repoindex.integrations.clustering import ConsolidationAdvisor
advisor = ConsolidationAdvisor(
min_confidence=0.7,
consider_dependencies=True,
consider_activity=True
)
# Get consolidation suggestions
repos = ['/path/to/repo1', '/path/to/repo2', '/path/to/repo3']
suggestions = advisor.analyze(repos)
for suggestion in suggestions:
if suggestion['action'] == 'consolidation_suggestion':
print(f"Consolidate: {', '.join(suggestion['repositories'])}")
print(f" Confidence: {suggestion['confidence']:.0%}")
print(f" Effort: {suggestion['estimated_effort']}")
print(f" Rationale: {suggestion['rationale']}")
print(f" Benefits:")
for benefit in suggestion['benefits']:
print(f" - {benefit}")
print()
Workflow API
Workflow Class
Create and manage workflows programmatically:
from repoindex.integrations.workflow import Workflow, Task
# Create workflow
workflow = Workflow(
name="My Workflow",
description="Automated repository management",
version="1.0.0"
)
# Set global configuration
workflow.set_config({
'max_parallel': 4,
'continue_on_error': False,
'timeout': 3600
})
# Define variables
workflow.set_variables({
'environment': 'production',
'notification_email': 'dev@example.com'
})
# Add tasks
workflow.add_task(Task(
id='list_repos',
name='List all repositories',
action='repoindex.list',
parameters={'format': 'json'},
output_var='repos'
))
workflow.add_task(Task(
id='filter_python',
name='Filter Python repositories',
action='repoindex.query',
parameters={'query': 'language == "Python"'},
depends_on=['list_repos'],
output_var='python_repos'
))
workflow.add_task(Task(
id='analyze',
name='Analyze Python repos',
action='repoindex.cluster',
parameters={'algorithm': 'kmeans', 'n_clusters': 3},
depends_on=['filter_python']
))
# Save to file
workflow.save('my-workflow.yaml')
# Or execute directly
from repoindex.integrations.workflow import WorkflowRunner
runner = WorkflowRunner()
result = runner.run(workflow)
if result.success:
print(f"Workflow completed in {result.duration:.2f}s")
for task_id, task_result in result.tasks.items():
print(f" {task_id}: {task_result.status}")
else:
print(f"Workflow failed: {result.error}")
Task Types
Shell Task
shell_task = Task(
id='git_pull',
type='shell',
name='Update repository',
command='git pull origin main',
cwd='/path/to/repo',
timeout=60,
retry={'attempts': 3, 'delay': 5}
)
Python Task
python_task = Task(
id='process_data',
type='python',
name='Process repository data',
code="""
import json
processed = []
for repo in context['repos']:
if repo['language'] == 'Python':
processed.append(repo['name'])
context['python_repos'] = processed
""",
depends_on=['list_repos']
)
HTTP Task
http_task = Task(
id='api_call',
type='http',
name='Call external API',
method='POST',
url='https://api.example.com/webhooks',
headers={'Content-Type': 'application/json'},
body={'event': 'workflow_complete', 'data': '{{ results }}'},
timeout=30
)
Git Task
git_task = Task(
id='commit_changes',
type='git',
name='Commit and push changes',
operation='commit',
message='Automated update from workflow',
push=True,
cwd='/path/to/repo'
)
Custom Actions
Create custom workflow actions:
from repoindex.integrations.workflow import Action, ActionRegistry
class CustomAnalysisAction(Action):
"""Custom action for specialized analysis."""
def __init__(self):
super().__init__(
name='custom.analysis',
description='Perform custom analysis on repositories'
)
def validate_parameters(self, parameters: dict) -> bool:
"""Validate action parameters."""
required = ['repositories', 'output_file']
return all(param in parameters for param in required)
def execute(self, parameters: dict, context: dict) -> dict:
"""
Execute the custom analysis.
Args:
parameters: Action parameters
context: Workflow execution context
Returns:
Result dictionary with status and output
"""
repos = parameters['repositories']
output_file = parameters['output_file']
# Perform analysis
results = self.analyze_repositories(repos)
# Save results
with open(output_file, 'w') as f:
json.dump(results, f, indent=2)
return {
'status': 'success',
'repos_analyzed': len(repos),
'output_file': output_file,
'results': results
}
def analyze_repositories(self, repos: list) -> dict:
"""Custom analysis logic."""
# Your analysis code here
return {'analysis': 'complete'}
# Register custom action
ActionRegistry.register(CustomAnalysisAction())
# Use in workflow
workflow.add_task(Task(
id='custom_analysis',
action='custom.analysis',
parameters={
'repositories': '{{ python_repos }}',
'output_file': 'analysis-results.json'
}
))
Workflow Execution
Advanced workflow execution control:
from repoindex.integrations.workflow import WorkflowRunner, ExecutionContext
# Create execution context
context = ExecutionContext(
variables={
'env': 'production',
'debug': False
},
cwd='/path/to/workspace',
dry_run=False
)
# Configure runner
runner = WorkflowRunner(
max_parallel=4,
log_level='INFO',
output_dir='/path/to/logs'
)
# Execute workflow
result = runner.run(workflow, context=context)
# Access detailed results
print(f"Status: {result.status}")
print(f"Duration: {result.duration:.2f}s")
print(f"Tasks executed: {len(result.tasks)}")
# Access individual task results
for task_id, task_result in result.tasks.items():
print(f"\nTask: {task_id}")
print(f" Status: {task_result.status}")
print(f" Duration: {task_result.duration:.2f}s")
if task_result.output:
print(f" Output: {task_result.output}")
if task_result.error:
print(f" Error: {task_result.error}")
# Export execution report
runner.export_report(result, format='json', output='workflow-report.json')
Utility Functions
Repository Analysis
from repoindex.core import analyze_repository
# Analyze a single repository
repo_info = analyze_repository('/path/to/repo')
print(f"Name: {repo_info['name']}")
print(f"Language: {repo_info['language']}")
print(f"Stars: {repo_info.get('stars', 0)}")
print(f"LOC: {repo_info['metrics']['loc']}")
Query Engine
from repoindex.query import QueryEngine
# Create query engine
engine = QueryEngine()
# Parse and execute queries
repos = [
{'name': 'repo1', 'language': 'Python', 'stars': 100},
{'name': 'repo2', 'language': 'JavaScript', 'stars': 50},
{'name': 'repo3', 'language': 'Python', 'stars': 200},
]
# Simple query
results = engine.query(repos, "language == 'Python'")
print(f"Found {len(list(results))} Python repos")
# Complex query with fuzzy matching
results = engine.query(repos, "language ~= 'pyton' and stars > 50")
print(f"Found {len(list(results))} repos")
# Query with nested fields
repos_with_github = [
{'name': 'repo1', 'github': {'stars': 100, 'forks': 10}},
{'name': 'repo2', 'github': {'stars': 50, 'forks': 5}},
]
results = engine.query(repos_with_github, "github.stars > 75")
print(f"Found {len(list(results))} popular repos")
Export Utilities
from repoindex.export import ExportManager
# Initialize export manager
exporter = ExportManager()
# Export to Markdown
repos = [...] # Your repository data
exporter.export_markdown(
repos,
output_file='portfolio.md',
template='default',
group_by='language'
)
# Export to HTML
exporter.export_html(
repos,
output_file='portfolio.html',
template='interactive',
include_search=True,
include_filters=True
)
# Export to JSON
exporter.export_json(
repos,
output_file='repos.json',
pretty=True,
include_metadata=True
)
# Export to CSV
exporter.export_csv(
repos,
output_file='repos.csv',
columns=['name', 'language', 'stars', 'last_commit']
)
Error Handling
Exception Classes
from repoindex.exceptions import (
GhopsError,
IntegrationError,
ClusteringError,
WorkflowError,
ConfigurationError
)
try:
analyzer = ClusterAnalyzer(algorithm='invalid')
except ClusteringError as e:
print(f"Clustering error: {e}")
print(f"Details: {e.details}")
try:
workflow = Workflow.load('missing-workflow.yaml')
except WorkflowError as e:
print(f"Workflow error: {e}")
if e.validation_errors:
print("Validation errors:")
for error in e.validation_errors:
print(f" - {error}")
Error Recovery
from repoindex.integrations.workflow import WorkflowRunner, ErrorHandler
# Create custom error handler
class CustomErrorHandler(ErrorHandler):
def handle_task_error(self, task, error, context):
"""Handle task execution errors."""
print(f"Task {task.id} failed: {error}")
# Log error
self.log_error(task, error)
# Attempt recovery
if task.retry and task.retry['attempts'] > 0:
return 'retry'
elif task.continue_on_error:
return 'continue'
else:
return 'stop'
# Use custom error handler
runner = WorkflowRunner(error_handler=CustomErrorHandler())
result = runner.run(workflow)
Configuration
Integration Configuration
from repoindex.config import Config
# Load configuration
config = Config.load('~/.repoindex/config.json')
# Access integration settings
clustering_config = config.get('integrations.clustering', {})
workflow_config = config.get('integrations.workflow', {})
# Update configuration
config.set('integrations.clustering.default_algorithm', 'dbscan')
config.save()
# Get with defaults
max_parallel = config.get('integrations.workflow.max_parallel', default=4)
Environment Variables
import os
from repoindex.config import apply_env_overrides
# Configuration with environment variables
config = {
'github': {
'token': os.getenv('REPOINDEX_GITHUB_TOKEN'),
'api_url': os.getenv('REPOINDEX_GITHUB_API', 'https://api.github.com')
},
'integrations': {
'clustering': {
'enabled': os.getenv('REPOINDEX_CLUSTERING_ENABLED', 'true').lower() == 'true'
}
}
}
# Apply environment overrides
config = apply_env_overrides(config)
Best Practices
Performance Optimization
# Use generators for large datasets
def process_repos_streaming(repo_paths):
"""Process repositories as a stream."""
for path in repo_paths:
yield analyze_repository(path)
# Don't do this (loads everything in memory)
# repos = [analyze_repository(path) for path in repo_paths]
# Do this (streams data)
repos = process_repos_streaming(repo_paths)
for repo in repos:
process(repo)
# Use multiprocessing for CPU-intensive tasks
from multiprocessing import Pool
def analyze_parallel(repo_paths, n_processes=4):
"""Analyze repositories in parallel."""
with Pool(n_processes) as pool:
results = pool.map(analyze_repository, repo_paths)
return results
Error Handling
# Always handle errors gracefully
def safe_analysis(repo_path):
"""Safely analyze a repository."""
try:
return analyze_repository(repo_path)
except Exception as e:
return {
'action': 'error',
'repo': repo_path,
'error': str(e),
'type': type(e).__name__
}
# Use context managers for resource cleanup
from contextlib import contextmanager
@contextmanager
def workflow_execution(workflow):
"""Context manager for workflow execution."""
runner = WorkflowRunner()
try:
yield runner
finally:
runner.cleanup()
# Usage
with workflow_execution(workflow) as runner:
result = runner.run(workflow)
Testing
import unittest
from repoindex.integrations.clustering import ClusterAnalyzer
class TestClustering(unittest.TestCase):
def setUp(self):
self.analyzer = ClusterAnalyzer(algorithm='kmeans', n_clusters=2)
def test_basic_clustering(self):
"""Test basic clustering functionality."""
repos = ['/path/to/repo1', '/path/to/repo2', '/path/to/repo3']
results = list(self.analyzer.analyze(repos))
# Check that we got results
self.assertGreater(len(results), 0)
# Check result structure
cluster_results = [r for r in results if r['action'] == 'cluster_result']
self.assertEqual(len(cluster_results), 2) # 2 clusters
def test_invalid_algorithm(self):
"""Test error handling for invalid algorithm."""
with self.assertRaises(ClusteringError):
ClusterAnalyzer(algorithm='invalid')
if __name__ == '__main__':
unittest.main()
Examples
See the examples directory for complete working examples:
- basic_clustering.py: Simple clustering example
- advanced_workflow.py: Complex workflow with multiple stages
- custom_integration.py: Building a custom integration
- batch_analysis.py: Analyzing large repository collections
- visualization_dashboard.py: Creating interactive dashboards