Publication channels

The publication channels make it possible to feed and maintain a queue of entities to be processed.

A channel is defined in the administration and has the following fields:

  • A name (Display name)
  • An identifier (unique identifier of the channel used by the connectors)
  • A catalog identifier (allows you to indicate a catalog associated with the channel) [optional]
  • A configuration JSON [optional]
  • A configuration file used by the associated connector [optional]

On each entity, the "Publishing Channels" list indicates the channels to publish the entity to.

Each connector is responsible for powering its channel. Once a connector is executed, the following information appears in the administration:

  • Batch identifier
  • Start/end time
  • Duration
  • The status of the last batch
  • The identifier of the last successful batch
  • The number of processed/failed entities on the last batch
  • Any batch errors

A successful batch does not mean that all entities have been successfully imported into this batch, but that it went to the end.

The last date is the date used to return the entities of the batch, it is fed by the connector and can be modified manually.

The action field allows you to:

  • Stop the connector ("STOP")
  • Replay entities in error ("RETRY")
  • Resynchronize all entities ("RESET")

On each entity, we find:

  • Its date of publication in the channel
  • The identifier of the last batch
  • The status of the last batch and any errors
  • The modification date

If a catalog is associated with the channel, the modified date is only updated for audited fields in the catalog. If no catalog is associated, this date is populated by the earliest date between the date of formulation, modification and creation.

The "force" field is used to force this entity during the next batch.

By default, a channel returns channel member entities whose modified date is less than or equal to the last date.

Setup

Research

It is possible to configure a specific search on a channel using the Configuration field.

The following JSON values are possible:

Example: Retrieves all products, clients, and suppliers even if they do not belong to the channel:

 {
  "query": "(@cm\\:created:[%s TO MAX] OR @cm\\:modified:[% s TO MAX]) AND(TYPE:\"bcpg:product\" OR TYPE:\"bcpg:client\" OR TYPE:\"bcpg:supplier\" )",
  "isFilter": false
}

isFilter indicates whether we are filtering channel member entities or if the channel is of search type. %s is replaced by the last date field.

Example: Filters channel member entities by including only validated products:

 {
  "query": "TYPE:\"bcpg:product\"  AND @bcpg\\:productState:\"Valid\" ",
  "isFilter": true
}

It is also possible to filter channel member entities using this syntax:

{
 "dateFilter": {
    "dateFilterField": "cm:modified",
    "dateFilterType": "Before, After, From, To, Equals",
    "dateFilterDelay": 1,
    "dateFilterDelayUnit": "Min, Hour, Day"
  },
  "versionFilter": {
    "versionFilterType": "MAJOR, MINOR, NONE"
  },
  "entityFilter": {
    "entityType": "bcpg:semiFinishedProduct",
    "criteria": {
      "assoc_bcpg_plants_added": "nodeRef"
    }
  }
}

The last date field is used in dateFilter

Connector

It is possible to pass properties to the connector using the properties parameter of the configuration JSON

{
  "properties": {
  "connector.notify.enabled": true,
  "connector.notify.from":"support@becpg.fr",
  "connector.notify.to": "support@becpg.fr",
  "remote.extra.fields": "cm:titled,cm:description",
  "remote.extra.lists": "bcpg:compoList"
  }
}

These parameters are specific to the channel. The configuration of the connector can also be done using configuration files:

Channel Configuration Parameters

connector.channel.id=id1,id2,id3 // Channels configured for this connector

Actions to be performed on the entity after the connector's execution:

connector.channel.onSuccess.action=STOP (Default empty)
connector.channel.onError.action=STOP (Default value of the channel)

JSON to be applied to the entity after the connector's execution, in remoteEntity format:

connector.channel.onSuccess.remote= {"attributes": {"bcpg:productState": "Simulation"}}
connector.channel.onError.remote=

File Sending:

connector.send.protocol=remote,sftp,ftp,ftps // One or more protocols separated by commas
connector.send.deleteOnSuccess=true // Delete files from the server after successful transmission

