Publication channels

The publication channels make it possible to feed and maintain a queue of entities to be processed.

A channel is defined in the administration and has the following fields:

  • A name (Display name)
  • An identifier (unique identifier of the channel used by the connectors)
  • A catalog identifier (allows you to indicate a catalog associated with the channel) [optional]
  • A configuration JSON [optional]
  • A configuration file used by the associated connector [optional]

On each entity, the "Publishing Channels" list indicates the channels to publish the entity to.

Each connector is responsible for powering its channel. Once a connector is executed, the following information appears in the administration:

  • Batch identifier
  • Start/end time
  • Duration
  • The status of the last batch
  • The identifier of the last successful batch
  • The number of processed/failed entities on the last batch
  • Any batch errors

A successful batch does not mean that all entities have been successfully imported into this batch, but that it went to the end.

The last date is the date used to return the entities of the batch, it is fed by the connector and can be modified manually.

The action field allows you to:

  • Stop the connector ("STOP")
  • Replay entities in error ("RETRY")
  • Resynchronize all entities ("RESET")

On each entity, we find:

  • Its date of publication in the channel
  • The identifier of the last batch
  • The status of the last batch and any errors
  • The modification date

If a catalog is associated with the channel, the modified date is only updated for audited fields in the catalog. If no catalog is associated, this date is populated by the earliest date between the date of formulation, modification and creation.

The "force" field is used to force this entity during the next batch.

By default, a channel returns channel member entities whose modified date is less than or equal to the last date.

Setup

Research

It is possible to configure a specific search on a channel using the Configuration field.

The following JSON values are possible:

  {
   "query": "(@cm\\: created: [% s TO MAX] OR @cm\\: modified: [% s TO MAX]) AND(TYPE: \"bcpg:product\" OR TYPE:\" bcpg:customer\" OR TYPE:\"bcpg:supplier\" )",
}

%s is replaced by the last date field

Or

{
  "dateFilter": {
     "dateFilterField": "cm:modified",
     "dateFilterType": "Before, After, From, To, Equals",
     "dateFilterDelay": 1,
     "dateFilterDelayUnit": "Min, Hour, Day"
   },
   "versionFilter": {
     "versionFilterType": "MAJOR, MINOR, NONE"
   },
   "entityFilter": {
     "entityType": "bcpg:semiFinishedProduct",
     "criteria": {
       "assoc_bcpg_plants_added": "nodeRef"
     }
   }
}

The last date field is used in dateFilter

Connector

It is possible to pass properties to the connector using the properties parameter of the configuration JSON

{
  "properties": {
"connector.notify.enabled": true,
"connector.notify.from":"support@becpg.fr",
"connector.notify.to": "support@becpg.fr",
"remote.extra.fields": "cm:titled,cm:description",
"remote.extra.lists": "bcpg:compoList"
  }
}

API

The list of entities is obtained with the LIST query.

/becpg/remote/channel/list?channelId={id} /becpg/remote/channel/list?channelNodeRef={nodeRef}

The maxResults parameter allows you to specify a number of results (-1 to have all the results)

The format parameter is used to modify the XML format of the response

format=xml (Default) format=json (Json format) The fields parameter allows to specify fields or associations to extract in the results

Architecture

