How to perform LO (large object) replication with pgEdge

This article provides instructions on using multi-node cluster and the LOLOR extension to perform LO replication

Replication of large objects isn't supported by PostgreSQL core logical replication. A meaningful error “Large objects aren’t supported by logical replication” is returned when trying to replicate a large object using logical replication.


pgEdge has developed an extension named LargeObjectLOgicalReplication (LOLOR)” project that will provide support for replicating large objects, the primary goal of this extension is to provide seamless replication of LO using pgEdge Spock replication. Large objects are accessed and manipulated in PostgreSQL using the following client interface functions : 


  • lo_create
  • lo_import
  • lo_import_with_oid
  • lo_open
  • lo_export
  • lo_read
  • lo_write
  • lo_seek
  • lo_seek64

The pgEdge extension will continue to support the same LO functions hence all the existing applications using the above functions for using LO will continue to work seamlessly. 


You need to download the pgEdge platform in order to use this extension. We are going to create a two node pgEdge cluster on localhost to demonstrate the capability of replicating large objects. The post contains native PSQL example of using the extension for replicating large objects and then it contains a JDBC example which shows how we can use the extension from a java program using a JDBC driver.


In any directory owned by your non-root user, install pgEdge on all nodes of the cluster:

python3 -c "$(curl -fsSL https://pgedge-upstream.s3.amazonaws.com/REPO/install.py)"


Node 1 setup

Navigate into the pgedge directory on node 1 and perform the following steps : 

Run the command below to setup the pgEdge platform; this will install PG-16 and pgEdge extensions. 

pgedge setup -U demo -P pgedge110 -d testdb -p 5432


-U is database super user

-P password for the database user

-d name of the database

-p port number (5432 is default PostgreSQL port)


Run the following command to create a spock node. In this example we are creating a node named n1. Please note that pgedge user used in the command below needs to be a OS user : 


pgedge spock node-create n1 'host=localhost user=pgedge dbname=testdb' testdb


The following command is executed to create the subscription from node 1 to node 2, please note that we need to run this command once the pgEdge initial setup on node 2 is completed.


pgedge spock sub-create sub_n1n2 ‘host=localhost port=5444 user=pgedge dbname=testdb’ testdb


Now we are ready to install the LOLOR extension : 


pgedge install lolor


Now login into PSQL and run the following statement


./psql -U demo testdb -p 5432

create extension lolor;


The following configuration parameter needs to be set for configuring LOLOR, lolor.node is required to be set before using the extension. It's value can be from 1 to 2^28, it will

help in generation of new large object OID. 



lolor.node=1


Please restart the server after adding the above configuration parameter to postgresql.conf.


The catalog tables of large object need to be part of the replication set, the following commands are used for adding the large objects to the default replication set.


./pgedge spock repset-add-table default 'lolor.pg_largeobject' testdb

./pgedge spock repset-add-table default 'lolor.pg_largeobject_metadata' testdb



The following commands are executed to enable automatic DDL replication : 


./pgedge db guc-set spock.enable_ddl_replication on


./pgedge db guc-set spock.allow_ddl_from_functions on


./pgedge db guc-set spock.include_ddl_repset on


Please restart the server after adding the above configuration parameter to postgresql.conf.




Node 2 setup

Go into pgedge directory on node 2 and perform the following steps : 

Run the command below to setup the pgEdge platform, this will install PG-16 and pgEdge extensions.

./pgedge setup -U demo -P pgedge110 -d testdb -p 5444


-U is database super user

-P password for the database user

-d name of the database

-p port number (5444)


Run the following command to create a spock node, in this case we are creating node 2, please note that pgedge user used in the command below needs to be a OS user : 


./pgedge spock node-create n2 'host=localhost user=pgedge port=5444 dbname=testdb' testdb


The following command is executed to create the subscription from node 1 to node 2.


./pgedge spock sub-create sub_n2n1 ‘host=localhost port=5432 user=pgedge dbname=testdb’ testdb


Now we are ready to install the LOLOR extension : 


./pgedge install lolor


Now login into PSQL and run the following statement


./psql -U demo testdb -p 5444

create extension lolor;



The following configuration parameter needs to be set for configuring LOLOR, lolor.node is required to be set before using the extension. It's value can be from 1 to 2^28, it will

help in generation of new large object OID. 



lolor.node=2


Please restart the server after adding the above configuration parameter to postgresql.conf.


The catalog tables of large object need to be part of the replication set, the following commands are used for adding the large objects to the default replication set.


