Example: dag_visualization_demo.py
DAG NetworkX Analysis: Critical Path, Parallel Groups, Export
Version: {VERSION} File: examples/dag_visualization_demo.py
Overview
This demo showcases advanced DAG visualization and analysis capabilities introduced in Taskiq-Flow v0.4.5 with NetworkX, Mermaid diagramming, and multiple export formats.
- Build a DAG from a DataflowPipeline
- NetworkX-based analysis (critical path, parallel groups)
- Export to JSON, Mermaid, DOT, Cytoscape
- Terminal ASCII visualization
- NiceGUI integration for interactive view
What This Example Shows
- Using
DAGVisualizerfor rich DAG analysis - Critical path detection (longest execution chain)
- Parallel task group identification (execution levels)
- Mermaid diagram generation for documentation
- DOT export for Graphviz rendering
- Cytoscape JSON for interactive web visualization
Code Walkthrough
1. Pipeline Definition
from taskiq import InMemoryBroker
from taskiq_flow import DataflowPipeline, pipeline_task
broker = InMemoryBroker(await_inplace=True)
@broker.task
@pipeline_task(output="audio_features")
def extract_features(audio_path: str) -> dict:
return {"duration": 180.0, "tempo": 120.0, "sample_rate": 44100}
@broker.task
@pipeline_task(output="tags")
def generate_tags(audio_features: dict) -> list[str]:
return ["electronic", "dance", "upbeat"]
@broker.task
@pipeline_task(output="embedding")
def compute_embedding(audio_features: dict) -> list[float]:
return [0.1, 0.2, 0.3, 0.4, 0.5]
@broker.task
@pipeline_task(output="metadata")
def create_metadata(audio_features: dict, tags: list[str], embedding: list[float]) -> dict:
return {
"features": audio_features,
"tags": tags,
"embedding": embedding,
}
pipeline = DataflowPipeline.from_tasks(
broker,
[extract_features, generate_tags, compute_embedding, create_metadata]
)
pipeline.pipeline_id = "audio_analysis_demo"
DAG structure:
extract_featuresruns first (no dependencies)generate_tagsandcompute_embeddingrun in parallel (both depend onaudio_features)create_metadataruns last (depends on all three predecessors)
2. NetworkX Analysis with DAGVisualizer
from taskiq_flow.visualization.dag_visualizer import DAGVisualizer
dag = pipeline.build_dag()
visualizer = DAGVisualizer(dag)
# JSON export
json_data = visualizer.to_json()
print(f"Nodes: {len(json_data['nodes'])}")
print(f"Edges: {len(json_data['edges'])}")
# Critical path
critical_path = visualizer.detect_critical_path()
print(f"Critical path: {' -> '.join(critical_path)}")
# Parallel groups
parallel_groups = visualizer.find_parallelizable_groups()
print(f"Parallel groups: {len(parallel_groups)} levels")
for i, group in enumerate(parallel_groups):
print(f" Level {i}: {group}")
Critical Path: Longest path in the DAG — minimum execution time with unlimited parallelism.
Parallel Groups: Tasks on the same level can run concurrently.
3. Mermaid Diagrams
from taskiq_flow.visualization.mermaid import MermaidGenerator
mermaid_gen = MermaidGenerator(dag)
mermaid_code = mermaid_gen.to_mermaid_with_styling(orientation="LR")
print(mermaid_code)
Output:
flowchart LR
A[extract_features] --> B[generate_tags]
A --> C[compute_embedding]
B --> D[create_metadata]
C --> D
4. ASCII Art (Terminal)
ascii_art = visualizer.visualize_ascii()
print(ascii_art)
Output:
extract_features
|
+--> generate_tags
|
+--> compute_embedding
|
+--> create_metadata
5. Graphviz DOT Export
dot = visualizer.to_graphviz()
# Save and render: dot -Tpng pipeline.dot -o pipeline.png
6. Cytoscape JSON (Web)
cytoscape = visualizer.to_cytoscape_json()
# Contains nodes[] and edges[] ready for Cytoscape.js
Expected Output
=== Taskiq-Flow DAG Visualization Demo ===
DAG has 4 nodes and 4 edges
1. NetworkX DAG Analysis
----------------------------------------
Nodes: 4
Edges: 4
Critical path: extract_features -> generate_tags -> create_metadata
Parallel groups: 3 levels
Level 0: ['extract_features']
Level 1: ['generate_tags', 'compute_embedding']
Level 2: ['create_metadata']
2. Mermaid Diagram
----------------------------------------
flowchart LR
extract_features --> generate_tags
...
3. ASCII Art
----------------------------------------
extract_features
|
+--> generate_tags
...
4. Graphviz DOT
----------------------------------------
digraph "audio_analysis_demo" {
"extract_features" -> "generate_tags";
...
}
5. Cytoscape JSON
----------------------------------------
Elements: 4 nodes, 4 edges
=== Demo Complete ===
Key Points
DAGVisualizer Methods
| Method | Returns | Use Case |
|---|---|---|
to_json() |
dict | API responses, web UIs |
detect_critical_path() |
list[str] | Find bottleneck tasks |
find_parallelizable_groups() |
list[list[str]] | Optimize parallelism |
to_graphviz() |
str | Graphviz rendering |
to_cytoscape_json() |
dict | Interactive web viz |
visualize_ascii() |
str | Terminal debugging |
MermaidGenerator Methods
| Method | Description |
|---|---|
to_mermaid(orientation) |
Basic flowchart |
to_mermaid_with_styling(orientation) |
Colored nodes by type |
to_mermaid_interactive() |
With click handlers |
NiceGUI Integration
from taskiq_flow.visualization.mermaid import MermaidGenerator
mermaid_gen = MermaidGenerator(dag)
mermaid_code = mermaid_gen.to_mermaid_with_styling()
# Embed in a NiceGUI page:
# ui.markdown(f"```mermaid\n{mermaid_code}\n```")
MermaidGenerator produces Mermaid.js source; NiceGUI ui.markdown() renders it in the browser.
Learning Path
After this example:
- Visualization Guide — Full DAG visualization features
- Performance Guide — Use DAG analysis for optimization
- NiceGUI Integration — Build interactive dashboards
This example covers all major visualization output formats. Use DAGVisualizer for programmatic analysis and MermaidGenerator for documentation.