Saturday, October 30, 2010

Running Moxi on Solaris

I have been working on getting membase up'n'running on OpenSolaris as a side project. Most of it is already in place, but there are still some Makefile issues to sort out. I thought that while we're waiting to complete that task, I could show you how to easily run moxi as a service controlled by SMF.

I've created some scripts to make it easier for you to build and install everything, so the first we need to do is to check out (or update your clone) of my tools repository:

trond@opensolaris> git clone git://github.com/trondn/tools.git
trond@opensolaris> cd tools/membase

Next up we need to create some new ZFS datasets for our moxi installation. I've created a script that creates the zfs datasets and set up the mountpoints:

trond@opensolaris> ./smf/moxi/setup.sh -u -z rpool

The -u option tells the script to create authorizations, profiles, users and groups we need, and the -z option tells the script to create the zfs filesystems in the zfs pool named rpool.

Next up we need to compile (and install) the source code. The directory /opt/membase is not writable for us, so let's change the ownership so I can install files there...:

trond@opensolaris> pfexec chown trond:staff /opt/membase
trond@opensolaris> ./setup.sh -d /opt/membase moxi
Download commit hook - Ok.
Checking out libmemcached (Bazaar) - Ok.
Checking out bucket_engine (git) - Ok.
Checking out ep-engine (git) - Ok.
Checking out libconflate (git) - Ok.
Checking out libvbucket (git) - Ok.
Checking out memcached (git) - Ok.
Checking out moxi (git) - Ok.
Checking out vbucketmigrator (git) - Ok.
Checking out membase-cli (git) - Ok.
Checking out ns_server (git) - Ok.
Checking out memcachetest (git) - Ok.
Configure build for SunOS
trond@opensolaris> cd moxi/SunOS
trond@opensolaris> make all install

Now we've got everything installed to /opt/membase, so let's change the ownership to membase:membase and install the SMF script to manage moxi:

trond@opensolaris> chown -R membase:membase /opt/membase
trond@opensolaris> cd ../../smf/moxi
trond@opensolaris> ./setup.sh -s
moxi installed as /lib/svc/method/moxi
moxi.xml installed as /var/svc/manifest/application/moxi.xml

So let's check out the configuration options we got for our new SMF service:

trond@opensolaris> svccfg
svc:> select moxi
svc:/application/database/moxi> listprop
manifestfiles                                        framework
manifestfiles/var_svc_manifest_application_moxi_xml  astring  /var/svc/manifest/application/moxi.xml
general                                              framework
general/action_authorization                         astring  solaris.smf.manage.moxi
general/entity_stability                             astring  Unstable
general/single_instance                              boolean  true
general/value_authorization                          astring  solaris.smf.value.moxi
multi-user-server                                    dependency
multi-user-server/entities                           fmri     svc:/milestone/multi-user-server
multi-user-server/grouping                           astring  require_all
multi-user-server/restart_on                         astring  none
multi-user-server/type                               astring  service
moxi                                                 application
moxi/corepattern                                     astring  /var/opt/membase/cores/core.%f.%p
moxi/downstream_max                                  astring  8
moxi/port                                            astring  11211
moxi/threads                                         astring  4
moxi/url                                             astring  http://membase:8091/pools/default/bucketStreaming/default
moxi/version                                         astring  1.6.0
tm_common_name                                       template
tm_common_name/C                                     ustring  Membase
tm_man_moxi                                          template
tm_man_moxi/manpath                                  astring  /opt/membase/share/man
tm_man_moxi/section                                  astring  1
tm_man_moxi/title                                    astring  moxi

You will most likely want to set the URL parameter to point to the bucket you want to use..

svc:/application/database/moxi> setprop moxi/url=http://myserver:8091/pools/default/bucketStreaming/default

Let's refresh the configuration and start the service:

trond@opensolaris> svccfg refresh moxi
trond@opensolaris> svcadm enable moxi
trond@opensolaris> svcs moxi
STATE          STIME    FMRI
online          9:45:41 svc:/application/database/moxi:moxi

Installing Python script from automake, Fixup :)

In my previous blog post I added a wrapper script to start the Python script, but it turns out that this script don't work unless you pass --libdir=something to configure. I didn't catch that originally because I always specify the library directory due to the fact that I'm building both 32bit and 64bit binaries on my Solaris machine.

The following script should address the problem:

#! /bin/sh
prefix=@prefix@
exec_prefix=@exec_prefix@
root=@libdir@/python

if test -z "${PYTHONPATH}"; then
   PYTHONPATH=$root
else
   PYTHONPATH=$root:${PYTHONPATH}
fi
export PYTHONPATH
exec $root/`basename $0`.py "$@"

Thursday, October 28, 2010

Installing Python scripts from automake...

