Thứ Ba, 12 tháng 7, 2016

KafkaConsumer is not safe for multi-threaded access

I work on project with Play framework Scala, with Guice plugins. It's very nice!

Yesterday, I met the exception:

2016-07-11 08:56:22,936 [ERROR] from in - KafkaConsumer is not safe for multi-threaded access
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire( ~[org.apache.kafka.kafka-clients-]
    at org.apache.kafka.clients.consumer.KafkaConsumer.subscribe( ~[org.apache.kafka.kafka-clients-]
    at org.apache.kafka.clients.consumer.KafkaConsumer.subscribe( ~[org.apache.kafka.kafka-clients-]
    at services.kafka.KafkaServiceImpl.consume(KafkaService.scala:73) ~[]

Reason: I run 2 consumers (difference threads), but just one KafkaService. And I use singleton for it (with Google Guice module). So, when running consumer, it conflict each other.

How to solve? Use @Named annotation in binding.

For example, this is my module file:

class KafkaModule extends AbstractModule with ScalaModule with AkkaGuiceSupport {

  override def configure(): Unit = {
    bind[services.kafka.KafkaService[String, String]].annotatedWith(Names.named("kafka-Coccoc")).to[services.kafka.KafkaServiceImpl].in[Singleton]
    bind[services.kafka.KafkaService[String, String]].annotatedWith(Names.named("kafka-Report")).to[services.kafka.KafkaServiceImpl].in[Singleton]

Then, I rewrite my consumer code. Example:

class ReportConsumerActor @Inject() (@Named("kafka-Report") val kafkaService: KafkaService[String, String])
                                    (implicit ec: ExecutionContext){

  def receive: Receive = {

That's all! Hope this help.

Không có nhận xét nào:

Đăng nhận xét

Biểu mẫu liên hệ


Email *

Thông báo *