Creating the Connection Logic

Last updated on Jul 18, 2025

The connection logic is the foundation of your custom connector. It involves setting up the connection, retrieving metadata, fetching data, and closing the connection once the process is complete. To build a custom connector, perform the following steps:

  1. Initializing Connection to Source: Establish a connection to your Source using the necessary authentication and access details required to connect to your Source. The input consists of connection details such as hostname, port, username, and password. The output is either a successful connection instance or an error if the connection fails.

  2. Fetching Objects from Source: Retrieve a list of tables or objects available in the Source. This method returns a list of objects, which could include tables, views, or collections, depending on the Source.

  3. Fetching Schema Details for Objects: Extract the schema for each selected object, defining field types, constraints, and other properties. The input is the list of objects retrieved in the previous step, and the output is an ObjectSchema list that describes the fields and data types for each object.

  4. Fetching Data from the Source: Retrieve records from the Source, transform them into Hevo’s internal format, and push them into the Pipeline. The input includes the schema, selected objects, and a ConnectorContext object provided by Hevo. This object contains essential information such as the current offset for incremental data fetching, details about any associated child objects, and schema details. The method returns structured data wrapped in an HStruct object, ready for processing.

  5. Closing the Connection: Once data processing is complete, terminate the connection and release any allocated resources.

The following skeleton code outlines how each of these steps is implemented within the connector class:

@Override
public void initializeConnection() {
    // Implement sample test connection
}

@Override
public List<ObjectDetails> getObjects() {
    return new LinkedList<>();
}

@Override
public List<ObjectSchema> fetchSchemaFromSource(List<ObjectDetails> objectDetails) {
    return Collections.emptyList();
}

@Override
public ExecutionResult fetchDataFromSource(
    ConnectorContext context, ConnectorProcessor processor) {
    return new ExecutionResult(
        Map.of(context.schema().objectDetail(), new ExecutionResultStats(0L)), Offset.empty());
}

@Override
public void close() {
    // Do nothing
}

Initializing Connection to Source

The first step in interacting with your Source is to establish a connection. To do this, Hevo uses the initializeConnection() method. This method does the following:

  • Establishes a session with the external system.

  • Authenticates the user using API keys, OAuth, or username-password authentication.

  • Validates the connection by performing a simple query or API call.

  • Throws a ConnectorException if the connection fails.

By default, initializeConnection() method just logs the connection attempt.

Connection Logic for SaaS Sources

For SaaS sources, Hevo provides a built-in REST API client to simplify connection logic. You have to provide the required API configuration, such as URLs, authentication tokens, headers, and parameters.

The following is a sample SaaS REST connection initialization method:

@Override
public void initializeConnection() throws ConnectorException {
    this.apiClient = RestClient.getInstance();
    apiClient.setBaseUrl("https://api.yoursaas.com/v1/");
    apiClient.setAuthToken(authToken); // Obtained via UI components
    apiClient.testConnection(); // Custom validation logic
}

In practical use, this method incorporates user-provided credentials and configurations collected through UI components such as @Property, @Auth, and @Group in the connector class. Read Creating the UI Components. These UI components allow users to input the following required connection details:

  • Hostname and Port for databases

  • API Keys for services using token-based authentication

  • OAuth Credentials for SaaS platforms such as Salesforce or Google Analytics

  • Account types such as Production or Sandbox

  • Optional settings such as SSL or SSH

When users input these details in the UI and save the configuration, Hevo injects the values into the connector class at runtime, making them accessible within the initializeConnection() method.

The actual connection logic in the method must be built using the user-provided values to authenticate and establish a session.

The following simplified example demonstrates connecting to Salesforce using OAuth:

@Override
public void initializeConnection() throws ConnectorException {
    log.info("Initializing connection to Salesforce...");

    this.httpClient = new OkHttpClientBuilder().build();
    String tokenUrl = accountType == AccountType.SANDBOX
        ? "https://test.salesforce.com/services/oauth2/token"
        : "https://login.salesforce.com/services/oauth2/token";

    RequestBody body = new FormBody.Builder()
        .add("grant_type", "refresh_token")
        .add("client_id", authCredentials.getClientId())
        .add("client_secret", authCredentials.getClientSecret())
        .add("refresh_token", authCredentials.getRefreshToken())
        .build();

    Request request = new Request.Builder()
        .url(tokenUrl)
        .post(body)
        .build();

    HttpResponse response = httpClient.execute(request);
    if (response.code() != 200) {
        throw new ConnectorException("Failed to authenticate with Salesforce");
    }

    log.info("Salesforce connection successful");
}

