Kafka Comprehensive Tutorial – Part 2
May 10, 2019
Kafka Comprehensive Tutorial – Part 4
May 10, 2019
Show all

Kafka Comprehensive Tutorial – Part 3

1. Objective

Today, in this Kafka article, we will see Kafka Cluster Setup. This Kafka Cluster tutorial provide us some simple steps to setup Kafka Cluster. In simple words, for high availability of the Kafka service, we need to setup Kafka in cluster mode. So, in this Kafka Cluster document, we will learn Kafka multi-node cluster setup and Kafka multi-broker cluster setup. Also, we will see Kafka Zookeeper cluster setup.
So, let’s start Kafka Cluster Setup.

2. Kafka Cluster Setup

In order to gain better reliability and high availability of the Kafka service, we need to setup Kafka in cluster mode. At very first:

  • From Apache’s site, download Kafka. Also, extract the zip file.
  • Further, make two copies of the extracted folder, and then add the suffix _1, _2, _3 to these folders name. Hence, you will have the folders kafka_2.11-1.1.0_1, kafka_2.11-1.1.0_2, kafka_2.11-1.1.0_3, if our extracted folder name was kafka_2.11-1.1.0.
  • Go to the kafka_2.11-1.1.0_1 folder.

3. Steps to Setup Kafka Cluster

Now, follow several steps to set up Kafka Cluster:

  1. Make a folder of name “logs”. In this folder, all the Kafka logs will be stored.
  2. Then, open the server.properties file, on going to the config directory. Here, we will find the file, which contains Kafka broker configurations.
  3. Further, set broker.id to 1. Make sure it is the id of the broker in a Kafka Cluster, so for each broker, it must be unique.
  4. Then, uncomment the listener’s configuration and also set it to PLAINTEXT://localhost:9091. It says, for connection requests, the Kafka broker will be listening on port 9091.
  5. Moreover, with the logs folder path, set the log.dirs configuration that we created in step 1.
  6. Also, set the Apache Zookeeper address, in the zookeeper.connect configuration. However, if Zookeeper is running in a Kafka cluster, then ensure to give the address as a comma-separated list, i.e.:localhost:2181, localhost:2182.

Basically, these are some general configurations that we need to be set up for the development environment.
In this way, our first Kafka broker configuration is ready. Now, follow the same steps with the following changes, for the other two folders or brokers.

  • Now, change broker.id to 2 and 3, In step 3, respectively.
  • And, also change the ports used to 9092 and 9093, respectively, in step 4. It is possible to provide any port number, which is available.

Therefore, for all brokers, our configuration is ready. Now, run the command ./bin/kafka-server-start.sh config/server.properties, on going to the home directory of each Kafka folder.

  • Execute the command (all as one line):
  • ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 50 --topic demo

Here with a replication factor of three for each partition, 50 partitions are created. On defining a replication factor of three,  there will be one leader and two followers, for a partition. Also, at the time when message or record is sent to the leader, it is copied in followers.

  • Execute this command:
  • ./bin/kafka-topics.sh --describe --topic Hello-Kafka --zookeeper localhost:2181

It helps us to know that which broker is the leader or follower for which partition.

  • Output:

Topic:demoPartitionCount:50ReplicationFactor:3Configs:
Topic: demoPartition: 0Leader: 2Replicas: 2,3,1Isr: 2,3,1
Topic: demoPartition: 1Leader: 3Replicas: 3,1,2Isr: 3,1,2
Topic: demoPartition: 2Leader: 1Replicas: 1,2,3Isr: 1,2,3
Topic: demoPartition: 3Leader: 2Replicas: 2,1,3Isr: 2,1,3
Topic: demoPartition: 4Leader: 3Replicas: 3,2,1Isr: 3,2,1
Topic: demoPartition: 5Leader: 1Replicas: 1,3,2Isr: 1,3,2
Topic: demoPartition: 6Leader: 2Replicas: 2,3,1Isr: 2,3,1
…………………………………………………………
…………………………………………………………
…………………………………………………………
Now, we can see Broker 2 is the leader, for Partition 0 and Broker 3 is the leader, for partition 1. And, here ISR refers to in sync replicas.