The caller is responsible for updating the channels of publications and entities by following the following kinematics:

                          +-------------------+
                          |                   |
                          |  Retrieve         |
                          |  channel info     |
                          |                   |
                          +--------+----------+
                                   |
                                   |
                          +--------v----------+
                          |                   |
                          |  Start batch and  |
                          |  reset channel    |
                          |  info             |
                          |                   |
                          +--------+----------+
                                   |
                                   |
                          +--------v----------+
                          |                   |
                          |  List channel     |
                          |  entities         |
                          |                   |
                          +--------+----------+
                                   |
                                   |
                  +----------------v--------------------------------+
                  |                                                 |
                  |                                                 |
                  |  +-------------------+   +-------------------+  |
                  |  |                   |   |                   |  |
                  |  |  Retrieve         |   |  Retrieve         |  |
                  |  |  entity info      |   |  entity info      |  |
                  |  |                   |   |                   |  |
                  |  +--------+----------+   +--------+----------+  |
                  |           |                       |             |
                  |           |                       |             |
                  |  +--------v----------+   +--------v----------+  |
                  |  |                   |   |                   |  |
                  |  |  Process entity   |   |  Process entity   |  |
                  |  |                   |   |                   |  |
                  |  +-------------------+   +-------------------+  |
                  |           |                      |              |
                  |           |                      |              |
                  |  +--------v----------+   +--------v----------+  |
                  |  |                   |   |                   |  |
                  |  |  Update channel   |   |  Update channel   |  |
                  |  |  info on entity   |   |  info on entity   |  |
                  |  |                   |   |                   |  |
                  |  +-------------------+   +-------------------+  |
                  |                                                 |
                  |                                                 |
                  -----------------^------------------------------- +
                                   |
                                   |
                          +--------v----------+
                          |                   |
                          |  Update channel   |
                          |                   |
                          +-------------------+

Fetch information from release channel

To retrieve information from a publish channel, make an HTTP GET request to the following URL:

/alfresco/service/becpg/remote/entity?format=json&query=+TYPE:%22bp:pubChannel%22%20AND%20%3Dbp%5C:pubChannelId:%22sample-canal%22%20

This query will return the release channel information in the following form:

{
   "entity": {
     "path": "/app:company_home/cm:System/cm:Characts/bcpg:entityLists/cm:PubChannels",
     "cm:name": "Sample channel",
     "attributes": {
       "bp:pubChannelId": "sample-channel",
       "bp:pubChannelConfig": "..."
     }
   }
}

Batch start and release channel information reset

To start a batch and reset the release channel information, make an HTTP PUT request to the following URL:

/alfresco/service/becpg/remote/entity?format=json&format=json

Send the following information in the request body:

{
   "entity": {
     "path": "/app:company_home/cm:System/cm:Characts/bcpg:entityLists/cm:PubChannels",
     "type": "bp:pubChannel",
     "attributes": {
       "bp:pubChannelBatchDuration": null,
       "bp:pubChannelBatchId": 1,
       "bp:pubChannelStatus": "STARTED",
       "bp:pubChannelBatchEndTime": null,
       "bp:pubChannelBatchStartTime": 1672320000652
     },
     "bp:pubChannelId": "sample-channel"
   }
}

List of publish channel entities

To get the list of entities associated with a publish channel, make an HTTP GET request to the following URL:

/alfresco/service/becpg/remote/channel/list?format=json&channelId=sample-channel

Retrieval of information from each entity

To retrieve information for a given entity, make an HTTP GET request to the following URL:

/alfresco/service/becpg/remote/entity?format=json&nodeRef={entityNodeRef}

Where {entityNodeRef} is the nodeRef of the entity to retrieve.

Updated Entity Channel Information

To update channel information on a given entity, make an HTTP POST request to the following URL:

/alfresco/service/becpg/remote/entity?format=json&nodeRef={entityNodeRef}

Send the following information in the request body:

{
   "entity": {
     "datalists": {
       "bp:pubChannelList": [
         {
           "type": "bp:pubChannelList",
           "attributes": {
             "bp:pubChannellListStatus": "SUCCESS",
             "bp:pubChannellListError": "",
             "bp:pubChannelListBatchId": 1
           },
           "bp:pubChannelListChannel": {
             "path": "/app:company_home/cm:System/cm:Characts/bcpg:entityLists/cm:PubChannels",
             "type": "bp:pubChannel",
             "bp:pubChannelId": "sample-channel"
           }
         }
       ]
     }
   }
}

Updated release channel once batch complete

After the batch completes, update the release channel by making an HTTP PUT request to the following URL:

/alfresco/service/becpg/remote/entity?format=json

Send the following information in the request body:

