Context flows through data systems as queryable metadata that validates decisions at each execution point.
A pipeline filters user events to exclude incomplete records. An agent queries the same table to analyze behavior. A dashboard displays event metrics. Each system needs to know what "valid event data" means. When quality rules change, all three systems must adapt identically or produce inconsistent results.
1. Define rules (once)
↓
2. Store as structured metadata
↓
3. Query at decision points
↓
4. Validate before execution
↓
5. Update rules without code changes
1. Define:
rule: user_event_quality
version: "1.3"
valid_from: "2024-03-01"
required_fields:
- session_id
- user_id
- event_type
- timestamp
completeness_threshold: 0.95
max_null_percentage: 52. Query in Pipeline:
def process_user_events():
rules = context.get("user_event_quality")
# Filter records that meet quality thresholds
return db.query(f"""
SELECT * FROM raw_events
WHERE {' AND '.join([f"{f} IS NOT NULL" for f in rules.required_fields])}
""")3. Query in Agent:
def agent_analyze_events(query):
# Agent generates SQL
sql = llm.generate_sql(query)
# Validate SQL includes quality filters
rules = context.get("user_event_quality")
if not rules.validate_sql(sql):
raise ValidationError("Query missing required quality filters")
return db.query(sql)4. Query in Dashboard:
def dashboard_event_metrics():
rules = context.get("user_event_quality")
# Dashboard uses same quality rules as pipeline and agent
return metric_widget(
source="raw_events",
required_fields=rules.required_fields,
completeness_threshold=rules.completeness_threshold
)Pipeline, agent, and dashboard use identical logic
Update rule once, all systems adapt
No SQL changes needed when business rules change
Agents validate they're using current definitions
# Context provider
class Context:
def get(self, rule_name, date=None):
# Fetch current or historical rule
rule = self.store.get(rule_name, effective_date=date)
return ExecutableRule(rule)
# Executable rule
class ExecutableRule:
def validate_sql(self, sql):
# Check if SQL follows current rules
pass
def build_conditions(self):
# Generate SQL WHERE clause from rules
pass
def is_valid_for_period(self, start, end):
# Check if rule applies to date range
passContext becomes infrastructure that pipelines, agents, and applications query before making decisions. This prevents the problems that occur when orchestration tools execute tasks without validating current business state.
Related: What is a context-aware workflow?