In this example, the connector retrieves OAuth credentials, client ID, client secret, and refresh token, from the user input through the UI. It uses these credentials to authenticate securely with Salesforce by sending an HTTP request to Salesforce’s OAuth endpoint.

Ensure your custom connector uses the appropriate connection logic based on the Source’s authentication method. For example, use JDBC for databases or API keys/tokens for API-based services, based on user-provided configuration.

Once a connection is successfully initialized, you can proceed to retrieve objects and metadata from the Source.

Fetching Objects from Source

After establishing a connection, the next step is to retrieve the available objects from the Source. These objects are structured data entities such as tables or views.

To do this, Hevo uses the getObjects() method. This retrieves a list of data objects from the Source and returns metadata about each object. This metadata allows Hevo to understand the structure and status of your data without actually retrieving the data itself. It provides the following details:

  • Object Name: The unique identifier of the data object.

  • Object Type: The kind of data object, such as a table or view.

  • Delimiter: The character or string that separates parts of the fully qualified name. For example, in a database with multiple schemas, a fully qualified table name might be schema.table, where . is the delimiter.

  • Object Status: The current state of the data object, which can be one of three statuses:

    • ACTIVE: The object is operational and can be accessed.

    • INACCESSIBLE: The object exists but cannot be accessed due to permissions, errors, or other issues.

    • DELETED: The object has been removed from the system.

public List<ObjectDetails> getObjects() {
    ObjectDetails o1 =
        ObjectDetails.builder()
            .tableName("o1")
            .type("TABLE")
            .delimiter(".")
            .sourceObjectStatus(SourceObjectStatus.INACCESSIBLE)
            .build();
    ObjectDetails o2 =
        ObjectDetails.builder()
            .tableName("o2")
            .type("TABLE")
            .delimiter(".")
            .sourceObjectStatus(SourceObjectStatus.ACTIVE)
            .build();

    return Arrays.asList(o1, o2);
}

Fetching Objects for SaaS Sources

For SaaS connectors with static objects, Hevo allows you to define objects in a JSON, YAML, or YML file, eliminating the need for explicit API calls to fetch object metadata.

The following is an example of a JSON definition:

[
  {
    "namespace": {"table": "tickets"},
    "type": "TABLE",
    "status": "ACTIVE",
    "fields": [
      {"name": "id", "logical_type": "hdut_long", "nullable": false},
      {"name": "subject", "logical_type": "hdut_varchar"}
    ]
  }
]

Hevo parses this file automatically when implementing getObjects():

@Override
public List<ObjectDetails> getObjects() {
    return super.getObjects(); // Auto-parses defined JSON schema
}

The getObjects() method works as follows:

  • Constructs a list of objects that the connector can interact with. Each object is represented by an ObjectDetails instance.

  • Specifies the table name, type, delimiter, and status. In the code snippet above, o1 is INACCESSIBLE, while o2 is ACTIVE.

  • Returns a list containing object metadata that Hevo can use to present data sources to the user.

Each object in your Source is represented by an ObjectDetails instance. Following is a breakdown of the parameters used when defining an object:

Parameter Description
catalogName Name of the database or catalog containing the object. It can be empty or NULL if unsupported.
schemaName Name of the schema containing the object. It can be empty or NULL.
tableName Name of the table or view. It cannot be empty or NULL.
type Type of the object such as TABLE, VIEW, or any other classification.
delimiter Character or string used to concatenate catalogName, schemaName, and tableName into a unique identifier.
sourceObjectStatus Status of the object at the Source such as ACTIVE, INACCESSIBLE, or DELETED.
inActiveReason Reason why an object may be INACCESSIBLE.

Your Source could be a database, SaaS, or a file-based storage system. Here are some examples that show how the key parameters map to a real-world Source:

For a Database Source

If your Source is a database, you can use SQL queries to retrieve tables.

@Override
public List<ObjectDetails> getObjects() {
    // Example: Fetch tables from a database source
    return List.of(
        ObjectDetails.builder()
                .catalog("sales_db")
                .schema("public")
                .table("customers")
                .type("TABLE")
                .delimiter(".")
                .sourceObjectStatus(SourceObjectStatus.ACTIVE)
                .build(),
        ObjectDetails.builder()
                .catalog("sales_db")
                .schema("public")
                .table("orders")
                .type("TABLE")
                .delimiter(".")
                .sourceObjectStatus(SourceObjectStatus.INACCESSIBLE)
                .inActiveReason("Permission Denied")
                .build());
    );
}

For a File-Based Source

If your Source stores files instead of structured tables, you can treat folders or CSV files as objects.