Remote Protocol:

Allows depositing files in a folder in beCPG

connector.send.remote.destNodeRef=9ffb8192-6a9a-4921-94fa-22d83ad43d64 // Destination

SFTP / FTP / FTPS Protocols:

Allows depositing files via SFTP

connector.send.sftp.host=
connector.send.sftp.port=
connector.send.sftp.login=
connector.send.sftp.password=
connector.send.sftp.destFolder=

For multichannel use, you need to utilize:

connector.send.{channelId}.

Notification Parameters:

Email Notification:

connector.notify.enabled=true // Active
connector.notify.from=support@becpg.fr // Email sender
connector.notify.to=support@becpg.fr // Email recipient

API

The list of entities is obtained with the LIST query.

/becpg/remote/channel/list?channelId={id} /becpg/remote/channel/list?channelNodeRef={nodeRef}

The maxResults parameter allows you to specify a number of results (-1 to have all the results)

The format parameter is used to modify the XML format of the response

format=xml (Default) format=json (Json format) The fields parameter allows to specify fields or associations to extract in the results

Architecture

The caller is responsible for updating the channels of publications and entities by following the following kinematics:

sequenceDiagram
title: Channel info retrieval and update sequence

actor Connector
participant beCPG-PLM

beCPG-PLM->+Connector: Retrieve channel info
Connector->Connector: Start batch
Connector->beCPG-PLM: Reset channel info
beCPG-PLM->Connector: List channel entities
beCPG-PLM->+Connector: Retrieve entity info
Connector->Connector: Process entity
Connector->-beCPG-PLM: Update channel info on entity
Connector->beCPG-PLM: Update channel
Connector->-Connector: End batch

Fetch information from release channel

To retrieve information from a publish channel, make an HTTP GET request to the following URL:

/alfresco/service/becpg/remote/entity?format=json&query=+TYPE:%22bp:pubChannel%22%20AND%20%3Dbp%5C:pubChannelId:%22sample-canal%22%20

This query will return the release channel information in the following form:

{
   "entity": {
     "path": "/app:company_home/cm:System/cm:Characts/bcpg:entityLists/cm:PubChannels",
     "cm:name": "Sample channel",
     "attributes": {
       "bp:pubChannelId": "sample-channel",
       "bp:pubChannelConfig": "..."
     }
   }
}

Batch start and release channel information reset

To start a batch and reset the release channel information, make an HTTP PUT request to the following URL:

/alfresco/service/becpg/remote/entity?format=json&format=json

Send the following information in the request body:

{
   "entity": {
     "path": "/app:company_home/cm:System/cm:Characts/bcpg:entityLists/cm:PubChannels",
     "type": "bp:pubChannel",
     "attributes": {
       "bp:pubChannelBatchDuration": null,
       "bp:pubChannelBatchId": 1,
       "bp:pubChannelStatus": "STARTED",
       "bp:pubChannelBatchEndTime": null,
       "bp:pubChannelBatchStartTime": 1672320000652
     },
     "bp:pubChannelId": "sample-channel"
   }
}

Note:

The dates are in timestamp format (the number of seconds since January 1, 1970).

List of publish channel entities

To get the list of entities associated with a publish channel, make an HTTP GET request to the following URL:

/alfresco/service/becpg/remote/channel/list?format=json&channelId=sample-channel

Retrieval of information from each entity

To retrieve information for a given entity, make an HTTP GET request to the following URL:

/alfresco/service/becpg/remote/entity?format=json&nodeRef={entityNodeRef}

Where {entityNodeRef} is the nodeRef of the entity to retrieve.

Updated Entity Channel Information

To update channel information on a given entity, make an HTTP POST request to the following URL:

/alfresco/service/becpg/remote/entity?format=json&nodeRef={entityNodeRef}

Send the following information in the request body:

