fromquestdb.ingressimportSenderfromconfluent_kafka.adminimportAdminClient,NewTopicimportjsonfromabcimportABC,abstractmethod# TODO can we dynamically import packages? it may be annoying to install all dependencies if you only need 1
[docs]@abstractmethoddefinsert(self,table_name:str,column_value_dict:dict):""" Inserts values into given columns of a table :param table_name: Name of the table :param column_value_dict: dict containing column-value pairs for the insertion. Example: {"col1": "Test"} :return: None """pass
[docs]@abstractmethoddefcommit(self):""" Commits the changes to the database :return: None """pass
[docs]@abstractmethoddefclose_connection(self):""" Closes the database connection :return: None """pass
[docs]classManPyKafkaConnection(ManPyDatabase):def__init__(self,producer,topics,bootstrap_server_address):self.producer=producerself.__create_topics(topics,bootstrap_server_address)def__create_topics(self,names,bootstrap_server_address):admin_client=AdminClient({'bootstrap.servers':bootstrap_server_address})# Todo make parameterstopics_to_create=[NewTopic(n,1,1)forninnames]admin_client.create_topics(topics_to_create)
[docs]definsert(self,table_name:str,column_value_dict:dict):""" sends data to kafka :param table_name: topic to write to :param column_value_dict: dict containing column-value pairs for the insertion. Example: {"col1": "Test"} :return: None """# todo idempotenceencoded_data=json.JSONEncoder().encode(column_value_dict)self.producer.poll(0)self.producer.produce(table_name,encoded_data.encode("utf-8"),callback=self.__on_delivery)
def__on_delivery(self,err,msg):iferrisnotNone:print(f">>>>> Error while sending data to Kafka!: {err}")