@Override
public List<ObjectDetails> getObjects() {
    // Example: Fetch objects from a file-based source
    return List.of(
        ObjectDetails.builder()
            .table("customer_data.csv")
            .type("FILE")
            .delimiter("/")
            .sourceObjectStatus(SourceObjectStatus.ACTIVE)
            .build(),
        ObjectDetails.builder()
            .table("archive")
            .type("FOLDER")
            .delimiter("/")
            .sourceObjectStatus(SourceObjectStatus.INACCESSIBLE)
            .inActiveReason("Restricted Access")
            .build()
    );
}

The custom connector can now retrieve available objects from your Source. It filters out unsupported objects and assigns attributes such as type, delimiter, and status. The retrieved objects serve as the foundation for schema extraction, ensuring only relevant tables and views are processed in the next step.

Fetching Schema Details for Objects

Once the available objects are retrieved from the Source, the next step is to fetch the schema details. The schema defines the structure of each object, including the field names, data types, and constraints.

To do this Hevo uses the fetchSchemaFromSource() method. This method retrieves the columns or fields for each object. It then defines data types for each column and returns the schema in an ObjectSchema format, which includes a list of fields for each object. It provides the following details:

  • Object Name: Unique identifier of the data object.

  • Fields: Set of columns belonging to the object.

    • Field Name: Name of each column in the table or view.

    • Field Type: Data type of each field, such as INTEGER, VARCHAR, or TIMESTAMP.

    • Primary Key: Specifies whether a field is part of the primary key.

    • Cursor Key: Indicates whether a field is used as a cursor key for incremental data fetching.

    • Nullable: Indicates whether the column allows NULL values.

    • Field Status: State of the column in the Source.

    • Complex Types: Supports structured fields like ARRAY, JSON, and XML.

@Override
public List<ObjectSchema> fetchSchemaFromSource(List<ObjectDetails> objectDetails) {
    Set<Field> fields1 = new HashSet<>();
    HIntegerField.Builder id1 =
        new HIntegerField.Builder("id", "INTEGER", 0, FieldState.ACTIVE).isNullable(false);
    id1.pkPos(0);
    HDateTimeField.Builder ts1 =
        new HDateTimeField.Builder("updated_ts", "TIMESTAMP", 2, FieldState.ACTIVE, 9)
            .isNullable(false);
    ts1.ckOrdinal(0);
    fields1.add(id1.build());
    fields1.add(ts1.build());

    Set<Field> fields2 = new HashSet<>();
    HIntegerField.Builder id2 =
        new HIntegerField.Builder("id", "INTEGER", 1, FieldState.ACTIVE).isNullable(false);
    HDateTimeField.Builder ts2 =
        new HDateTimeField.Builder("updated_ts", "TIMESTAMP", 2, FieldState.ACTIVE, 9)
            .isNullable(false);
    ts2.ckOrdinal(1);
    fields2.add(id2.build());
    fields2.add(ts2.build());

    ObjectSchema os1 = new ObjectSchema(objectDetails.get(0), fields1);
    ObjectSchema os2 = new ObjectSchema(objectDetails.get(1), fields2);
    return Arrays.asList(os1, os2);
}

Fetching Schema for SaaS Sources

For static SaaS schemas, Hevo uses your provided JSON or YAML file to auto-generate schema details, bypassing the need for manual field definitions:

The following is an example of an auto-generated schema method:

@Override
public List<ObjectSchema> fetchSchemaFromSource(List<ObjectDetails> objectDetails) {
    return super.fetchSchemaFromSource(objectDetails); // Uses predefined schema file
}

The fetchSchemaFromSource() method retrieves schema details for each object in the Source and returns them as ObjectSchema instances. In this above example, fields1 and fields2 are created to store column definitions for two objects.

Set<Field> fields1 = new HashSet<>();
Set<Field> fields2 = new HashSet<>();

The method defines an id column as an integer using HIntegerField.Builder. This marks the column as non-nullable and, in the case of the first object, assigns it as a primary key.

HIntegerField.Builder id1 =
    new HIntegerField.Builder("id", "INTEGER", 0, FieldState.ACTIVE).isNullable(false);
id1.pkPos(0);
fields1.add(id1.build());

It also defines an updated_ts column using HDateTimeField.Builder by specifying it as a timestamp in nanoseconds and setting it as a cursor key for tracking the position of the last fetched record.

HDateTimeField.Builder ts1 =
    new HDateTimeField.Builder("updated_ts", "TIMESTAMP", 2, FieldState.ACTIVE, 9)
        .isNullable(false);
