LP#1847805: PCRUD can miss valid rows in LIMIT/OFFSET queries
authorMike Rylander <mrylander@gmail.com>
Fri, 11 Oct 2019 22:29:10 +0000 (18:29 -0400)
committerMike Rylander <mrylander@gmail.com>
Mon, 21 Oct 2019 18:10:13 +0000 (14:10 -0400)
Because we check authz for rows retrieved by open-ils.pcrud calls after
the underlying query is executed, including any limit/offset-based
paging, we can fail to return results when the first "page" is full of
rows that the calling user cannot see.

With this commit, we switch to SQL cursors and page through results in
the application logic, which guarantees that the situation above will
not impact legitimate row visibility.

Additionally, this may prove to increase speed of some queries,
especially because Postgres has the option to optimize for "fast-start"
plans.

There is a small amount over overhead added to single-row retrieval
requests, on the order of 0.25ms-0.5ms total for cursor declaration and
then cursor closing.

Signed-off-by: Mike Rylander <mrylander@gmail.com>
Open-ILS/src/c-apps/oils_sql.c

index ac2f054..c4fc6aa 100644 (file)
@@ -74,6 +74,7 @@ static void setXactId( osrfMethodContext* ctx );
 static inline const char* getXactId( osrfMethodContext* ctx );
 static inline void clearXactId( osrfMethodContext* ctx );
 
+static char* random_cursor_name();
 static jsonObject* doFieldmapperSearch ( osrfMethodContext* ctx, osrfHash* class_meta,
                jsonObject* where_hash, jsonObject* query_hash, int* err );
 static jsonObject* oilsMakeFieldmapperFromResult( dbi_result, osrfHash* );
@@ -92,7 +93,7 @@ static char* searchPredicate ( const ClassInfo*, osrfHash*, jsonObject*, osrfMet
 static char* searchJOIN ( const jsonObject*, const ClassInfo* left_info );
 static char* searchWHERE ( const jsonObject* search_hash, const ClassInfo*, int, osrfMethodContext* );
 static char* buildSELECT( const jsonObject*, jsonObject* rest_of_query,
-       osrfHash* meta, osrfMethodContext* ctx );
+       osrfHash* meta, osrfMethodContext* ctx, char** cursor_name );
 static char* buildOrderByFromArray( osrfMethodContext* ctx, const jsonObject* order_array );
 
 char* buildQuery( osrfMethodContext* ctx, jsonObject* query, int flags );