Apache Kafka Producer

Today, we will discuss Kafka Producer with the example. Moreover, we will see KafkaProducer API and Producer API. Also, we will learn configurations settings in Kafka Producer. At last, we will discuss simple producer application in Kafka Producer tutorial. In order to publish messages to an Apache Kafka topic, we use Kafka Producer. 
So, let’s explore Apache Kafka Producer in detail.

Apache Kafka Producer

Kafka Producer for Beginners

1. What is Kafka Producer?

Basically, an application that is the source of the data stream is what we call a producer. In order to generate tokens or messages and further publish it to one or more topics in the Kafka cluster, we use Apache Kafka Producer. Also, the Producer API from Kafka helps to pack the message or token and deliver it to Kafka Server.

Kafka Producer - Apache Kafka Producer Working

Kafka Producer – Apache Kafka Producer Working

There are some API’s available in Kafka Producer Client.

2. KafkaProducer API

However, to publish a stream of records to one or more Kafka topics, this Kafka Producer API permits to an application. Moreover, its central part is KafkaProducer class. Basically, with the following methods, this class offers an option to connect a Kafka broker in its constructor:

  • In order to send messages asynchronously to a topic, KafkaProducer class provides send method. So, the signature of send() is:
  1. producer.send(new ProducerRecord<byte[],byte[]>(topic,partition, key1, value1) , callback);
  • ProducerRecord − Generally, the producer manages a buffer of records waiting to be sent.
  • Callback − When the record has been acknowledged by the server, a user-supplied callback to execute.
  • Moreover, to ensure all previously sent messages have been actually completed, KafkaProducer class provides a flush method. So, the syntax of the flush method is −
  1. public void flush()
  •  Also, to get the partition metadata for a given topic, KafkaProducer class provides the partitionfor method. Moreover, we can use it for custom partitioning. So, the signature of this method is:
  1. public Map metrics()

In this way, this method returns the map of internal metrics maintained by the producer.

  • public void close() − It also offers a close method blocks until all previously sent requests are completed.

3. Producer API

Producer class is the central part of the Kafka Producer API. By the following methods, it offers an option to connect the Kafka broker in its constructor.

a. Kafka Producer Class

Basically, to send messages to either single or multiple topics, the producer class offers an send method. The following are the signatures we can use for it.

  1. public void send(KeyedMessaget<k,v> message)

 – sends the data to a single topic, partitioned by key using either sync or async producer.

  1. public void send(List<KeyedMessage<k,v>>messages)

– sends data to multiple topics.

  1. Properties prop = new Properties();
  2. prop.put(producer.type,”async”)
  3. ProducerConfig config = new ProducerConfig(prop);

However, there are two types of producers, such as Sync and Async.
Although, to Sync producer, the same API configuration applies. There is only one difference in both:

Sync producer sends messages directly but in the background whereas, when we want higher throughput, we prefer the Async producer. However, an Async producer does not have a callback for send() to register error handlers in the previous releases like 0.8. It is only available in the current release of 0.9.

b. Public Void Close()

In order to close the producer pool connections to all Kafka brokers, producer class offers a public void close() method.

4. Configuration Settings For Kafka Producer API

Here, we are listing the Kafka Producer API’s main configuration settings: 
a. client.id
It identifies producer application.
b. producer.type
Either sync or async.
c. acks
Basically, it controls the criteria for producer requests that are considered complete. 
d. retries
“Retries” means if somehow producer request fails, then automatically retry with the specific value. 
e. bootstrap.servers
It bootstraps list of brokers. 
f. linger.ms
Basically, we can set linger.ms to something greater than some value, if we want to reduce the number of requests
g. key.serializer
It is a key for the serializer interface. 
h. value.serializer
A value for the serializer interface.
i. batch.size
Simply, Buffer size. 
j. buffer.memory
“buffer.memory” controls the total amount of memory available to the producer for buffering.

