Published on 00/00/0000
Last updated on 00/00/0000
Published on 00/00/0000
Last updated on 00/00/0000
Share
Share
INSIGHTS
4 min read
Share
This post is part of the Debug 101 series. If you missed the previous post in this series, check it out here:
We're in the middle of deploying Apache Kafka to Kubernetes the cloud native-way - by totally removing the Zookeeper dependency and using etcd, instead. This means that service registry/discovery and other internal Kafka to Zookeeper operations will be dispatched to a pre-existing etcd cluster. Sweet, isn't it? No need for yet another third party system, because already have etcd
as part of Kubernetes, out-of-the-box.
In this post we don't want to go into detail about why we choose to totally remove Zookeeper, or why it's considerably better to rely on etcd when deploying to Kubernetes. Once we've completed this project and pushed the PR upstream, we'll revisit that topic. Meanwhile, if you'd like consider one single point on this matter - performance - we reccomend you read this blog.
To recap, etcd
is a distributed key/value store which relies on the Raft consensus algorithm and is used internally by Kubernetes. The java library which can interact with it is called jetcd - currently under development and in beta. We are using this library to remove the Zookeeper dependency, and it's worked pretty well so far. Earlier this week we arrived at a point in making these changes wherein we were running a large amount of Kafka tests, and few of them were failing. The problem we encontered was related to Transactions
. A simple jetcd transaction looks like this:
val client: Client = Client.builder().endpoints("http://localhost:2379").build()
val test = Try {
client.getKVClient.txn().
If(new Cmp(ByteSequence.fromString("foo"), Cmp.Op.GREATER, CmpTarget.version(0))).
Then(Op.get(ByteSequence.fromString("foo"), GetOption.DEFAULT)).
Else(Op.put(ByteSequence.fromString("foo"), ByteSequence.fromString("bar"),PutOption.DEFAULT)).commit().get()
}
It checkes if the key foo exists, if it does, it gets the value or else it creates foo with the value bar.
if(test.get.isSucceeded)
println(s"Key `foo` exists and the value is: ${test.get.getGetResponses.get(0).getKvs.get(0).getValue}")
else
println("Key `foo` does not exists, creating new one with value `bar`")
client.close()
Before we can check this code we need a running etcd
cluster. To do that, run the following docker command:
docker run --rm -d -p 2379:2379 -p 2380:2380 --name etcd quay.io/coreos/etcd:v3.2.9 /usr/local/bin/etcd --data-dir=/etcd-data --name node1 --initial-advertise-peer-urls http://127.0.0.1:2380 --listen-peer-urls http://0.0.0.0:2380 --advertise-client-urls http://127.0.0.1:2379 --listen-client-urls http://0.0.0.0:2379 --initial-cluster node1=http://127.0.0.1:2380
Back to our scala code. As anybody might rightfully assume, our first run will produce the following output: Key foo does not exists, creating new one with value bar
. But what happens if we try to run the code again? Unfortunatelly the results are less predictable:
Exception in thread "main" java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.get(ArrayList.java:433)
at Main$.delayedEndpoint$Main$1(Main.scala:21)
at Main$delayedInit$body.apply(Main.scala:9)
at scala.Function0.apply$mcV$sp(Function0.scala:34)
at scala.Function0.apply$mcV$sp$(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App.$anonfun$main$1$adapted(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:389)
at scala.App.main(App.scala:76)
at scala.App.main$(App.scala:74)
at Main$.main(Main.scala:9)
at Main.main(Main.scala)
Let's take a closer look: test.get.getGetResponses.get(0).getKvs.get(0).getValue
. The test variable holds a Try[TxnResponse]
. The TxnResponse
can contain three different responses:
In our case we have one GetResponse
, because inside the transaction (Then(Op.get(ByteSequence.fromString("foo"), GetOption.DEFAULT))
) we issued one get operation. But, of course, put, get and delete operations can be mixed into one transaction. So to get back every GetResponse
you have to call getGetResponses
. Now take a closer look at the jetcd API implementation, especially the TxnResponse.java file:
/**
* returns a list of GetResponse; empty list if none.
*/
public synchronized List<GetResponse> getGetResponses() {
if (getResponses == null) {
getResponses = getResponse().getResponsesList().stream()
.filter((responseOp) -> responseOp.getResponseCase() != RESPONSE_RANGE)
.map(responseOp -> new GetResponse(responseOp.getResponseRange()))
.collect(Collectors.toList());
}
return getResponses;
}
In the exception above, we clearly received an empty list. How did this happen? It turns out that this method's implementation has errors, because it cannot return anything but an empty list. Moreover, getPutResponses
and getDeleteResponses
are also affected by this bug. The bug is inside the filter call, RESPONSE_RANGE
is an enum which is used to identify the various responses. It transpires that the GetResponse
corresponds to RESPONSE_RANGE
, so, instead of !=
, ==
is needed.
Luckily, this bug was fixed on the master
branch by this PR. We assume there will be a new release, soon. However, until then, if Txn
is essential for your work, the master
branch needs to be built locally with mvn clean install
, and a local maven repo needs to be referred on your project.
Get emerging insights on innovative technology straight to your inbox.
Discover how AI assistants can revolutionize your business, from automating routine tasks and improving employee productivity to delivering personalized customer experiences and bridging the AI skills gap.
The Shift is Outshift’s exclusive newsletter.
The latest news and updates on cloud native modern applications, application security, generative AI, quantum computing, and other groundbreaking innovations shaping the future of technology.