programming

Simple Knative Eventing in Kubernetes with a Java Listener and Sender

Lightphos
actual-tech
Published in
7 min readAug 19, 2020

--

Previously we ran a service using Knative Server and kicked it off with a http request.

Here we will write our own Knative services that act on events a listener and a sender both in Java using Spring Boot.

Set up

Docker Desktop (macos) 2.3.03
K8s 1.16.5
Istio 1.6.4

Install Eventing

From https://knative.dev/development/install/any-kubernetes-cluster/#installing-the-eventing-component

Install the eventing components:

kubectl apply — filename https://github.com/knative/eventing/releases/download/v0.16.0/eventing-crds.yaml

customresourcedefinition.apiextensions.k8s.io/apiserversources.sources.knative.dev created
customresourcedefinition.apiextensions.k8s.io/brokers.eventing.knative.dev created
customresourcedefinition.apiextensions.k8s.io/channels.messaging.knative.dev created
etc

kubectl apply — filename https://github.com/knative/eventing/releases/download/v0.16.0/eventing-core.yaml

namespace/knative-eventing created
serviceaccount/eventing-controller created
clusterrolebinding.rbac.authorization.k8s.io/eventing-controller created
clusterrolebinding.rbac.authorization.k8s.io/eventing-controller-resolver created
clusterrolebinding.rbac.authorization.k8s.io/eventing-controller-source-observer created
clusterrolebinding.rbac.authorization.k8s.io/eventing-controller-sources-controller created
clusterrolebinding.rbac.authorization.k8s.io/eventing-controller-manipulator created
serviceaccount/pingsource-mt-adapter created
clusterrolebinding.rbac.authorization.k8s.io/knative-eventing-pingsource-mt-adapter created
serviceaccount/eventing-webhook created

etc

Istio Cluster Gateway

This is needed for correct service discovery in Knative Eventing service startup.

cat << EOF > ./istio-minimal-operator.yaml
apiVersion: install.istio.io/v1alpha1
kind: IstioOperator
spec:
values:
global:
proxy:
autoInject: disabled
useMCP: false
# The third-party-jwt is not enabled on all k8s.
# See: https://istio.io/docs/ops/best-practices/security/#configure-third-party-service-account-tokens
jwtPolicy: first-party-jwt

addonComponents:
pilot:
enabled: true
prometheus:
enabled: false

components:
ingressGateways:
- name: istio-ingressgateway
enabled: true
- name: cluster-local-gateway
enabled: true
label:
istio: cluster-local-gateway
app: cluster-local-gateway
k8s:
service:
type: ClusterIP
ports:
- port: 15020
name: status-port
- port: 80
name: http2
- port: 443
name: https
EOF

istioctl manifest apply -f istio-minimal-operator.yml

Event Display and Ping Source

First we will use a pre-defined event display Service and Ping source.

cat <<EOF | kubectl create -f -
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: event-display
spec:
template:
spec:
containers:
- image: gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/event_display
EOF

kn service list
NAME URL LATEST AGE CONDITIONS READY REASON
event-display
http://event-display.default.example.com 7s 0 OK / 3 Unknown RevisionMissing : Configuration “event-display” is waiting for a Revision to become ready.

Let’s create a standard Ping Source

cat <<EOF | kubectl create -f -
apiVersion: sources.knative.dev/v1alpha2
kind: PingSource
metadata:
name: test-ping-source
spec:
schedule: "*/2 * * * *"
jsonData: '{"message": "Hello world!"}'
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
EOF

This will send the message “Hello World” every two minutes.

kn source list
NAME TYPE RESOURCE SINK READY
test-ping-source PingSource pingsources.sources.knative.dev ksvc:event-display True

Check if the event display service is receiving events.

kubectl logs -l serving.knative.dev/service=event-display -c user-container — since=10m

specversion: 1.0
type: dev.knative.sources.ping
source: /apis/v1/namespaces/default/pingsources/test-ping-source
id: c9270430-a649–4980–87a1-f9a7a985d2ac
time: 2020–08–05T00:32:00.003686838Z
datacontenttype: application/json
Data,
{
“message”: “Hello world!”
}

You will see the pod being created and deleted which each ping (every 2 mins)

All good our Knative eventing services have been installed and are working.

Clean up

kubectl delete pingsources.sources.knative.dev test-ping-source

or

kn source ping delete test-ping-source

And also delete the event-display

kn service delete event-display

Java Based Event Listener

Let’s write our own event listener in Java, in kn-listener sub project.