./pgedge spock repset-add-table default 'lolor.pg_largeobject' testdb

./pgedge spock repset-add-table default 'lolor.pg_largeobject_metadata' testdb



The following commands are executed to enable automatic DDL replication : 


./pgedge db guc-set spock.enable_ddl_replication on


./pgedge db guc-set spock.allow_ddl_from_functions on


./pgedge db guc-set spock.include_ddl_repset on


Please restart the server after adding the above configuration parameter to postgresql.conf.

PSQL Example code

Now we are going to do a short test of large object replication using the PSQL client, PSQL is native PostgreSQL client that uses the libpq driver.


We are going to perform the following commands on node 1 and see if the large object is getting replicated on node 1.



create table test_lolor(id int primary key, lo_object oid);


INSERT INTO test_lolor VALUES 

(008, lo_from_bytea(0, '\xaced0005774d0a060805100418005243080010001a3918002000320608011000180042121a080a044d41494e100022060a044d41494e52090a0744454641554c545a0c0a0a0805320608001000180122026800'));


testdb=# select id,lo_get(lo_object) from test_lolor; 

 id |                                                                                  lo_get                                                                 

                 

----+---------------------------------------------------------------------------------------------------------------------------------------------------------

-----------------

  8 | \xaced0005774d0a060805100418005243080010001a3918002000320608011000180042121a080a044d41494e100022060a044d41494e52090a0744454641554c545a0c0a0a080532060800

1000180122026800

(1 row)



We have auto_ddl enabled so the table is also getting replicated to other nodes. We are going to run the select statement on node 2 to see if the large object was replicated.



testdb=# select id,lo_get(lo_object) from test_lolor; 

 id |                                                                                  lo_get                                                                 

                 

----+---------------------------------------------------------------------------------------------------------------------------------------------------------

-----------------

  8 | \xaced0005774d0a060805100418005243080010001a3918002000320608011000180042121a080a044d41494e100022060a044d41494e52090a0744454641554c545a0c0a0a080532060800

1000180122026800

(1 row)


JDBC Example code


The following program code connects with a pgEdge node and loads `/etc/os-release` file

in the database as a large object and perform retrieval operations.


Example.java


package lolor;


import java.sql.*;

import java.io.*;

import java.util.Properties;

import java.nio.charset.StandardCharsets;


import org.postgresql.PGConnection;

import org.postgresql.largeobject.LargeObject;

import org.postgresql.largeobject.LargeObjectManager;


public class Example {

privatestaticConnectionpgconn=null;

privatestaticPropertiesdbProps;

privatefinalstaticStringdbPropsFile="app.properties";


/*

  * load property file

  */

publicstaticvoidloadDBPropertiesFile()throwsException{


     dbProps = new Properties();

     InputStream in = new FileInputStream(dbPropsFile);

     dbProps.load(in);

     in.close();

}


/*

  * Connect with PG

  */

publicstaticvoidconnectPG()

         throws Exception {

     try {

         // Set the search_path to pick lolor schema first

         dbProps.setProperty("options", "-c search_path=lolor,\"$user\",public,pg_catalog");


         pgconn = DriverManager.getConnection(dbProps.getProperty("url"), dbProps);

         pgconn.setAutoCommit(false);

     } catch (SQLException e) {

         throw new RuntimeException(e);

     }

}


/*

  * Close the connection

  */

publicstaticvoiddisconnectPG()

         throws Exception {

     try {

         pgconn.close();

     } catch (SQLException e) {

         throw new RuntimeException(e);

     }

}


/*

  * Run query and return results

  * Perform commit if asked

  */

publicstaticStringexecuteSQL(Stringsql,booleandoCommit)

         throws Exception {

     String result = "";

     try {

         StringBuilder sbResult = new StringBuilder();

         PreparedStatement ps = pgconn.prepareStatement(sql);

         ResultSet rs = ps.executeQuery();

         ResultSetMetaData rsmd = rs.getMetaData();

         int columnsNumber = rsmd.getColumnCount();

         for (int i = 1; i <= columnsNumber; i++) {

             if (i > 1)

                 sbResult.append(",");

             sbResult.append(rsmd.getColumnName(i));

         }

         sbResult.append("\n");

         if (true) {

             while (rs.next()) {

                 for (int i = 1; i <= columnsNumber; i++) {

                     if (i > 1)

                         sbResult.append(",");

                     String columnValue = rs.getString(i);

                     sbResult.append(columnValue);

                 }

                 sbResult.append("\n");

             }

         }

         if (doCommit) {

             pgconn.commit();

         }

         result = sbResult.toString();

         return result;

     } catch (SQLException e) {

         // 02000 = no_data

         if (e.getSQLState().compareTo("02000") == 0) {

             return result;

         } else {

             pgconn.rollback();

             throw new RuntimeException(e);

         }

     }

}


/*

  * Initialize database

  */

publicstaticvoidinitDB()

