Wednesday, August 14, 2013

Connecting to multiple clusters from libcouchbase

The other day I talked to a user who wanted to utilize multiple Couchbase clusters from the same application using libcouchbase. That is pretty simple to do, but it also gave me an idea to a blog post describing a couple of options you have.

If you've used libcouchbase you probably know that you need to supply an lcb_t to all of the functions in libcouchbase. The reason for that is that we don't use global variables in libcouchbase, so in fact the absolute easiest way to communicate with two different clusters can be done as simple as:

lcb_t cluster1, cluster2;
struct lcb_create_st create_options;

/* Create instance to the first cluster */
memset(&create_options, 0, sizeof(create_options));
create_options.v.v0.host = "cluster1";
lcb_create(&cluster1, &create_options);

/* Create instance to the second cluster */
memset(&create_options, 0, sizeof(create_options));
create_options.v.v0.host = "cluster2";
lcb_create(&cluster2, &create_options);

So far so good, but what if I want to access the two clusters concurrently? Using multiple threads seems like the "easiest" solution to this problem, and that is fine as long as you don't use the same lcb_t from multiple threads at the same time. libcouchbase is built for scalability, so we won't put any limitations inside the library that you could be better off solved outside the library. With this in mind the previous example could just as easy be rewritten as (using pthreads):