ts1.ckOrdinal(0);
fields1.add(ts1.build());

If your Source contains different columns, you need to modify the field definitions accordingly by changing column names, data types, and constraints such as primary keys, nullability, or clustering keys. The same logic is repeated for every object in the Source.

The method then creates two ObjectSchema instances, linking the object details to their corresponding fieldsets. You need to ensure that objectDetails.get(0) and objectDetails.get(1) reference the correct objects from the Source, and if more objects exist, they should follow the same pattern.

ObjectSchema os1 = new ObjectSchema(objectDetails.get(0), fields1);
ObjectSchema os2 = new ObjectSchema(objectDetails.get(1), fields2);

Finally, the method returns a list of schemas, one for each object.

return Arrays.asList(os1, os2);

Each column in the Source database is represented by a corresponding field object. When retrieving schema details, your custom connector identifies the column name and its data type and then maps it to an appropriate Hevo field type. The following is a breakdown of the different supported data types in Hevo:

Basic Data Types

Field Type Description Example Source Data Type
HBooleanField Boolean values such as TRUE, FALSE, or NULL. BOOLEAN
HVarcharField String values stored as UTF-8 text. VARCHAR(255), TEXT
HByteField Single-byte integer. TINYINT
HByteArrayField Byte array for binary data. BLOB, BYTEA

Number Data Types

Field Type Description Example Source Data Type
HDecimalField High-precision decimal numbers. DECIMAL(10,2), NUMERIC
HDoubleField Double-precision floating-point numbers. DOUBLE PRECISION
HFloatField Floating-point numbers. FLOAT, REAL
HIntegerField Integer values. INTEGER, INT
HLongField Long integer values. BIGINT
HShortField Short integer values. SMALLINT

Date and Time Data Types

Field Type Description Example Source Data Type
HDateField Represents a date (YYYY-MM-DD). DATE
HDateTimeField Represents a timestamp (YYYY-MM-DD HH:MM:SS). TIMESTAMP
HDateTimeTZField Timestamp with time zone. TIMESTAMP WITH TIME ZONE
HTimeField Represents time (HH:MM:SS). TIME
HTimeTZField Represents time with a time zone offset. TIME WITH TIME ZONE

Complex Data Types

Field Type Description Example Source Data Type
HArrayField An array where all elements are of the same type. ARRAY, JSON ARRAY
HJsonField Stores JSON-formatted text. JSON, JSONB
HStructField Stores structured (nested) data. STRUCT, COMPOSITE TYPE
HXMLField Stores XML-formatted text. XML

By querying the Source database’s metadata, your custom connector retrieves column details for each table, maps them to Hevo’s supported field types, and constructs the schema. With the schema now defined, the next step is to use this structure to fetch data and process it.

Fetching Data from the Source

Once the schema details are retrieved, the next step is to fetch actual data from the Source. The fetched data is then processed and sent forward for further transformations or loading.

To do this, Hevo uses the fetchDataFromSource() method. This method retrieves records for each object and maps them to their respective columns. It then constructs the data as a list of HDatum objects, where each HDatum represents a field in the object. The method processes and publishes this data using ConnectorProcessor, ensuring that it is pushed into the Pipeline for further use. It provides the following details:

  • Object Name: Unique identifier of the data object.

  • Fields: Collection of data points belonging to the object.

  • Field Name: Name of each column in the record.

  • Field Value: Value retrieved for the field.

  • Field Type: Data type of the field, such as INTEGER, VARCHAR, or TIMESTAMP.

  • Processing Status: Indicates if the data has been successfully read and published.

  • Offset Tracking: Last processed record to support incremental data fetching.

Optionally, you can add support for parent-child relationships. When fetching data from your Source, you can simultaneously process a parent object, such as Zendesk articles, and its associated child objects, such as comments or attachments. If your Source does not have related child objects, this functionality can be safely omitted.

