ADLStream.data.stream.KafkaStream
Kafka Stream.
Stream that consumes messages from a Kafka server.
For more references check:
Parameters:
Name | Type | Description | Default |
---|---|---|---|
topic |
str |
Optional list of topics to subscribe to. |
required |
group_id |
str |
The name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If None, auto-partition assignment (via group coordinator) and offset commits are disabled. Defaults to None. |
None |
bootstrap_servers |
str |
'host[:port]' string (or list of
'host[:port]' strings) that the consumer should contact to bootstrap initial
cluster metadata. This does not have to be the full node list. It just needs
to have at least one broker that will respond to a Metadata API Request.
Default port is 9092.
Defaults to |
'localhost:9092' |
auto_offset_reset |
str |
A policy for resetting offsets on OffsetOutOfRange errors: "earliest" will move to the oldest available message, "latest" will move to the most recent. Any other value will raise an exception. Defaults to "latest". |
required |
value_deserializer |
callable |
Any callable that takes a raw message value and returns a deserialized value. Defaults to json ascii decoder. |
<function KafkaStream.<lambda> at 0x7fcac899b3b0> |
timeout |
int |
number of milliseconds to block during message iteration before
raising StopIteration (i.e., ending the iterator). In order to block forever
use |
30000 |
Source code in ADLStream/data/stream/kafka_stream.py
class KafkaStream(BaseStream):
"""Kafka Stream.
Stream that consumes messages from a *Kafka* server.
For more references check:
* [Apache Kafka](https://kafka.apache.org/)
* [kafka-python library](https://github.com/dpkp/kafka-python)
Arguments:
topic (str): Optional list of topics to subscribe to.
group_id (str, optional): The name of the consumer group to join for dynamic
partition assignment (if enabled), and to use for fetching and committing
offsets. If None, auto-partition assignment (via group coordinator) and
offset commits are disabled.
Defaults to None.
bootstrap_servers (str, optional): 'host[:port]' string (or list of
'host[:port]' strings) that the consumer should contact to bootstrap initial
cluster metadata. This does not have to be the full node list. It just needs
to have at least one broker that will respond to a Metadata API Request.
Default port is 9092.
Defaults to `"localhost:9092"`.
auto_offset_reset (str, optional): A policy for resetting offsets on
OffsetOutOfRange errors: "earliest" will move to the oldest available
message, "latest" will move to the most recent. Any other value will raise
an exception.
Defaults to "latest".
value_deserializer (callable): Any callable that takes a raw message value and
returns a deserialized value.
Defaults to json ascii decoder.
timeout (int): number of milliseconds to block during message iteration before
raising StopIteration (i.e., ending the iterator). In order to block forever
use `float('inf')`.
Defaults to 30000.
"""
def __init__(
self,
topic,
group_id=None,
bootstrap_servers="localhost:9092",
outo_offset_reset="latest",
value_deserializer=lambda m: json.loads(m.decode("ascii")),
timeout=30000,
**kwargs
):
super(timeout=timeout, **kwargs)
self.topic = (topic,)
self.group_id = group_id
self.bootstrap_servers = (bootstrap_servers,)
self.auto_offset_reset = outo_offset_reset
self.value_deserializer = value_deserializer
self.kafka_consumer = None
def start(self):
super().start()
self.kafka_consumer = KafkaConsumer(
self.topic,
group_id=self.group_id,
bootstrap_servers=self.bootstrap_servers,
auto_offset_reset=self.auto_offset_reset,
consumer_timeout_ms=self.timeout,
value_deserializer=self.value_deserializer,
)
def get_message(self):
message = next(self.kafka_consumer).value
return message
get_message(self)
The function that contains the logic to generate a new message. It must return the message as an array. This function must be override by every custom stream.
Exceptions:
Type | Description |
---|---|
NotImplementedError |
Abstract function has not been overrided. |
Returns:
Type | Description |
---|---|
list |
message |
Source code in ADLStream/data/stream/kafka_stream.py
def get_message(self):
message = next(self.kafka_consumer).value
return message
start(self)
Function to be called before asking any message.
Source code in ADLStream/data/stream/kafka_stream.py
def start(self):
super().start()
self.kafka_consumer = KafkaConsumer(
self.topic,
group_id=self.group_id,
bootstrap_servers=self.bootstrap_servers,
auto_offset_reset=self.auto_offset_reset,
consumer_timeout_ms=self.timeout,
value_deserializer=self.value_deserializer,
)