In this entry we're going to fix our
get
and store
method so that they don't block the engine API. As I've said earlier, the intention of this tutorial is to focus on the engine API. That means I'm not going to try to make an effective design, because that could distract the focus from what I'm trying to explain. If people are interested in how we could optimize this, I could add a second part of the tutorial in the future... Just let me know.In order to implement asynchronous operations in our engine, we need to make use of the API the server makes available to us in
create_instance
. Let's extend our engine structure to keep track of the server API:struct fs_engine { ENGINE_HANDLE_V1 engine; SERVER_HANDLE_V1 sapi; }; ... MEMCACHED_PUBLIC_API ENGINE_ERROR_CODE create_instance(uint64_t interface, GET_SERVER_API get_server_api, ENGINE_HANDLE **handle) { ... h->engine.item_set_cas = fs_item_set_cas; h->engine.get_item_info = fs_get_item_info; h->sapi = *get_server_api(); ...
To implement an asynchronous function in the engine interface, the engine needs to dispatch the request to another thread before it return
ENGINE_EWOULDBLOCK
from the engine function. When the backend is done processing the result, it notifies the memcached core by using the notify_io_complete
function in the server interface. If an error occurred while processing the request, the memcached core will return the error message to the client. If your engine called notify_io_complete
with ENGINE_SUCCESS
, the memcached core will call the engine interface function once more with the same argument as thefirst time.
If you look at the server api, you'll see that the it got an interface for storing an engine-specific pointer. This will make our life easier when we want to implement async io. So let's go ahead and update our
fs_get
method:static ENGINE_ERROR_CODE fs_get(ENGINE_HANDLE* handle, const void* cookie, item** item, const void* key, const int nkey, uint16_t vbucket) { struct fs_engine *engine = (struct fs_engine *)engine; void *res = engine->sapi.cookie->get_engine_specific(cookie); if (res != NULL) { *item = res; engine->sapi.cookie->store_engine_specific(cookie, NULL); return ENGINE_SUCCESS; } ...
The next thing we need to do is to create a function that runs asynchronously and stores the result in the engine_specific setting for the cookie. Since we're going to use async tasks for all of the engine methods, let's go ahead and create a function to run tasks in another thread:
static ENGINE_ERROR_CODE execute(struct task *task) { pthread_attr_t attr; pthread_t tid; if (pthread_attr_init(&attr) != 0 || pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 || pthread_create(&tid, &attr, task->callback, task) != 0) { return ENGINE_FAILED; } return ENGINE_EWOULDBLOCK; }
As you can see from the code I'm going to create a new thread to execute each operation. This isn't very efficient, because creating a new thread got a substansial overhead. In your design you would probably want a pool of threads to run your tasks.
The newly created thread would run the specialized callback with a pointer to the task as it's only argument. So what does this task structure look like?
struct task { struct fs_engine *engine; /* Pointer to the engine */ const void *cookie; /* The cookie requesting the operation */ void *(*callback)(void *arg); union { struct { char key[PATH_MAX]; size_t nkey; } get; /* Data used by the get operation */ struct { item *item; ENGINE_STORE_OPERATION operation; } store; /* Data used by the store operation */ } data; };So let's finish up
fs_get
:static ENGINE_ERROR_CODE fs_get(ENGINE_HANDLE* handle, const void* cookie, item** item, const void* key, const int nkey, uint16_t vbucket) { struct fs_engine *engine = (struct fs_engine *)handle; /* Check to see if this is the callback from an earlier ewouldblock */ void *res = engine->sapi.cookie->get_engine_specific(cookie); if (res != NULL) { *item = res; engine->sapi.cookie->store_engine_specific(cookie, NULL); return ENGINE_SUCCESS; } /* We don't support keys longer than PATH_MAX */ if (nkey >= PATH_MAX) { return ENGINE_FAILED; } /* Set up the callback struct */ struct task *task = calloc(1, sizeof(*task)); if (task == NULL) { return ENGINE_ENOMEM; } task->engine = (struct fs_engine *)handle; task->cookie = cookie; task->callback = fs_get_callback; memcpy(task->data.get.key, key, nkey); task->data.get.nkey = nkey; ENGINE_ERROR_CODE ret = execute(task); if (ret != ENGINE_EWOULDBLOCK) { free(task); } return ret; }
If you look at the code above, you'll see that we specify
gs_get_callback
as the function to execute. So let's go ahead and implement the callback:static void *fs_get_callback(void *arg) { struct task *task = arg; char *fname = task->data.get.key; task->data.get.key[task->data.get.nkey] = '\0'; struct stat st; if (stat(fname, &st) == -1) { task->engine->sapi.cookie->notify_io_complete(task->cookie, ENGINE_KEY_ENOENT); free(task); return NULL; } struct fs_item* it = NULL; ENGINE_ERROR_CODE ret = fs_allocate((ENGINE_HANDLE*)task->engine, task->cookie, (void**)&it, task->data.get.key, task->data.get.nkey, st.st_size, 0, 0); if (ret != ENGINE_SUCCESS) { task->engine->sapi.cookie->notify_io_complete(task->cookie, ENGINE_ENOMEM); free(task); return NULL; } FILE *fp = fopen(fname, "r"); if (fp == NULL) { fs_release((ENGINE_HANDLE*)task->engine, task->cookie, it); task->engine->sapi.cookie->notify_io_complete(task->cookie, ENGINE_FAILED); free(task); return NULL; } size_t nr = fread(it->data, 1, it->ndata, fp); fclose(fp); if (nr != it->ndata) { fs_release((ENGINE_HANDLE*)task->engine, task->cookie, it); task->engine->sapi.cookie->notify_io_complete(task->cookie, ENGINE_FAILED); free(task); return NULL; } task->engine->sapi.cookie->store_engine_specific(task->cookie, it); task->engine->sapi.cookie->notify_io_complete(task->cookie, ENGINE_SUCCESS); return NULL; }
As you see it's quite easy to add asynchronous support for the engine functions. Let's go ahead and do the same for
fs_store
:static ENGINE_ERROR_CODE fs_store(ENGINE_HANDLE* handle, const void *cookie, item* item, uint64_t *cas, ENGINE_STORE_OPERATION operation, uint16_t vbucket) { struct fs_engine *engine = (struct fs_engine *)handle; /* Check to see if this is the callback from an earlier ewouldblock */ void *res = engine->sapi.cookie->get_engine_specific(cookie); if (res != NULL) { *cas = 0; engine->sapi.cookie->store_engine_specific(cookie, NULL); return ENGINE_SUCCESS; } /* Set up the callback struct */ struct task *task = calloc(1, sizeof(*task)); if (task == NULL) { return ENGINE_ENOMEM; } task->engine = (struct fs_engine *)handle; task->cookie = cookie; task->callback = fs_store_callback; task->data.store.item = item; task->data.store.operation = operation; ENGINE_ERROR_CODE ret = execute(task); if (ret != ENGINE_EWOULDBLOCK) { free(task); } return ret; }
And
fs_store_callback
looks like the following:static void *fs_store_callback(void *arg) { struct task *task = arg; struct fs_item* it = task->data.store.item; char fname[it->nkey + 1]; memcpy(fname, it->key, it->nkey); fname[it->nkey] = '\0'; FILE *fp = fopen(fname, "w"); if (fp == NULL) { task->engine->sapi.cookie->notify_io_complete(task->cookie, ENGINE_NOT_STORED); free(task); return NULL; } size_t nw = fwrite(it->data, 1, it->ndata, fp); fclose(fp); if (nw != it->ndata) { remove(fname); task->engine->sapi.cookie->notify_io_complete(task->cookie, ENGINE_NOT_STORED); free(task); return NULL; } task->engine->sapi.cookie->store_engine_specific(task->cookie, it); task->engine->sapi.cookie->notify_io_complete(task->cookie, ENGINE_SUCCESS); return NULL; }
If you look closely at the code above you'll see that we still don't differentiate between
add
/set
/replace
, but we'll fix that in the next session.
Hi
ReplyDeleteif (ret != ENGINE_EWOULDBLOCK) {
free(task);
}
return ret;
if ret == ENGINE_EWOULDBLOCK, is there memory leak?
Good catch. The _intention_ here was to release it right before all of the "return NULL" in the callbacks, but it seems that I forgot to put it in the code examples. You'll see that it gets freed in all error situations in the example. I'll fix that up in the next blog post :)
ReplyDeleteThis comment has been removed by the author.
ReplyDeleteHi Trond,
ReplyDeleteThanks for reply and looking forward to next post.
I checkout engine-pu branch of git, seems only binary protocol is ready for implement asynchronous engine, is it in design?
wooba: the engine-pu branch isn't up to date yet.. check out my engine branch from git://github.com/trondn/memcached.git
ReplyDeleteCHeers,
Trond