Use EventBus and Trigger
This document gives an example of how to use EventBus and Trigger.
Prerequisites
- You need to create a function as the target function to be triggered. Please refer to Create a function for more details.
- You need to create a Kafka cluster. Please refer to Create a Kafka cluster for more details.
Deploy an NATS streaming server
Run the following commands to deploy an NATS streaming server. This document uses nats://nats.default:4222
as the access address of the NATS streaming server and stan
as the cluster ID. For more information, see NATS Streaming (STAN).
helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm install nats nats/nats
helm install stan nats/stan --set stan.nats.url=nats://nats:4222
Create an OpenFuncAsync Runtime Function
Use the following content to create a configuration file (for example,
openfuncasync-function.yaml
) for the target function, which is triggered by the Trigger CRD and prints the received message.apiVersion: core.openfunction.io/v1beta1 kind: Function metadata: name: trigger-target spec: version: "v1.0.0" image: openfunctiondev/v1beta1-trigger-target:latest port: 8080 serving: runtime: "async" scaleOptions: keda: scaledObject: pollingInterval: 15 minReplicaCount: 0 maxReplicaCount: 10 cooldownPeriod: 30 triggers: - type: stan metadata: natsServerMonitoringEndpoint: "stan.default.svc.cluster.local:8222" queueGroup: "grp1" durableName: "ImDurable" subject: "metrics" lagThreshold: "10" inputs: - name: autoscaling-pubsub component: eventbus topic: metrics pubsub: eventbus: type: pubsub.natsstreaming version: v1 metadata: - name: natsURL value: "nats://nats.default:4222" - name: natsStreamingClusterID value: "stan" - name: subscriptionType value: "queue" - name: durableSubscriptionName value: "ImDurable" - name: consumerID value: "grp1"
Run the following command to apply the configuration file.
kubectl apply -f openfuncasync-function.yaml
Create an EventBus and an EventSource
Use the following content to create a configuration file (for example,
eventbus.yaml
) for an EventBus.apiVersion: events.openfunction.io/v1alpha1 kind: EventBus metadata: name: default spec: natsStreaming: natsURL: "nats://nats.default:4222" natsStreamingClusterID: "stan" subscriptionType: "queue" durableSubscriptionName: "ImDurable"
Use the following content to create a configuration file (for example,
eventsource.yaml
) for an EventSource.Note
Set the name of the event bus throughspec.eventBus
.apiVersion: events.openfunction.io/v1alpha1 kind: EventSource metadata: name: my-eventsource spec: logLevel: "2" eventBus: "default" kafka: sample-two: brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092" topic: "events-sample" authRequired: false
Run the following commands to apply these configuration files.
kubectl apply -f eventbus.yaml kubectl apply -f eventsource.yaml
Run the following commands to check the results.
$ kubectl get eventsources.events.openfunction.io NAME EVENTBUS SINK STATUS my-eventsource default Ready $ kubectl get eventbus.events.openfunction.io NAME AGE default 6m53s $ kubectl get components NAME AGE serving-6r5dl-component-eventbus-jlpqf 11m serving-9689d-component-ebfes-my-eventsource-cmcbw 6m57s serving-9689d-component-esc-kafka-sample-two-l99cg 6m57s serving-k6zw8-component-cron-9x8hl 61m serving-k6zw8-component-kafka-server-sjrzs 61m $ kubectl get deployments.apps NAME READY UP-TO-DATE AVAILABLE AGE serving-6r5dl-deployment-v100-m7nq2 0/0 0 0 12m serving-9689d-deployment-v100-5qdvk 1/1 1 1 7m17s
Note
In the case of using the event bus, the workflow of the EventSource controller is described as follows:
- Create an EventSource custom resource named
my-eventsource
. - Retrieve and reorganize the configuration of the EventBus, including the EventBus name (
default
in this example) and the name of the Dapr component associated with the EventBus. - Create a Dapr component named
serving-xxxxx-component-ebfes-my-eventsource-xxxxx
to enable the EventSource to associate with the event bus. - Create a Dapr component named
serving-xxxxx-component-esc-kafka-sample-two-xxxxx
to enable the EventSource to associate with the event source. - Create a Deployment named
serving-xxxxx-deployment-v100-xxxxx
for processing events.
- Create an EventSource custom resource named
Create a Trigger
Use the following content to create a configuration file (for example,
trigger.yaml
) for a Trigger.Note
- Set the event bus associated with the Trigger through
spec.eventBus
. - Set the event input source through
spec.inputs
. - This is a simple trigger that collects events from the EventBus named
default
. When it retrieves asample-two
event from the EventSourcemy-eventsource
, it triggers a Knative service namedfunction-sample-serving-qrdx8-ksvc-fwml8
and sends the event to the topicmetrics
of the event bus at the same time.
apiVersion: events.openfunction.io/v1alpha1 kind: Trigger metadata: name: my-trigger spec: logLevel: "2" eventBus: "default" inputs: inputDemo: eventSource: "my-eventsource" event: "sample-two" subscribers: - condition: inputDemo topic: "metrics"
- Set the event bus associated with the Trigger through
Run the following command to apply the configuration file.
kubectl apply -f trigger.yaml
Run the following commands to check the results.
$ kubectl get triggers.events.openfunction.io NAME EVENTBUS STATUS my-trigger default Ready $ kubectl get eventbus.events.openfunction.io NAME AGE default 62m $ kubectl get components NAME AGE serving-9689d-component-ebfes-my-eventsource-cmcbw 46m serving-9689d-component-esc-kafka-sample-two-l99cg 46m serving-dxrhd-component-eventbus-t65q7 13m serving-zwlj4-component-ebft-my-trigger-4925n 100s
Note
In the case of using the event bus, the workflow of the Trigger controller is as follows:
- Create a Trigger custom resource named
my-trigger
. - Retrieve and reorganize the configuration of the EventBus, including the EventBus name (
default
in this example) and the name of the Dapr component associated with the EventBus. - Create a Dapr component named
serving-xxxxx-component-ebft-my-trigger-xxxxx
to enable the Trigger to associatie with the event bus. - Create a Deployment named
serving-xxxxx-deployment-v100-xxxxx
for processing trigger tasks.
- Create a Trigger custom resource named
Create an Event Producer
Use the following content to create an event producer configuration file (for example,
events-producer.yaml
).apiVersion: core.openfunction.io/v1beta1 kind: Function metadata: name: events-producer spec: version: "v1.0.0" image: openfunctiondev/v1beta1-bindings:latest serving: template: containers: - name: function imagePullPolicy: Always runtime: "async" inputs: - name: cron component: cron outputs: - name: target component: kafka-server operation: "create" bindings: cron: type: bindings.cron version: v1 metadata: - name: schedule value: "@every 2s" kafka-server: type: bindings.kafka version: v1 metadata: - name: brokers value: "kafka-server-kafka-brokers:9092" - name: topics value: "events-sample" - name: consumerGroup value: "bindings-with-output" - name: publishTopic value: "events-sample" - name: authRequired value: "false"
Run the following command to apply the configuration file.
kubectl apply -f events-producer.yaml
Run the following commands to observe changes of the target asynchronous function.
$ kubectl get functions.core.openfunction.io NAME BUILDSTATE SERVINGSTATE BUILDER SERVING URL AGE trigger-target Skipped Running serving-dxrhd 20m $ kubectl get po --watch NAME READY STATUS RESTARTS AGE serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 0/2 Pending 0 0s serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 0/2 Pending 0 0s serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 0/2 ContainerCreating 0 0s serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 0/2 ContainerCreating 0 2s serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 1/2 Running 0 4s serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 1/2 Running 0 4s serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 2/2 Running 0 4s
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.