static void *my_cluster_worker(void *arg) {
    lcb_t instance;
    struct lcb_create_st create_options;
    memset(&create_options, 0, sizeof(create_options));
    create_options.v.v0.host = arg;
    lcb_create(&instance, &create_options);

...

/* Spin up the different threads */
pthread_create(tid, NULL, my_cluster_worker, "cluster1");
pthread_create(tid, NULL, my_cluster_worker, "cluster2");

You could of course just protect the different lcb_t with a lock and ensure that you're using them excluselively:

pthread_mutex_lock(&cluster1_mutex);
lcb_get(cluster1,  ... )
lcb_wait(cluster1);
pthread_mutex_unlock(&cluster1_mutex);

Given that libcouchbase is asynchronous we can also utilize multiple clusters from the same thread by utilizing the same IO instance. This isn't "true multitasking", but in most cases we'd be waiting on the command flying over the network anyway:

lcb_io_opt_t iops;
lcb_t cluster1, cluster2;
struct lcb_create_st create_options;

/* Create io instance */
lcb_create_io_ops(&iops, NULL);

/* Create instance to the first cluster */
memset(&create_options, 0, sizeof(create_options));
create_options.v.v0.host = "cluster1";
create_options.v.v0.io = iops;
lcb_create(&cluster1, &create_options);

/* Create instance to the second cluster */
memset(&create_options, 0, sizeof(create_options));
create_options.v.v0.host = "cluster2";
create_options.v.v0.io = iops;
lcb_create(&cluster2, &create_options);

All operations you try to execute will use the same event loop, so that if you do lcb_wait it will drive the event loop until all operations scheduled for that lcb_t instance is executed (but this will also execute commands scheduled for all other lcb_t instances using the same io instance).

Lets drag the example a bit further and imagine that we're using multiple Couchbase clusters for a high availability cache. I'm not going to look into cache consistency here (trying to limit the scope of the post). Whenever we want to store an item we try to store it on all of the servers, and whenever we want to retrieve an object we'll just use the fastest response (we could of course return the value returned from the quorum of the clusters etc, but you'll figure out how to tweak the code to do so.

You'll find the entire source code in the example directory for libcouchbase, so I'll just comment inline in the code here (making it harder for people doing copy'n'paste).

So let's go ahead and make a MultiClusterClient with the following API:

class MultiClusterClient {
public:
    MultiClusterClient(std::list<std::string> clusters);
    lcb_error_t store(const std::string &key, const std::string &value);
    lcb_error_t get(const std::string &key, std::string &value);
};

The user can then use the client like:

std::vector<std::string> clusters;
clusters.push_back("cluster1");
clusters.push_back("cluster2");
clusters.push_back("cluster3");
clusters.push_back("cluster4");

MultiClusterClient mcc(clusters);
mcc.store("foo", "bar");
...

The way the client works is that instead of using lcb_wait to wait for completion of operations, it starts and stops the event machine whenever it is needed through the io operations interface.

So let's show the entire signature for the MultiClusterClient class:

class MultiClusterClient {
public:
    MultiClusterClient(std::list<std::string> clusters);
    lcb_error_t store(const std::string &key, const std::string &value);
    lcb_error_t get(const std::string &key, std::string &value);

private:
    void wait(void) {
        switch (iops->version) {
        case 0:
            iops->v.v0.run_event_loop(iops);
            break;
        case 1:
            iops->v.v1.run_event_loop(iops);
            break;
        default:
            std::cerr << "Unknown io version " << iops->version << std::endl;
            exit(EXIT_FAILURE);
        }
    }

    void resume(void) {
        switch (iops->version) {
        case 0:
            iops->v.v0.stop_event_loop(iops);
            break;
        case 1:
            iops->v.v1.stop_event_loop(iops);
            break;
        default:
            std::cerr << "Unknown io version " << iops->version << std::endl;
            exit(EXIT_FAILURE);
        }
    }

    lcb_io_opt_t iops;
    std::list<lcb_t> instances;
};

Now that we've got an idea on how the class looks like, lets go ahead and write the constructor. In the constructor I'm going to create all of the instances used to connect to the various nodes, and I'm going to simplify the error handling to terminate the program instead of obfuscating the code with a ton of recovery code/logic.

MultiClusterClient(std::list<std::string> clusters) {
    lcb_error_t err;
    if ((err = lcb_create_io_ops(&iops, &backend)) != LCB_SUCCESS) {
        std::cerr <<"Failed to create io ops: "
                  << lcb_strerror(NULL, err)
                  << std::endl;
        exit(1);
    }

    // Create an lcb_t instance to all of the clusters
    for (std::list<std::string>::iterator iter = clusters.begin();
         iter != clusters.end();
         ++iter) {
        lcb_create_st options(iter->c_str(), NULL, NULL, NULL, iops);
        lcb_t instance;
        if ((err = lcb_create(&instance, &options)) != LCB_SUCCESS) {
            std::cerr <<"Failed to create instance: "
                      << lcb_strerror(NULL, err)
                      << std::endl;
            exit(1);
        }

        lcb_set_error_callback(instance, error_callback);
        lcb_set_get_callback(instance, get_callback);
        lcb_set_store_callback(instance, storage_callback);

        lcb_connect(instance);
        lcb_wait(instance);
        instances.push_back(instance);
    }
}

To summarize the effect of the code above, we've now got list of lcb_t instances connected to all of the requested clusters where all of them is bound to the same event base.

With the list of the instances all set up I guess its time to implement the store method and start discussing that:

lcb_error_t store(const std::string &key, const std::string &value) {
    const lcb_store_cmd_t *commands[1];
    lcb_store_cmd_t cmd;
    commands[0] = &cmd;
    memset(&cmd, 0, sizeof(cmd));
    cmd.v.v0.key = key.c_str();
    cmd.v.v0.nkey = key.length();
    cmd.v.v0.bytes = value.c_str();
    cmd.v.v0.nbytes = value.length();
    cmd.v.v0.operation = LCB_SET;

    lcb_error_t error;
    Operation *oper = new Operation(this);

    // Send the operation to all of the clusters
    for (std::list<lcb_t>::iterator iter = instances.begin();
         iter != instances.end();
         ++iter) {

        if ((error = lcb_store(*iter, oper, 1, commands)) != LCB_SUCCESS) {
            oper->response(error, "");
        }
    }

    wait();
    lcb_error_t ret = oper->getErrorCode();
    oper->release();
    return ret;
}

lcb_error_t get(const std::string &key, std::string &value) {
    lcb_get_cmd_t cmd;
    const lcb_get_cmd_t *commands[1];

    commands[0] = &cmd;
    memset(&cmd, 0, sizeof(cmd));
    cmd.v.v0.key = key.c_str();
    cmd.v.v0.nkey = key.length();

    Operation *oper = new Operation(this);
    lcb_error_t error;
    for (std::list<lcb_t>::iterator iter = instances.begin();
         iter != instances.end();
         ++iter) {

        if ((error = lcb_get(*iter, oper, 1, commands)) != LCB_SUCCESS) {
            oper->response(error, "");
        }
    }

    wait();
    value = oper->getValue();
    lcb_error_t ret = oper->getErrorCode();
    oper->release();
    return ret;
}

This looks pretty much like how you would have done with just a single cluster except for the Operation class and that we're calling wait() instead of lcb_wait(). So what is the Operation class and what is its purpose? As I said earlier we're not going to wait for a response from all of the clusters before responding. This means that the next time I wait for an response I get a response for the previous request I sent out (which should be "silently" ignored). I'm aware of that I really don't need to create a separate class for this (I could have used a counter and assigned a sequence number to each command, but this was just as easy). Given that I don't know the life-time for each request I use "reference-counting" on the object to figure out when to destory the object.

So let take a look at the Operation class:

class Operation {
public:
    Operation(MultiClusterClient *r) :
        root(r),
        error(LCB_SUCCESS),
        numReferences(r->instances.size() + 1),
        numResponses(0)
    {
    }

    void response(lcb_error_t err, const std::string &value) {
        if (err == LCB_SUCCESS) {
            values.push_back(value);
        } else {
            error = err;
        }

        // @todo Currently we're going to proceed at the first
        // response.. you might want more ;-)
        // the wait to resume
        if (++numResponses == 1) {
            root->resume();
        }

        maybeNukeMe();
    }

    lcb_error_t getErrorCode(void) {
        // You might want to do this in a quorum fasion of all the
        // responses
        return error;
    }

    std::string getValue(void) {
        // You might want to do this in a quorum fasion of all the
        // responses
        return values[0];
    }

    void release(void) {
        maybeNukeMe();
    }

private:
    void maybeNukeMe(void) {
        if (--numReferences == 0) {
            delete this;
        }
    }

    MultiClusterClient *root;
    lcb_error_t error;
    int numReferences;
    int numResponses;
    std::vector<std::string> values;
};

As you see the code makes a few shortcuts. For once I let one error mark the entire operation fail (if the first cluster don't have the key but the second does etc you'll get that the key wasn't found), and the error checking should do more retries etc. Anyway you'll figure out how it works.

The last "missing pieces" is the callbacks called from libcouchbase:

static void storage_callback(lcb_t, const void *cookie,
                             lcb_storage_t, lcb_error_t error,
                             const lcb_store_resp_t *)
{
    MultiClusterClient::Operation *o;
    o = (MultiClusterClient::Operation *)cookie;
    o->response(error, "");
}

static void get_callback(lcb_t, const void *cookie, lcb_error_t error,
                         const lcb_get_resp_t *resp)
{
    MultiClusterClient::Operation *o;
    o = (MultiClusterClient::Operation *)cookie;
    if (error == LCB_SUCCESS) {
        std::string value((char*)resp->v.v0.bytes, resp->v.v0.nbytes);
        o->response(error, value);
    } else {
        o->response(error, "");
    }
}

static void error_callback(lcb_t instance,
                           lcb_error_t error,
                           const char *errinfo)
{
    std::cerr << "An error occurred: " << lcb_strerror(instance, error);
    if (errinfo) {
        std::cerr << " (" << errinfo << ")";
    }
    std::cerr << std::endl;
    exit(EXIT_FAILURE);
}

Happy hacking

No comments:

Post a Comment