pcrud fleshing
authorLebbeous Fogle-Weekley <lebbeous@esilibrary.com>
Tue, 25 Oct 2011 21:01:04 +0000 (17:01 -0400)
committerMike Rylander <mrylander@gmail.com>
Mon, 19 Mar 2012 13:50:15 +0000 (09:50 -0400)
This allows fleshing in pcrud queries if
a) the objects you would flesh have a pcrud controller in the IDL,
b) you have permissions for all the objects you would flesh just as
   you would have to have them if you asked for them directly.

This is harder to read than it might be because of efforts to avoid
redundant perm checks for every fleshed object of the same class, and
efforts to keep results streaming out quickly.

Tests pretty well for me, but more rigor appreciated.

I received lots of help in making this happen from Mike Rylander and
Bill Erickson.

Signed-off-by: Lebbeous Fogle-Weekley <lebbeous@esilibrary.com>
Signed-off-by: Bill Erickson <berick@esilibrary.com>
Open-ILS/src/c-apps/oils_sql.c

index 99918d5..b82d71c 100644 (file)
@@ -119,7 +119,7 @@ static ClassInfo* add_joined_class( const char* alias, const char* classname );
 static void clear_query_stack( void );
 
 static const jsonObject* verifyUserPCRUD( osrfMethodContext* );
-static int verifyObjectPCRUD( osrfMethodContext*, const jsonObject*, const int );
+static int verifyObjectPCRUD( osrfMethodContext*, osrfHash*, const jsonObject*, int );
 static const char* org_tree_root( osrfMethodContext* ctx );
 static jsonObject* single_hash( const char* key, const char* value );
 