         throws Exception {

             executeSQL("DROP TABLE pglotest_blobs;", true);

             String createTableSql = "CREATE TABLE pglotest_blobs (\n" +

             "    fname       text PRIMARY KEY,\n" +

             "    blob        oid\n" +

             ");";

             executeSQL(createTableSql, true);

             executeSQL("CREATE EXTENSION IF NOT EXISTS lolor;", true);

         }


   /*

  * Perform insert operation

  * It internally calls lo_create, lo_open, lo_write, lo_close

  * */

staticbyte[]do_insert(Stringfname)

         throws Exception {

     File file;

     FileInputStream fis;

     LargeObjectManager lom;

     long oid;

     LargeObject lo;

     byte[] buf = new byte[10];

     int n;

     ByteArrayOutputStream byteArrayOutStr = new ByteArrayOutputStream();


     // Open the input file as InputStream

     file = new File(fname);

     fis = new FileInputStream(file);


     // Create the LO

     lom = ((PGConnection) pgconn).getLargeObjectAPI();

     oid = lom.createLO();

     lo = lom.open(oid, LargeObjectManager.WRITE);

     while ((n = fis.read(buf, 0, buf.length)) > 0) {

         lo.write(buf, 0, n);

         byteArrayOutStr.write(buf, 0, n);

     }

     lo.close();


     // Create the entry in the pglotest_blobs table

     PreparedStatement ps = pgconn.prepareStatement("INSERT INTO pglotest_blobs VALUES (?, ?)");

     ps.setString(1, fname);

     ps.setLong(2, oid);

     ps.execute();

     ps.close();


     // Close the input file and commit the transaction

     fis.close();

     pgconn.commit();

     return byteArrayOutStr.toByteArray();

}


/*

  * Perform read operation

  * It internally calls lo_open, loread, lo_close

  */

staticbyte[]do_select(Stringfname)

         throws Exception {

     LargeObjectManager lom;

     long oid;

     LargeObject lo;

     byte[] buf = new byte[10];

     int n;

     ByteArrayOutputStream byteArrayOutStr = new ByteArrayOutputStream();


     // Get the Oid of the LO with that filename

     PreparedStatement ps = pgconn.prepareStatement("SELECT blob FROM pglotest_blobs WHERE fname = ?");

     ps.setString(1, fname);

     ResultSet rs = ps.executeQuery();

     if (rs.next()) {

         // Open the LO and read its content

         oid = rs.getLong(1);

         lom = ((PGConnection) pgconn).getLargeObjectAPI();

         lo = lom.open(oid, LargeObjectManager.READ);

         while ((n = lo.read(buf, 0, buf.length)) > 0) {

             byteArrayOutStr.write(buf, 0, n);

         }

         lo.close();

     } else {

         throw new Exception("Entry for " + fname + " not found");

     }


     // Rollback the transaction

     pgconn.rollback();

     return byteArrayOutStr.toByteArray();

}


publicstaticvoidmain(String[]args)throwsException{

     // Pick a sample file

     String textFile1 = "/etc/os-release";


     // Initialization

     loadDBPropertiesFile();

     connectPG();

     initDB();


     // Perform LO operations

     byte[] bufInput = do_insert(textFile1);

     byte[] bufRetrieved = do_select(textFile1);


     // Verify the results

     String input = new String(bufInput, StandardCharsets.UTF_8);

     String retrieved = new String(bufRetrieved, StandardCharsets.UTF_8);


     System.out.println("-----------------------");

     System.out.println("Text Input: ");

     System.out.println("-----------------------\n" + input);

     System.out.println("-----------------------");

     System.out.println("Text Retrieved: ");

     System.out.println("-----------------------\n" + retrieved);

}

}





We can specify connection information in the following file i.e.


app.properties


# JDBC

Jdbc.drivers=org.postgresql.Driver

url=jdbc:postgresql://localhost:5432/lolordb

username=asif

password=password


References:

https://jdbc.postgresql.org/documentation/publicapi/org/postgresql/largeobject/LargeObjectManager.html

https://jdbc.postgresql.org/documentation/publicapi/org/postgresql/largeobject/LargeObject.html