Reading a Kafka Stream

By providing Kafka streams, Lasair provides a machine-readable packet of data that can cause action at your site. See the FAQ article on how to create a stream using the Lasair web environment. This page is about how to read it on your side. We have used two installs to work with Kafka:

Confluent Python

The install procedure is at https://pypi.org/project/confluent-kafka/, and is done with the usual installation tools. The following stub program is built with the confluent python client. It should work with the server:port set to lasair.roe.ac.uk:9092.

import sys
from confluent_kafka import Consumer, KafkaError

if len(sys.argv) < 2:
print('Usage: Consumer.py server:port <topic> ')
sys.exit()
kafka_server = sys.argv[1]

group_id = 'LASAIR4'
conf = {
'bootstrap.servers': kafka_server,
'group.id': group_id,
'default.topic.config': {'auto.offset.reset': 'smallest'}
}
streamReader = Consumer(conf)

if len(sys.argv) < 3:
# the topics that this server has
t = list(streamReader.list_topics().topics.keys())
print('Topics are ', t)
else:
# content of given topic
topic = sys.argv[2]
streamReader.subscribe([topic])
while 1:
msg = streamReader.poll(timeout=20)
if msg == None: break
print(msg.value())
streamReader.close()

Apache Kafka

This toolbox is available from https://kafka.apache.org/downloads, and has full server environment as well as the simple client tools we describe below. Once it is installed, there will be a kafka directory in your home space. Go to the bin subdirectory and ls for the list of tools.

  • The following fetches a list of topics:
    ./kafka-topics.sh --bootstrap-server http://lasair.roe.ac.uk:9092 --list

  • The following fetches all the events in a topic, from the beginning:./kafka-console-consumer.sh --bootstrap-server http://lasair.roe.ac.uk:9092 --from-beginning --topic 2TNScrossmatch