@Slf4j
@RestController
@SpringBootApplication
public class KnEventListener {

public static void main(String[] args) {
SpringApplication.run(KnEventListener.class, args);
}

@GetMapping(value = "/", produces = {MediaType.APPLICATION_JSON_VALUE})
public ResponseEntity<String> status() {
return ResponseEntity.ok("{\"status\": \"UP\"}");
}


@PostMapping(value = "/", consumes = {MediaType.APPLICATION_JSON_VALUE})
public ResponseEntity<Void> event(@RequestHeader Map<String, Object> headers, @RequestBody String body) throws Exception {
log.info(LocalDateTime.now() + ", " + body);
return ResponseEntity.accepted().build();
}

}

build.gradle (parent)

plugins {
id 'org.springframework.boot' version '2.3.2.RELEASE'
id "io.freefair.lombok" version "5.1.1"
}


subprojects {

group 'uk.co.vadalg'
version '1.0-SNAPSHOT'

apply plugin: 'application'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'io.freefair.lombok'

repositories {
jcenter()
}

}

build.gradle (listener)

mainClassName  = "uk.co.ac.vadalg.KnEventListener"

dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.junit.jupiter:junit-jupiter:5.4.2'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
}

test {
useJUnitPlatform()
}

bootBuildImage {
imageName = 'localhost:5007/kn-listener'
}

Create docker image (using gradle)

gradle bootBuildImage

then push to local docker registry and create the service (see previous blog)

docker push localhost:5007/kn-listener

kn service create knlistener — image localhost:5007/kn-listener

If you have docker image connection issues you may need to check you have edited the configmap (see previous blog )

Test if the service is up:

curl -i -H “Host: knlistener.default.example.com” localhost
HTTP/1.1 200 OK
content-length: 16
content-type: application/json
date: Thu, 06 Aug 2020 22:27:39 GMT
x-envoy-upstream-service-time: 5
server: istio-envoy

{“status”: “UP”}

Start the ping again but this time referencing knlistener

cat <<EOF | kubectl create -f -
apiVersion: sources.knative.dev/v1alpha2
kind: PingSource
metadata:
name: test-ping-source
spec:
schedule: "*/2 * * * *"
jsonData: '{"message": "Hi earth!"}'
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: knlistener
EOF

Check the logs

kubectl logs -l serving.knative.dev/service=knlistener -c user-container — since=10m

2020-08-06 22:32:03.778  INFO 1 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2020-08-06 22:32:03.778 INFO 1 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1073 ms
2020-08-06 22:32:04.052 INFO 1 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2020-08-06 22:32:04.271 INFO 1 --- [ main] o.s.b.a.e.web.EndpointLinksResolver : Exposing 2 endpoint(s) beneath base path '/actuator'
2020-08-06 22:32:04.329 INFO 1 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2020-08-06 22:32:04.342 INFO 1 --- [ main] uk.co.ac.vadalg.KnEventListener : Started KnEventListener in 2.058 seconds (JVM running for 2.479)
2020-08-06 22:32:04.544 INFO 1 --- [nio-8080-exec-3] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2020-08-06 22:32:04.545 INFO 1 --- [nio-8080-exec-3] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2020-08-06 22:32:04.550 INFO 1 --- [nio-8080-exec-3] o.s.web.servlet.DispatcherServlet : Completed initialization in 5 ms
2020-08-06 22:32:04.641 INFO 1 --- [nio-8080-exec-3] uk.co.ac.vadalg.KnEventListener : 2020-08-06T22:32:04.641, {"message":"Hi earth!"}

Boing, spring event listener. Nice.

Delete the test source

kn source ping delete test-ping-source

Can also test the kn service listener with a curl pod:

kubectl run curl \
--image=curlimages/curl --rm=true --restart=Never -ti -- \
-X POST -v \
-H "content-type: application/json" \
-H "ce-specversion: 1.0" \
-H "ce-source: http://curl-command" \
-H "ce-type: curl.demo" \
-H "ce-id: ekbetron" \
-d '{"name":"lightphos"}' \
http://knlistener.default.svc

Java Based Event Sender

Let’s write our own Kn event source in Java.

We need three things a sender, sink binding and a cronjob. We will keep the existing knlistener we wrote above to receive the events.

Simple Java Sender

@Slf4j
@RestController
@SpringBootApplication
public class KnEventSender {

public static void main(String[] args) {
SpringApplication.run(KnEventSender.class, args);
}

@Value("${ksink}")
private String sink;

@PostConstruct
public void fire() {
try {
log.info("Fire {} ", send());
System.exit(0);
}
catch (Exception e ) {
log.error("sink {}", sink);
log.error("error ", e);
}
}

@GetMapping(value = "/", produces = {MediaType.APPLICATION_JSON_VALUE})
public ResponseEntity<String> send() {
log.info("Sink {}", sink);
RestTemplate restTemplate = new RestTemplate();
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);

HttpEntity<String> request = new HttpEntity<>("{ 'test': 'from kn sender " + Math.random() + "'}", headers);

return restTemplate.postForEntity(sink, request, String.class);
}

}