@@ -510,7 +510,7 @@ void userDataFree( void* blob ) {
        that it will free whatever else needs freeing.
 */
 static void sessionDataFree( char* key, void* item ) {
-       if( !strcmp( key, "xact_id" ) || !strcmp( key, "authkey" ) ) 
+       if( !strcmp( key, "xact_id" ) || !strcmp( key, "authkey" ) || !strncmp( key, "rs_size_", 8) 
                free( item );
        else if( !strcmp( key, "user_login" ) )
                jsonObjectFree( (jsonObject*) item );
@@ -523,6 +523,19 @@ static void pcacheFree( char* key, void* item ) {
 }
 
 /**
+       @brief Initialize session cache.
+       @param ctx Pointer to the method context.
+
+       Create a cache for the session by making the session's userData member point
+       to an osrfHash instance.
+*/
+static void initSessionCache( osrfMethodContext* ctx ) {
+       ctx->session->userData = osrfNewHash();
+       osrfHashSetCallback( (osrfHash*) ctx->session->userData, &sessionDataFree );
+       ctx->session->userDataFree = &userDataFree;
+}
+
+/**
        @brief Save a transaction id.
        @param ctx Pointer to the method context.
 
@@ -536,11 +549,8 @@ static void setXactId( osrfMethodContext* ctx ) {
 
                // If the session doesn't already have a hash, create one.  Make sure
                // that the application session frees the hash when it terminates.
-               if( NULL == cache ) {
-                       session->userData = cache = osrfNewHash();
-                       osrfHashSetCallback( cache, &sessionDataFree );
-                       ctx->session->userDataFree = &userDataFree;
-               }
+               if( NULL == cache )
+                       initSessionCache( ctx );
 
                // Save the transaction id in the hash, with the key "xact_id"
                osrfHashSet( cache, strdup( session->session_id ), "xact_id" );
@@ -586,11 +596,8 @@ static void setPermLocationCache( osrfMethodContext* ctx, const char* perm, osrf
 
                // If the session doesn't already have a hash, create one.  Make sure
                // that the application session frees the hash when it terminates.
-               if( NULL == cache ) {
-                       session->userData = cache = osrfNewHash();
-                       osrfHashSetCallback( cache, &sessionDataFree );
-                       ctx->session->userDataFree = &userDataFree;
-               }
+               if( NULL == cache )
+                       initSessionCache( ctx );
 
                osrfHash* pcache = osrfHashGet(cache, "pcache");
 
@@ -641,11 +648,8 @@ static void setUserLogin( osrfMethodContext* ctx, jsonObject* user_login ) {
 
                // If the session doesn't already have a hash, create one.  Make sure
                // that the application session frees the hash when it terminates.
-               if( NULL == cache ) {
-                       session->userData = cache = osrfNewHash();
-                       osrfHashSetCallback( cache, &sessionDataFree );
-                       ctx->session->userDataFree = &userDataFree;
-               }
+               if( NULL == cache )
+                       initSessionCache( ctx );
 
                if( user_login )
                        osrfHashSet( cache, user_login, "user_login" );
@@ -683,11 +687,8 @@ static void setAuthkey( osrfMethodContext* ctx, const char* authkey ) {
 
                // If the session doesn't already have a hash, create one.  Make sure
                // that the application session frees the hash when it terminates.
-               if( NULL == cache ) {
-                       session->userData = cache = osrfNewHash();
-                       osrfHashSetCallback( cache, &sessionDataFree );
-                       ctx->session->userDataFree = &userDataFree;
-               }
+               if( NULL == cache )
+                       initSessionCache( ctx );
 
                // Save the transaction id in the hash, with the key "xact_id"
                if( authkey && *authkey )
@@ -763,6 +764,10 @@ static int reset_timeout( const char* authkey, time_t now ) {
 static const char* getAuthkey( osrfMethodContext* ctx ) {
        if( ctx && ctx->session && ctx->session->userData ) {
                const char* authkey = osrfHashGet( (osrfHash*) ctx->session->userData, "authkey" );
+        // LFW recent changes mean the userData hash gets set up earlier, but
+        // doesn't necessarily have an authkey yet
+        if (!authkey)
+            return NULL;
 
                // Possibly reset the authentication timeout to keep the login alive.  We do so
                // no more than once per method call, and not at all if it has been only a short
@@ -1200,14 +1205,17 @@ int doSearch( osrfMethodContext* ctx ) {
                return -1;
        }
 
-       // Return each row to the client (except that some may be suppressed by PCRUD)
-       jsonObject* cur = 0;
-       unsigned long res_idx = 0;
-       while((cur = jsonObjectGetIndex( obj, res_idx++ ) )) {
-               if( enforce_pcrud && !verifyObjectPCRUD( ctx, cur, obj->size ))
-                       continue;
-               osrfAppRespond( ctx, cur );
-       }
+       // doFieldmapperSearch() now takes care of our responding for us
+//     // Return each row to the client
+//     jsonObject* cur = 0;
+//     unsigned long res_idx = 0;
+//
+//     while((cur = jsonObjectGetIndex( obj, res_idx++ ) )) {
+//             // We used to discard based on perms here, but now that's
+//             // inside doFieldmapperSearch()
+//             osrfAppRespond( ctx, cur );
+//     }
+
        jsonObjectFree( obj );
 
        osrfAppRespondComplete( ctx, NULL );
@@ -1301,11 +1309,10 @@ int doIdList( osrfMethodContext* ctx ) {
        jsonObject* cur;
        unsigned long res_idx = 0;
        while((cur = jsonObjectGetIndex( obj, res_idx++ ) )) {
-               if( enforce_pcrud && !verifyObjectPCRUD( ctx, cur, obj->size ))
-                       continue;        // Suppress due to lack of permission
-               else
-                       osrfAppRespond( ctx,
-                               oilsFMGetObject( cur, osrfHashGet( class_meta, "primarykey" ) ) );
+               // We used to discard based on perms here, but now that's
+               // inside doFieldmapperSearch()
+               osrfAppRespond( ctx,
+                       oilsFMGetObject( cur, osrfHashGet( class_meta, "primarykey" ) ) );
        }
 
        jsonObjectFree( obj );
@@ -1350,7 +1357,7 @@ static int verifyObjectClass ( osrfMethodContext* ctx, const jsonObject* param )
        }
 
        if( enforce_pcrud )
-               return verifyObjectPCRUD( ctx, param, 1 );
+               return verifyObjectPCRUD( ctx, class, param, 1 );
        else
                return 1;
 }
@@ -1417,20 +1424,28 @@ static const jsonObject* verifyUserPCRUD( osrfMethodContext* ctx ) {
 /**
        @brief For PCRUD: Determine whether the current user may access the current row.
        @param ctx Pointer to the method context.
+       @param class Same as ctx->method->userData's item for key "class" except when called in recursive doFieldmapperSearch
        @param obj Pointer to the row being potentially accessed.
        @return 1 if access is permitted, or 0 if it isn't.
 
        The @a obj parameter points to a JSON_HASH of column values, keyed on column name.
 */
-static int verifyObjectPCRUD (  osrfMethodContext* ctx, const jsonObject* obj, const int rs_size ) {
+static int verifyObjectPCRUD ( osrfMethodContext* ctx, osrfHash *class, const jsonObject* obj, int rs_size ) {
 
        dbhandle = writehandle;
 
        // Figure out what class and method are involved
        osrfHash* method_metadata = (osrfHash*) ctx->method->userData;
-       osrfHash* class = osrfHashGet( method_metadata, "class" );
        const char* method_type = osrfHashGet( method_metadata, "methodtype" );
 
+       if (!rs_size) {
+               int *rs_size_from_hash = osrfHashGetFmt( (osrfHash *) ctx->session->userData, "rs_size_req_%d", ctx->request );
+               if (rs_size_from_hash) {
+                       rs_size = *rs_size_from_hash;
+                       osrfLogDebug(OSRF_LOG_MARK, "used rs_size from request-scoped hash: %d", rs_size);
+               }
+       }
+
        // Set fetch to 1 in all cases except for inserts, meaning that for local or foreign
        // contexts we will do another lookup of the current row, even if we already have a
        // previously fetched row image, because the row image in hand may not include the
@@ -1480,7 +1495,11 @@ static int verifyObjectPCRUD (  osrfMethodContext* ctx, const jsonObject* obj, c
        // Get a list of permissions from the permacrud entry.
        osrfStringArray* permission = osrfHashGet( pcrud, "permission" );
        if( permission->size == 0 ) {
-               osrfLogDebug( OSRF_LOG_MARK, "No permissions required for this action, passing through" );
+               osrfLogDebug(
+                       OSRF_LOG_MARK,
+                       "No permissions required for this action (class %s), passing through",
+                       osrfHashGet(class, "classname")
+               );
                return 1;
        }
 
@@ -1555,9 +1574,11 @@ static int verifyObjectPCRUD (  osrfMethodContext* ctx, const jsonObject* obj, c
 
                if( fetch ) {
                        // Fetch the row so that we can look at the foreign key(s)
+                       osrfHashSet((osrfHash*) ctx->session->userData, "1", "inside_verify");
                        jsonObject* _tmp_params = single_hash( pkey, pkey_value );
                        jsonObject* _list = doFieldmapperSearch( ctx, class, _tmp_params, NULL, &err );
                        jsonObjectFree( _tmp_params );
+                       osrfHashSet((osrfHash*) ctx->session->userData, "0", "inside_verify");
 
                        param = jsonObjectExtractIndex( _list, 0 );
                        jsonObjectFree( _list );
@@ -1647,8 +1668,11 @@ static int verifyObjectPCRUD (  osrfMethodContext* ctx, const jsonObject* obj, c
 
                                        // Look up the row to which the foreign key points
                                        jsonObject* _tmp_params = single_hash( foreign_pkey, foreign_pkey_value );
+
+                                       osrfHashSet((osrfHash*) ctx->session->userData, "1", "inside_verify");
                                        jsonObject* _list = doFieldmapperSearch(
                                                ctx, osrfHashGet( oilsIDL(), class_name ), _tmp_params, NULL, &err );
+                                       osrfHashSet((osrfHash*) ctx->session->userData, "0", "inside_verify");
 
                                        jsonObject* _fparam = NULL;
                                        if( _list && JSON_ARRAY == _list->type && _list->size > 0 )
@@ -2359,7 +2383,7 @@ int doRetrieve( osrfMethodContext* ctx ) {
 
        if( enforce_pcrud ) {
                // no result, skip this entirely
-               if(NULL != obj && !verifyObjectPCRUD( ctx, obj, 1 )) {
+               if(NULL != obj && !verifyObjectPCRUD( ctx, class_def, obj, 1 )) {
                        jsonObjectFree( obj );
 
                        growing_buffer* msg = buffer_init( 128 );
@@ -2376,7 +2400,10 @@ int doRetrieve( osrfMethodContext* ctx ) {
                }
        }
 
-       osrfAppRespondComplete( ctx, obj );
+       // doFieldmapperSearch() now does the responding for us
+       //osrfAppRespondComplete( ctx, obj );
+       osrfAppRespondComplete( ctx, NULL );
+
        jsonObjectFree( obj );
        return 0;
 }
@@ -5529,9 +5556,32 @@ static jsonObject* doFieldmapperSearch( osrfMethodContext* ctx, osrfHash* class_
        dbhandle = writehandle;
 
        char* core_class = osrfHashGet( class_meta, "classname" );
+       osrfLogDebug( OSRF_LOG_MARK, "entering doFieldmapperSearch() with core_class %s", core_class );
+
        char* pkey = osrfHashGet( class_meta, "primarykey" );
 
-       const jsonObject* _tmp;
+       if (!ctx->session->userData)
+               initSessionCache( ctx );
+
+       char *methodtype = osrfHashGet( (osrfHash *) ctx->method->userData, "methodtype" );
+       char *inside_verify = osrfHashGet( (osrfHash*) ctx->session->userData, "inside_verify" );
+       int need_to_verify = (inside_verify ? !atoi(inside_verify) : 1);
+
+       int i_respond_directly = 0;
+       int flesh_depth = 0;
+
+       // XXX This can be redundant with another instance of the same test that happens
+       // within the functions that call doFieldmapperSearch(), but we have it here to
+       // prevent any non-pcrud-controlled classes from being fleshed on.
+       //
+       // TODO To avoid redundancy, move this block to right before we recurse,
+       // and change the class we're checking to the one we're /about/ to search for,
+       // not the one we're currently searching for.
+       if (!osrfStringArrayContains(osrfHashGet(class_meta, "controller"), modulename)) {
+               osrfLogInfo(OSRF_LOG_MARK, "%s is not listed as a controller for %s, moving on",
+                       modulename, core_class);
+               return jsonNewObjectType( JSON_ARRAY ); /* empty */
+       }
 
        char* sql = buildSELECT( where_hash, query_hash, class_meta, ctx );
        if( !sql ) {
@@ -5569,6 +5619,30 @@ static jsonObject* doFieldmapperSearch( osrfMethodContext* ctx, osrfHash* class_
        jsonObject* res_list = jsonNewObjectType( JSON_ARRAY );
        jsonObject* row_obj = NULL;
 
+       // The following two steps are for verifyObjectPCRUD()'s benefit.
+       // 1. get the flesh depth
+       const jsonObject* _tmp = jsonObjectGetKeyConst( query_hash, "flesh" );
+       if( _tmp ) {
+               flesh_depth = (int) jsonObjectGetNumber( _tmp );
+               if( flesh_depth == -1 || flesh_depth > max_flesh_depth )
+                       flesh_depth = max_flesh_depth;
+       }
+
+       // 2. figure out one consistent rs_size for verifyObjectPCRUD to use
+       // over the whole life of this request.  This means if we've already set
+       // up a rs_size_req_%d, do nothing.
+       //      a. Incidentally, we can also use this opportunity to set i_respond_directly
+       int *rs_size = osrfHashGetFmt( (osrfHash *) ctx->session->userData, "rs_size_req_%d", ctx->request );
+       if( !rs_size ) {        // pointer null, so value not set in hash
+               // i_respond_directly can only be true at the /top/ of a recursive search, if even that.
+               i_respond_directly = ( *methodtype == 'r' || *methodtype == 'i' || *methodtype == 's' );
+
+               rs_size = (int *) safe_malloc( sizeof(int) );   // will be freed by sessionDataFree()
+               unsigned long long result_count = dbi_result_get_numrows( result );
+               *rs_size = (int) result_count * (flesh_depth + 1);      // yes, we could lose some bits, but come on
+               osrfHashSet( (osrfHash *) ctx->session->userData, rs_size, "rs_size_req_%d", ctx->request );
+       }
+
        if( dbi_result_first_row( result )) {
 
                // Convert each row to a JSON_ARRAY of column values, and enclose those objects
@@ -5583,8 +5657,11 @@ static jsonObject* doFieldmapperSearch( osrfMethodContext* ctx, osrfHash* class_
                                jsonObjectFree( row_obj );
                                free( pkey_val );
                        } else {
-                               osrfHashSet( dedup, pkey_val, pkey_val );
-                               jsonObjectPush( res_list, row_obj );
+                               if( !enforce_pcrud || !need_to_verify ||
+                                               verifyObjectPCRUD( ctx, class_meta, row_obj, 0 /* means check user data for rs_size */ )) {
+                                       osrfHashSet( dedup, pkey_val, pkey_val );
+                                       jsonObjectPush( res_list, row_obj );
+                               }
                        }
                } while( dbi_result_next_row( result ));
                osrfHashFree( dedup );
@@ -5599,25 +5676,24 @@ static jsonObject* doFieldmapperSearch( osrfMethodContext* ctx, osrfHash* class_
        free( sql );
 
        // If we're asked to flesh, and there's anything to flesh, then flesh it
-       // (but not for PCRUD, lest the user to bypass permissions by fleshing
-       // something that he has no permission to look at).
-       if( res_list->size && query_hash && ! enforce_pcrud ) {
-               _tmp = jsonObjectGetKeyConst( query_hash, "flesh" );
-               if( _tmp ) {
-                       // Get the flesh depth
-                       int flesh_depth = (int) jsonObjectGetNumber( _tmp );
-                       if( flesh_depth == -1 || flesh_depth > max_flesh_depth )
-                               flesh_depth = max_flesh_depth;
-
-                       // We need a non-zero flesh depth, and a list of fields to flesh
-                       const jsonObject* temp_blob = jsonObjectGetKeyConst( query_hash, "flesh_fields" );
+       // (formerly we would skip fleshing if in pcrud mode, but now we support
+       // fleshing even in PCRUD).
+       if( res_list->size ) {
+               jsonObject* temp_blob;  // We need a non-zero flesh depth, and a list of fields to flesh
+               jsonObject* flesh_fields; 
+               jsonObject* flesh_blob = NULL;
+               osrfStringArray* link_fields = NULL;
+               osrfHash* links = NULL;
+               int want_flesh = 0;
+
+               if( query_hash ) {
+                       temp_blob = jsonObjectGetKey( query_hash, "flesh_fields" );
                        if( temp_blob && flesh_depth > 0 ) {
 
-                               jsonObject* flesh_blob = jsonObjectClone( temp_blob );
-                               const jsonObject* flesh_fields = jsonObjectGetKeyConst( flesh_blob, core_class );
+                               flesh_blob = jsonObjectClone( temp_blob );
+                               flesh_fields = jsonObjectGetKey( flesh_blob, core_class );
 
-                               osrfStringArray* link_fields = NULL;
-                               osrfHash* links = osrfHashGet( class_meta, "links" );
+                               links = osrfHashGet( class_meta, "links" );
 
                                // Make an osrfStringArray of the names of fields to be fleshed
                                if( flesh_fields ) {
@@ -5638,210 +5714,227 @@ static jsonObject* doFieldmapperSearch( osrfMethodContext* ctx, osrfHash* class_
                                                jsonIteratorFree( _i );
                                        }
                                }
+                               want_flesh = link_fields ? 1 : 0;
+                       }
+               }
 
-                               osrfHash* fields = osrfHashGet( class_meta, "fields" );
-
-                               // Iterate over the JSON_ARRAY of rows
-                               jsonObject* cur;
-                               unsigned long res_idx = 0;
-                               while((cur = jsonObjectGetIndex( res_list, res_idx++ ) )) {
+               osrfHash* fields = osrfHashGet( class_meta, "fields" );
 
-                                       int i = 0;
-                                       const char* link_field;
+               // Iterate over the JSON_ARRAY of rows
+               jsonObject* cur;
+               unsigned long res_idx = 0;
+               while((cur = jsonObjectGetIndex( res_list, res_idx++ ) )) {
 
-                                       // Iterate over the list of fleshable fields
-                                       while( (link_field = osrfStringArrayGetString(link_fields, i++)) ) {
+                       int i = 0;
+                       const char* link_field;
 
-                                               osrfLogDebug( OSRF_LOG_MARK, "Starting to flesh %s", link_field );
+                       // Iterate over the list of fleshable fields
+                       if ( want_flesh ) {
+                               while( (link_field = osrfStringArrayGetString(link_fields, i++)) ) {
 
-                                               osrfHash* kid_link = osrfHashGet( links, link_field );
-                                               if( !kid_link )
-                                                       continue;     // Not a link field; skip it
+                                       osrfLogDebug( OSRF_LOG_MARK, "Starting to flesh %s", link_field );
 
-                                               osrfHash* field = osrfHashGet( fields, link_field );
-                                               if( !field )
-                                                       continue;     // Not a field at all; skip it (IDL is ill-formed)
+                                       osrfHash* kid_link = osrfHashGet( links, link_field );
+                                       if( !kid_link )
+                                               continue;     // Not a link field; skip it
 
-                                               osrfHash* kid_idl = osrfHashGet( oilsIDL(),
-                                                       osrfHashGet( kid_link, "class" ));
-                                               if( !kid_idl )
-                                                       continue;   // The class it links to doesn't exist; skip it
+                                       osrfHash* field = osrfHashGet( fields, link_field );
+                                       if( !field )
+                                               continue;     // Not a field at all; skip it (IDL is ill-formed)
 
-                                               const char* reltype = osrfHashGet( kid_link, "reltype" );
-                                               if( !reltype )
-                                                       continue;   // No reltype; skip it (IDL is ill-formed)
+                                       osrfHash* kid_idl = osrfHashGet( oilsIDL(),
+                                               osrfHashGet( kid_link, "class" ));
+                                       if( !kid_idl )
+                                               continue;   // The class it links to doesn't exist; skip it
 
-                                               osrfHash* value_field = field;
+                                       const char* reltype = osrfHashGet( kid_link, "reltype" );
+                                       if( !reltype )
+                                               continue;   // No reltype; skip it (IDL is ill-formed)
 
-                                               if(    !strcmp( reltype, "has_many" )
-                                                       || !strcmp( reltype, "might_have" ) ) { // has_many or might_have
-                                                       value_field = osrfHashGet(
-                                                               fields, osrfHashGet( class_meta, "primarykey" ) );
-                                               }
+                                       osrfHash* value_field = field;
 
-                                               osrfStringArray* link_map = osrfHashGet( kid_link, "map" );
+                                       if(    !strcmp( reltype, "has_many" )
+                                               || !strcmp( reltype, "might_have" ) ) { // has_many or might_have
+                                               value_field = osrfHashGet(
+                                                       fields, osrfHashGet( class_meta, "primarykey" ) );
+                                       }
 
-                                               if( link_map->size > 0 ) {
-                                                       jsonObject* _kid_key = jsonNewObjectType( JSON_ARRAY );
-                                                       jsonObjectPush(
-                                                               _kid_key,
-                                                               jsonNewObject( osrfStringArrayGetString( link_map, 0 ) )
-                                                       );
+                                       osrfStringArray* link_map = osrfHashGet( kid_link, "map" );
 
-                                                       jsonObjectSetKey(
-                                                               flesh_blob,
-                                                               osrfHashGet( kid_link, "class" ),
-                                                               _kid_key
-                                                       );
-                                               };
+                                       if( link_map->size > 0 ) {
+                                               jsonObject* _kid_key = jsonNewObjectType( JSON_ARRAY );
+                                               jsonObjectPush(
+                                                       _kid_key,
+                                                       jsonNewObject( osrfStringArrayGetString( link_map, 0 ) )
+                                               );
 
-                                               osrfLogDebug(
-                                                       OSRF_LOG_MARK,
-                                                       "Link field: %s, remote class: %s, fkey: %s, reltype: %s",
-                                                       osrfHashGet( kid_link, "field" ),
+                                               jsonObjectSetKey(
+                                                       flesh_blob,
                                                        osrfHashGet( kid_link, "class" ),
-                                                       osrfHashGet( kid_link, "key" ),
-                                                       osrfHashGet( kid_link, "reltype" )
+                                                       _kid_key
                                                );
+                                       };
 
-                                               const char* search_key = jsonObjectGetString(
-                                                       jsonObjectGetIndex( cur,
-                                                               atoi( osrfHashGet( value_field, "array_position" ) )
-                                                       )
-                                               );
+                                       osrfLogDebug(
+                                               OSRF_LOG_MARK,
+                                               "Link field: %s, remote class: %s, fkey: %s, reltype: %s",
+                                               osrfHashGet( kid_link, "field" ),
+                                               osrfHashGet( kid_link, "class" ),
+                                               osrfHashGet( kid_link, "key" ),
+                                               osrfHashGet( kid_link, "reltype" )
+                                       );
 
-                                               if( !search_key ) {
-                                                       osrfLogDebug( OSRF_LOG_MARK, "Nothing to search for!" );
-                                                       continue;
-                                               }
+                                       const char* search_key = jsonObjectGetString(
+                                               jsonObjectGetIndex( cur,
+                                                       atoi( osrfHashGet( value_field, "array_position" ) )
+                                               )
+                                       );
 
-                                               osrfLogDebug( OSRF_LOG_MARK, "Creating param objects..." );
+                                       if( !search_key ) {
+                                               osrfLogDebug( OSRF_LOG_MARK, "Nothing to search for!" );
+                                               continue;
+                                       }
 
-                                               // construct WHERE clause
-                                               jsonObject* where_clause  = jsonNewObjectType( JSON_HASH );
-                                               jsonObjectSetKey(
-                                                       where_clause,
-                                                       osrfHashGet( kid_link, "key" ),
-                                                       jsonNewObject( search_key )
-                                               );
+                                       osrfLogDebug( OSRF_LOG_MARK, "Creating param objects..." );
+
+                                       // construct WHERE clause
+                                       jsonObject* where_clause  = jsonNewObjectType( JSON_HASH );
+                                       jsonObjectSetKey(
+                                               where_clause,
+                                               osrfHashGet( kid_link, "key" ),
+                                               jsonNewObject( search_key )
+                                       );
+
+                                       // construct the rest of the query, mostly
+                                       // by copying pieces of the previous level of query
+                                       jsonObject* rest_of_query = jsonNewObjectType( JSON_HASH );
+                                       jsonObjectSetKey( rest_of_query, "flesh",
+                                               jsonNewNumberObject( flesh_depth - 1 + link_map->size )
+                                       );
 
-                                               // construct the rest of the query, mostly
-                                               // by copying pieces of the previous level of query
-                                               jsonObject* rest_of_query = jsonNewObjectType( JSON_HASH );
-                                               jsonObjectSetKey( rest_of_query, "flesh",
-                                                       jsonNewNumberObject( flesh_depth - 1 + link_map->size )
+                                       if( flesh_blob )
+                                               jsonObjectSetKey( rest_of_query, "flesh_fields",
+                                                       jsonObjectClone( flesh_blob ));
+
+                                       if( jsonObjectGetKeyConst( query_hash, "order_by" )) {
+                                               jsonObjectSetKey( rest_of_query, "order_by",
+                                                       jsonObjectClone( jsonObjectGetKeyConst( query_hash, "order_by" ))
                                                );
+                                       }
 
-                                               if( flesh_blob )
-                                                       jsonObjectSetKey( rest_of_query, "flesh_fields",
-                                                               jsonObjectClone( flesh_blob ));
+                                       if( jsonObjectGetKeyConst( query_hash, "select" )) {
+                                               jsonObjectSetKey( rest_of_query, "select",
+                                                       jsonObjectClone( jsonObjectGetKeyConst( query_hash, "select" ))
+                                               );
+                                       }
 
-                                               if( jsonObjectGetKeyConst( query_hash, "order_by" )) {
-                                                       jsonObjectSetKey( rest_of_query, "order_by",
-                                                               jsonObjectClone( jsonObjectGetKeyConst( query_hash, "order_by" ))
-                                                       );
-                                               }
+                                       // do the query, recursively, to expand the fleshable field
+                                       jsonObject* kids = doFieldmapperSearch( ctx, kid_idl,
+                                               where_clause, rest_of_query, err );
 
-                                               if( jsonObjectGetKeyConst( query_hash, "select" )) {
-                                                       jsonObjectSetKey( rest_of_query, "select",
-                                                               jsonObjectClone( jsonObjectGetKeyConst( query_hash, "select" ))
-                                                       );
-                                               }
+                                       jsonObjectFree( where_clause );
+                                       jsonObjectFree( rest_of_query );
 
-                                               // do the query, recursively, to expand the fleshable field
-                                               jsonObject* kids = doFieldmapperSearch( ctx, kid_idl,
-                                                       where_clause, rest_of_query, err );
+                                       if( *err ) {
+                                               osrfStringArrayFree( link_fields );
+                                               jsonObjectFree( res_list );
+                                               jsonObjectFree( flesh_blob );
+                                               return NULL;
+                                       }
 
-                                               jsonObjectFree( where_clause );
-                                               jsonObjectFree( rest_of_query );
+                                       osrfLogDebug( OSRF_LOG_MARK, "Search for %s return %d linked objects",
+                                               osrfHashGet( kid_link, "class" ), kids->size );
 
-                                               if( *err ) {
-                                                       osrfStringArrayFree( link_fields );
-                                                       jsonObjectFree( res_list );
-                                                       jsonObjectFree( flesh_blob );
-                                                       return NULL;
-                                               }
+                                       // Traverse the result set
+                                       jsonObject* X = NULL;
+                                       if( link_map->size > 0 && kids->size > 0 ) {
+                                               X = kids;
+                                               kids = jsonNewObjectType( JSON_ARRAY );
 
-                                               osrfLogDebug( OSRF_LOG_MARK, "Search for %s return %d linked objects",
-                                                       osrfHashGet( kid_link, "class" ), kids->size );
-
-                                               // Traverse the result set
-                                               jsonObject* X = NULL;
-                                               if( link_map->size > 0 && kids->size > 0 ) {
-                                                       X = kids;
-                                                       kids = jsonNewObjectType( JSON_ARRAY );
-
-                                                       jsonObject* _k_node;
-                                                       unsigned long res_idx = 0;
-                                                       while((_k_node = jsonObjectGetIndex( X, res_idx++ ) )) {
-                                                               jsonObjectPush(
-                                                                       kids,
-                                                                       jsonObjectClone(
-                                                                               jsonObjectGetIndex(
-                                                                                       _k_node,
-                                                                                       (unsigned long) atoi(
+                                               jsonObject* _k_node;
+                                               unsigned long res_idx = 0;
+                                               while((_k_node = jsonObjectGetIndex( X, res_idx++ ) )) {
+                                                       jsonObjectPush(
+                                                               kids,
+                                                               jsonObjectClone(
+                                                                       jsonObjectGetIndex(
+                                                                               _k_node,
+                                                                               (unsigned long) atoi(
+                                                                                       osrfHashGet(
                                                                                                osrfHashGet(
                                                                                                        osrfHashGet(
                                                                                                                osrfHashGet(
-                                                                                                                       osrfHashGet(
-                                                                                                                               oilsIDL(),
-                                                                                                                               osrfHashGet( kid_link, "class" )
-                                                                                                                       ),
-                                                                                                                       "fields"
+                                                                                                                       oilsIDL(),
+                                                                                                                       osrfHashGet( kid_link, "class" )
                                                                                                                ),
-                                                                                                               osrfStringArrayGetString( link_map, 0 )
+                                                                                                               "fields"
                                                                                                        ),
-                                                                                                       "array_position"
-                                                                                               )
+                                                                                                       osrfStringArrayGetString( link_map, 0 )
+                                                                                               ),
+                                                                                               "array_position"
                                                                                        )
                                                                                )
                                                                        )
-                                                               );
-                                                       } // end while loop traversing X
-                                               }
-
-                                               if(    !strcmp( osrfHashGet( kid_link, "reltype" ), "has_a" )
-                                                       || !strcmp( osrfHashGet( kid_link, "reltype" ), "might_have" )) {
-                                                       osrfLogDebug(OSRF_LOG_MARK, "Storing fleshed objects in %s",
-                                                               osrfHashGet( kid_link, "field" ));
-                                                       jsonObjectSetIndex(
-                                                               cur,
-                                                               (unsigned long) atoi( osrfHashGet( field, "array_position" ) ),
-                                                               jsonObjectClone( jsonObjectGetIndex( kids, 0 ))
+                                                               )
                                                        );
-                                               }
+                                               } // end while loop traversing X
+                                       }
 
-                                               if( !strcmp( osrfHashGet( kid_link, "reltype" ), "has_many" )) {
-                                                       // has_many
-                                                       osrfLogDebug( OSRF_LOG_MARK, "Storing fleshed objects in %s",
-                                                               osrfHashGet( kid_link, "field" ) );
-                                                       jsonObjectSetIndex(
-                                                               cur,
-                                                               (unsigned long) atoi( osrfHashGet( field, "array_position" ) ),
-                                                               jsonObjectClone( kids )
-                                                       );
-                                               }
+                                       if(    !strcmp( osrfHashGet( kid_link, "reltype" ), "has_a" )
+                                               || !strcmp( osrfHashGet( kid_link, "reltype" ), "might_have" )) {
+                                               osrfLogDebug(OSRF_LOG_MARK, "Storing fleshed objects in %s",
+                                                       osrfHashGet( kid_link, "field" ));
+                                               jsonObjectSetIndex(
+                                                       cur,
+                                                       (unsigned long) atoi( osrfHashGet( field, "array_position" ) ),
+                                                       jsonObjectClone( jsonObjectGetIndex( kids, 0 ))
+                                               );
+                                       }
 
-                                               if( X ) {
-                                                       jsonObjectFree( kids );
-                                                       kids = X;
-                                               }
+                                       if( !strcmp( osrfHashGet( kid_link, "reltype" ), "has_many" )) {
+                                               // has_many
+                                               osrfLogDebug( OSRF_LOG_MARK, "Storing fleshed objects in %s",
+                                                       osrfHashGet( kid_link, "field" ) );
+                                               jsonObjectSetIndex(
+                                                       cur,
+                                                       (unsigned long) atoi( osrfHashGet( field, "array_position" ) ),
+                                                       jsonObjectClone( kids )
+                                               );
+                                       }
 
+                                       if( X ) {
                                                jsonObjectFree( kids );
+                                               kids = X;
+                                       }
 
-                                               osrfLogDebug( OSRF_LOG_MARK, "Fleshing of %s complete",
-                                                       osrfHashGet( kid_link, "field" ) );
-                                               osrfLogDebug( OSRF_LOG_MARK, "%s", jsonObjectToJSON( cur ));
+                                       jsonObjectFree( kids );
+
+                                       osrfLogDebug( OSRF_LOG_MARK, "Fleshing of %s complete",
+                                               osrfHashGet( kid_link, "field" ) );
+                                       osrfLogDebug( OSRF_LOG_MARK, "%s", jsonObjectToJSON( cur ));
 
-                                       } // end while loop traversing list of fleshable fields
-                               } // end while loop traversing res_list
-                               jsonObjectFree( flesh_blob );
-                               osrfStringArrayFree( link_fields );
+                               } // end while loop traversing list of fleshable fields
                        }
-               }
+
+                       if( i_respond_directly ) {
+                               if ( *methodtype == 'i' ) {
+                                       osrfAppRespond( ctx,
+                                               oilsFMGetObject( cur, osrfHashGet( class_meta, "primarykey" ) ) );
+                               } else {
+                                       osrfAppRespond( ctx, cur );
+                               }
+                       }
+               } // end while loop traversing res_list
+               jsonObjectFree( flesh_blob );
+               osrfStringArrayFree( link_fields );
        }
 
-       return res_list;
+       if( i_respond_directly ) {
+               jsonObjectFree( res_list );
+               return jsonNewObjectType( JSON_ARRAY );
+       } else {
+               return res_list;
+       }
 }
 
 
@@ -6136,7 +6229,7 @@ int doDelete( osrfMethodContext* ctx ) {
 
                id = oilsFMGetString( jsonObjectGetIndex(ctx->params, _obj_pos), pkey );
        } else {
-               if( enforce_pcrud && !verifyObjectPCRUD( ctx, NULL, 1 )) {
+               if( enforce_pcrud && !verifyObjectPCRUD( ctx, meta, NULL, 1 )) {
                        osrfAppRespondComplete( ctx, NULL );
                        return -1;
                }