Skip to main content
Version: 8.0.11 (latest)

Kafka Backup and restore

To backup and restore Kafka topic data, Adobe S3 Kafka connector is used which periodically polls data from Kafka and in turn, uploads it to S3. Each chunk of data is represented as an S3 object. More details about the connector can be found here.

During Restore, topic messages are purged before the restore operation is performed. This is done to make sure that topic configuration remains the same after restoration.

Assumptions

  • A ConfigMap containing the parameters for the connector is present in the cluster.
  • Topics should be present in the Kafka cluster before taking the backup.
  • No consumer should be consuming messages from the topic during restore.

Setup Kafka Cluster

If it hasn't been done already, the strimzi Helm repository needs to be added to your local configuration:

## Add strimzi helm repo
$ helm repo add strimzi https://strimzi.io/charts/
$ helm repo update

Install the [Strimzi Cluster Operator] from the strimzi Helm repository:

## create namespace
$ kubectl create namespace kafka-test
$ helm install kafka-release strimzi/strimzi-kafka-operator --namespace kafka-test

Setup Kafka Cluster with one ZooKeeper and one Kafka broker instance:

Create a file kafka-cluster.yaml with the following contents

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
version: 3.2.0
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
logging:
type: inline
loggers:
kafka.root.logger.level: "INFO"
template:
pod:
securityContext:
runAsUser: 0
fsGroup: 0
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
inter.broker.protocol.version: "3.1"
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 100Gi
deleteClaim: false
zookeeper:
replicas: 1
template:
pod:
securityContext:
runAsUser: 0
fsGroup: 0
logging:
type: inline
loggers:
zookeeper.root.logger.level: "INFO"
storage:
type: persistent-claim
size: 100Gi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}

And then apply the file using:

$ kubectl --namespace kafka-test apply -f kafka-cluster.yaml

Add some data to the Kafka topic [blogs] using Kafka image strimzi/kafka:0.20.0-kafka-2.6.0 provided by strimzi:

## Create a topic on Kafka server
$ kubectl -n kafka-test run kafka-topic -ti --image=strimzi/kafka:0.20.0-kafka-2.6.0 --rm=true --restart=Never -- bin/kafka-topics.sh --create --topic blogs --bootstrap-server my-cluster-kafka-bootstrap:9092
## Create a producer to push events to blogs topic
$ kubectl -n kafka-test run kafka-producer -ti --image=strimzi/kafka:0.20.0-kafka-2.6.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic blogs
>{"userId": 1,"id": 1,"title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit"}
>{"userId": 1,"id": 2,"title": "qui est esse"}
>{"userId": 1,"id": 3,"title": "ea molestias quasi exercitationem repellat qui ipsa sit aut"}
Note

To take backup of multiple topics, add comma separated topic names in adobe-s3-sink.properties

Create ConfigMap

A config map with the following configuration should be provided to the Kafka Connector:

  • Details of the S3 bucket and Kafka broker address
  • adobe-s3-sink.properties file containing properties related to [s3 sink Connector]
  • adobe-s3-source.properties file containing properties related to [s3 source Connector]
  • kafkaConfiguration.properties containing properties related to Kafka server

Create files with adobe connector configurations:

adobe-s3-sink.properties:

connector.class=com.spredfast.kafka.connect.s3.sink.S3SinkConnector
tasks.max=4
format=binary
format.include.keys=true
topics=blogs,feeds
# too many records can overwhelm the poll loop on large topics and will result in
# Connect continously rebalancing without making progress
consumer.max.poll.records=500
# Flushing to S3 can take some time, so allow for more than the default 5 seconds when shutting down.
task.shutdown.graceful.timeout.ms=30000
# The converters specify the format of data in Kafka and how to translate it into Connect data
key.converter=com.spredfast.kafka.connect.s3.AlreadyBytesConverter
value.converter=com.spredfast.kafka.connect.s3.AlreadyBytesConverter
s3.prefix=topics
s3.path_style=true
local.buffer.dir=/tmp/kafka-connect-s3.buffer

adobe-s3-source.properties:

tasks.max=4
connector.class=com.spredfast.kafka.connect.s3.source.S3SourceConnector
format=binary
format.include.keys=true
topics=blogs
# too many records can overwhelm the poll loop on large topics and will result in
# Connect continously rebalancing without making progress
consumer.max.poll.records=500
# Flushing to S3 can take some time, so allow for more than the default 5 seconds when shutting down.
task.shutdown.graceful.timeout.ms=30000
# The converters specify the format of data in Kafka and how to translate it into Connect data
key.converter=com.spredfast.kafka.connect.s3.AlreadyBytesConverter
value.converter=com.spredfast.kafka.connect.s3.AlreadyBytesConverter
s3.prefix=topics
s3.path_style=true

adobe-kafkaConfiguration.properties:

# bootstrap server points to kafka broker address which is static
bootstrap.servers=PLAINTEXT://my-cluster-kafka-bootstrap:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/opt/
$ kubectl create configmap --namespace kafka-test s3config --from-file=adobe-s3-sink.properties=./adobe-s3-sink.properties \
--from-file=adobe-kafkaConfiguration.properties=./adobe-kafkaConfiguration.properties --from-file=adobe-s3-source.properties=./adobe-s3-source.properties \
--from-literal=timeinSeconds=1800

Create Blueprint

Create a file kafka-blueprint.yaml with the following contents

apiVersion: cr.kanister.io/v1alpha1
kind: Blueprint
metadata:
name: kafka-blueprint
actions:
backup:
outputArtifacts:
s3Dump:
keyValue:
s3path: '{{ .Phases.setupPhase.Output.s3path }}'
backupDetail: '{{ .Phases.setupPhase.Output.backupDetail }}'
phases:
- func: KubeTask
name: setupPhase
args:
namespace: "{{ .Object.metadata.namespace }}"
podOverride:
containers:
- name: container
imagePullPolicy: IfNotPresent
restartPolicy: Never
image: ghcr.io/kanisterio/kafka-adobe-s3-sink-connector
command:
- bash
- -o
- errexit
- -o
- pipefail
- -c
- |
mkdir /tmp/config
{{- if .Profile.Credential.KeyPair }}
export AWS_SECRET_KEY="{{ .Profile.Credential.KeyPair.Secret }}"
export AWS_ACCESS_KEY="{{ .Profile.Credential.KeyPair.ID }}"
{{- else }}
export AWS_SECRET_KEY="{{ .Profile.Credential.Secret.Data.aws_secret_access_key | toString }}"
export AWS_ACCESS_KEY="{{ .Profile.Credential.Secret.Data.aws_access_key_id | toString }}"
{{- end }}
REGION="{{ .Profile.Location.Region }}"
BUCKET="{{ .Profile.Location.Bucket }}"
export CONNECTORNAME=$HOSTNAME
S3CONFIG="{{ index .Object.data "adobe-s3-sink.properties" | toString }}"
echo -e "${S3CONFIG}\ns3.region=${REGION}\ns3.bucket=${BUCKET}\nname=${CONNECTORNAME}\n" > /tmp/config/s3config.properties
S3FOLDER=`cat /tmp/config/s3config.properties | grep "s3.prefix=" | awk -F "=" '{print $2}'`
S3_TOPIC_PATH="${S3FOLDER}_{{ .Time | date "2006-01-02T15:04:05" }}"
sed -i "/^s3.prefix/d" /tmp/config/s3config.properties
echo -e "\ns3.prefix=${S3_TOPIC_PATH}\n" >> /tmp/config/s3config.properties

export S3_PATH="s3://{{ .Profile.Location.Bucket }}/${S3_TOPIC_PATH}"
KAFKACONFIG="{{ index .Object.data "adobe-kafkaConfiguration.properties" | toString }}"
echo "$KAFKACONFIG" > /tmp/config/kafkaConfig.properties

export TIMEINSECONDS="{{ index .Object.data "timeinSeconds" | toString }}"

export BOOTSTRAPSERVER=`cat /tmp/config/kafkaConfig.properties | grep "bootstrap.servers=" | awk -F "=" '{print $2}'`

