Skip to content
Data > Feed Handlers

Kafka

AMI currently supports JSON, Avro, and Protobuf message serialization.

Overview

  1. Configure AMI properties to the Kafka server.

  2. Restart AMI and start streaming data using Kafka.

Properties

Required

# Must be exact
ami.relay.fh.active=ssocket,kafka
ami.relay.fh.kafka.class=com.f1.ami.relay.fh.kafka.AmiKafkaFH
ami.relay.fh.kafka.props.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# insert the hostname of your kafka server here
ami.relay.fh.kafka.props.bootstrap.servers=<HOSTNAME>:9092

# insert the comma delimited topic names being used for kafka e.g. topic-1,topic-2,topic-3
ami.relay.fh.kafka.props.topics=<TOPIC_NAME(S)>

# insert a consumer group id string (in case other processes are consuming from the same topics additionally, use that group id).E.g. test-group
ami.relay.fh.kafka.props.group.id=<GROUP ID>

# Select one of the deserializers below depending on your message format
ami.relay.fh.kafka.props.value.deserializer=io.confluent.kafka.serializers.KafkaJsonDeserializer
ami.relay.fh.kafka.props.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
ami.relay.fh.kafka.props.value.deserializer=io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer

# Alternatively, use a custom deserializer class and helper factory
ami.relay.fh.kafka.props.value.deserializer=<fully_qualified_class_name>
ami.relay.fh.kafka.props.helper.factory.class=<fully_qualified_class_name>

Optional

# auto commit
ami.relay.fh.kafka.props.enable.auto.commit=true

# insert the hostname of your kafka server running schema registry here
ami.relay.fh.kafka.props.schema.registry=http://<HOSTNAME>:8081

# kafka id to identify your consumer
ami.relay.fh.kafka.props.client.id=<CLIENTID>

# What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server. Options include earliest, latest, none. Where earliest consumes from beginning of the topic partition while latest consumes from end of topic partition which is default.
ami.relay.fh.kafka.props.auto.offset.reset=<OPTION>

# SASL mechanism used for client connections - defaults to GSSAPI
ami.relay.fh.kafka.props.sasl.mechanism=<SASL_MECHANISM>

# Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL - defaults to PLAINTEXT
ami.relay.fh.kafka.props.security.protocol=<PROTOCOL>

# JAAS login context parameters for SASL connections in the format used by JAAS configuration files - defaults to null
ami.relay.fh.kafka.props.sasl.jaas.config=<SASL_JAAS_CONFIG>

# Fully-qualified class name for custom sasl login callback handler
ami.relay.fh.kafka.props.sasl.login.callback.handler.class=<CLASSNAME>

# AMI uses kafka record keys as AMI message keys by default (same key Kafka records get upserted). Set to false to ignore Kafka record keys and send to AMI without key - default true
ami.relay.fh.kafka.props.use.record.key=<true/false>

# Enable debug logging for consumer polls - defaults to false)
ami.relay.fh.kafka.props.enable.debug.log=<true/false>

# Custom properties can also be passed to the Kafka Consumer Client that you want to connect to using the following format: (Refer to Kafka Consumer Configuration documentation on available properties)
ami.relay.fh.kafka.props.<CUSTOM_PROPERTY>=<CUSTOM_VALUE>

# Max number of records per poll - defaults to 500
ami.relay.fh.kafka.props.max.poll.records=5000

Using in AMI

When AMI starts up, it will automatically connect to the kafka server.

To see the data being streamed, a Realtime Table/Visualization needs to be created:

  1. Create a new window: Windows -> New Window

  2. Create a new Realtime Table: Click on the green button on the panel just created and select Create Realtime Table/Visualization.

  3. Create a Table: Under AMIDB Tables, select the desired table(s) (with the same topic name as defined in the properties file. Each topic has its separate table) and click Next >>.

  4. Select the desired columns you want to display in the table and select Finish

  5. The created Realtime Table:

  1. Create a new window: Windows -> New Window

  2. Create a new Realtime Table: Click on the green button on the panel just created and select Create Realtime Table/Visualization.

  3. Create a Table: Under AMIDB Tables, select the desired table(s) (with the same topic name as defined in the properties file. Each topic has its separate table) and click Next >>.

  4. Select the desired columns you want to display in the table and select Finish

  5. The created Realtime Table:

  1. Create a new window: Windows -> New Window

  2. Create a new Realtime Table: Click on the green button on the panel just created and select Create Realtime Table/Visualization.

  3. Create a Table: Under AMIDB Tables, select the desired table(s) (with the same topic name as defined in the properties file. Each topic has its separate table) and click Next >>.

  4. Select the desired columns you want to display in the table and select Finish

  5. The created Realtime Table:

The Data Modeler can also be used to create data models for the realtime feeds for more customizability.