Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

By providing Kafka streams, Lasair provides a machine-readable packet of data that can cause action at your site. See the FAQ article on how to create a stream using the Lasair web environment. This page is about how to read it on your side. We have used two installs to work with Kafka:

Confluent Python

...

if len(sys.argv) < 2:
print('Usage: Consumer.py server:port <topic> ')
sys.exit()
kafka_server = sys.argv[1]

group_id = 'LASAIR4'
conf = {
'bootstrap.servers': kafka_server,
'group.id': group_id,
'default.topic.config': {'auto.offset.reset': 'smallest'}
}
streamReader = Consumer(conf)

if len(sys.argv) < 3:
# the topics that this server has
t = list(streamReader.list_topics().topics.keys())
print('Topics are ', t)
else:
# content of given topic
topic = sys.argv[2]
streamReader.subscribe([topic])
while 1:
msg = streamReader.poll(timeout=20)
if msg == None: break
print(msg.value())
streamReader.close()

Apache Kafka

This toolbox is available from https://kafka.apache.org/downloads, and has full server environment as well as the simple client tools we describe below. Once it is installed, there will be a kafka directory in your home space. Go to the bin subdirectory and ls for the list of tools.

...

The following fetches a list of topics:
./kafka-topics.sh --bootstrap-server http://lasair.roe.ac.uk:9092 --list

...

There is a blog post about why Kafka is a good way to deal with streaming data.

  • We recommend Confluent Kafka, the python install being pip install confluent_kafka.

  • You will be connecting to kafka.lsst.ac.uk on port 9092

  • For coding details, please see the accompanying notebook.

You will need to understand two concepts: Topic and GroupID.

  • The Topic is a string to identify which stream of alerts you want, which derives from the name of a Lasair streaming query. For example, the query defined here is named “SN-like candidates”, and its output collected here. Its Kafka topic is “lasair_2SN-likecandidates”.

  • The GroupID tells Kafka where to start delivery to you. It is just a string that you can make up, for example “Susan3456”. The Kafka server remembers which GroupIds it has seen before, and which was the last alert it delivered. When you start your code again with the same GroupID, you only get alerts that arrived since last time you used that GroupId. If you use a new GroupID, you get the alerts from the start of the Kafka cache, which is about 7 days.