echo "============ENV variable set====================="
/bin/connect-standalone /tmp/config/kafkaConfig.properties /tmp/config/s3config.properties &
export PID=$!
# script to monitors sink connector backup all topic and stops the connector when lag is zero
sh monitorconnect.sh
exit 0
restore:
inputArtifactNames:
- s3Dump
phases:
- func: KubeTask
name: restorePreHookPhase
args:
namespace: "{{ .Object.metadata.namespace }}"
podOverride:
containers:
- name: container
imagePullPolicy: IfNotPresent
image: ghcr.io/kanisterio/kafka-adobe-s3-source-connector
command:
- bash
- -o
- errexit
- -o
- pipefail
- -c
- |
mkdir /tmp/config
{{- if .Profile.Credential.KeyPair }}
export AWS_SECRET_KEY="{{ .Profile.Credential.KeyPair.Secret }}"
export AWS_ACCESS_KEY="{{ .Profile.Credential.KeyPair.ID }}"
{{- else }}
export AWS_SECRET_KEY="{{ .Profile.Credential.Secret.Data.aws_secret_access_key | toString }}"
export AWS_ACCESS_KEY="{{ .Profile.Credential.Secret.Data.aws_access_key_id | toString }}"
{{- end }}
export REGION="{{ .Profile.Location.Region }}"
export BUCKET="{{ .Profile.Location.Bucket }}"

KAFKACONFIG="{{ index .Object.data "adobe-kafkaConfiguration.properties" | toString }}"
echo "$KAFKACONFIG" > /tmp/config/kafkaConfig.properties

S3CONFIG="{{ index .Object.data "adobe-s3-source.properties" | toString }}"
echo "${S3CONFIG}" > /tmp/config/s3config.properties

export BOOTSTRAPSERVER=`cat /tmp/config/kafkaConfig.properties | grep "bootstrap.servers=" | awk -F "=" '{print $2}'`

cat /tmp/config/s3config.properties | grep "topics=" | awk -F "=" '{print $2}' | tr , "\n" > /tmp/config/topics.txt
while IFS= read -r TOPIC
do
# getting topic name from configuration file
echo "purging topic $TOPIC"
# getting retention period as set for the topic
export RETENTION_PERIOD="$(/bin/kafka-configs --describe --bootstrap-server "$BOOTSTRAPSERVER" --entity-type topics --entity-name "$TOPIC" --all | grep -m1 retention.ms= | sed 's/[^0-9]*//; s/ .*//')"
# purging topic by setting retention to 1ms
/bin/kafka-configs --bootstrap-server "$BOOTSTRAPSERVER" --entity-type topics --entity-name "$TOPIC" --alter --add-config retention.ms=1
echo "retention set to 1"
# Verifying Purging is complete
export START_OFFSET="$(/bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list "$BOOTSTRAPSERVER" --topic "$TOPIC" --time -1)"
export END_OFFSET="$(/bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list "$BOOTSTRAPSERVER" --topic "$TOPIC" --time -2)"
until [ "$START_OFFSET" = "$END_OFFSET" ]
do
echo "purging in process"
START_OFFSET="$(/bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list "$BOOTSTRAPSERVER" --topic "$TOPIC" --time -1)"
END_OFFSET="$(/bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list "$BOOTSTRAPSERVER" --topic "$TOPIC" --time -2)"
sleep 1
done
echo "purging complete for topic $TOPIC"
echo "resetting the retention to previous value"
# reset the retention period to previous value
/bin/kafka-configs --bootstrap-server "$BOOTSTRAPSERVER" --entity-type topics --entity-name "$TOPIC" bin/kafka-configs.sh --alter --add-config retention.ms="$RETENTION_PERIOD"
done < "/tmp/config/topics.txt"
- func: KubeTask
name: restorePhase
args:
namespace: "{{ .Object.metadata.namespace }}"
podOverride:
containers:
- name: container
imagePullPolicy: IfNotPresent
image: ghcr.io/kanisterio/kafka-adobe-s3-source-connector
command:
- bash
- -o
- errexit
- -o
- pipefail
- -c
- |
mkdir /tmp/config
{{- if .Profile.Credential.KeyPair }}
export AWS_SECRET_KEY="{{ .Profile.Credential.KeyPair.Secret }}"
export AWS_ACCESS_KEY="{{ .Profile.Credential.KeyPair.ID }}"
{{- else }}
export AWS_SECRET_KEY="{{ .Profile.Credential.Secret.Data.aws_secret_access_key | toString }}"
export AWS_ACCESS_KEY="{{ .Profile.Credential.Secret.Data.aws_access_key_id | toString }}"
{{- end }}
REGION="{{ .Profile.Location.Region }}"
BUCKET="{{ .Profile.Location.Bucket }}"
export CONNECTORNAME=$HOSTNAME
S3CONFIG="{{ index .Object.data "adobe-s3-source.properties" | toString }}"
echo -e "${S3CONFIG}\ns3.region=${REGION}\ns3.bucket=${BUCKET}\nname=${CONNECTORNAME}\n" > /tmp/config/s3config.properties
sed -i "/^s3.prefix/d" /tmp/config/s3config.properties