I've been working on making it easier for developers to compile and install membase, and today I learned some more automake magic. I'm one of those developers who don't want to spend a lot of time working on the build system, I want to spend my time working on the code. At the same time I don't want to do unnecessary boring manual work that the build system should do for me.

Parts of membase is implemented in Python, and I've been trying to figure out how to install those pieces. I don't like to mess up the /bin directory with "library" files, so I needed a way to package the Python bits better. I've been using a wrapper script that sets the PYTHONPATH variable earlier, but I've never tried to integrate that into an automake generated makefile.

As always I started out asking google for help, but I didn't end up with a good and easy example so I ended up reading through the automake manual. It turns out that it's fairly easy to do exactly what I want, so I decided to share the knowledge in a blog post :-)

We don't want to hardcode the path to our binary anywhere, so the first thing we need to do is to update configure.ac to also generate our wrapper script:

AC_CONFIG_FILES(Makefile python_wrapper)

I've got multiple programs implemented with Python, and I don't want to create a ton of wrappers, so my python_wrapper.in looks like:

#! /bin/sh
if test -z "${PYTHONPATH}"; then
   PYTHONPATH=@libdir@/python
else
   PYTHONPATH=@libdir@/python:${PYTHONPATH}
fi
export PYTHONPATH
exec @libdir@/python/`basename $0`.py "$@"

This means that if I install this script as /opt/membase/bin/stats, it will try to execute /opt/membase/lib/python/stats.py with the same arguments. So let's go ahead and add a rule to Makefile.am to generate the scripts with the correct names:

PYTHON_TOOLS=stats
${PYTHON_TOOLS}: python_wrapper
    cp $< $@

BUILT_SOURCES += ${PYTHON_TOOLS}
CLEANFILES+= ${PYTHON_TOOLS}
bin_SCRIPTS+= ${PYTHON_TOOLS}

 Now we've got the wrapper script in place, and we've generated all of the scripts to start our programs. The next thing up would be to create the destination directory for the python bits, and install all of them there. To do so we need to create a variable that ends with "dir" to contain the name of the directory. Let's name our "pythonlibdir" and put it in a subdirectory named python of the specified libdir:

pythonlibdir=$(libdir)/python


Finally we need to list all of the files we want to go there:

pythonlib_DATA= \
                mc_bin_client.py \
                mc_bin_server.py \
                memcacheConstants.py
  
pythonlib_SCRIPTS= \
                stats.py \

The reason I use pythonlib_SCRIPTS for the last one is because I want the execute bit set on file.

Sunday, October 17, 2010

Writing your own storage engine for Memcached, part 3

Right now we've got an engine capable of running get and set load, but it is doing synchrounus filesystem IO. We can't serve our client faster than we can read the item from disk, but we might serve other connections while we're reading the item off disk.

In this entry we're going to fix our get and store method so that they don't block the engine API. As I've said earlier, the intention of this tutorial is to focus on the engine API. That means I'm not going to try to make an effective design, because that could distract the focus from what I'm trying to explain. If people are interested in how we could optimize this, I could add a second part of the tutorial in the future... Just let me know.

In order to implement asynchronous operations in our engine, we need to make use of the API the server makes available to us in create_instance. Let's extend our engine structure to keep track of the server API:

struct fs_engine {
   ENGINE_HANDLE_V1 engine;
   SERVER_HANDLE_V1 sapi;
};

...