{
   "entity": {
     "docs": [],
     "path": "/app:company_home/cm:System/cm:Characts/bcpg:entityLists/cm:PubChannels",
     "type": "bp:pubChannel",
     "attributes": {
       "bp:pubChannelLastDate": 1672320540452,
       "bp:pubChannelAction": null,
       "bp:pubChannelBatchDuration": 74,
       "bp:pubChannelBatchId": 1,
       "bp:pubChannelStatus": "COMPLETED",
       "bp:pubChannelFailCount": 1,
       "bp:pubChannelBatchEndTime": 1672320614749,
       "bp:pubChannelError": "",
       "bp:pubChannelLastSuccessBatchId": 1,
       "bp:pubChannelReadCount": 1
     },
     "bp:pubChannelId": "sample-channel"
   }
}

Example using Java and Spring batch APIs


/**
 * <p>StandardBatchExecutionListener class.</p>
 *
 * @author matthieu
 * @version $Id: $Id
 */
public class StandardBatchExecutionListener extends BatchContextHolder {

    private static Logger logger = LoggerFactory.getLogger(StandardBatchExecutionListener.class);

    private BatchConfiguration batchConfiguration;
    private JobExecution jobExecution;

    /**
     * <p>Constructor for StandardBatchExecutionListener.</p>
     *
     * @param batchConfiguration a {@link fr.becpg.connector.service.BatchConfiguration} object
     */
    public StandardBatchExecutionListener(BatchConfiguration batchConfiguration) {
        super();
        this.batchConfiguration = batchConfiguration;
    }