5. ProducerRecord API

By using the following signature, it is a key/value pair that is sent to the Kafka cluster. ProducerRecord class constructor is for creating a record with partition, key and value pairs.
public ProducerRecord (string topic, int partition, k key, v value)

  1. Topic − user-defined topic name that will append to record.
  2. Partition − partition count.
  3. Key − The key that will be included in the record.
  4. Value − Record contents.

public ProducerRecord (string topic, k key, v value)
To create a record with the key, value pairs and without partition, we use the ProducerRecord class constructor.

  1. Topic − Create a topic to assign record.
  2. Key − key for the record.
  3. Value − Record contents.

public ProducerRecord (string topic, v value)
Moreover, without partition and key, ProducerRecord class creates a record.

  1. Topic − Create a topic.
  2. Value − Record contents.

Now, here we are listing the ProducerRecord class methods − 
1. public string topic()
The topic will append to the record.
2. public K key()
Key that will be included in the record. If no such key, null will be returned here. 
3. public V value()
To record contents.
4. partition()
Partition count for the record.

6. Simple Kafka Producer Application

Make sure that first start ZooKeeper and Kafka broker then create your own topic in Kafka broker using create topic command. Then create a Java class named SimpleProducer.java and proceed with the following coding:

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named &amp;quot;SimpleProducer&amp;quot;
public class SimpleProducer {

    public static void main(String[] args) throws Exception{

        // Check arguments length value
        if(args.length == 0){
            System.&amp;lt;em&amp;gt;out&amp;lt;/em&amp;gt;.println(&amp;quot;Enter topic name&amp;quot;);
            return;
        }

        //Assign topicName to string variable
        String topicName = args[0].toString();

        // create instance for properties to access producer configs
        Properties props = new Properties();

        //Assign localhost id
        props.put(&amp;quot;bootstrap.servers&amp;quot;, &amp;quot;localhost:9092&amp;quot;);

        //Set acknowledgements for producer requests.
        props.put(&amp;quot;acks&amp;quot;, &amp;quot;all&amp;quot;);

                //If the request fails, the producer can automatically retry,
                props.put(&amp;quot;retries&amp;quot;, 0);

        //Specify buffer size in config
        props.put(&amp;quot;batch.size&amp;quot;, 16384);

        //Reduce the no of requests less than 0
        props.put(&amp;quot;linger.ms&amp;quot;, 1);

        //The buffer.memory controls the total amount of memory available to the producer for buffering.
        props.put(&amp;quot;buffer.memory&amp;quot;, 33554432);

        props.put(&amp;quot;key.serializer&amp;quot;,
                &amp;quot;org.apache.kafka.common.serializa-tion.StringSerializer&amp;quot;);

        props.put(&amp;quot;value.serializer&amp;quot;,
                &amp;quot;org.apache.kafka.common.serializa-tion.StringSerializer&amp;quot;);

        Producer&amp;amp;lt;String, String&amp;gt; producer = new KafkaProducer
                &amp;amp;lt;String, String&amp;gt;(props);

        for(int i = 0; i &amp;amp;lt; 10; i++)
            producer.send(new ProducerRecord&amp;amp;lt;String, String&amp;gt;(topicName,
                    Integer.&amp;lt;em&amp;gt;toString&amp;lt;/em&amp;gt;(i), Integer.&amp;lt;em&amp;gt;toString&amp;lt;/em&amp;gt;(i)));
        System.&amp;lt;em&amp;gt;out&amp;lt;/em&amp;gt;.println(&amp;quot;Message sent successfully&amp;quot;);
        producer.close();
    }
}
 

a. Compilation

By using the following command we can compile the application.

  1. javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

b. Execution

Further, using the following command, we can execute the application.

  1. java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

c. Output

