Apache Kafka
Type: bindings.kafka
Status: stable
Reference: https://docs.dapr.io/reference/components-reference/supported-bindings/kafka/
Example
apiVersion: cra.diagrid.io/v1beta1
kind: Component
metadata:
name: <name>
spec:
type: bindings.kafka
version: v1
metadata:
# Authentication type. This must be set to "password" for this authentication profile.
- name: authType
value: "password"
# The SASL authentication mechanism to use.
- name: saslMechanism
value: "PLAINTEXT"
# The SASL password.
- name: saslPassword
value: "mypassword"
# The SASL username.
- name: saslUsername
value: "myuser"
# A comma-separated list of Kafka brokers.
- name: brokers
value: "mycompany.com:9092,dapr-kafka.myapp.svc.cluster.local:9093"
# Indicates the direction of the binding component.
- name: direction
value: "input,output"
# The max amount of time for the client connection to be kept alive with the broker, as a Go duration, before closing the connection. A zero value (default) means keeping alive indefinitely. (Optional)
#- name: clientConnectionKeepAliveInterval
# value: "0"
# The interval for the client connection's topic metadata to be refreshed with the broker as a Go duration. (Optional)
#- name: clientConnectionTopicMetadataRefreshInterval
# value: "9m"
# A user-provided string sent with every request to the Kafka brokers for logging, debugging, and auditing purposes. (Optional)
#- name: clientID
# value: "sarama"
# Disables consumer retry by setting this to "false". (Optional)
#- name: consumeRetryEnabled
# value: "false"
# The interval between retries when attempting to consume topics. (Optional)
#- name: consumeRetryInterval
# value: "100ms"
# A kafka consumer group to listen on. Each record published to a topic is delivered to one consumer within each consumer group subscribed to the topic. (Optional)
#- name: consumerGroup
# value: "group1"
# Disable TLS for transport security. This is potentially insecure and not recommended for use in production. (Optional)
#- name: disableTls
# value: "false"
# The interval between heartbeats to the consumer coordinator. (Optional)
#- name: heartbeatInterval
# value: "3s"
# The initial offset to use if no offset was previously committed. (Optional)
#- name: initialOffset
# value: "newest"
# The maximum size in bytes allowed for a single Kafka message. (Optional)
#- name: maxMessageBytes
# value: "1024"
# The topic to publish to. (Optional)
#- name: publishTopic
# value: "mytopic"
# Specifies a custom route for incoming events. (Optional)
#- name: route
# value: "/custom-path"
# Enables caching for schemas. (Optional)
#- name: schemaCachingEnabled
# value: "true"
# The TTL for schema caching when publishing a message with latest schema available. (Optional)
#- name: schemaLatestVersionCacheTTL
# value: "5m"
# The Schema Registry credentials API Key. (Optional)
#- name: schemaRegistryAPIKey
# value: "XYAXXAZ"
# The Schema Registry credentials API Secret. (Optional)
#- name: schemaRegistryAPISecret
# value: "ABCDEFGMEADFF"
# The Schema Registry URL. (Optional)
#- name: schemaRegistryURL
# value: "http://mycompany.com:8081"
# The maximum time between heartbeats before the consumer is considered inactive and will timeout. (Optional)
#- name: sessionTimeout
# value: "10s"
# Skip TLS verification. This is potentially insecure and not recommended for use in production. (Optional)
#- name: skipVerify
# value: "false"
# A comma-separated list of topics to subscribe to. (Optional)
#- name: topics
# value: "mytopic1,topic2"
# Kafka cluster version. Note that this must be set to "1.0.0" if you are using Azure Event Hubs with Kafka. (Optional)
#- name: version
# value: "2.0.0.0"
Binding information
Input Binding: yes
Output Binding: yes
Output Binding operations:
create
: Publish a new message in the topic.
Authentication profiles
Available authentication profiles:
-
OIDC Authentication
-
SASL Authentication
-
mTLS Authentication
-
No Authentication
-
AWS IAM
OIDC Authentication
Authenticate using OpenID Connect.
authType
(string)
Required - Authentication type. This must be set to "oidc" for this authentication profile.
Example value: oidc
Allowed values:
- oidc
oidcClientID
(string)
Required - The OAuth2 client ID that has been provisioned in the identity provider.
Example value: my-client-id
oidcClientSecret
(string)
Required - The OAuth2 client secret that has been provisioned in the identity provider.
Example value: KeFg23!
oidcTokenEndpoint
(string)
Required - URL of the OAuth2 identity provider access token endpoint.
Example value: https://identity.example.com/v1/token
oidcExtensions
(string)
String containing a JSON-encoded dictionary of OAuth2/OIDC extensions to request with the access token.
Example value: {"cluster":"kafka","poolid":"kafkapool"}
oidcScopes
(string)
Comma-delimited list of OAuth2/OIDC scopes to request with the access token. Although not required, this field is recommended.
Default value: openid
Example value: openid,kafka-prod
SASL Authentication
Authenticate using SASL.
authType
(string)
Required - Authentication type. This must be set to "password" for this authentication profile.
Example value: password
Allowed values:
- password
saslMechanism
(string)
Required - The SASL authentication mechanism to use.
Default value: PLAINTEXT
Example value: SHA-512
Allowed values:
-
SHA-512
-
SHA-256
-
PLAINTEXT
saslPassword
(string)
Required - The SASL password.
Example value: mypassword
saslUsername
(string)
Required - The SASL username.
Example value: myuser
mTLS Authentication
Authenticate using mTLS.
authType
(string)
Required - Authentication type. This must be set to "mtls" for this authentication profile.
Example value: mtls
Allowed values:
- mtls
caCert
(string)
Required - Certificate authority certificate.
Example value:
-----BEGIN CERTIFICATE-----
<base64-encoded DER>
-----END CERTIFICATE-----
clientCert
(string)
Required - Client certificate.
Example value:
-----BEGIN CERTIFICATE-----
<base64-encoded DER>
-----END CERTIFICATE-----
clientKey
(string)
Required - Client key.
Example value:
-----BEGIN RSA PRIVATE KEY-----
<base64-encoded DER>
-----END RSA PRIVATE KEY-----
No Authentication
Do not perform authentication.
authType
(string)
Required - Authentication type. This must be set to "none" for this authentication profile.
Example value: none
Allowed values:
- none
AWS IAM
Authenticate using AWS IAM credentials or role for AWS MSK
authType
(string)
Required - Authentication type. This must be set to "awsiam" for this authentication profile.
Example value: awsiam
Allowed values:
- awsiam
awsAccessKey
(string)
Required - AWS access key associated with an IAM account.
Example value: AKIAIOSFODNN7EXAMPLE
awsIamRoleArn
(string)
Required - IAM role that has access to MSK. This is another option to authenticate with MSK aside from the AWS Credentials.
Example value: arn:aws:iam::123456789:role/mskRole
awsRegion
(string)
Required - The AWS Region where the MSK Kafka broker is deployed to.
Example value: us-east-1
awsSecretKey
(string)
Required - The secret key associated with the access key.
Example value: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
awsSessionToken
(string)
AWS session token to use. A session token is only required if you are using temporary security credentials.
Example value: TOKEN
awsStsSessionName
(string)
Represents the session name for assuming a role.
Default value: MSKSASLDefaultSession
Example value: MyAppSession
Metadata
brokers
(string)
Required - A comma-separated list of Kafka brokers. Used for input binding direction. Used for output binding direction.
Example value: mycompany.com:9092,dapr-kafka.myapp.svc.cluster.local:9093
direction
(string)
Required - Indicates the direction of the binding component.
Example value: input,output
Allowed values:
-
input
-
output
-
input,output
Documentation: https://docs.dapr.io/reference/api/bindings_api/#binding-direction-optional
clientConnectionKeepAliveInterval
(duration)
The max amount of time for the client connection to be kept alive with the broker, as a Go duration, before closing the connection. A zero value (default) means keeping alive indefinitely.
Default value: 0
Example value: 4m
clientConnectionTopicMetadataRefreshInterval
(duration)
The interval for the client connection's topic metadata to be refreshed with the broker as a Go duration.
Default value: 9m
Example value: 4m
clientID
(string)
A user-provided string sent with every request to the Kafka brokers for logging, debugging, and auditing purposes.
Default value: sarama
Example value: my-dapr-app
consumeRetryEnabled
(bool)
Disables consumer retry by setting this to "false".
Default value: false
Example value: true
consumeRetryInterval
(duration)
The interval between retries when attempting to consume topics.
Default value: 100ms
Example value: 200ms
consumerGroup
(string)
A kafka consumer group to listen on. Each record published to a topic is delivered to one consumer within each consumer group subscribed to the topic. Used for input binding direction.
Example value: group1
disableTls
(bool)
Disable TLS for transport security. This is potentially insecure and not recommended for use in production.
Default value: false
Example value: true
heartbeatInterval
(duration)
The interval between heartbeats to the consumer coordinator.
Default value: 3s
Example value: 5s
initialOffset
(string)
The initial offset to use if no offset was previously committed. Used for input binding direction.
Default value: newest
Example value: oldest
Allowed values:
-
newest
-
oldest
maxMessageBytes
(number)
The maximum size in bytes allowed for a single Kafka message.
Default value: 1024
Example value: 2048
publishTopic
(string)
The topic to publish to. Used for output binding direction.
Example value: mytopic
route
(string)
Specifies a custom route for incoming events.
Example value: /custom-path
Documentation: https://docs.dapr.io/developing-applications/building-blocks/bindings/howto-triggers/#specify-a-custom-route
schemaCachingEnabled
(bool)
Enables caching for schemas.
Default value: true
Example value: true
schemaLatestVersionCacheTTL
(duration)
The TTL for schema caching when publishing a message with latest schema available.
Default value: 5m
Example value: 5m
schemaRegistryAPIKey
(string)
The Schema Registry credentials API Key.
Example value: XYAXXAZ
schemaRegistryAPISecret
(string)
The Schema Registry credentials API Secret.
Example value: ABCDEFGMEADFF
schemaRegistryURL
(string)
The Schema Registry URL.
Example value: http://mycompany.com:8081
sessionTimeout
(duration)
The maximum time between heartbeats before the consumer is considered inactive and will timeout.
Default value: 10s
Example value: 20s
skipVerify
(bool)
Skip TLS verification. This is potentially insecure and not recommended for use in production.
Default value: false
Example value: true
topics
(string)
A comma-separated list of topics to subscribe to. Used for input binding direction.
Example value: mytopic1,topic2
version
(string)
Kafka cluster version. Note that this must be set to "1.0.0" if you are using Azure Event Hubs with Kafka.
Default value: 2.0.0.0
Example value: 0.10.2.0