@Override
public ExecutionResult fetchDataFromSource(
    ConnectorContext connectorContext, ConnectorProcessor connectorProcessor)
    throws ConnectorException {

  Map<ObjectDetails, ExecutionResultStats> recordStats = new HashMap<>();
  Offset.Builder currentRecordOffset = Offset.builder();

  // Fetch and process parent object data
  List<HDatum> parentRow = new ArrayList<>(connectorContext.schema().fields().size());
  for (Field col : connectorContext.schema().fields()) {
    HDataType hDataType = HDataType.fromLogicalType(col.logicalType());
    if (hDataType.equals(HDataType.INTEGER)) {
      parentRow.add(new HInteger(1));
    } else if (hDataType.equals(HDataType.DATE_TIME)) {
      parentRow.add(new HDateTime(LocalDateTime.now()));
    }
    // Add additional data type handling here as needed
  }

  // Publish parent object data
  connectorProcessor.publish(
      new HStruct(parentRow),
      ConnectorMeta.builder().opType(OpType.READ).build(),
      connectorContext.schema().objectDetail()
  );
  recordStats.put(connectorContext.schema().objectDetail(), new ExecutionResultStats(1));

  // Optional: Process child objects, if present
  if (connectorContext.childSchemas() != null && !connectorContext.childSchemas().isEmpty()) {
    for (ObjectSchema childSchema : connectorContext.childSchemas()) {
      // Process each child object separately
      List<HDatum> childRow = new ArrayList<>(childSchema.fields().size());
      for (Field childCol : childSchema.fields()) {
        HDataType childDataType = HDataType.fromLogicalType(childCol.logicalType());
        if (childDataType.equals(HDataType.INTEGER)) {
          childRow.add(new HInteger(1));
        } else if (childDataType.equals(HDataType.DATE_TIME)) {
          childRow.add(new HDateTime(LocalDateTime.now()));
        }
        // Additional data type handling as needed
      }

      // Publish child object data
      connectorProcessor.publish(
          new HStruct(childRow),
          ConnectorMeta.builder().opType(OpType.READ).build(),
          childSchema.objectDetail()
      );

      // Update recordStats for child object
      recordStats.put(childSchema.objectDetail(), new ExecutionResultStats(1));
    }
  }

  return new ExecutionResult(recordStats, currentRecordOffset.build());
}

Fetching Data for SaaS Sources

Hevo provides helper methods for SaaS connectors to fetch data via REST API and map it directly into HStruct objects:

Response apiResponse = apiClient.request(request);
Map<String, Object> refinedData = DataConverterUtils.convertResponse(apiResponse, objectMapper, new TypeReference<>() {});
HStruct row = DataConverterUtils.convertToHStruct(refinedData, connectorContext.schema());
connectorProcessor.publish(row, ConnectorMeta.builder().opType(OpType.READ).build(), connectorContext.schema().objectDetail());

Customize the convertResponse() method if needed.


The fetchDataFromSource() method begins by creating a list to store the fetched data, where each value corresponds to a column in the schema. By default, it assigns placeholder values, 1 for integers and the current timestamp for date-time fields. To fetch real data from the Source instead of using placeholder values, replace the hardcoded assignments with actual values retrieved from your data Source. For example, if you are fetching data from a database, execute a query to get records and populate row dynamically:

if (hDataType.equals(HDataType.INTEGER)) {
    int value = resultSet.getInt(col.name()); // Fetch actual value from result set
    row.add(new HInteger(value));
} else if (hDataType.equals(HDataType.DATE_TIME)) {
    LocalDateTime timestamp = resultSet.getTimestamp(col.name()).toLocalDateTime();
    row.add(new HDateTime(timestamp));
}

An Offset.Builder object is also initialized to track the last processed record, which helps manage incremental fetching. As the method loops through each field in the schema, it determines the field’s data type using HDataType.fromLogicalType(). Ensure these mappings match your Source’s actual data types.

Hevo supports mapping to the following data types:

  • HBoolean

  • HVarchar

  • HByte

  • HInteger

  • HDecimal

  • HDouble

  • HFloat

  • HLong

  • HShort

  • HDate

  • HDateTime

  • HDateTimeTZ

  • HTime

  • HTimeTZ

  • HArray

  • HJsonField

  • HStructField

  • HXMLField.

For a detailed explanation of these data types, refer to the table in Step 3.

Once the data is retrieved, it is wrapped inside an HStruct object and published using the connectorProcessor.publish() method along with the relevant ObjectDetails. This explicitly associates the processed data with its corresponding Source object and pushes it into Hevo’s Pipeline. Finally, the method returns an ExecutionResult containing a detailed mapping of each Source object (ObjectDetails) to its respective count of processed records (ExecutionResultStats), along with the updated offset. While testing your custom connector, you can validate that the returned result accurately reflects the number of records processed per Source object.

Closing the Connection

After fetching data, it is important to properly terminate the connection to release resources and avoid memory leaks.

To do this, Hevo uses the close() method. This method ensures that any open connections or resources are properly released. By default, the method is empty, meaning no cleanup is performed unless explicitly implemented. You must update this method to terminate database connections, file streams, or other resources your custom connector uses.

@Override
public void close() {
    // Clean resources
}

Tell us what went wrong