MEMCACHED_PUBLIC_API
ENGINE_ERROR_CODE create_instance(uint64_t interface,
                                  GET_SERVER_API get_server_api,
                                  ENGINE_HANDLE **handle)
{

...

   h->engine.item_set_cas = fs_item_set_cas;
   h->engine.get_item_info = fs_get_item_info;
   h->sapi = *get_server_api();

...

      

To implement an asynchronous function in the engine interface, the engine needs to dispatch the request to another thread before it return ENGINE_EWOULDBLOCK from the engine function. When the backend is done processing the result, it notifies the memcached core by using the notify_io_complete function in the server interface. If an error occurred while processing the request, the memcached core will return the error message to the client. If your engine called notify_io_complete with ENGINE_SUCCESS, the memcached core will call the engine interface function once more with the same argument as the
first time.

If you look at the server api, you'll see that the it got an interface for storing an engine-specific pointer. This will make our life easier when we want to implement async io. So let's go ahead and update our fs_get method:

static ENGINE_ERROR_CODE fs_get(ENGINE_HANDLE* handle,
                                const void* cookie,
                                item** item,
                                const void* key,
                                const int nkey,
                                uint16_t vbucket)
{
   struct fs_engine *engine = (struct fs_engine *)engine;
   void *res = engine->sapi.cookie->get_engine_specific(cookie);
   if (res != NULL) {
      *item = res;
      engine->sapi.cookie->store_engine_specific(cookie, NULL);
      return ENGINE_SUCCESS;

   }

...
      

The next thing we need to do is to create a function that runs asynchronously and stores the result in the engine_specific setting for the cookie. Since we're going to use async tasks for all of the engine methods, let's go ahead and create a function to run tasks in another thread:

static ENGINE_ERROR_CODE execute(struct task *task)
{
   pthread_attr_t attr;
   pthread_t tid;

   if (pthread_attr_init(&attr) != 0 ||
       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 ||
       pthread_create(&tid, &attr, task->callback, task) != 0) {
      return ENGINE_FAILED;
   }

   return ENGINE_EWOULDBLOCK;
}
      

As you can see from the code I'm going to create a new thread to execute each operation. This isn't very efficient, because creating a new thread got a substansial overhead. In your design you would probably want a pool of threads to run your tasks.

The newly created thread would run the specialized callback with a pointer to the task as it's only argument. So what does this task structure look like?

struct task {
   struct fs_engine *engine; /* Pointer to the engine */
   const void *cookie; /* The cookie requesting the operation */
   void *(*callback)(void *arg);
   union {
      struct {
         char key[PATH_MAX];
         size_t nkey;
      } get; /* Data used by the get operation */
      struct {
         item *item;
         ENGINE_STORE_OPERATION operation;
      } store; /* Data used by the store operation */
   } data;
};
      
So let's finish up fs_get:

static ENGINE_ERROR_CODE fs_get(ENGINE_HANDLE* handle,
                                const void* cookie,
                                item** item,
                                const void* key,
                                const int nkey,
                                uint16_t vbucket)
{
   struct fs_engine *engine = (struct fs_engine *)handle;
   /* Check to see if this is the callback from an earlier ewouldblock */
   void *res = engine->sapi.cookie->get_engine_specific(cookie);
   if (res != NULL) {
      *item = res;
      engine->sapi.cookie->store_engine_specific(cookie, NULL);
      return ENGINE_SUCCESS;
   }

   /* We don't support keys longer than PATH_MAX */
   if (nkey >= PATH_MAX) {
      return ENGINE_FAILED;
   }

   /* Set up the callback struct */
   struct task *task = calloc(1, sizeof(*task));
   if (task == NULL) {
      return ENGINE_ENOMEM;
   }

   task->engine = (struct fs_engine *)handle;
   task->cookie = cookie;
   task->callback = fs_get_callback;
   memcpy(task->data.get.key, key, nkey);
   task->data.get.nkey = nkey;

   ENGINE_ERROR_CODE ret = execute(task);
   if (ret != ENGINE_EWOULDBLOCK) {
      free(task);
   }
   return ret;
}
      

If you look at the code above, you'll see that we specify gs_get_callback as the function to execute. So let's go ahead and implement the callback:

static void *fs_get_callback(void *arg)
{
   struct task *task = arg;
   char *fname = task->data.get.key;
   task->data.get.key[task->data.get.nkey] = '\0';

   struct stat st;
   if (stat(fname, &st) == -1) {
      task->engine->sapi.cookie->notify_io_complete(task->cookie,
                                                    ENGINE_KEY_ENOENT);
      free(task);
      return NULL;
   }

   struct fs_item* it = NULL;
   ENGINE_ERROR_CODE ret = fs_allocate((ENGINE_HANDLE*)task->engine,
                                       task->cookie, (void**)&it,
                                       task->data.get.key,
                                       task->data.get.nkey,
                                       st.st_size, 0, 0);
   if (ret != ENGINE_SUCCESS) {
      task->engine->sapi.cookie->notify_io_complete(task->cookie,
                                                    ENGINE_ENOMEM);
      free(task);
      return NULL;
   }

   FILE *fp = fopen(fname, "r");
   if (fp == NULL) {
      fs_release((ENGINE_HANDLE*)task->engine, task->cookie, it);
      task->engine->sapi.cookie->notify_io_complete(task->cookie,
                                                    ENGINE_FAILED);
      free(task);
      return NULL;
   }

   size_t nr = fread(it->data, 1, it->ndata, fp);
   fclose(fp);
   if (nr != it->ndata) {
      fs_release((ENGINE_HANDLE*)task->engine, task->cookie, it);
      task->engine->sapi.cookie->notify_io_complete(task->cookie,
                                                    ENGINE_FAILED);
      free(task);
      return NULL;
   }

   task->engine->sapi.cookie->store_engine_specific(task->cookie, it);
   task->engine->sapi.cookie->notify_io_complete(task->cookie,
                                                 ENGINE_SUCCESS);
   return NULL;
}
      

As you see it's quite easy to add asynchronous support for the engine functions. Let's go ahead and do the same for fs_store:

static ENGINE_ERROR_CODE fs_store(ENGINE_HANDLE* handle,
                                  const void *cookie,
                                  item* item,
                                  uint64_t *cas,
                                  ENGINE_STORE_OPERATION operation,
                                  uint16_t vbucket)
{
   struct fs_engine *engine = (struct fs_engine *)handle;
   /* Check to see if this is the callback from an earlier ewouldblock */
   void *res = engine->sapi.cookie->get_engine_specific(cookie);
   if (res != NULL) {
      *cas = 0;
      engine->sapi.cookie->store_engine_specific(cookie, NULL);
      return ENGINE_SUCCESS;
   }


   /* Set up the callback struct */
   struct task *task = calloc(1, sizeof(*task));
   if (task == NULL) {
      return ENGINE_ENOMEM;
   }

   task->engine = (struct fs_engine *)handle;
   task->cookie = cookie;
   task->callback = fs_store_callback;
   task->data.store.item = item;
   task->data.store.operation = operation;

   ENGINE_ERROR_CODE ret = execute(task);
   if (ret != ENGINE_EWOULDBLOCK) {
      free(task);
   }
   return ret;
}

And fs_store_callback looks like the following:

static void *fs_store_callback(void *arg)
{
   struct task *task = arg;
   struct fs_item* it = task->data.store.item;
   char fname[it->nkey + 1];
   memcpy(fname, it->key, it->nkey);
   fname[it->nkey] = '\0';

   FILE *fp = fopen(fname, "w");
   if (fp == NULL) {
      task->engine->sapi.cookie->notify_io_complete(task->cookie,
                                                    ENGINE_NOT_STORED);
      free(task);
      return NULL;
   }

   size_t nw = fwrite(it->data, 1, it->ndata, fp);
   fclose(fp);
   if (nw != it->ndata) {
      remove(fname);
      task->engine->sapi.cookie->notify_io_complete(task->cookie,
                                                    ENGINE_NOT_STORED);
      free(task);
      return NULL;
   }

   task->engine->sapi.cookie->store_engine_specific(task->cookie, it);
   task->engine->sapi.cookie->notify_io_complete(task->cookie,
                                                 ENGINE_SUCCESS);
   return NULL;
}

If you look closely at the code above you'll see that we still don't differentiate between add/set/replace, but we'll fix that in the next session.

Tuesday, October 12, 2010

Building membase from the sources...

I thought I should share some information about my personal development model for membase.

I've set up a "sandbox" where I'm doing all of my development in with the following commands:

trond@opensolaris> pfexec zfs create -o mountpoint=/source rpool/source
trond@opensolaris> pfexec chown trond:staff /source
trond@opensolaris> mkdir /source/membase
trond@opensolaris> cd /source/membase
trond@opensolaris> git clone git://github.com/trondn/tools.git
trond@opensolaris> cd tools/membase

I like to keep my changes as separated as possible, to reduce the dependencies between them. Whenever I am fixing a bug report I would do something like:

trond@opensolaris> mkdir bugnnn
trond@opensolaris> cd bugnnn
trond@opensolaris> ln -s ../Makefile
trond@opensolaris> make

That would build the entire Membase stack and put the files in /tmp/membase-build. I would then change my working directory to the module where I'm going to fix a bug and (hopefully) fix it.

After fixing the bug (and writing a test case!) I would commit the change and push it for review with the following commands:

trond@opensolaris> git add -p   (and select the changes to include)
trond@opensolaris> git commit -m "bugnnn: blah blah blah"
trond@opensolaris> git for-review

The last command there would push the change to our review system, so that Dustin or someone else can read through the diffs and accept the patch if they like it.

If you look at the workflow above it looks pretty easy, there is however one little thing that is really annoying... That is that Membase is a cross platform project, so I need to ensure that the code compiles and works on all of our platforms. With the method above I would have to log into another system and set everything up and copy my change over to see that it works. For simple changes that only touch one module I could always use buildbot or Hudson to test it on all platforms, but that doesn't work if I do an interface change that affects all of our modules.

I'm kind of lazy so I don't want to do such boring work all of the time, so instead I wrote a script to set up the sources and create makefiles so that I can easily build the same source tree on all of my platforms.

In order for it to work you need to set up sharing on your filesystem:

trond@opensolaris> pfexec zfs set sharenfs=on rpool/source
trond@opensolaris> pfexec zfs set sharesmb=name=source rpool/source

To set up a tree for lets say bug 9999 I would run something like:

trond@opensolaris> ./setup.sh bug_9999
Download commit hook - Ok.
Checking out libmemcached (Bazaar) - Ok.
  Generate configure script - Ok.
Checking out bucket_engine (git) - Ok.
Checking out ep-engine (git) - Ok.
  Generate configure script - Ok.
Checking out libconflate (git) - Ok.
  Generate configure script - Ok.
Checking out libvbucket (git) - Ok.
  Generate configure script - Ok.
Checking out memcached (git) - Ok.
  Generate configure script - Ok.
Checking out moxi (git) - Ok.
  Generate configure script - Ok.
Checking out vbucketmigrator (git) - Ok.
  Generate configure script - Ok.
Checking out membase-cli (git) - Ok.
Checking out ns_server (git) - Ok.
Checking out memcachetest (git) - Ok.
  Generate configure script - Ok.
Configure build for SunOS

This will set up a build environemnt for Solaris that builds membase as a "multi isa" (both 32 and 64 bit) stack in /tmp/membase. But let's add support for my MacOSX, Ubuntu and Debian box as well. Since all of the code is located on my opensolaris box, I need to use the -s option to let it know where the source is located:

trond@opensolaris> ./setup.sh -s /net/opensolaris/source/membase/tools/membase -p Ubuntu bug_9999
Configure build for Ubuntu
trond@opensolaris> ./setup.sh -s /net/opensolaris/source/membase/tools/membase -p Darwin bug_9999
Configure build for Darwin
trond@opensolaris> ./setup.sh -s /net/opensolaris/source/membase/tools/membase -p Debian bug_9999
Configure build for Debian

So let's look inside the bug_9999 directory:

trond@opensolaris> ls -l bug_9999
total 15
drwxr-xr-x  13 trond    staff         14 Oct 12 13:35 Darwin
drwxr-xr-x  13 trond    staff         14 Oct 12 13:35 Debian
drwxr-xr-x  13 trond    staff         14 Oct 12 13:33 src
drwxr-xr-x   4 trond    staff          5 Oct 12 13:33 SunOS
drwxr-xr-x  13 trond    staff         14 Oct 12 13:35 Ubuntu

All of the sources are located in the src directory, and all of the makefiles for the various platforms will reference that code.

To build on all of my platforms I'm just executing:

trond@opensolaris> ssh ubuntu "cd /net/opensolaris/source/membase/tools/membase/bug_9999/Ubuntu && make" > ubuntu.log 2>&1 &
trond@opensolaris> ssh debian "cd /net/opensolaris/source/membase/tools/membase/bug_9999/Debian && make" > debian.log 2>&1 &
trond@opensolaris> ssh darwin "cd /net/opensolaris/source/membase/tools/membase/bug_9999/Darwin && make" > darwin.log 2>&1 &

but I've got that in a script of course:

trond@opensolaris> cat bug_9999/build.sh
#! /bin/ksh
cd SunOS && gmake > sunos.log 2>&1 &
ssh ubuntu "cd /net/opensolaris/source/membase/tools/membase/bug_9999/Ubuntu && make" > ubuntu.log 2>&1 &
ssh debian "cd /net/opensolaris/source/membase/tools/membase/bug_9999/Debian && make" > debian.log 2>&1 &
ssh darwin "cd /net/opensolaris/source/membase/tools/membase/bug_9999/Darwin && make" > darwin.log 2>&1 &
xterm -T SunOS -e tail -f sunos.log &
xterm -T Ubuntu -e tail -f ubuntu.log &
xterm -T Debian -e tail -f debian.log &
xterm -T MacOS -e tail -f darwin.log &

Unfortunately you can't start the membase we just installed in /tmp/membase, but I'm working on it!

Friday, October 8, 2010

Writing your own storage engine for Memcached, part 2

In the previous blog post I described the engine initialization and destruction. This blog post will cover the memory allocation model in the engine interface.

The memcached core is responsible for allocating all of the memory it needs for its connections (send / receive buffers etc), and the engine is responsible for allocating (and freeing) all of the memory it needs to keep track of the items. The engines shouldn't have to care about the memory the core allocates (and use), but the core will access the memory managed by the engine.

When the memcached core is about to store a new item it needs to get a (as of today continous) buffer to store the data for the item. The core will try to allocate this buffer by calling the allocate function in the API. So let's start extending our example code by adding out own implementation of the allocate function. The first thing we need to do is to add it to our engine descriptor we return from create_instance. We're going to add a number of functions in todays entry, so let's just map all of them while we're at it:

MEMCACHED_PUBLIC_API
ENGINE_ERROR_CODE create_instance(uint64_t interface,
                                  GET_SERVER_API get_server_api,
                                  ENGINE_HANDLE **handle)
{
   [ ... cut ... ]
  /*
   * Map the API entry points to our functions that implement them.
   */
   h->engine.initialize = fs_initialize;
   h->engine.destroy = fs_destroy;
   h->engine.get_info = fs_get_info;
   h->engine.allocate = fs_allocate;
   h->engine.remove = fs_item_delete;
   h->engine.release = fs_item_release;
   h->engine.get = fs_get;
   h->engine.get_stats = fs_get_stats;
   h->engine.reset_stats = fs_reset_stats;
   h->engine.store = fs_store;
   h->engine.flush = fs_flush;
   h->engine.unknown_command = fs_unknown_command;
   h->engine.item_set_cas = fs_item_set_cas;
   h->engine.get_item_info = fs_get_item_info;
      
The next thing we need to do is to create a data structure to keep the information we need. The purpose of this tutorial isn't to create a memory efficient implementation, but to exercise the API. So let's just create the following struct:

struct fs_item {
   void *key;
   size_t nkey;
   void *data;
   size_t ndata;
   int flags;
   rel_time_t exptime;
};
      
Our implementation of allocate would then look like:

static ENGINE_ERROR_CODE fs_allocate(ENGINE_HANDLE* handle,
                                     const void* cookie,
                                     item **item,
                                     const void* key,
                                     const size_t nkey,
                                     const size_t nbytes,
                                     const int flags,
                                     const rel_time_t exptime)
{
   struct fs_item *it = malloc(sizeof(struct fs_item));
   if (it == NULL) {
      return ENGINE_ENOMEM;
   }
   it->flags = flags;
   it->exptime = exptime;
   it->nkey = nkey;
   it->ndata = nbytes;
   it->key = malloc(nkey);
   it->data = malloc(nbytes);
   if (it->key == NULL || it->data == NULL) {
      free(it->key);
      free(it->data);
      free(it);
      return ENGINE_ENOMEM;
   }
   memcpy(it->key, key, nkey);
   *item = it;
   return ENGINE_SUCCESS;
}
      
If you look in the implementation above you'll see that we didn't return the pointer to the actual memory for the data storage to the memcached core. To get that address memcached will call get_item_info in the API. So let's implememnt that:

static bool fs_get_item_info(ENGINE_HANDLE *handle, const void *cookie,
                             const item* item, item_info *item_info)
{
   struct fs_item* it = (struct fs_item*)item;
   if (item_info->nvalue < 1) {
      return false;
   }

   item_info->cas = 0; /* Not supported */
   item_info->clsid = 0; /* Not supported */
   item_info->exptime = it->exptime;
   item_info->flags = it->flags;
   item_info->key = it->key;
   item_info->nkey = it->nkey;
   item_info->nbytes = it->ndata; /* Total length of the items data */
   item_info->nvalue = 1; /* Number of fragments used */
   item_info->value[0].iov_base = it->data; /* pointer to fragment 1 */
   item_info->value[0].iov_len = it->ndata; /* Length of fragment 1 */

   return true;
}
      
The get_item_info function is important and deserve more information. If you look in the engine API the "item" is defined as a void pointer, and we defined our own item-structure to keep track of the information we need on a per item basis. The memcached core will however need to know
where to read / write the memory for the key and the data going to / coming from a clinet. To do so in will invoke get_item_info. If you look closely at our implementation of fs_get_item_info you will see that the first thing I'm doing is to check that item_info->nvalue contains at least 1
element. Right now it will always be one, but the intention is that we're going to support scattered IO.

When the core is done moving the data it received over the wire into the item, it will try to store the item in our engine by calling store. So let's go ahead and create a simple implementation (we'll extend it later on in the tutorial):

static ENGINE_ERROR_CODE fs_store(ENGINE_HANDLE* handle,
                                  const void *cookie,
                                  item* item,
                                  uint64_t *cas,
                                  ENGINE_STORE_OPERATION operation,
                                  uint16_t vbucket)
{
   struct fs_item* it = item;
   char fname[it->nkey + 1];
   memcpy(fname, it->key, it->nkey);
   fname[it->nkey] = '\0';
   FILE *fp = fopen(fname, "w");
   if (fp == NULL) {
      return ENGINE_NOT_STORED;
   }
   size_t nw = fwrite(it->data, 1, it->ndata, fp);
   fclose(fp);
   if (nw != it->ndata) {
      remove(fname);
      return ENGINE_NOT_STORED;
   }

   *cas = 0;
   return ENGINE_SUCCESS;
}
      
If you look at the implementation above you will see that it doesn't implement the correct semantics for add/replace/set etc, and it will block memcached while we're doing file IO. Don't care about that right now, because we'll get back to that.

When the core is done using the item it allocated, it will release the item by calling the release function in the API. The engine may reuse the items storage for something else at this time. So let's hook up our release implementation:

static void fs_item_release(ENGINE_HANDLE* handle,
                            const void *cookie,
                            item* item)
{
   struct fs_item *it = item;
   free(it->key);
   free(it->data);
   free(it);
}
      
Now we've created all of the code to successfully store items in our engine, but we can't read any of them back. So let's implement get

static ENGINE_ERROR_CODE fs_get(ENGINE_HANDLE* handle,
                                const void* cookie,
                                item** item,
                                const void* key,
                                const int nkey,
                                uint16_t vbucket)
{

   char fname[nkey + 1];
   memcpy(fname, key, nkey);
   fname[nkey] = '\0';

   struct stat st;
   if (stat(fname, &st) == -1) {
      return ENGINE_NOT_STORED;
   }

   struct fs_item* it = NULL;
   ENGINE_ERROR_CODE ret = fs_allocate(handle, cookie, (void**)&it, key, nkey,
                                       st.st_size, 0, 0);
   if (ret != ENGINE_SUCCESS) {
      return ENGINE_ENOMEM;
   }

   FILE *fp = fopen(fname, "r");
   if (fp == NULL) {
      fs_release(handle, cookie, it);
      return ENGINE_FAILED;
   }

   size_t nr = fread(it->data, 1, it->ndata, fp);
   fclose(fp);
   if (nr != it->ndata) {
      fs_release(handle, cookie, it);
      return ENGINE_FAILED;
   }

   *item = it;
   return ENGINE_SUCCESS;
}
      
Let's add a dummy implementation for the rest of the API and try to load and test the engine:

static const engine_info* fs_get_info(ENGINE_HANDLE* handle)
{
   static engine_info info = {
      .description = "Filesystem engine v0.1",
      .num_features = 0
   };

   return &info;
}

static ENGINE_ERROR_CODE fs_item_delete(ENGINE_HANDLE* handle,
                                        const void* cookie,
                                        const void* key,
                                        const size_t nkey,
                                        uint64_t cas,
                                        uint16_t vbucket)
{
   return ENGINE_KEY_ENOENT;
}

static ENGINE_ERROR_CODE fs_get_stats(ENGINE_HANDLE* handle,
                                      const void* cookie,
                                      const char* stat_key,
                                      int nkey,
                                      ADD_STAT add_stat)
{
   return ENGINE_SUCCESS;
}

static ENGINE_ERROR_CODE fs_flush(ENGINE_HANDLE* handle,
                                  const void* cookie, time_t when)
{

   return ENGINE_SUCCESS;
}

static void fs_reset_stats(ENGINE_HANDLE* handle, const void *cookie)
{

}

static ENGINE_ERROR_CODE fs_unknown_command(ENGINE_HANDLE* handle,
                                            const void* cookie,
                                            protocol_binary_request_header *request,
                                            ADD_RESPONSE response)
{
   return ENGINE_ENOTSUP;
}

static void fs_item_set_cas(ENGINE_HANDLE *handle, const void *cookie,
                            item* item, uint64_t val)
{
}

      
So let's go ahead and try our engine:

trond@opensolaris> /opt/memcached/bin/memcached -E .libs/fs_engine.so
      
From another terminal I'm typing in:

trond@opensolaris> telnet localhost 11211
Trying ::1...
Connected to opensolaris.
Escape character is '^]'.
add test 0 0 4
test
STORED
get test
VALUE test 0 4
test
END
quit
Connection to storm closed by foreign host.
      
Terminate memcached by pressing ctrl-c, and look in the current directory:

trond@opensolaris> ls -l test
-rw-r--r--   1 trond    users          6 Oct  8 12:56 test
trond@opensolaris> cat test
test
      
That's all for this time.

Monday, October 4, 2010

Writing your own storage engine for Memcached

I am working full time on membase, which utilize the "engine interface" we're adding to Memcached. Being the one who designed the API and wrote the documentation, I can say that we do need more (and better) documentation without insulting anyone. This blog entry will be the first entry in mini-tutorial on how to write your own storage engine. I will try to cover all aspects of the engine interface while we're building an engine that stores all of the keys on files on the server.

This entry will cover the basic steps of setting up your development environment and cover the lifecycle of the engine.

Set up the development environment

The easiest way to get "up'n'running" is to install my development branch of the engine interface. Just execute the following commands:

$ git clone git://github.com/trondn/memcached.git
$ cd memcached
$ git -b engine origin/engine
$ ./config/autorun.sh
$ ./configure --prefix=/opt/memcached
$ make all install
     

Lets verify that the server works by executing the following commands:

$ /opt/memcached/bin/memcached -E default_engine.so &
$ echo version | nc localhost 11211
VERSION 1.3.3_433_g82fb476     ≶-- you may get another output string....
$ fg
$ ctrl-C
     

Creating the filesystem engine

You might want to use autoconf to build your engine, but setting up autoconf is way beyond the scope of this tutorial. Let's just use the following Makefile instead.

ROOT=/opt/memcached
INCLUDE=-I${ROOT}/include

#CC = gcc
#CFLAGS=-std=gnu99 -g -DNDEBUG -fno-strict-aliasing -Wall \
# -Wstrict-prototypes -Wmissing-prototypes -Wmissing-declarations \
# -Wredundant-decls \
# ${INCLUDE} -DHAVE_CONFIG_H
#LDFLAGS=-shared

CC=cc
CFLAGS=-I${ROOT}/include -m64 -xldscope=hidden -mt -g \
      -errfmt=error -errwarn -errshort=tags  -KPIC
LDFLAGS=-G -z defs -m64 -mt

all: .libs/fs_engine.so

install: all
 ${CP} .libs/fs_engine.so ${ROOT}/lib

SRC = fs_engine.c
OBJS = ${SRC:%.c=.libs/%.o}

.libs/fs_engine.so: .libs $(OBJS)
 ${LINK.c} -o $@ ${OBJS}

.libs:; -@mkdir $@

.libs/%.o: %.c
 ${COMPILE.c} $< -o $@   clean:  $(RM) .libs/fs_engine.so $(OBJS)       

I am doing most of my development on Solaris using the Sun Studio compilers, but I have added a section with settings for gcc there if you're using gcc. Just comment out lines for CC, CFLAGS and LDFLAGS and remove the # for the gcc alternatives.

In order for memcached to utilize your storage engine it needs to first load your module, and then create an instance the engine. You use the -E option to memcached to specify the name of the module memcached should load. With the module loaded memcached will look for a symbol named create_instance in the module to create an handle memcached can use to communicate with the engine. This is the first function we need to create, and it should have the following signature:

MEMCACHED_PUBLIC_API
ENGINE_ERROR_CODE create_instance(uint64_t interface, GET_SERVER_API get_server_api, ENGINE_HANDLE **handle);
     

The purpose of this function is to provide the server a handle to our module, but we should not perform any kind of initialization of our engine yet. The reason for that is because the memcached server may not support the version of the API we provide. The intention is that the server should notify the engine with the "highest" interface version it supports through interface, and the engine must return a descriptor to one of those interfaces through the handle. If the engine don't support any of those interfaces it should return ENGINE_ENOTSUP.

So let's go ahead and define a engine descriptor for our example engine and create an implementation for create_instance:

struct fs_engine {
  ENGINE_HANDLE_V1 engine;
  /* We're going to extend this structure later on */
};

MEMCACHED_PUBLIC_API
ENGINE_ERROR_CODE create_instance(uint64_t interface,
                                 GET_SERVER_API get_server_api,
                                 ENGINE_HANDLE **handle) {
  /*
   * Verify that the interface from the server is one we support. Right now
   * there is only one interface, so we would accept all of them (and it would
   * be up to the server to refuse us... I'm adding the test here so you
   * get the picture..
   */
  if (interface == 0) {
     return ENGINE_ENOTSUP;
  }

  /*
   * Allocate memory for the engine descriptor. I'm no big fan of using
   * global variables, because that might create problems later on if
   * we later on decide to create multiple instances of the same engine.
   * Better to be on the safe side from day one...
   */
  struct fs_engine *h = calloc(1, sizeof(*h));
  if (h == NULL) {
     return ENGINE_ENOMEM;
  }

  /*
   * We're going to implement the first version of the engine API, so
   * we need to inform the memcached core what kind of structure it should
   * expect
   */
  h->engine.interface.interface = 1;

  /*
   * Map the API entry points to our functions that implement them.
   */
  h->engine.initialize = fs_initialize;
  h->engine.destroy = fs_destroy;

  /* Pass the handle back to the core */
  *handle = (ENGINE_HANDLE*)h;

  return ENGINE_SUCCESS;
}
     

If the interface we provide in create_instance is dropped from the supported interfaces in memcached, the core will call destroy() immediately. The memcached core guarantees that it will never use any pointers returned from the engine when destroy() is called.

So let's go ahead and implement our destroy() function. If you look at our implementation of create_instance you will see that we mapped destroy() to a function named fs_destroy():

static void fs_destroy(ENGINE_HANDLE* handle) {
  /* Release the memory allocated for the engine descriptor */
  free(handle);
}
     

If the core implements the interface we specify, the core will call a the initialize() method. This is the time where you should do all sort of initialization in your engine (like connecting to a database, initializing mutexes etc). The initialize function is called only once per instance returned from create_instance (even if the memcached core use multiple threads). The core will not call any other functions in the api before the initialization method returns.

We don't need any kind of initialization at this moment, so we can use the following initialization code:

static ENGINE_ERROR_CODE fs_initialize(ENGINE_HANDLE* handle,
                                      const char* config_str) {
  return ENGINE_SUCCESS;
}
     

If the engine returns anything else than ENGINE_SUCCESS, the memcached core will refuse to use the engine and call destroy()

In the next blog entry we will start adding functionality so that we can load our engine and handle commands from the client.