Inferensys

Integration

AI Integration with WSO2 Stream Processor

Embed AI inference directly into real-time event streams using WSO2 Stream Processor. Add instant fraud scoring, sentiment analysis, and dynamic pricing to API traffic, IoT data, and transaction flows without batch processing delays.
Hardware engineer integrating LLM with IoT sensors, circuit boards on desk, soldering iron nearby, maker lab aesthetic.
ARCHITECTING REAL-TIME INTELLIGENCE

Where AI Fits in WSO2 Stream Processor

Integrate AI inference directly into your event streams to detect anomalies, enrich data, and trigger actions within milliseconds.

WSO2 Stream Processor (WSO2 SP) is built for Siddhi, a streaming SQL engine that processes events in flight. AI integration fits at three key architectural points: within Siddhi queries as custom function extensions, as external microservices called via HTTP sinks/sources, and as part of stream enrichment where a Siddhi app joins a live stream with a vector database lookup. Common integration surfaces include the http-request() function for calling RESTful model endpoints, Kafka topics to publish enriched events for downstream consumers, and WebSocket sources for bidirectional communication with stateful AI agents.

High-value use cases leverage this real-time layer for operational intelligence. For example, a Siddhi app can monitor API traffic logs streamed from the WSO2 API Manager, pass payload snippets to a sentiment analysis model, and immediately tag abusive or frustrated API consumers for review. In financial services, transaction streams can be analyzed for fraud patterns, triggering a log() alert or pushing a blocking signal back to the core banking API gateway within the same execution cycle. For dynamic pricing, a stream of inventory levels, competitor feeds, and demand signals can be processed, with an AI model recommending price adjustments that are injected directly into the eCommerce platform's pricing API.

A production rollout requires careful governance of the AI inference loop. Implement circuit breakers and fallback logic within your Siddhi queries to handle model endpoint latency or downtime. Use WSO2 SP's event tables to maintain short-term context windows for models requiring conversational memory. For audit and compliance, ensure all AI-enriched events are routed to a dedicated audit stream. Start by deploying a single Siddhi app that performs a non-critical enrichment (e.g., tagging traffic) before moving to models that trigger financial or security actions. This phased approach lets you validate accuracy and performance impact within your specific stream topology.

This integration turns WSO2 SP from a passive event router into an active, intelligent nervous system for your APIs and microservices. For a deeper dive into connecting vector databases for real-time retrieval or patterns for canarying AI model endpoints, see our guide on AI Integration for Microservices API Gateways.

REAL-TIME EVENT PROCESSING FOR AI

AI Integration Surfaces in WSO2 Stream Processor

Embed AI Inference Directly in Stream Logic

The Siddhi Query Language (SiddhiQL) is the core processing engine. Integrate AI by calling external model endpoints or embedding lightweight inference within stream queries.

Key Integration Points:

  • call() Extension: Invoke HTTP/REST endpoints of hosted AI models (OpenAI, Azure AI, custom) from within a Siddhi query to enrich events in-flight.
  • UDFs (User Defined Functions): Write Java or Python functions that wrap local TensorFlow/PyTorch models for ultra-low-latency inference (e.g., fraud scoring) without network hops.
  • Windowed Analytics: Apply AI models over sliding time windows of events. For example, calculate a sentiment score for the last 100 social media posts, then trigger an alert if the trend turns negative.

Example Workflow:

  1. Ingest API traffic logs as a stream.
  2. Use a Siddhi query to call a fraud detection model for each transaction.
  3. Filter and route high-risk transactions to a separate alert stream in real-time.
WSO2 STREAM PROCESSOR INTEGRATIONS

High-Value AI Use Cases for Streaming Data

WSO2 Stream Processor transforms raw event streams into actionable intelligence. By integrating AI models directly into your Siddhi processing pipelines, you can move from passive monitoring to proactive, intelligent response. Below are key patterns for injecting AI into real-time workflows.

01

Real-Time API Fraud Detection

Analyze API traffic patterns (IP, geo, headers, payload size) in-flight to score transaction risk. A Siddhi app passes key attributes to a lightweight fraud model, flagging high-risk calls in milliseconds. Flagged events can trigger an immediate block via the connected WSO2 API Manager gateway or route to a step-up authentication flow.

Typical Workflow: HTTP Stream → Siddhi Query (Feature Extraction) → AI Model Inference → High-Risk Alert/Block Action