Message sent successfully
To check the above output open the new terminal and type Consumer CLI command to receive messages.

~/kafka/bin/kafka-console-consumer.sh \
–bootstrap-server localhost:9092 \
–topic myTopic \
–from-beginning

1
2
3
4
5
6
7
8
9
10

Apache Kafka Consumer | Kafka Consumer Group

Firstly, we will see what is Kafka Consumer and example of Kafka Consumer. Afterward, we will learn Kafka Consumer Group. Moreover, we will see Consumer record API and configurations setting for Kafka Consumer.
After creating a Kafka Producer to send messages to Apache Kafka cluster. Now, we are creating a Kafka Consumer to consume messages from the Kafka cluster.

2. What is Kafka Consumer?

An application that reads data from Kafka Topics is what we call a Consumer. Basically, Kafka Consumer subscribes to one or more topics in the Kafka cluster then further feeds on tokens or messages from the Kafka Topics. 

Heartbeat is set up at Consumer to let Zookeeper or Broker Coordinator know if the Consumer is still connected to the Cluster.

So, Kafka Consumer is no longer connected to the Cluster, if the heartbeat is absent. In that case, the Broker Coordinator has to re-balance the load. Moreover, Heartbeat is an overhead to the cluster. Also, by keeping the data throughput and overhead in consideration, we can configure the interval at which the heartbeat is at Consumer.

Apache Kafka Consumer

What is Apache Kafka Consumer

Moreover, we can group the consumers, and the consumers in the Consumer Group in Kafka could share the partitions of the Kafka Topics they subscribed to.

To understand see, if there are N partitions in a Topic, N consumers in the Kafka Consumer Group and the group has subscribed to a Topic, each consumer would read data from a partition of the topic. Hence, we can say, this is just a heads up that Consumers could be in groups.

To be specific, to connect to the Kafka cluster and consume the data streams, the Consumer API from Kafka helps.
Below is the picture showing Apache Kafka Consumer:

Working of Apache Kafka Consumer

Working of Apache Kafka Consumer

To subscribe to one or more topics and process the stream of records produced to them in an application, we use this Kafka Consumer API. In order words, we use KafkaConsumer API to consume messages from the Kafka cluster. Moreover, below see the KafkaConsumer class constructor.

  1. public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs

Return a map of consumer configs.


Methods of KafkaConsumer class:
1.public java.util.Set<TopicPartition> assignment()
To get the set of partitions currently assigned by the consumer.
2. public string subscription()
In order to subscribe to the given list of topics to get dynamically assigned partitions.
3. public void subscribe(java.util.List<java.lang.String> topics, ConsumerRebalanceListener listener)
Further, to subscribe to the given list of topics to get dynamically assigned partitions.
4.public void unsubscribe()
Now, to unsubscribe the topics from the given list of partitions.
5. public void subscribe(java.util.List<java.lang.String> topics)
In order to subscribe to the given list of topics to get dynamically assigned partitions. If the given list of topics is empty, it is treated the same as unsubscribe().
6. public void subscribe(java.util.regex.Pattern pattern, ConsumerRebalanceListener listener)
Here, argument pattern refers to the subscribing pattern in the format of regular expression and the listener argument gets notifications from the subscribing pattern.
7. public void assign(java.util.List<TopicPartition> partitions)
To manually assign a list of partitions to the customer. 
8. poll()
Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. This will return the error if the topics are not subscribed before the polling for data. 
9. public void commitSync()
In order to commit offsets returned in the last poll() for all the subscribed list of topics and partitions. The same operation is applied to commitAsyn(). 
10. public void seek(TopicPartition partition, long offset)
To fetch the current offset value that consumer will use in the next poll() method.
11. public void resume()
In order to resume the paused partitions. 
12. public void wakeup()
To wake Up the consumer.

3. ConsumerRecord API