application.yml (note the K_SINK environment variable)

spring.application.name: kneventsender 
ksink: ${K_SINK}

build.gradle (kn-sender)

mainClassName  = "uk.co.ac.vadalg.KnEventSender"

dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.junit.jupiter:junit-jupiter:5.4.2'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
}

test {
useJUnitPlatform()
}

bootBuildImage {
imageName = 'localhost:5007/kn-sender'
}

Build and push to local docker registry

gradle bootBuildImage

docker push localhost:5007/kn-sender

No need to create a sender service as we will use a cronjob to do that and fire events periodically.

Sink Binding

This references the cronjob (knsender-cron) and binds the service created by it to our kn listener.

apiVersion: sources.knative.dev/v1alpha1
kind: SinkBinding
metadata:
name: knsender-heartbeat
spec:
subject:
apiVersion: batch/v1
kind: Job
selector:
matchLabels:
app: knsender-cron

sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: knlistener
ceOverrides:
extensions:
sink: bound

kubectl apply -f <above>

CronJob Trigger

This will set up a cron job to run every minute or so and kick off our java sender in concert with the sink bind above.

apiVersion: batch/v1beta1
kind: CronJob
metadata:
name: knsender-cron
spec:
# Run every minute
schedule: "* * * * *"
jobTemplate:
metadata:
labels:
app: knsender-cron
spec:
template:
spec:
restartPolicy: Never
containers:
- name: knsender-heartbeat
image: localhost:5007/kn-sender

kubectl apply -f <above>

Check out Jobs

kubectl get cronjob
NAME SCHEDULE SUSPEND ACTIVE LAST SCHEDULE AGE
knsender-cron * * * * * False 0 28s 30m

kubectl get job
NAME COMPLETIONS DURATION AGE
knsender-cron-1597849380 1/1 4s 3m2s
knsender-cron-1597849440 1/1 4s 2m2s
knsender-cron-1597849500 1/1 9s 62s
knsender-cron-1597849560 0/1 2s 2s

Listener

The listener should be getting invoked and receiving events:

kubectl logs -l serving.knative.dev/service=knlistener -c user-container — since=10m

2020–08–19 15:00:07.106 INFO 1 — — [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1064 ms
2020–08–19 15:00:07.432 INFO 1 — — [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService ‘applicationTaskExecutor’
2020–08–19 15:00:07.632 INFO 1 — — [ main] o.s.b.a.e.web.EndpointLinksResolver : Exposing 2 endpoint(s) beneath base path ‘/actuator’
2020–08–19 15:00:07.700 INFO 1 — — [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ‘’
2020–08–19 15:00:07.733 INFO 1 — — [ main] uk.co.ac.vadalg.KnEventListener : Started KnEventListener in 2.14 seconds (JVM running for 2.508)
2020–08–19 15:00:07.753 INFO 1 — — [nio-8080-exec-3] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet ‘dispatcherServlet’
2020–08–19 15:00:07.753 INFO 1 — — [nio-8080-exec-3] o.s.web.servlet.DispatcherServlet : Initializing Servlet ‘dispatcherServlet’
2020–08–19 15:00:07.758 INFO 1 — — [nio-8080-exec-3] o.s.web.servlet.DispatcherServlet : Completed initialization in 5 ms
2020–08–19 15:00:07.834 INFO 1 — — [nio-8080-exec-3]
uk.co.ac.vadalg.KnEventListener : 2020–08–19T15:00:07.834, { ‘test’: ‘from kn sender 0.9763648332730366’}
2020–08–19 15:01:03.744 INFO 1 — — [nio-8080-exec-9]
uk.co.ac.vadalg.KnEventListener : 2020–08–19T15:01:03.744, { ‘test’: ‘from kn sender 0.8349076066212481’}

There you are, java sender and listener via knative.

Delete

kubectl delete cronjob knsender-cron
cronjob.batch “knsender-cron” deleted

kubectl get po
NAME READY STATUS RESTARTS AGE
knlistener-rzcpv-1-deployment-789df7845b-bgrmn 2/2 Terminating 0 4m3s

Conclusion

We installed our knative event dependencies, tested the installation using standard event resources then created our own Java listener and Java sender to receive and send json based events in a ‘serverless’ manner (the pods are removed after inactivity of 30 secs).

Event Sources

Knative comes ready with core sources of events such as:

  • ApiServerSource, fired when a k8s resource is created
  • PingSource (cronjob)
  • ContainerSource, container images as sources
  • KafkaSource

See https://knative.dev/docs/eventing/sources/

Source Code

All source code can be found here:

https://gitlab.com/lightphos/spring/vadalg

Originally published at https://blog.ramjee.uk on August 19, 2020.

--

--