Millisecond
Detection Latency
02

Dynamic Pricing & Rate Limiting

Use real-time demand signals (concurrent users, error rates, upstream latency) to adjust API quotas or service pricing dynamically. A Siddhi query aggregates metrics and calls a pricing model, which outputs a new rate limit or price tier. This decision is pushed back to the gateway policy in near real-time.

Integration Point: Update Apigee/Kong rate-limit plugins or WSO2 API Manager throttling policies via admin APIs.

Batch → Real-time
Policy Updates
03

Sentiment-Based Routing & Alerting

Process customer support chat, email, or social media event streams to detect sentiment and urgency. Use a sentiment analysis model within a Siddhi window to score messages. Route high-negative-sentiment events to a priority queue or trigger immediate alerts to a manager's dashboard.

Operational Value: Reduces escalations by prioritizing distressed customers within the same operational window.

Same day
Escalation Triage
04

IoT Predictive Maintenance Alerts

Ingest high-frequency sensor data (temperature, vibration, pressure) from manufacturing equipment. A Siddhi pipeline performs rolling aggregations (mean, std dev) and feeds features into an anomaly detection model. Output triggers a work order in a connected CMMS like Fiix or UpKeep before a failure occurs.

Architecture: MQTT/Kafka Stream → Siddhi (Sliding Window) → Anomaly Model → REST Call to CMMS

Pre-failure
Alert Timing
05

Intelligent Log & Event Enrichment

Stream application logs, security events, or audit trails. Use an LLM via a Siddhi function to categorize, summarize, or extract entities (e.g., person, account number) from unstructured log messages. Enriched events are sent to SIEMs like Splunk or data lakes with structured metadata, drastically improving searchability and correlation.

Impact: Turns opaque log blobs into query-ready, structured events for analysts.

Hours -> Minutes
Investigation Setup
06

Real-Time Customer Journey Scoring

Unify clickstream, cart activity, and support event streams to compute a real-time engagement or churn risk score. A Siddhi query joins streams by user ID and calls a scoring model. High-risk scores trigger personalized offers or retention workflows in connected systems like Braze or Salesforce Marketing Cloud.

Key Integration: The score and context are pushed to a CDP or marketing automation platform via webhook.

In-session
Intervention
WSO2 STREAM PROCESSOR INTEGRATION PATTERNS

Example Real-Time AI Workflows

These workflows demonstrate how to embed AI inference directly into WSO2 Stream Processor's real-time event processing pipelines. Each pattern connects streaming API traffic, logs, or telemetry to AI models for immediate analysis and automated action.

Trigger: An incoming API request passes through the WSO2 API Gateway.

Context/Data Pulled: The Stream Processor consumes a Siddhi event stream containing the request payload, headers (e.g., User-Agent, IP), and contextual metadata (e.g., user ID, endpoint, timestamp).

Model or Agent Action: A pre-processing Siddhi query extracts and formats relevant features (e.g., request velocity from this IP, anomaly score from previous behavior). This feature vector is sent via an HTTP sink to a low-latency fraud detection model (e.g., a lightweight classifier hosted on KServe). The model returns a fraud probability score and a reason code.

System Update or Next Step: Based on a configurable threshold:

  • If score < 0.3: The request is allowed to proceed; a log event is emitted for auditing.
  • If score >= 0.3: The Siddhi app immediately triggers a WSO2 Gateway Policy via an event sink to inject a custom header (X-AI-Fraud-Risk: HIGH). The gateway's policy executes a conditional flow, potentially blocking the request, challenging with a CAPTCHA, or routing to a high-security endpoint.

Human Review Point: All requests scoring above 0.7 are also pushed to a Kafka topic consumed by a security team dashboard for manual investigation.

REAL-TIME AI INFERENCE PIPELINE

Implementation Architecture & Data Flow

Integrating AI models directly into WSO2 Stream Processor's event processing pipelines enables instant, context-aware decisions on live API traffic.

The integration is architected as a Siddhi extension or a custom sink connector within your WSO2 Stream Processor (SP) deployment. Incoming API traffic events—captured as JSON or Avro payloads from Kong, Apigee, or native applications—flow through Siddhi query windows. At the point of inference, the stream processor calls an external AI model endpoint via a secure HTTP client, passing relevant event attributes (e.g., userId, endpoint, payload_size, geo_ip, response_code) as a structured prompt or feature vector. The AI service—hosted on your cloud or a managed platform like Azure AI or AWS SageMaker—returns a prediction (e.g., fraud score, sentiment label, dynamic price) within milliseconds, which is then appended to the event stream.