Basically, to receive records from the Kafka cluster, we use the ConsumerRecord API. It includes a topic name, partition number, from which the record is being received also an offset that points to the record in a Kafka partition. Moreover, to create a consumer record with specific topic name, partition count and <key, value> pairs, we use ConsumerRecord class. Its signature is:

  1. public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  • Topic

The topic name for consumer record received from the Kafka cluster.

  • Partition

Partition for the topic.

  • Key

The key of the record, if no key exists null will be returned.

  • Value

Record contents.

4. ConsumerRecords API

Basically, it is a container for ConsumerRecord. To keep the list of ConsumerRecord per partition for a particular topic we use this API. Its Constructor is:

  1. public ConsumerRecords(java.util.Map<TopicPartition, java.util.List<ConsumerRecord>K,V>>> records)
  • TopicPartition:

To return a map of partition for a particular topic.

  • Records:

To return the list of ConsumerRecord.

Methods of a ConsumerRecords class:
1. public int count()
The number of records for all the topics.
2. public Set partitions()
The set of partitions with data in this recordset (if no data was returned then the set is empty).
3. public Iterator iterator()
Generally, iterator enables you to cycle through a collection, obtaining or removing elements.
4. public List records()
Basically, get the list of records for the given partition.

5. ConsumerRecord API vs ConsumerRecords API

a. ConsumerRecord API
ConsumerRecord API is a key/value pair to be received from Kafka. It contains a topic name and a partition number, from which the record is being received and an offset that points to the record in a Kafka partition.
b. ConsumerRecords API
Whereas, ConsumerRecords API is a container that holds the list ConsumerRecord per partition for a particular topic. Basically, there is one ConsumerRecord list for every topic partition returned by a Consumer.poll(long) operation.

6. Configuration Settings

Here, we are listing the configuration settings for the Consumer client API −
1. bootstrap.servers
It bootstraps list of brokers.
2. group.id
To assign an individual consumer to a group.
3. enable.auto.commit
Basically, it enables auto-commit for offsets if the value is true, otherwise not committed.
4. auto.commit.interval.ms
Basically, it returns how often updated consumed offsets are written to ZooKeeper.
5. session.timeout.ms
It indicates how many milliseconds Kafka will wait for the ZooKeeper to respond to a request (read or write) before giving up and continuing to consume messages.

7. SimpleConsumer Application

Make sure the producer application steps remain the same here. Here also start your ZooKeeper and Kafka broker. Further, create a SimpleConsumer application with the java class named SimpleConsumer.java. Then type the following code:

import java.util.Properties;
import java.util.Arrays;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            System.&lt;em&gt;out&lt;/em&gt;.println(&quot;Enter topic name&quot;);
            return;
        }
        //Kafka consumer configuration settings
        String topicName = args[0].toString();
        Properties props = new Properties();
        props.put(&quot;bootstrap.servers&quot;, &quot;localhost:9092&quot;);
        props.put(&quot;group.id&quot;, &quot;test&quot;);
        props.put(&quot;enable.auto.commit&quot;, &quot;true&quot;);
        props.put(&quot;auto.commit.interval.ms&quot;, &quot;1000&quot;);
        props.put(&quot;session.timeout.ms&quot;, &quot;30000&quot;);
        props.put(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
        props.put(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);

        KafkaConsumer&amp;lt;String, String&gt; consumer = new KafkaConsumer&amp;lt;String, String&gt;(props);
        //Kafka Consumer subscribes list of topics here.
        consumer.subscribe(Arrays.&lt;em&gt;asList&lt;/em&gt;(topicName));
        //print the topic name
        System.&lt;em&gt;out&lt;/em&gt;.println(&quot;Subscribed to topic &quot; + topicName);

 
        while (true) {
            ConsumerRecords&amp;lt;String, String&gt; records = consumer.poll(100);
            for (ConsumerRecord&amp;lt;String, String&gt; record : records)
                // print the offset,key and value for the consumer records.
                System.&lt;em&gt;out&lt;/em&gt;.printf(&quot;offset = %d, key = %s, value = %s\n&quot;,
                        record.offset(), record.key(), record.value());
        }
    }
}

 a. Compilation

