Apache Kafka
Type: pubsub.kafka
Status: stable
Reference: https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-apache-kafka/
Example
apiVersion: cra.diagrid.io/v1beta1
kind: Component
metadata:
name: <name>
spec:
type: pubsub.kafka
version: v1
metadata:
# Authentication type. This must be set to "password" for this authentication profile.
- name: authType
value: "password"
# The SASL authentication mechanism to use.
- name: saslMechanism
value: "PLAINTEXT"
# The SASL password.
- name: saslPassword
value: "mypassword"
# The SASL username.
- name: saslUsername
value: "myuser"
# A comma-separated list of Kafka brokers.
- name: brokers
value: "mycompany.com:9092,dapr-kafka.myapp.svc.cluster.local:9093"
# The number of events to buffer in internal and external channels. (Optional)
#- name: channelBufferSize
# value: "256"
# The max amount of time for the client connection to be kept alive with the broker, as a Go duration, before closing the connection. A zero value (default) means keeping alive indefinitely. (Optional)
#- name: clientConnectionKeepAliveInterval
# value: "0"
# The interval for the client connection's topic metadata to be refreshed with the broker as a Go duration. (Optional)
#- name: clientConnectionTopicMetadataRefreshInterval
# value: "9m"
# A user-provided string sent with every request to the Kafka brokers for logging, debugging, and auditing purposes. (Optional)
#- name: clientID
# value: "sarama"
# Disables consumer retry by setting this to "false". (Optional)
#- name: consumeRetryEnabled
# value: "false"
# The interval between retries when attempting to consume topics. (Optional)
#- name: consumeRetryInterval
# value: "100ms"
# The default number of message bytes to fetch from the broker in each request. (Optional)
#- name: consumerFetchDefault
# value: "1048576"
# The minimum number of message bytes to fetch in a request. (Optional)
#- name: consumerFetchMin
# value: "1"
# A kafka consumer group to listen on. Each record published to a topic is delivered to one consumer within each consumer group subscribed to the topic. (Optional)
#- name: consumerGroup
# value: "group1"
# Disable TLS for transport security. This is potentially insecure and not recommended for use in production. (Optional)
#- name: disableTls
# value: "false"
# Enables URL escaping of the message header values. It allows sending headers with special characters that are usually not allowed in HTTP headers. (Optional)
#- name: escapeHeaders
# value: "false"
# The interval between heartbeats to the consumer coordinator. (Optional)
#- name: heartbeatInterval
# value: "3s"
# The initial offset to use if no offset was previously committed. (Optional)
#- name: initialOffset
# value: "newest"
# The maximum size in bytes allowed for a single Kafka message. (Optional)
#- name: maxMessageBytes
# value: "1024"
# Enables caching for schemas. (Optional)
#- name: schemaCachingEnabled
# value: "true"
# The TTL for schema caching when publishing a message with latest schema available. (Optional)
#- name: schemaLatestVersionCacheTTL
# value: "5m"
# The Schema Registry credentials API Key. (Optional)
#- name: schemaRegistryAPIKey
# value: "XYAXXAZ"
# The Schema Registry credentials API Secret. (Optional)
#- name: schemaRegistryAPISecret
# value: "ABCDEFGMEADFF"
# The Schema Registry URL. (Optional)
#- name: schemaRegistryURL
# value: "http://mycompany.com:8081"
# The maximum time between heartbeats before the consumer is considered inactive and will timeout. (Optional)
#- name: sessionTimeout
# value: "10s"
# Skip TLS verification. This is potentially insecure and not recommended for use in production. (Optional)
#- name: skipVerify
# value: "false"
# Kafka cluster version. Note that this must be set to "1.0.0" if you are using Azure Event Hubs with Kafka. (Optional)
#- name: version
# value: "2.0.0.0"
Authentication profiles
Available authentication profiles:
-
OIDC Authentication
-
SASL Authentication
-
mTLS Authentication
-
No Authentication
-
AWS: Access Key ID and Secret Access Key
-
AWS: Assume IAM Role
-
AWS: IAM Roles Anywhere
OIDC Authentication
Authenticate using OpenID Connect.
authType
(string)
Required - Authentication type. This must be set to "oidc" for this authentication profile.
Example value: oidc
Allowed values:
- oidc
oidcClientID
(string)
Required - The OAuth2 client ID that has been provisioned in the identity provider.
Example value: my-client-id
oidcClientSecret
(string)
Required - The OAuth2 client secret that has been provisioned in the identity provider.
Example value: KeFg23!
oidcTokenEndpoint
(string)
Required - URL of the OAuth2 identity provider access token endpoint.
Example value: https://identity.example.com/v1/token
oidcExtensions
(string)
String containing a JSON-encoded dictionary of OAuth2/OIDC extensions to request with the access token.
Example value: {"cluster":"kafka","poolid":"kafkapool"}
oidcScopes
(string)
Comma-delimited list of OAuth2/OIDC scopes to request with the access token. Although not required, this field is recommended.
Default value: openid
Example value: openid,kafka-prod
SASL Authentication
Authenticate using SASL.
authType
(string)
Required - Authentication type. This must be set to "password" for this authentication profile.
Example value: password
Allowed values:
- password
saslMechanism
(string)
Required - The SASL authentication mechanism to use.
Default value: PLAINTEXT
Example value: SHA-512
Allowed values:
-
SHA-512
-
SHA-256
-
PLAINTEXT
saslPassword
(string)
Required - The SASL password.
Example value: mypassword
saslUsername
(string)
Required - The SASL username.
Example value: myuser
mTLS Authentication
Authenticate using mTLS.
authType
(string)
Required - Authentication type. This must be set to "mtls" for this authentication profile.
Example value: mtls
Allowed values:
- mtls
caCert
(string)
Required - Certificate authority certificate.
Example value:
-----BEGIN CERTIFICATE-----
<base64-encoded DER>
-----END CERTIFICATE-----
clientCert
(string)
Required - Client certificate.
Example value:
-----BEGIN CERTIFICATE-----
<base64-encoded DER>
-----END CERTIFICATE-----
clientKey
(string)
Required - Client key.
Example value:
-----BEGIN RSA PRIVATE KEY-----
<base64-encoded DER>
-----END RSA PRIVATE KEY-----
No Authentication
Do not perform authentication.
authType
(string)
Required - Authentication type. This must be set to "none" for this authentication profile.
Example value: none
Allowed values:
- none
AWS: Access Key ID and Secret Access Key
Authenticate using an Access Key ID and Secret Access Key included in the metadata
authType
(string)
Required - Authentication type. This must be set to "awsiam" for this authentication profile.
Example value: awsiam
Allowed values:
- awsiam
accessKey
AWS access key associated with an IAM account
Example value: AKIAIOSFODNN7EXAMPLE
awsAccessKey
(string)
This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'accessKey' instead. If both fields are set, then 'accessKey' value will be used. AWS access key associated with an IAM account.
Example value: AKIAIOSFODNN7EXAMPLE
awsIamRoleArn
(string)
This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'assumeRoleArn' instead. If both fields are set, then 'assumeRoleArn' value will be used. IAM role that has access to MSK. This is another option to authenticate with MSK aside from the AWS Credentials.
Example value: arn:aws:iam::123456789:role/mskRole
awsRegion
(string)
This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'region' instead. The AWS Region where the AWS resource is deployed to.
Example value: us-east-1
awsSecretKey
(string)
This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'secretKey' instead. If both fields are set, then 'secretKey' value will be used. The secret key associated with the access key.
Example value: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
awsSessionToken
(string)
This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'sessionToken' instead. If both fields are set, then 'sessionToken' value will be used. AWS session token to use. A session token is only required if you are using temporary security credentials.
Example value: TOKEN
awsStsSessionName
(string)
This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'sessionName' instead. If both fields are set, then 'sessionName' value will be used. Represents the session name for assuming a role.
Default value: DaprDefaultSession
Example value: MyAppSession
region
(string)
The AWS Region where the AWS resource is deployed to. This will be marked required in Dapr 1.17.
Example value: us-east-1
secretKey
The secret key associated with the access key
Example value: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
sessionToken
(string)
AWS session token to use. A session token is only required if you are using temporary security credentials.
Example value: TOKEN
AWS: Assume IAM Role
Assume a specific IAM role. Note: This is only supported for Kafka and PostgreSQL.
authType
(string)
Required - Authentication type. This must be set to "awsiam" for this authentication profile.
Example value: awsiam
Allowed values:
- awsiam
region
(string)
Required - The AWS Region where the AWS resource is deployed to.
Example value: us-east-1
assumeRoleArn
(string)
IAM role that has access to AWS resource. This is another option to authenticate with MSK and RDS Aurora aside from the AWS Credentials. This will be marked required in Dapr 1.17.
Example value: arn:aws:iam::123456789:role/mskRole
awsIamRoleArn
(string)
This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'assumeRoleArn' instead. If both fields are set, then 'assumeRoleArn' value will be used. IAM role that has access to MSK. This is another option to authenticate with MSK aside from the AWS Credentials.
Example value: arn:aws:iam::123456789:role/mskRole
awsStsSessionName
(string)
This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'sessionName' instead. If both fields are set, then 'sessionName' value will be used. Represents the session name for assuming a role.
Default value: DaprDefaultSession
Example value: MyAppSession
sessionName
(string)
The session name for assuming a role.
Default value: DaprDefaultSession
Example value: MyAppSession
AWS: IAM Roles Anywhere
Use AWS IAM Roles Anywhere to establish trust between your AWS account and Diagrid.
assumeRoleArn
Required - ARN of the AWS IAM role to assume in the trusting AWS account.
Example value: arn:aws:iam:012345678910:role/exampleIAMRoleName
authType
(string)
Required - Authentication type. This must be set to "awsiam" for this authentication profile.
Example value: awsiam
Allowed values:
- awsiam
trustAnchorArn
Required - ARN of the AWS Trust Anchor in the AWS account granting trust to the Dapr Certificate Authority.
Example value: arn:aws:rolesanywhere:us-west-1:012345678910:trust-anchor/01234568-0123-0123-0123-012345678901
trustProfileArn
Required - ARN of the AWS IAM Profile in the trusting AWS account.
Example value: arn:aws:rolesanywhere:us-west-1:012345678910:profile/01234568-0123-0123-0123-012345678901
Metadata
brokers
(string)
Required - A comma-separated list of Kafka brokers.
Example value: mycompany.com:9092,dapr-kafka.myapp.svc.cluster.local:9093
channelBufferSize
(number)
The number of events to buffer in internal and external channels.
Default value: 256
Example value: 128
clientConnectionKeepAliveInterval
(duration)
The max amount of time for the client connection to be kept alive with the broker, as a Go duration, before closing the connection. A zero value (default) means keeping alive indefinitely.
Default value: 0
Example value: 4m
clientConnectionTopicMetadataRefreshInterval
(duration)
The interval for the client connection's topic metadata to be refreshed with the broker as a Go duration.
Default value: 9m
Example value: 4m
clientID
(string)
A user-provided string sent with every request to the Kafka brokers for logging, debugging, and auditing purposes.
Default value: sarama
Example value: my-dapr-app
consumeRetryEnabled
(bool)
Disables consumer retry by setting this to "false".
Default value: false
Example value: true
consumeRetryInterval
(duration)
The interval between retries when attempting to consume topics.
Default value: 100ms
Example value: 200ms
consumerFetchDefault
(number)
The default number of message bytes to fetch from the broker in each request.
Default value: 1048576
Example value: 2097152
consumerFetchMin
(number)
The minimum number of message bytes to fetch in a request.
Default value: 1
Example value: 4
consumerGroup
(string)
A kafka consumer group to listen on. Each record published to a topic is delivered to one consumer within each consumer group subscribed to the topic.
Example value: group1
disableTls
(bool)
Disable TLS for transport security. This is potentially insecure and not recommended for use in production.
Default value: false
Example value: true
escapeHeaders
(bool)
Enables URL escaping of the message header values. It allows sending headers with special characters that are usually not allowed in HTTP headers.
Default value: false
Example value: true
heartbeatInterval
(duration)
The interval between heartbeats to the consumer coordinator.
Default value: 3s
Example value: 5s
initialOffset
(string)
The initial offset to use if no offset was previously committed.
Default value: newest
Example value: oldest
Allowed values:
-
newest
-
oldest
maxMessageBytes
(number)
The maximum size in bytes allowed for a single Kafka message.
Default value: 1024
Example value: 2048
schemaCachingEnabled
(bool)
Enables caching for schemas.
Default value: true
Example value: true
schemaLatestVersionCacheTTL
(duration)
The TTL for schema caching when publishing a message with latest schema available.
Default value: 5m
Example value: 5m
schemaRegistryAPIKey
(string)
The Schema Registry credentials API Key.
Example value: XYAXXAZ
schemaRegistryAPISecret
(string)
The Schema Registry credentials API Secret.
Example value: ABCDEFGMEADFF
schemaRegistryURL
(string)
The Schema Registry URL.
Example value: http://mycompany.com:8081
sessionTimeout
(duration)
The maximum time between heartbeats before the consumer is considered inactive and will timeout.
Default value: 10s
Example value: 20s
skipVerify
(bool)
Skip TLS verification. This is potentially insecure and not recommended for use in production.
Default value: false
Example value: true
version
(string)
Kafka cluster version. Note that this must be set to "1.0.0" if you are using Azure Event Hubs with Kafka.
Default value: 2.0.0.0
Example value: 0.10.2.0