@@ -149,12 +150,26 @@ int writeAuditInfo( osrfMethodContext* ctx, const char* user_id, const char* ws_
 static char* _sanitize_tz_name( const char* tz );
 static char* _sanitize_savepoint_name( const char* sp );
 
+const char charset[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_";
+static char* random_cursor_name() {
+    char* str = safe_malloc(sizeof(char) * (ALIAS_STORE_SIZE + 1)); // 16 character cursor name + \0
+    int n;
+    for(n = 0; n < ALIAS_STORE_SIZE; n++) {
+        int key = rand() % (int) (sizeof charset - 1);
+        str[n] = charset[key];
+    }
+    str[ALIAS_STORE_SIZE] = '\0';
+    return str;
+}
+
 /**
        @brief Connect to the database.
        @return A database connection if successful, or NULL if not.
 */
 dbi_conn oilsConnectDB( const char* mod_name ) {
 
+    srand(time(NULL)); // In case we need some randomness, just once per db connection.
+
        osrfLogDebug( OSRF_LOG_MARK, "Attempting to initialize libdbi..." );
        if( dbi_initialize( NULL ) == -1 ) {
                osrfLogError( OSRF_LOG_MARK, "Unable to initialize libdbi" );
@@ -487,6 +502,12 @@ void userDataFree( void* blob ) {
                };
        }
        if( writehandle ) {
+               if( !dbi_conn_query( writehandle, "CLOSE ALL;" ) ) {
+                       const char* msg;
+                       int errnum = dbi_conn_error( writehandle, &msg );
+                       osrfLogWarning( OSRF_LOG_MARK, "Unable to release cursors: %d %s",
+                               errnum, msg ? msg : "(No description available)" );
+               }
                if( !dbi_conn_query( writehandle, "SELECT auditor.clear_audit_info();" ) ) {
                        const char* msg;
                        int errnum = dbi_conn_error( writehandle, &msg );
@@ -5471,7 +5492,7 @@ static char* buildOrderByFromArray( osrfMethodContext* ctx, const jsonObject* or
        The SELECT statements built here are distinct from those built for the json_query method.
 */
 static char* buildSELECT ( const jsonObject* search_hash, jsonObject* rest_of_query,
-       osrfHash* meta, osrfMethodContext* ctx ) {
+       osrfHash* meta, osrfMethodContext* ctx, char** cursor_name ) {
 
        const char* locale = osrf_message_get_last_locale();
 
@@ -5586,7 +5607,9 @@ static char* buildSELECT ( const jsonObject* search_hash, jsonObject* rest_of_qu
        if( !table )
                table = strdup( "(null)" );
 
-       buffer_fadd( sql_buf, "SELECT %s FROM %s AS \"%s\"", col_list, table, core_class );
+    *cursor_name = random_cursor_name();
+
+       buffer_fadd( sql_buf, "DECLARE \"%s\" NO SCROLL CURSOR WITH HOLD FOR SELECT %s FROM %s AS \"%s\"", *cursor_name, col_list, table, core_class );
        free( col_list );
        free( table );
 
@@ -5642,7 +5665,7 @@ static char* buildSELECT ( const jsonObject* search_hash, jsonObject* rest_of_qu
                free( pred );
        }
 
-       // Add the ORDER BY, LIMIT, and/or OFFSET clauses, if present
+       // Add the ORDER BY, if present
        if( rest_of_query ) {
                const jsonObject* order_by = NULL;
                if( ( order_by = jsonObjectGetKeyConst( rest_of_query, "order_by" )) ){
@@ -5798,30 +5821,6 @@ static char* buildSELECT ( const jsonObject* search_hash, jsonObject* rest_of_qu
 
                        free( order_by_list );
                }
-
-               const jsonObject* limit = jsonObjectGetKeyConst( rest_of_query, "limit" );
-               if( limit ) {
-                       const char* str = jsonObjectGetString( limit );
-                       if (str) {
-                               buffer_fadd(
-                                       sql_buf,
-                                       " LIMIT %d",
-                                       atoi(str)
-                               );
-                       }
-               }
-
-               const jsonObject* offset = jsonObjectGetKeyConst( rest_of_query, "offset" );
-               if( offset ) {
-                       const char* str = jsonObjectGetString( offset );
-                       if (str) {
-                               buffer_fadd(
-                                       sql_buf,
-                                       " OFFSET %d",
-                                       atoi( str )
-                               );
-                       }
-               }
        }
 
        if( defaultselhash )
@@ -5943,8 +5942,12 @@ static jsonObject* doFieldmapperSearch( osrfMethodContext* ctx, osrfHash* class_
 
        int i_respond_directly = 0;
        int flesh_depth = 0;
+    int offset = 0;
+    int limit = 0;
+    int cursor_page_size = 5;
+       char* cursor_name;
 
-       char* sql = buildSELECT( where_hash, query_hash, class_meta, ctx );
+       char* sql = buildSELECT( where_hash, query_hash, class_meta, ctx, &cursor_name );
        if( !sql ) {
                osrfLogDebug( OSRF_LOG_MARK, "Problem building query, returning NULL" );
                *err = -1;
@@ -5987,6 +5990,25 @@ static jsonObject* doFieldmapperSearch( osrfMethodContext* ctx, osrfHash* class_
        }
 
 
+       const jsonObject* limit_o = jsonObjectGetKeyConst( query_hash, "limit" );
+       if( limit_o ) {
+               limit = (int) jsonObjectGetNumber( limit_o );
+       } else {
+        limit = -1;
+    }
+
+       const jsonObject* offset_o = jsonObjectGetKeyConst( query_hash, "offset" );
+       if( offset_o ) {
+               offset = (int) jsonObjectGetNumber( offset_o );
+       } else {
+        offset = -1;
+    }
+
+    // set cursor_page_size to the largest of the three
+    if (cursor_page_size < offset) cursor_page_size = offset;
+    if (cursor_page_size < limit) cursor_page_size = limit;
+
+    // Declares a cursor
        dbi_result result = dbi_conn_query( dbhandle, sql );
 
        if( NULL == result ) {
@@ -6035,11 +6057,36 @@ static jsonObject* doFieldmapperSearch( osrfMethodContext* ctx, osrfHash* class_
                i_respond_directly = ( *methodtype == 'r' || *methodtype == 'i' || *methodtype == 's' );
 
                rs_size = (int *) safe_malloc( sizeof(int) );   // will be freed by sessionDataFree()
-               unsigned long long result_count = dbi_result_get_numrows( result );
+               unsigned long long result_count = cursor_page_size;
                *rs_size = (int) result_count * (flesh_depth + 1);      // yes, we could lose some bits, but come on
                osrfHashSet( (osrfHash *) ctx->session->userData, rs_size, "rs_size_req_%d", ctx->request );
        }
 
+    
+       dbi_result_free( result ); // toss the declare result
+       result = dbi_conn_queryf( dbhandle, "FETCH %d FROM \"%s\";", cursor_page_size, cursor_name );
+
+       if( NULL == result ) {
+               const char* msg;
+               int errnum = dbi_conn_error( dbhandle, &msg );
+               osrfLogError(OSRF_LOG_MARK, "%s: Error retrieving %s with query [%s]: %d %s",
+                       modulename, osrfHashGet( class_meta, "fieldmapper" ), sql, errnum,
+                       msg ? msg : "(No description available)" );
+               if( !oilsIsDBConnected( dbhandle ))
+                       osrfAppSessionPanic( ctx->session );
+               osrfAppSessionStatus(
+                       ctx->session,
+                       OSRF_STATUS_INTERNALSERVERERROR,
+                       "osrfMethodException",
+                       ctx->request,
+                       "Severe query error -- see error log for more details"
+               );
+               *err = -1;
+               free( sql );
+               return NULL;
+
+       }
+
        if( dbi_result_first_row( result )) {
 
                // Convert each row to a JSON_ARRAY of column values, and enclose those objects
@@ -6047,7 +6094,9 @@ static jsonObject* doFieldmapperSearch( osrfMethodContext* ctx, osrfHash* class_
                // eliminate the duplicates.
                osrfLogDebug( OSRF_LOG_MARK, "Query returned at least one row" );
                osrfHash* dedup = osrfNewHash();
+        int cursor_page_remaining = cursor_page_size;
                do {
+            cursor_page_remaining--;
                        row_obj = oilsMakeFieldmapperFromResult( result, class_meta );
                        char* pkey_val = oilsFMGetString( row_obj, pkey );
                        if( osrfHashGet( dedup, pkey_val ) ) {
@@ -6056,11 +6105,46 @@ static jsonObject* doFieldmapperSearch( osrfMethodContext* ctx, osrfHash* class_
                        } else {
                                if( !enforce_pcrud || !need_to_verify ||
                                                verifyObjectPCRUD( ctx, class_meta, row_obj, 0 /* means check user data for rs_size */ )) {
-                                       osrfHashSet( dedup, pkey_val, pkey_val );
-                                       jsonObjectPush( res_list, row_obj );
+                    if (offset > -1 && offset) {
+                        offset--;
+                    } else {
+                                           if (limit > -1 && limit) limit--;
+                        osrfHashSet( dedup, pkey_val, pkey_val );
+                                           jsonObjectPush( res_list, row_obj );
+                    }
                                }
                        }
-               } while( dbi_result_next_row( result ));
+            if (!cursor_page_remaining) { // fetch next page
+                cursor_page_remaining = cursor_page_size;
+                dbi_result_free( result );
+                result = dbi_conn_queryf( dbhandle, "FETCH %d FROM \"%s\";", cursor_page_size, cursor_name );
+
+               if( NULL == result ) {
+                       const char* msg;
+                       int errnum = dbi_conn_error( dbhandle, &msg );
+                       osrfLogError(OSRF_LOG_MARK, "%s: Error retrieving %s with query [%s]: %d %s",
+                               modulename, osrfHashGet( class_meta, "fieldmapper" ), sql, errnum,
+                               msg ? msg : "(No description available)" );
+                       if( !oilsIsDBConnected( dbhandle ))
+                               osrfAppSessionPanic( ctx->session );
+                       osrfAppSessionStatus(
+                               ctx->session,
+                               OSRF_STATUS_INTERNALSERVERERROR,
+                               "osrfMethodException",
+                               ctx->request,
+                               "Severe query error -- see error log for more details"
+                       );
+                       *err = -1;
+                       free( sql );
+                       return NULL;
+               }
+            }
+               } while(
+            limit && ( // limit is -1 (none) or not yet decremented to 0
+                (cursor_page_remaining == cursor_page_size && dbi_result_first_row(result)) || // on a new page, and there's a row
+                (cursor_page_remaining != cursor_page_size && dbi_result_next_row(result)) // on the same page, and there's a row
+            )
+        );
                osrfHashFree( dedup );
 
        } else {
@@ -6070,6 +6154,8 @@ static jsonObject* doFieldmapperSearch( osrfMethodContext* ctx, osrfHash* class_
 
        /* clean up the query */
        dbi_result_free( result );
+    result = dbi_conn_queryf( dbhandle, "CLOSE \"%s\";", cursor_name );
+       dbi_result_free( result );
        free( sql );
 
        // If we're asked to flesh, and there's anything to flesh, then flesh it