{
   "entity": {
     "datalists": {
       "bp:pubChannelList": [
         {
           "type": "bp:pubChannelList",
           "attributes": {
             "bp:pubChannelListStatus": "COMPLETED",
             "bp:pubChannelListError": "",
             "bp:pubChannelListAction": "STOP", // It can also be an empty string or "RETRY"
             "bp:pubChannelListBatchId": 1
           },
           "bp:pubChannelListChannel": {
             "path": "/app:company_home/cm:System/cm:Characts/bcpg:entityLists/cm:PubChannels",
             "type": "bp:pubChannel",
             "bp:pubChannelId": "sample-channel"
           }
         }
       ]
     }
   }
}

Note:

  • bp:pubChannelListStatus can have values of "COMPLETED" or "FAILED."
  • bp:pubChannelListAction can be an empty string, "STOP," or "RETRY."

Updated release channel once batch complete

After the batch completes, update the release channel by making an HTTP PUT request to the following URL:

/alfresco/service/becpg/remote/entity?format=json

Send the following information in the request body:

{
   "entity": {
     "docs": [],
     "path": "/app:company_home/cm:System/cm:Characts/bcpg:entityLists/cm:PubChannels",
     "type": "bp:pubChannel",
     "attributes": {
       "bp:pubChannelLastDate": 1672320540452,
       "bp:pubChannelAction": null,
       "bp:pubChannelBatchDuration": 74,
       "bp:pubChannelBatchId": 1,
       "bp:pubChannelStatus": "COMPLETED",
       "bp:pubChannelFailCount": 1,
       "bp:pubChannelBatchEndTime": 1672320614749,
       "bp:pubChannelError": "",
       "bp:pubChannelLastSuccessBatchId": 1,
       "bp:pubChannelReadCount": 1
     },
     "bp:pubChannelId": "sample-channel"
   }
}

Note:

  • pubChannelBatchDuration is in seconds

Example using Java and Spring batch APIs


/**
 * <p>StandardBatchExecutionListener class.</p>
 *
 * @author matthieu
 * @version $Id: $Id
 */
public class StandardBatchExecutionListener extends BatchContextHolder {

    private static final String PROP_PREFIX = "connector.channel.";
    private static final String PROP_PREFIX_ON_SUCCESS = "onSuccess.";
    private static final String PROP_PREFIX_ON_ERROR = "onError.";
    private static final String PROP_ACTION = "action";
    private static final String PROP_REMOTE = "remote";

    private static Logger logger = LoggerFactory.getLogger(StandardBatchExecutionListener.class);

    private ConnectorExecutionContext executionContext;
    private JobExecution jobExecution;

    /**
     * <p>Constructor for StandardBatchExecutionListener.</p>
     *
     * @param executionContext a {@link fr.becpg.connector.service.ConnectorExecutionContext} object
     */
    public StandardBatchExecutionListener(ConnectorExecutionContext executionContext) {
        super();
        this.executionContext = executionContext;
    }