By using the following command we can compile the application.

  1. javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

 b. Execution

Moreover, using the following command we can execute the application.

  1. java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

 c. Input

Further, open the producer CLI and send some messages to the topic. We can put the simple input as ‘Hello Consumer’. 

d. Output

The output is

  1. Subscribed to topic Hello-Kafka
  2. offset = 3, key = null, value = Hello Consumer

8. Kafka Consumer Group

Basically, Consumer group in Kafka is a multi-threaded or multi-machine consumption from Kafka topics.

Kafka Consumer- Kafka Consumer Group
Kafka Consumer- Kafka Consumer Group
  • By using the same group.id, Consumers can join a group.
  • The maximum parallelism of a group is that the number of consumers in the group ← numbers of partitions.
  • Moreover, Kafka assigns the partitions of a topic to the consumer in a group. Hence, each partition is consumed by exactly one consumer in the group.
  • Also, Kafka guarantees that a message is only ever read by a single consumer in the group.
  • Consumers can see the message in the order they were stored in the log.


a. Re-balancing of a Consumer
Basically, an addition of more processes/threads will cause Kafka to re-balance. Basically, if somehow any consumer or broker fails to send heartbeat to ZooKeeper, then it can be re-configured via the Kafka cluster. Also, Kafka will assign available partitions to the available threads, possibly moving a partition to another process, during this re-balance.

 
import java.util.Properties;
import java.util.Arrays;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class ConsumerGroup {
    public static void main(String[] args) throws Exception {
//        if (args.length &amp;lt; 2) {
//            System.out.println(&quot;Usage: consumer &amp;lt;topic&gt; &amp;lt;groupname&gt;&quot;);
//            return;
//        }
//        String topic = args[0].toString();
//        String group = args[1].toString();

        String topic = &quot;myTopic&quot;;
        String group = &quot;testGroup&quot;;
        Properties props = new Properties();
        props.put(&quot;bootstrap.servers&quot;, &quot;localhost:9092&quot;);
        props.put(&quot;group.id&quot;, group);
        props.put(&quot;enable.auto.commit&quot;, &quot;true&quot;);
        props.put(&quot;auto.commit.interval.ms&quot;, &quot;1000&quot;);
        props.put(&quot;session.timeout.ms&quot;, &quot;30000&quot;);
        props.put(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
        props.put(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
        KafkaConsumer&amp;lt;String, String&gt; consumer = new KafkaConsumer&amp;lt;String, String&gt;(props);
        consumer.subscribe(Arrays.&lt;em&gt;asList&lt;/em&gt;(topic));
        System.&lt;em&gt;out&lt;/em&gt;.println(&quot;Subscribed to topic &quot; + topic);


        while (true) {
            ConsumerRecords&amp;lt;String, String&gt; records = consumer.poll(100);
            for (ConsumerRecord&amp;lt;String, String&gt; record : records)
                System.&lt;em&gt;out&lt;/em&gt;.printf(&quot;offset = %d, key = %s, value = %s\n&quot;,
                        record.offset(), record.key(), record.value());
        }
    }
}

ii. Compilation

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java

iii. Execution

>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. ConsumerGroup <topic-name> my-group

>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":. ConsumerGroup <topic-name> my-group

Hence, we can see the sample group we have created the name, my-group with two consumers.
b. Input
Now, after opening producer CLI, send some messages like-

  1. Test consumer group 01
  2. Test consumer group 02

c. The output of the First Process

  1. Subscribed to topic Hello-kafka
  2. offset = 3, key = null, value = Test consumer group 01

d. Further, the output of the Second Process

  1. Subscribed to topic Hello-kafka
  2. offset = 3, key = null, value = Test consumer group 02


Amir Masoud Sefidian
Amir Masoud Sefidian
Data Scientist, Researcher, Software Developer

Comments are closed.