This real-time pattern supports several high-value workflows:

  • Instant Fraud Scoring: Analyze each API transaction for anomalous patterns (unusual location, high velocity, mismatched user-agent) and tag high-risk events for immediate quarantine or step-up authentication.
  • Dynamic Pricing & Quotas: Adjust API rate limits or compute costs in real-time based on predicted customer intent, historical usage, and current system load.
  • Sentiment-Aware Routing: Classify support API call content (from attached logs or error messages) to route frustrated users to high-touch support channels automatically. The processed stream, now enriched with AI inferences, can be sunk to Kafka topics for downstream analytics, written to a data lake for model retraining, or used to trigger real-time alerts and webhooks back to your API gateway for policy enforcement.

Rollout follows a phased, event-sourced approach. Start by deploying a shadow-mode pipeline that runs AI inference in parallel without affecting live traffic, logging predictions alongside actual outcomes to validate model accuracy. Use WSO2 SP's built-in metrics and Grafana dashboards to monitor latency and throughput. For governance, ensure all AI model calls are logged with full audit trails (input payload hash, model version, timestamp, inference result) and implement circuit breakers and fallback logic within the Siddhi query to maintain stream integrity if the AI service is unavailable. This architecture keeps your core stream processing logic declarative and manageable while injecting intelligent decision points where they have the most immediate operational impact.

AI-ENHANCED REAL-TIME PROCESSING

Code Examples: Siddhi Queries & Extensions

Pattern Matching for Anomalous API Traffic

Use Siddhi's CEP engine to detect fraud patterns in API call streams before feeding suspect events to an AI model for final scoring. This reduces inference costs by pre-filtering.

sql
-- Siddhi query to detect rapid-fire login attempts from a single IP
@info(name = 'HighVelocityLogins')
from LoginStream#window.time(1 min)
select ipAddress, count(userId) as attemptCount
group by ipAddress
having attemptCount > 20
insert into SuspectLoginStream;

-- Enrich suspect events with geo-context from a lookup table, then call AI service
@info(name = 'EnrichAndScore')
from SuspectLoginStream#lookup('IPGeoTable', ipAddress, ipAddress as lookupIP)
select ipAddress, attemptCount, country, isp
insert into EnrichedSuspectStream;

The EnrichedSuspectStream can trigger a custom Siddhi extension that calls a fraud detection AI endpoint (e.g., via HTTP call), appending a risk score for downstream alerting or blocking via a Kong plugin.

AI-ENHANCED STREAM PROCESSING

Realistic Operational Impact & Time Savings

This table illustrates the tangible operational improvements achievable by integrating AI models with WSO2 Stream Processor for real-time event analysis. It compares manual or rule-based processes against AI-assisted workflows.

MetricBefore AIAfter AINotes

Fraudulent transaction detection

Batch review next day

Real-time scoring (<100ms)

AI flags high-risk events; human analysts review only exceptions.

API traffic anomaly identification

Static threshold alerts

Dynamic behavioral profiling

Reduces false positives by learning normal baseline patterns.

Customer sentiment from chat logs

Manual sampling & reporting

Continuous real-time sentiment stream

Enables immediate intervention for negative sentiment spikes.

Dynamic pricing rule adjustment

Weekly manual updates

Near-real-time model recalibration

AI suggests pricing adjustments; business owner approves via workflow.

Real-time data enrichment for downstream systems

ETL jobs on hourly schedule

Streaming enrichment with AI context

Downstream CRM/ERP receives AI-augmented data instantly.

Operational dashboard refresh rate

5-15 minute latency

Sub-second event visibility

Leadership and ops teams see AI-derived insights as they happen.

Initial integration & model deployment

Custom coding (4-6 weeks)

Configured pipeline (1-2 weeks)

Leverages WSO2 Siddhi extensions and pre-built connectors to AI services.

OPERATIONALIZING AI IN REAL-TIME STREAMS

Governance, Security, and Phased Rollout

Integrating AI with WSO2 Stream Processor requires a deliberate approach to data governance, model security, and controlled deployment to ensure reliable, compliant operations.

