Publication channels
The publication channels make it possible to feed and maintain a queue of entities to be processed.
A channel is defined in the administration and has the following fields:
- A name (Display name)
- An identifier (unique identifier of the channel used by the connectors)
- A catalog identifier (allows you to indicate a catalog associated with the channel) [optional]
- A configuration JSON [optional]
- A configuration file used by the associated connector [optional]
On each entity, the "Publishing Channels" list indicates the channels to publish the entity to.
Each connector is responsible for powering its channel. Once a connector is executed, the following information appears in the administration:
- Batch identifier
- Start/end time
- Duration
- The status of the last batch
- The identifier of the last successful batch
- The number of processed/failed entities on the last batch
- Any batch errors
A successful batch does not mean that all entities have been successfully imported into this batch, but that it went to the end.
The last date is the date used to return the entities of the batch, it is fed by the connector and can be modified manually.
The action field allows you to:
- Stop the connector ("STOP")
- Replay entities in error ("RETRY")
- Resynchronize all entities ("RESET")
On each entity, we find:
- Its date of publication in the channel
- The identifier of the last batch
- The status of the last batch and any errors
- The modification date
If a catalog is associated with the channel, the modified date is only updated for audited fields in the catalog. If no catalog is associated, this date is populated by the earliest date between the date of formulation, modification and creation.
The "force" field is used to force this entity during the next batch.
By default, a channel returns channel member entities whose modified date is less than or equal to the last date.
Setup
Research
It is possible to configure a specific search on a channel using the Configuration field.
The following JSON values are possible:
Example: Retrieves all products, clients, and suppliers even if they do not belong to the channel:
{
"query": "(@cm\\:created:[%s TO MAX] OR @cm\\:modified:[% s TO MAX]) AND(TYPE:\"bcpg:product\" OR TYPE:\"bcpg:client\" OR TYPE:\"bcpg:supplier\" )",
"isFilter": false
}
isFilter indicates whether we are filtering channel member entities or if the channel is of search type. %s is replaced by the last date field.
Example: Filters channel member entities by including only validated products:
{
"query": "TYPE:\"bcpg:product\" AND @bcpg\\:productState:\"Valid\" ",
"isFilter": true
}
It is also possible to filter channel member entities using this syntax:
{
"dateFilter": {
"dateFilterField": "cm:modified",
"dateFilterType": "Before, After, From, To, Equals",
"dateFilterDelay": 1,
"dateFilterDelayUnit": "Min, Hour, Day"
},
"versionFilter": {
"versionFilterType": "MAJOR, MINOR, NONE"
},
"entityFilter": {
"entityType": "bcpg:semiFinishedProduct",
"criteria": {
"assoc_bcpg_plants_added": "nodeRef"
}
}
}
The last date field is used in dateFilter
Connector
It is possible to pass properties to the connector using the properties parameter of the configuration JSON
{
"properties": {
"connector.notify.enabled": true,
"connector.notify.from":"support@becpg.fr",
"connector.notify.to": "support@becpg.fr",
"remote.extra.fields": "cm:titled,cm:description",
"remote.extra.lists": "bcpg:compoList"
}
}
These parameters are specific to the channel. The configuration of the connector can also be done using configuration files:
Channel Configuration Parameters
connector.channel.id=id1,id2,id3 // Channels configured for this connector
Actions to be performed on the entity after the connector's execution:
connector.channel.onSuccess.action=STOP (Default empty)
connector.channel.onError.action=STOP (Default value of the channel)
JSON to be applied to the entity after the connector's execution, in remoteEntity format:
connector.channel.onSuccess.remote= {"attributes": {"bcpg:productState": "Simulation"}}
connector.channel.onError.remote=
File Sending:
connector.send.protocol=remote,sftp,ftp,ftps // One or more protocols separated by commas
connector.send.deleteOnSuccess=true // Delete files from the server after successful transmission
Remote Protocol:
Allows depositing files in a folder in beCPG
connector.send.remote.destNodeRef=9ffb8192-6a9a-4921-94fa-22d83ad43d64 // Destination
SFTP / FTP / FTPS Protocols:
Allows depositing files via SFTP
connector.send.sftp.host=
connector.send.sftp.port=
connector.send.sftp.login=
connector.send.sftp.password=
connector.send.sftp.destFolder=
For multichannel use, you need to utilize:
connector.send.{channelId}.
Notification Parameters:
Email Notification:
connector.notify.enabled=true // Active
connector.notify.from=support@becpg.fr // Email sender
connector.notify.to=support@becpg.fr // Email recipient
API
The list of entities is obtained with the LIST query.
The maxResults parameter allows you to specify a number of results (-1 to have all the results)
The format parameter is used to modify the XML format of the response
format=xml (Default) format=json (Json format) The fields parameter allows to specify fields or associations to extract in the results
Architecture
The caller is responsible for updating the channels of publications and entities by following the following kinematics:
sequenceDiagram
title: Channel info retrieval and update sequence
actor Connector
participant beCPG-PLM
beCPG-PLM->+Connector: Retrieve channel info
Connector->Connector: Start batch
Connector->beCPG-PLM: Reset channel info
beCPG-PLM->Connector: List channel entities
beCPG-PLM->+Connector: Retrieve entity info
Connector->Connector: Process entity
Connector->-beCPG-PLM: Update channel info on entity
Connector->beCPG-PLM: Update channel
Connector->-Connector: End batch
Fetch information from release channel
To retrieve information from a publish channel, make an HTTP GET request to the following URL:
/alfresco/service/becpg/remote/entity?format=json&query=+TYPE:%22bp:pubChannel%22%20AND%20%3Dbp%5C:pubChannelId:%22sample-canal%22%20
This query will return the release channel information in the following form:
{
"entity": {
"path": "/app:company_home/cm:System/cm:Characts/bcpg:entityLists/cm:PubChannels",
"cm:name": "Sample channel",
"attributes": {
"bp:pubChannelId": "sample-channel",
"bp:pubChannelConfig": "..."
}
}
}
Batch start and release channel information reset
To start a batch and reset the release channel information, make an HTTP PUT request to the following URL:
/alfresco/service/becpg/remote/entity?format=json&format=json
Send the following information in the request body:
{
"entity": {
"path": "/app:company_home/cm:System/cm:Characts/bcpg:entityLists/cm:PubChannels",
"type": "bp:pubChannel",
"attributes": {
"bp:pubChannelBatchDuration": null,
"bp:pubChannelBatchId": 1,
"bp:pubChannelStatus": "STARTED",
"bp:pubChannelBatchEndTime": null,
"bp:pubChannelBatchStartTime": 1672320000652
},
"bp:pubChannelId": "sample-channel"
}
}
List of publish channel entities
To get the list of entities associated with a publish channel, make an HTTP GET request to the following URL:
/alfresco/service/becpg/remote/channel/list?format=json&channelId=sample-channel
Retrieval of information from each entity
To retrieve information for a given entity, make an HTTP GET request to the following URL:
/alfresco/service/becpg/remote/entity?format=json&nodeRef={entityNodeRef}
Where {entityNodeRef} is the nodeRef of the entity to retrieve.
Updated Entity Channel Information
To update channel information on a given entity, make an HTTP POST request to the following URL:
/alfresco/service/becpg/remote/entity?format=json&nodeRef={entityNodeRef}
Send the following information in the request body:
{
"entity": {
"datalists": {
"bp:pubChannelList": [
{
"type": "bp:pubChannelList",
"attributes": {
"bp:pubChannellListStatus": "SUCCESS",
"bp:pubChannellListError": "",
"bp:pubChannelListBatchId": 1
},
"bp:pubChannelListChannel": {
"path": "/app:company_home/cm:System/cm:Characts/bcpg:entityLists/cm:PubChannels",
"type": "bp:pubChannel",
"bp:pubChannelId": "sample-channel"
}
}
]
}
}
}
Updated release channel once batch complete
After the batch completes, update the release channel by making an HTTP PUT request to the following URL:
/alfresco/service/becpg/remote/entity?format=json
Send the following information in the request body:
{
"entity": {
"docs": [],
"path": "/app:company_home/cm:System/cm:Characts/bcpg:entityLists/cm:PubChannels",
"type": "bp:pubChannel",
"attributes": {
"bp:pubChannelLastDate": 1672320540452,
"bp:pubChannelAction": null,
"bp:pubChannelBatchDuration": 74,
"bp:pubChannelBatchId": 1,
"bp:pubChannelStatus": "COMPLETED",
"bp:pubChannelFailCount": 1,
"bp:pubChannelBatchEndTime": 1672320614749,
"bp:pubChannelError": "",
"bp:pubChannelLastSuccessBatchId": 1,
"bp:pubChannelReadCount": 1
},
"bp:pubChannelId": "sample-channel"
}
}
Example using Java and Spring batch APIs
/**
* <p>StandardBatchExecutionListener class.</p>
*
* @author matthieu
* @version $Id: $Id
*/
public class StandardBatchExecutionListener extends BatchContextHolder {
private static Logger logger = LoggerFactory.getLogger(StandardBatchExecutionListener.class);
private BatchConfiguration batchConfiguration;
private JobExecution jobExecution;
/**
* <p>Constructor for StandardBatchExecutionListener.</p>
*
* @param batchConfiguration a {@link fr.becpg.connector.service.BatchConfiguration} object
*/
public StandardBatchExecutionListener(BatchConfiguration batchConfiguration) {
super();
this.batchConfiguration = batchConfiguration;
}
/** {@inheritDoc} */
@Override
public void beforeJob(JobExecution jobExecution) {
super.beforeJob(jobExecution);
this.jobExecution = jobExecution;
if (batchConfiguration.destPath() != null) {
File destFolder = new File(batchConfiguration.destPath());
if (!destFolder.exists()) {
destFolder.mkdirs();
}
}
if (batchConfiguration.channelId() != null) {
RemoteEntity channelEntity = ChannelAPIModel.createChannelEntity(batchConfiguration.channelId());
Map<String, Object> attributes = new HashMap<>();
attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHSTARTTIME, jobExecution.getStartTime());
attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHENDTIME, null);
attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHDURATION, null);
attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHID, jobExecution.getId());
attributes.put(ChannelAPIModel.PROP_CHANNEL_STATUS, jobExecution.getStatus());
channelEntity.setAttributes(attributes);
batchConfiguration.getEntityAPI().update(channelEntity);
}
}
/** {@inheritDoc} */
@Override
public void afterJob(JobExecution jobExecution) {
super.afterJob(jobExecution);
BatchContext batchContext = get();
Map<String, Object> attributes = new HashMap<>();
attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHENDTIME, jobExecution.getEndTime());
attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHDURATION, (jobExecution.getEndTime().getTime() - jobExecution.getStartTime().getTime())/1000);
attributes.put(ChannelAPIModel.PROP_CHANNEL_BATCHID, jobExecution.getId());
BatchStatus status = jobExecution.getStatus();
if (!jobExecution.getStepExecutions().isEmpty()) {
StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
attributes.put(ChannelAPIModel.PROP_CHANNEL_READCOUNT, stepExecution.getReadCount());
}
if(!ChannelAPIModel.ACTION_RETRY.equals(jobExecution.getJobParameters().getString(BeCPGConnectorService.ACTION_PARAM))) {
if (ExitStatus.COMPLETED.equals(jobExecution.getExitStatus())) {
attributes.put(ChannelAPIModel.PROP_CHANNEL_LASTSUCCESSBATCHID, jobExecution.getId());
attributes.put(ChannelAPIModel.PROP_CHANNEL_LASTDATE, jobExecution.getStartTime());
if ( batchConfiguration.destPath() != null) {
DateFormat dateFormat = new SimpleDateFormat(BatchConfiguration.UTC_DATE_FORMAT);
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
Date startDate = jobExecution.getStartTime();
try (BufferedWriter bw = new BufferedWriter(new FileWriter(batchConfiguration.getLastImportDateFile()))) {
bw.write(dateFormat.format(startDate));
} catch (IOException e) {
logger.error("Cannot write last import data", e);
jobExecution.addFailureException(e);
}
}
}
}
attributes.put(ChannelAPIModel.PROP_CHANNEL_ACTION, null);
attributes.put(ChannelAPIModel.PROP_CHANNEL_STATUS, status);
StringBuilder errors = new StringBuilder();
if (batchContext.hasErrors()) {
if (!batchContext.getErrors().isEmpty()) {
status = BatchStatus.FAILED;
for (String error : batchContext.getErrors()) {
errors.append(error);
errors.append("\n");
}
}
}
List<Throwable> exceptions = jobExecution.getFailureExceptions();
for (Throwable exception : exceptions) {
errors.append(formatExceptionMessage(exception));
errors.append("\n");
}
attributes.put(ChannelAPIModel.PROP_CHANNEL_ERROR, errors.toString());
if (batchContext.getFailedEntities() != null) {
attributes.put(ChannelAPIModel.PROP_CHANNEL_FAILCOUNT, batchContext.getFailedEntities().size());
} else {
attributes.put(ChannelAPIModel.PROP_CHANNEL_FAILCOUNT, 0);
}
if (batchConfiguration.channelId() != null) {
RemoteEntity channelEntity = ChannelAPIModel.createChannelEntity(batchConfiguration.channelId());
channelEntity.setAttributes(attributes);
batchConfiguration.getEntityAPI().update(channelEntity);
}
}
private String formatExceptionMessage(Throwable exception) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
exception.printStackTrace(new PrintStream(baos));
return baos.toString();
}
/**
* <p>onReadError.</p>
*
* @param ex a {@link java.lang.Exception} object
*/
@OnReadError
public void onReadError(Exception ex) {
get().addError(ex.getMessage());
}
/**
* <p>onProcessError.</p>
*
* @param item a {@link fr.becpg.api.model.RemoteEntityRef} object
* @param ex a {@link java.lang.Exception} object
*/
@OnProcessError
public void onProcessError(RemoteEntityRef item, Exception ex) {
sendError(item.getEntity().getId(), ex.getMessage());
}
/**
* <p>afterProcess.</p>
*
* @param item a {@link fr.becpg.api.model.RemoteEntityRef} object
* @param root a {@link fr.becpg.connector.model.RemoteEntityItemContext} object
*/
@AfterProcess
public void afterProcess(RemoteEntityRef item, RemoteEntityItemContext root) {
if (logger.isDebugEnabled()) {
ObjectMapper objectMapper = new ObjectMapper();
try {
logger.debug(objectMapper.setSerializationInclusion(Include.NON_NULL).writerWithDefaultPrettyPrinter()
.writeValueAsString(root.getEntity()));
} catch (JsonProcessingException e) {
logger.error("Invalid RemoteEntity", e);
}
}
}
/**
* <p>onWriteError.</p>
*
* @param ex a {@link java.lang.Exception} object
* @param items a {@link java.util.List} object
*/
@OnWriteError
public void onWriteError(Exception ex, List<? extends RemoteEntityItemContext> items) {
for (RemoteEntityItemContext item : items) {
sendError(item.getEntity().getId(), ex.getMessage());
}
}
/**
* <p>afterWrite.</p>
*
* @param items a {@link java.util.List} object
*/
@AfterWrite
public void afterWrite(List<? extends RemoteEntityItemContext> items) {
BatchContext batchContext = get();
for (RemoteEntityItemContext item : items) {
if (batchContext.getFailedEntities().containsKey(item.getEntity().getId())) {
sendError(item.getEntity().getId(), batchContext.getFailedEntities().get(item.getEntity().getId()));
} else {
sendSuccess(item.getEntity().getId());
}
}
if (logger.isInfoEnabled()) {
Integer totalCount = (Integer) jobExecution.getExecutionContext().get(BeCPGConnectorService.TOTAL_COUNT_PARAM);
if (totalCount != null) {
Integer reads = 0;
for (StepExecution step : jobExecution.getStepExecutions()) {
reads = reads + step.getReadCount();
}
logger.info("Write {}/{} entities - [ {} %]", reads,totalCount, Math.round((reads * 1d) / (totalCount * 1d) * 100) );
}
}
}
private void sendSuccess(String id) {
if (batchConfiguration.channelId() != null) {
try {
RemoteEntity entity = new RemoteEntity();
entity.setId(id);
RemoteEntity publicationChannelListItem = new RemoteEntity();
publicationChannelListItem.setType(ChannelAPIModel.TYPE_CHANNEL_LIST);
Map<String, Object> identifiers = new HashMap<>();
identifiers.put(ChannelAPIModel.ASSOC_CHANNELLIST_CHANNEL, ChannelAPIModel.createChannelEntity(batchConfiguration.channelId()));
publicationChannelListItem.setOptionalIdentifiers(identifiers);
Map<String, Object> attributes = new HashMap<>();
attributes.put(ChannelAPIModel.PROP_CHANNELLIST_BATCHID, get().getBatchId());
attributes.put(ChannelAPIModel.PROP_CHANNELLIST_PUBLISHEDDATE, new Date());
attributes.put(ChannelAPIModel.PROP_CHANNELLIST_STATUS, ChannelAPIModel.STATUS_COMPLETED);
attributes.put(ChannelAPIModel.PROP_CHANNELLIST_ERROR, "");
publicationChannelListItem.setAttributes(attributes);
Map<String, List<RemoteNodeInfo>> datalists = new HashMap<>();
datalists.put(ChannelAPIModel.TYPE_CHANNEL_LIST, Arrays.asList(publicationChannelListItem));
entity.setDatalists(datalists);
batchConfiguration.getEntityAPI().update(entity);
} catch (RemoteAPIException e) {
get().addError(id + " - " + e.getMessage());
logger.error("sendSuccess error: {} - {}" ,id, e.getMessage());
}
}
get().addVisitedEntity(id);
}
private void sendError(String id, String error) {
if (logger.isInfoEnabled()) {
logger.info("Error: {}" , error);
}
if (batchConfiguration.channelId() != null) {
try {
RemoteEntity entity = new RemoteEntity();
entity.setId(id);
RemoteEntity publicationChannelListItem = new RemoteEntity();
publicationChannelListItem.setType(ChannelAPIModel.TYPE_CHANNEL_LIST);
Map<String, Object> identifiers = new HashMap<>();
identifiers.put(ChannelAPIModel.ASSOC_CHANNELLIST_CHANNEL, ChannelAPIModel.createChannelEntity(batchConfiguration.channelId()));
publicationChannelListItem.setOptionalIdentifiers(identifiers);
Map<String, Object> attributes = new HashMap<>();
attributes.put(ChannelAPIModel.PROP_CHANNELLIST_BATCHID, get().getBatchId());
attributes.put(ChannelAPIModel.PROP_CHANNELLIST_STATUS, ChannelAPIModel.STATUS_FAILED);
attributes.put(ChannelAPIModel.PROP_CHANNELLIST_ERROR, error);
publicationChannelListItem.setAttributes(attributes);
Map<String, List<RemoteNodeInfo>> datalists = new HashMap<>();
datalists.put(ChannelAPIModel.TYPE_CHANNEL_LIST, Arrays.asList(publicationChannelListItem));
entity.setDatalists(datalists);
batchConfiguration.getEntityAPI().update(entity);
} catch (RemoteAPIException e) {
get().addError(id + " - " + error);
logger.error("sendError error: {} - {} - {}" ,id, e.getMessage(), error);
}
}
get().addFailedEntity(id, error);
}
}