    /** {@inheritDoc} */
    @Override
    public void beforeJob(JobExecution jobExecution) {
        super.beforeJob(jobExecution);
        this.jobExecution = jobExecution;

        if (batchConfiguration.destPath() != null) {
            File destFolder = new File(batchConfiguration.destPath());
            if (!destFolder.exists()) {
                destFolder.mkdirs();
            }
        }

        if (batchConfiguration.channelId() != null) {
            RemoteEntity channelEntity = ChannelAPIModel.createChannelEntity(batchConfiguration.channelId());

            Map<String, Object> attributes = new HashMap<>();
            attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHSTARTTIME, jobExecution.getStartTime());
            attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHENDTIME, null);
            attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHDURATION, null);

            attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHID, jobExecution.getId());
            attributes.put(ChannelAPIModel.PROP_CHANNEL_STATUS, jobExecution.getStatus());

            channelEntity.setAttributes(attributes);

            batchConfiguration.getEntityAPI().update(channelEntity);
        }

    }

    /** {@inheritDoc} */
    @Override
    public void afterJob(JobExecution jobExecution) {
        super.afterJob(jobExecution);
        BatchContext batchContext = get();

        Map<String, Object> attributes = new HashMap<>();
        attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHENDTIME, jobExecution.getEndTime());
        attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHDURATION, (jobExecution.getEndTime().getTime() - jobExecution.getStartTime().getTime())/1000);
        attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHID, jobExecution.getId());
        BatchStatus status = jobExecution.getStatus();

        if (!jobExecution.getStepExecutions().isEmpty()) {

            StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
            attributes.put(ChannelAPIModel.PROP_CHANNEL_READCOUNT, stepExecution.getReadCount());
        }

        if(!ChannelAPIModel.ACTION_RETRY.equals(jobExecution.getJobParameters().getString(BeCPGConnectorService.ACTION_PARAM))) {
            if (ExitStatus.COMPLETED.equals(jobExecution.getExitStatus())) {
                attributes.put(ChannelAPIModel.PROP_CHANNEL_LASTSUCCESSBATCHID, jobExecution.getId());
                attributes.put(ChannelAPIModel.PROP_CHANNEL_LASTDATE, jobExecution.getStartTime());

                if ( batchConfiguration.destPath() != null) {

                    DateFormat dateFormat = new SimpleDateFormat(BatchConfiguration.UTC_DATE_FORMAT);
                    dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));

                    Date startDate = jobExecution.getStartTime();

                    try (BufferedWriter bw = new BufferedWriter(new FileWriter(batchConfiguration.getLastImportDateFile()))) {
                        bw.write(dateFormat.format(startDate));
                    } catch (IOException e) {
                        logger.error("Cannot write last import data", e);
                        jobExecution.addFailureException(e);

                    }
                }
            }
        }

        attributes.put(ChannelAPIModel.PROP_CHANNEL_ACTION, null);
        attributes.put(ChannelAPIModel.PROP_CHANNEL_STATUS, status);

        StringBuilder errors = new StringBuilder();

        if (batchContext.hasErrors()) {
            if (!batchContext.getErrors().isEmpty()) {
                status = BatchStatus.FAILED;
                for (String error : batchContext.getErrors()) {
                    errors.append(error);
                    errors.append("\n");
                }
            }
        }
        List<Throwable> exceptions = jobExecution.getFailureExceptions();

        for (Throwable exception : exceptions) {
            errors.append(formatExceptionMessage(exception));
            errors.append("\n");
        }

        attributes.put(ChannelAPIModel.PROP_CHANNEL_ERROR, errors.toString());
        if (batchContext.getFailedEntities() != null) {
            attributes.put(ChannelAPIModel.PROP_CHANNEL_FAILCOUNT, batchContext.getFailedEntities().size());
        } else {
            attributes.put(ChannelAPIModel.PROP_CHANNEL_FAILCOUNT, 0);
        }

        if (batchConfiguration.channelId() != null) {

            RemoteEntity channelEntity = ChannelAPIModel.createChannelEntity(batchConfiguration.channelId());
            channelEntity.setAttributes(attributes);

            batchConfiguration.getEntityAPI().update(channelEntity);
        }
    }

    private String formatExceptionMessage(Throwable exception) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        exception.printStackTrace(new PrintStream(baos));
        return baos.toString();
    }

    /**
     * <p>onReadError.</p>
     *
     * @param ex a {@link java.lang.Exception} object
     */
    @OnReadError
    public void onReadError(Exception ex) {
        get().addError(ex.getMessage());
    }

    /**
     * <p>onProcessError.</p>
     *
     * @param item a {@link fr.becpg.api.model.RemoteEntityRef} object
     * @param ex a {@link java.lang.Exception} object
     */
    @OnProcessError
    public void onProcessError(RemoteEntityRef item, Exception ex) {
        sendError(item.getEntity().getId(), ex.getMessage());
    }

    /**
     * <p>afterProcess.</p>
     *
     * @param item a {@link fr.becpg.api.model.RemoteEntityRef} object
     * @param root a {@link fr.becpg.connector.model.RemoteEntityItemContext} object
     */
    @AfterProcess
    public void afterProcess(RemoteEntityRef item, RemoteEntityItemContext root) {

        if (logger.isDebugEnabled()) {
            ObjectMapper objectMapper = new ObjectMapper();

            try {
                logger.debug(objectMapper.setSerializationInclusion(Include.NON_NULL).writerWithDefaultPrettyPrinter()
                        .writeValueAsString(root.getEntity()));
            } catch (JsonProcessingException e) {
                logger.error("Invalid RemoteEntity", e);
            }
        }

    }

    /**
     * <p>onWriteError.</p>
     *
     * @param ex a {@link java.lang.Exception} object
     * @param items a {@link java.util.List} object
     */
    @OnWriteError
    public void onWriteError(Exception ex, List<? extends RemoteEntityItemContext> items) {
        for (RemoteEntityItemContext item : items) {
            sendError(item.getEntity().getId(), ex.getMessage());
        }
    }

    /**
     * <p>afterWrite.</p>
     *
     * @param items a {@link java.util.List} object
     */
    @AfterWrite
    public void afterWrite(List<? extends RemoteEntityItemContext> items) {
        BatchContext batchContext = get();
        for (RemoteEntityItemContext item : items) {
            if (batchContext.getFailedEntities().containsKey(item.getEntity().getId())) {
                sendError(item.getEntity().getId(), batchContext.getFailedEntities().get(item.getEntity().getId()));
            } else {
                sendSuccess(item.getEntity().getId());
            }
        }

        if (logger.isInfoEnabled()) {

            Integer totalCount = (Integer) jobExecution.getExecutionContext().get(BeCPGConnectorService.TOTAL_COUNT_PARAM);

            if (totalCount != null) {
                Integer reads = 0;
                for (StepExecution step : jobExecution.getStepExecutions()) {
                    reads = reads + step.getReadCount();
                }

                logger.info("Write {}/{} entities - [ {} %]", reads,totalCount, Math.round((reads * 1d) / (totalCount * 1d) * 100) );
            }
        }

    }

    private void sendSuccess(String id) {
        if (batchConfiguration.channelId() != null) {
            try {
                RemoteEntity entity = new RemoteEntity();
                entity.setId(id);
                RemoteEntity publicationChannelListItem = new RemoteEntity();
                publicationChannelListItem.setType(ChannelAPIModel.TYPE_CHANNEL_LIST);

                Map<String, Object> identifiers = new HashMap<>();
                identifiers.put(ChannelAPIModel.ASSOC_CHANNELLIST_CHANNEL, ChannelAPIModel.createChannelEntity(batchConfiguration.channelId()));
                publicationChannelListItem.setOptionalIdentifiers(identifiers);

                Map<String, Object> attributes = new HashMap<>();
                attributes.put(ChannelAPIModel.PROP_CHANNELLIST_BATCHID, get().getBatchId());
                attributes.put(ChannelAPIModel.PROP_CHANNELLIST_PUBLISHEDDATE, new Date());
                attributes.put(ChannelAPIModel.PROP_CHANNELLIST_STATUS, ChannelAPIModel.STATUS_COMPLETED);
                attributes.put(ChannelAPIModel.PROP_CHANNELLIST_ERROR, "");

                publicationChannelListItem.setAttributes(attributes);

                Map<String, List<RemoteNodeInfo>> datalists = new HashMap<>();

                datalists.put(ChannelAPIModel.TYPE_CHANNEL_LIST, Arrays.asList(publicationChannelListItem));

                entity.setDatalists(datalists);

                batchConfiguration.getEntityAPI().update(entity);
            } catch (RemoteAPIException e) {
                get().addError(id + " - " + e.getMessage());
                logger.error("sendSuccess error: {} -  {}" ,id, e.getMessage());
            }
        }

        get().addVisitedEntity(id);

    }

    private void sendError(String id, String error) {
        if (logger.isInfoEnabled()) {
            logger.info("Error: {}" , error);
        }

        if (batchConfiguration.channelId() != null) {
            try {
                RemoteEntity entity = new RemoteEntity();
                entity.setId(id);
                RemoteEntity publicationChannelListItem = new RemoteEntity();
                publicationChannelListItem.setType(ChannelAPIModel.TYPE_CHANNEL_LIST);

                Map<String, Object> identifiers = new HashMap<>();
                identifiers.put(ChannelAPIModel.ASSOC_CHANNELLIST_CHANNEL, ChannelAPIModel.createChannelEntity(batchConfiguration.channelId()));
                publicationChannelListItem.setOptionalIdentifiers(identifiers);

                Map<String, Object> attributes = new HashMap<>();
                attributes.put(ChannelAPIModel.PROP_CHANNELLIST_BATCHID, get().getBatchId());
                attributes.put(ChannelAPIModel.PROP_CHANNELLIST_STATUS, ChannelAPIModel.STATUS_FAILED);
                attributes.put(ChannelAPIModel.PROP_CHANNELLIST_ERROR, error);

                publicationChannelListItem.setAttributes(attributes);

                Map<String, List<RemoteNodeInfo>> datalists = new HashMap<>();

                datalists.put(ChannelAPIModel.TYPE_CHANNEL_LIST, Arrays.asList(publicationChannelListItem));

                entity.setDatalists(datalists);

                batchConfiguration.getEntityAPI().update(entity);
            } catch (RemoteAPIException e) {
                get().addError(id + " - " + error);
                logger.error("sendError error: {} - {} - {}" ,id, e.getMessage(), error);
            }
        }
        get().addFailedEntity(id, error);

    }

}

results matching ""

    No results matching ""