Governance starts with the Siddhi query layer. Define clear policies for which event streams—such as API traffic logs, transaction payloads, or sensor telemetry—are eligible for AI processing. Use Siddhi's filtering and windowing capabilities to anonymize or redact sensitive fields (e.g., PII, API keys) before events are sent to an external AI model. This ensures raw data never leaves your controlled environment, maintaining compliance with data residency and privacy regulations like GDPR or HIPAA. Audit trails should log the stream source, the transformed payload sent for inference, and the AI-generated output, creating a lineage from source event to AI action.

Security is enforced at the integration point. When a Siddhi query calls an external AI service (e.g., for fraud scoring or sentiment detection), it should use a dedicated service account with scoped permissions, pass through a secure API gateway like Kong or WSO2 API Manager for rate limiting and threat protection, and communicate over mutually authenticated TLS. For models deployed internally, the Stream Processor can invoke them via a secure gRPC or REST endpoint, with inference requests and responses validated against a schema to prevent prompt injection or malformed output from disrupting downstream workflows. Consider implementing a lightweight model output validator as a Siddhi function to catch anomalies or confidence scores below a defined threshold before the result triggers an alert or pricing adjustment.

A phased rollout mitigates risk. Start with a shadow mode deployment: run your AI-enhanced Siddhi queries in parallel with the existing logic, comparing outputs without taking action. This validates model performance against real-world data streams. Next, move to a human-in-the-loop phase, where AI recommendations (e.g., "flag this transaction for review") are routed to a dashboard or ticketing system like ServiceNow for analyst approval. Finally, graduate to controlled automation for specific, high-confidence use cases—such as dynamic pricing for non-critical SKUs or filtering obvious spam API traffic—while maintaining circuit breakers that can disable AI-driven actions if error rates spike or downstream systems become unresponsive. This iterative approach builds operational confidence and isolates the blast radius of any model drift or integration failure.

IMPLEMENTATION AND WORKFLOW DETAILS

FAQ: AI Integration with WSO2 Stream Processor

Practical questions and workflow walkthroughs for engineering teams adding real-time AI inference to WSO2 Stream Processor (SP) for fraud detection, sentiment analysis, and dynamic pricing.

This workflow uses WSO2 SP to process live API event streams, enrich them with context, and call a fraud detection model.

  1. Trigger: An event is published to a Kafka topic (e.g., payment-api-events) by Kong or WSO2 API Manager after a payment API call.
  2. Context/Data Pulled: WSO2 SP consumes the event and executes a Siddhi query to join it with reference data from a Redis store (e.g., user's historical transaction count, device ID).
  3. Model/Agent Action: The enriched payload is sent via an HTTP sink to a low-latency fraud scoring model endpoint (e.g., a deployed XGBoost model on KServe). The Siddhi query handles the synchronous HTTP call and timeout logic.
  4. System Update/Next Step: Based on the returned risk score (e.g., score > 0.85), the event is routed:
    • High Risk: To a high-risk-payments Kafka topic for immediate human review and potential block via a webhook back to the API gateway.
    • Low Risk: To the normal payments-approved topic for downstream processing.
  5. Human Review Point: Events in the high-risk-payments topic are consumed by a case management system where analysts can confirm or override the AI's scoring, creating a feedback loop to retrain the model.

Example Siddhi Snippet for HTTP Call:

sql
@info(name = 'EnrichAndScore')
from PaymentEventStream#window.time(5 min) as e
join RedisTable#window.length(1) as r
    on e.userId == r.userId
select e.transactionId, e.amount, e.ipAddress, r.txnCountLastHour,
       r.isDeviceNew
insert into EnrichedStream;

@info(name = 'CallFraudModel')
from EnrichedStream
select transactionId, amount, ipAddress, txnCountLastHour, isDeviceNew
http:post('http://fraud-model.kserve:8080/predict', 'application/json', '{"features": [{{amount}}, {{txnCountLastHour}}, ...]}') as score
insert into ScoredStream;
Prasad Kumkar

About the author

Prasad Kumkar

CEO & MD, Inference Systems

Prasad Kumkar is the CEO & MD of Inference Systems and writes about AI systems architecture, LLM infrastructure, model serving, evaluation, and production deployment. Over 5+ years, he has worked across computer vision models, L5 autonomous vehicle systems, and LLM research, with a focus on taking complex AI ideas into real-world engineering systems.

His work and writing cover AI systems, large language models, AI agents, multimodal systems, autonomous systems, inference optimization, RAG, evaluation, and production AI engineering.