val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet)
val brokers = metadataCache.getAliveBrokers
trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","), brokers.mkString(","), metadataRequest.correlationId, metadataRequest.clientId))
val response = new TopicMetadataResponse(brokers, topicMetadata, metadataRequest.correlationId)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
val joinGroupReq = request.requestObj.asInstanceOf[JoinGroupRequestAndHeader]
- val topics = JavaConversions.asScalaIterable(joinGroupReq.body.topics()).toSet
+ val topics = joinGroupReq.body.topics().toSet
val partitions = this.replicaManager.logManager.allLogs.filter(log => topics.contains(log.topicAndPartition.topic))
val partitionList = partitions.map(_.topicAndPartition).map(tp => new org.apache.kafka.common.TopicPartition(tp.topic, tp.partition)).toBuffer
this.consumerGroupGenerationId += 1
- val response = new JoinGroupResponse(ErrorMapping.NoError, this.consumerGroupGenerationId, joinGroupReq.body.consumerId, JavaConversions.asJavaList(partitionList))
+ val response = new JoinGroupResponse(ErrorMapping.NoError, this.consumerGroupGenerationId, joinGroupReq.body.consumerId, partitionList)
val send = new BoundedByteBufferSend(new JoinGroupResponseAndHeader(joinGroupReq.correlationId, response))