S3_PATH="{{ .ArtifactsIn.s3Dump.KeyValue.s3path }}"
echo $S3_PATH

export TOPICS_DIR="$(echo $S3_PATH | awk -F "/" '{print $(NF)}')"
echo -e "\ns3.prefix=${TOPICS_DIR}\n" >> /tmp/config/s3config.properties

KAFKACONFIG="{{ index .Object.data "adobe-kafkaConfiguration.properties" | toString }}"
echo "$KAFKACONFIG" > /tmp/config/kafkaConfig.properties

TOPIC_DETAIL="{{ .ArtifactsIn.s3Dump.KeyValue.backupDetail }}"
export BOOTSTRAPSERVER=`cat /tmp/config/kafkaConfig.properties | grep "bootstrap.servers=" | awk -F "=" '{print $2}'`
export TOPIC_LIST=`cat /tmp/config/s3config.properties | grep "topics=" | awk -F "=" '{print $2}'`

echo "============ENV variable set====================="

# start kafka source connector
sh /bin/connect-standalone /tmp/config/kafkaConfig.properties /tmp/config/s3config.properties &
export PID=$!
# script to monitors source connector to restore all topic and stops the connector successfully
sh monitorconnect.sh
exit 0
delete:
inputArtifactNames:
- s3Dump
phases:
- func: KubeTask
name: deleteFromBlobStore
args:
podOverride:
containers:
- name: container
imagePullPolicy: IfNotPresent
image: ghcr.io/kanisterio/kafka-adobe-s3-source-connector
namespace: "{{ .Namespace.Name }}"
command:
- bash
- -o
- errexit
- -o
- pipefail
- -c
- |
{{- if .Profile.Credential.KeyPair }}
export AWS_SECRET_KEY="{{ .Profile.Credential.KeyPair.Secret }}"
export AWS_ACCESS_KEY="{{ .Profile.Credential.KeyPair.ID }}"
{{- else }}
export AWS_SECRET_KEY="{{ .Profile.Credential.Secret.Data.aws_secret_access_key | toString }}"
export AWS_ACCESS_KEY="{{ .Profile.Credential.Secret.Data.aws_access_key_id | toString }}"
{{- end }}
export S3PATH="{{ .ArtifactsIn.s3Dump.KeyValue.s3path }}"
export REGION="{{ .Profile.Location.Region }}"
export BUCKET="{{ .Profile.Location.Bucket }}"
# script to clean the s3 path
python3 cleanup.py

And then apply the file using:

$ kubectl --namespace kasten-io apply -f kafka-blueprint.yaml

Alternatively, use the Blueprints page on Veeam Kasten Dashboard to create the Blueprint resource.

Once the Blueprint gets created, annotate the ConfigMap with the below annotations to instruct Veeam Kasten to use this Blueprint while performing backup and restore operations on the Kafka instance.

$ kubectl -n kafka-test annotate configmaps/s3config kanister.kasten.io/blueprint=kafka-blueprint

Finally, use Veeam Kasten to backup and restore the application.