    /** {@inheritDoc} */
    @Override
    public void beforeJob(JobExecution jobExecution) {
        super.beforeJob(jobExecution);
        this.jobExecution = jobExecution;

        if (executionContext.destPath() != null) {
            File destFolder = new File(executionContext.destPath());
            if (!destFolder.exists()) {
                destFolder.mkdirs();
            }
        }

        if (executionContext.channelId() != null) {
            RemoteEntity channelEntity = ChannelAPIModel.createChannelEntity(executionContext.channelId());

            Map<String, Object> attributes = new HashMap<>();
            attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHSTARTTIME, asDate(jobExecution.getStartTime()));
            attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHENDTIME, null);
            attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHDURATION, null);

            attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHID, jobExecution.getId());
            attributes.put(ChannelAPIModel.PROP_CHANNEL_STATUS, jobExecution.getStatus());

            channelEntity.setAttributes(attributes);

            executionContext.getEntityAPI().update(channelEntity);
        }

    }

    /** {@inheritDoc} */
    @Override
    public void afterJob(JobExecution jobExecution) {
        super.afterJob(jobExecution);
        BatchContext batchContext = get();

        Map<String, Object> attributes = new HashMap<>();
        attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHENDTIME, asDate(jobExecution.getEndTime()));
        attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHDURATION,
                jobExecution.getStartTime().until(jobExecution.getEndTime(), ChronoUnit.SECONDS));

        attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHID, jobExecution.getId());
        BatchStatus status = jobExecution.getStatus();

        if (!jobExecution.getStepExecutions().isEmpty()) {

            StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
            attributes.put(ChannelAPIModel.PROP_CHANNEL_READCOUNT, stepExecution.getReadCount());
        }

        if (!ChannelAPIModel.ACTION_RETRY.equals(jobExecution.getJobParameters().getString(ConnectorService.ACTION_PARAM))) {
            if (ExitStatus.COMPLETED.equals(jobExecution.getExitStatus())) {
                attributes.put(ChannelAPIModel.PROP_CHANNEL_LASTSUCCESSBATCHID, jobExecution.getId());
                attributes.put(ChannelAPIModel.PROP_CHANNEL_LASTDATE, asDate(jobExecution.getStartTime()));

                if (executionContext.destPath() != null) {

                    LocalDateTime startDate = jobExecution.getStartTime();

                    try (BufferedWriter bw = new BufferedWriter(new FileWriter(executionContext.getLastImportDateFile()))) {
                        bw.write(DateTimeFormatter.ofPattern(BatchConfiguration.UTC_DATE_FORMAT).format(startDate));
                    } catch (IOException e) {
                        logger.error("Cannot write last import data", e);
                        jobExecution.addFailureException(e);

                    }
                }
            }
        }

        attributes.put(ChannelAPIModel.PROP_CHANNEL_ACTION, null);
        attributes.put(ChannelAPIModel.PROP_CHANNEL_STATUS, status);

        StringBuilder errors = new StringBuilder();

        if (batchContext.hasErrors() && !batchContext.getErrors().isEmpty()) {
            status = BatchStatus.FAILED;
            for (String error : batchContext.getErrors()) {
                errors.append(error);
                errors.append("\n");
            }
        }

        List<Throwable> exceptions = jobExecution.getFailureExceptions();

        for (Throwable exception : exceptions) {
            errors.append(formatExceptionMessage(exception));
            errors.append("\n");
        }

        attributes.put(ChannelAPIModel.PROP_CHANNEL_ERROR, errors.toString());
        if (batchContext.getFailedEntities() != null) {
            attributes.put(ChannelAPIModel.PROP_CHANNEL_FAILCOUNT, batchContext.getFailedEntities().size());
        } else {
            attributes.put(ChannelAPIModel.PROP_CHANNEL_FAILCOUNT, 0);
        }

        if (executionContext.channelId() != null) {
            RemoteEntity channelEntity = ChannelAPIModel.createChannelEntity(executionContext.channelId());
            channelEntity.setAttributes(attributes);
            executionContext.getEntityAPI().update(channelEntity);
        }
    }

    private Date asDate(LocalDateTime localDateTime) {
        return localDateTime!=null ? Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant()) : null;
    }

    private String formatExceptionMessage(Throwable exception) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        exception.printStackTrace(new PrintStream(baos));
        return baos.toString();
    }

    /**
     * <p>onReadError.</p>
     *
     * @param ex a {@link java.lang.Exception} object
     */
    @OnReadError
    public void onReadError(Exception ex) {
        get().addError(ex.getMessage());
    }

    /**
     * <p>onProcessError.</p>
     *
     * @param item a {@link fr.becpg.api.model.RemoteEntityRef} object
     * @param ex a {@link java.lang.Exception} object
     */
    @OnProcessError
    public void onProcessError(RemoteEntityRef item, Exception ex) {
        sendError(item.getEntity().getId(), ex.getMessage());
    }

    /**
     * <p>afterProcess.</p>
     *
     * @param item a {@link fr.becpg.api.model.RemoteEntityRef} object
     * @param root a {@link fr.becpg.connector.model.RemoteEntityItemContext} object
     */
    @AfterProcess
    public void afterProcess(RemoteEntityRef item, RemoteEntityItemContext root) {

        if (logger.isDebugEnabled()) {
            ObjectMapper objectMapper = new ObjectMapper();

            try {
                logger.debug(objectMapper.setSerializationInclusion(Include.NON_NULL).writerWithDefaultPrettyPrinter()
                        .writeValueAsString(root.getEntity()));
            } catch (JsonProcessingException e) {
                logger.error("Invalid RemoteEntity", e);
            }
        }

    }

    /**
     * <p>onWriteError.</p>
     *
     * @param ex a {@link java.lang.Exception} object
     * @param items a {@link java.util.List} object
     */
    @OnWriteError
    public void onWriteError(Exception ex, Chunk<? extends RemoteEntityItemContext> items) {
        for (RemoteEntityItemContext item : items) {
            sendError(item.getEntity().getId(), ex.getMessage());
        }
    }

    /**
     * <p>afterWrite.</p>
     *
     * @param items a {@link java.util.List} object
     */
    @AfterWrite
    public void afterWrite(Chunk<? extends RemoteEntityItemContext> items) {
        BatchContext batchContext = get();
        for (RemoteEntityItemContext item : items) {
            if (batchContext.getFailedEntities().containsKey(item.getEntity().getId())) {
                sendError(item.getEntity().getId(), batchContext.getFailedEntities().get(item.getEntity().getId()));
            } else {
                sendSuccess(item.getEntity().getId());
            }
        }

        if (logger.isInfoEnabled()) {

            Integer totalCount = (Integer) jobExecution.getExecutionContext().get(ConnectorService.TOTAL_COUNT_PARAM);

            if (totalCount != null) {
                Long writes = 0L;
                for (StepExecution step : jobExecution.getStepExecutions()) {
                    writes = writes + step.getWriteCount();
                }

                logger.info("Write {}/{} entities - [ {} %]", writes, totalCount, Math.round(((writes * 1d) / (totalCount * 1d)) * 100));
            }
        }

    }

    private void sendSuccess(String id) {
        if (executionContext.channelId() != null) {
            try {
                RemoteEntity entity = new RemoteEntity();
                entity.setId(id);
                RemoteEntity publicationChannelListItem = new RemoteEntity();
                publicationChannelListItem.setType(ChannelAPIModel.TYPE_CHANNEL_LIST);

                Map<String, Object> identifiers = new HashMap<>();
                identifiers.put(ChannelAPIModel.ASSOC_CHANNELLIST_CHANNEL, ChannelAPIModel.createChannelEntity(executionContext.channelId()));
                publicationChannelListItem.setOptionalIdentifiers(identifiers);

                Map<String, Object> attributes = new HashMap<>();
                attributes.put(ChannelAPIModel.PROP_CHANNELLIST_BATCHID, get().getBatchId());
                attributes.put(ChannelAPIModel.PROP_CHANNELLIST_PUBLISHEDDATE, new Date());
                attributes.put(ChannelAPIModel.PROP_CHANNELLIST_STATUS, ChannelAPIModel.STATUS_COMPLETED);

                attributes.put(ChannelAPIModel.PROP_CHANNELLIST_ACTION, getChannelProperty(PROP_PREFIX_ON_SUCCESS + PROP_ACTION, ""));
                attributes.put(ChannelAPIModel.PROP_CHANNELLIST_ERROR, "");

                publicationChannelListItem.setAttributes(attributes);

                Map<String, List<RemoteNodeInfo>> datalists = new HashMap<>();

                datalists.put(ChannelAPIModel.TYPE_CHANNEL_LIST, Arrays.asList(publicationChannelListItem));

                entity.setDatalists(datalists);

                String remoteJsonString = getChannelProperty(PROP_PREFIX_ON_SUCCESS + PROP_REMOTE, null);
                if (remoteJsonString != null) {
                    ObjectMapper objectMapper = new ObjectMapper();
                    entity.merge(objectMapper.readValue(remoteJsonString, RemoteEntity.class));
                }

                executionContext.getEntityAPI().update(entity);
            } catch (JsonProcessingException | RemoteAPIException e) {
                get().addError(id + " - " + e.getMessage());
                logger.error("sendSuccess error: {} -  {}", id, e.getMessage());
            }
        } else if (executionContext.errorsPath() != null) {
            File errorFile = new File(executionContext.errorsPath() + File.separator + id + ".error");
            if (errorFile.exists()) {
                try {
                    FileUtils.delete(errorFile);
                } catch (IOException e) {
                    get().addError(id + " - " + e.getMessage());
                    logger.error("Cannot delete error file for: {} - {}", id, e.getMessage());
                }
            }

        }

        get().addVisitedEntity(id);
    }

    private void sendError(String id, String error) {
        if (logger.isInfoEnabled()) {
            logger.info("Error: {}", error);
        }

        if (executionContext.channelId() != null) {
            try {
                RemoteEntity entity = new RemoteEntity();
                entity.setId(id);
                RemoteEntity publicationChannelListItem = new RemoteEntity();
                publicationChannelListItem.setType(ChannelAPIModel.TYPE_CHANNEL_LIST);

                Map<String, Object> identifiers = new HashMap<>();
                identifiers.put(ChannelAPIModel.ASSOC_CHANNELLIST_CHANNEL, ChannelAPIModel.createChannelEntity(executionContext.channelId()));
                publicationChannelListItem.setOptionalIdentifiers(identifiers);

                Map<String, Object> attributes = new HashMap<>();
                attributes.put(ChannelAPIModel.PROP_CHANNELLIST_BATCHID, get().getBatchId());
                attributes.put(ChannelAPIModel.PROP_CHANNELLIST_STATUS, ChannelAPIModel.STATUS_FAILED);
                attributes.put(ChannelAPIModel.PROP_CHANNELLIST_ERROR, error);

                String action = getChannelProperty(PROP_PREFIX_ON_ERROR + PROP_ACTION, null);

                if (action != null) {
                    attributes.put(ChannelAPIModel.PROP_CHANNELLIST_ACTION, action);
                }

                publicationChannelListItem.setAttributes(attributes);

                Map<String, List<RemoteNodeInfo>> datalists = new HashMap<>();

                datalists.put(ChannelAPIModel.TYPE_CHANNEL_LIST, Arrays.asList(publicationChannelListItem));

                entity.setDatalists(datalists);

                String remoteJsonString = getChannelProperty(PROP_PREFIX_ON_ERROR + PROP_REMOTE, null);
                if (remoteJsonString != null) {
                    ObjectMapper objectMapper = new ObjectMapper();
                    entity.merge(objectMapper.readValue(remoteJsonString, RemoteEntity.class));
                }

                executionContext.getEntityAPI().update(entity);
            } catch (RemoteAPIException | JsonProcessingException e) {
                get().addError(id + " - " + error);
                logger.error("sendError error: {} - {} - {}", id, e.getMessage(), error);
            }
        } else if (executionContext.errorsPath() != null) {
            File destFolder = new File(executionContext.errorsPath());
            if (!destFolder.exists()) {
                destFolder.mkdirs();
            }

            try (BufferedWriter writer = new BufferedWriter(new FileWriter(executionContext.errorsPath() + File.separator + id + ".error"))) {
                writer.write(error);
            } catch (IOException e) {
                get().addError(id + " - " + error);
                logger.error("Cannot write error: {} - {} - {}", id, e.getMessage(), error);
            }

        }
        get().addFailedEntity(id, error);

    }

    private String getChannelProperty(String key, String defaultValue) {
        String ret = defaultValue;

        if (executionContext.getProperty(PROP_PREFIX + executionContext.channelId() + "." + key) != null) {
            ret = executionContext.getProperty(PROP_PREFIX + executionContext.channelId() + "." + key);
        } else if (executionContext.getProperty(PROP_PREFIX + key) != null) {
            ret = executionContext.getProperty(PROP_PREFIX + key);
        }
        return ret;
    }

}

results matching ""

    No results matching ""