Axeda Machine Streams: Data Relay Project (Axeda Platform v6.8 and later)

    Axeda Machine Streams enables external Platform integrators to access the current, raw data from connected assets. The Platform can stream the data item, alarm, mobile location, and registration messages from connected assets to an ActiveMQ server or Azure Service Bus endpoint. Streamed data can be used for data analytics or reporting, or simply for storage.

    This article explains the Machine Streams Data Relay project that Axeda provides. This sample project illustrates how stream consumers can create their own projects to relay Machine Stream messages from ActiveMQ or Azure Service Bus into their environments.

    The Machine Streams Data Relay project was created using Apache Maven. The project operates by dispatching messages to a log message processor. Each machine streams message is logged to stdout.

    Note: The "Axeda Features Guide" provides a high level introduction to the Axeda Machine Streams feature. That PDF is available from PTC Support (http://support.ptc.com/).)

    Downloading and Installing the Project

    The machine-streams-data-relay project is provided as a tar.gz archive for Linux users and a .zip archive for Windows users. Each archive includes a Maven project with all source code. This page provides downloads and full source for the machine-steams-data-relay Maven project. The Data Relay project files are available from here.

    Prerequisites

    To download, build, and compile the machine-streams-data-relay project, you will need the following:

    • Access to an Axeda Platform instance configured to stream asset data (for ActiveMQ endpoint this includes the Axeda provided ActiveMQ machine-streams plugin/overlay (axeda-jms-plugin-r<SVN_REVISION>-machine-streams.zip, which is provided here.
    • ActiveMQ or Azure Service Bus server configured for Machine Streams. Instructions for configuring an ActiveMQ or Azure Service Bus server for Machine Streams are provided in the “Axeda® Machine Streams: A Guide to Setting Up Broker Endpoints", available with all documentation from PTC Support (http://support.ptc.com/).
    • At least one machine stream (Axeda Artisan Machine Streams Archetype) configured to stream data to the ActiveMQ or Azure Service Bus server for your assets. (Complete information about creating machine streams and adding machine stream support to the Axeda Platform is provided in the “Axeda v2 API/Services Developers Reference Guide” available from PTC Support (http://support.ptc.com/).)
    • Access to the ActiveMQ or Azure Service Bus server configured as the endpoint for streamed Machine Streams content
    • Oracle Java JDK 1.7 or greater and java and javac installed and available in your PATH (if you need instructions for this, see http://www.oracle.com/technetwork/java/javase/downloads/index.html)
    • Maven 3.0.4 or greater and mvn installed and available in your PATH (if you need instructions for this, see http://maven.apache.org/download.cgi)

    Note: For the Machine Streams Data Relay project to work successfully, the Axeda Platform instance and the ActiveMQ or Azure Service Bus server instance must be configured with support for Axeda Machine Streams, and at least one machine stream must be configured to stream data. Complete information about configuring Axeda Platform for Axeda Machine Streams, including the data format for the resulting streams (XML or JSON) is available in the “Axeda v2 API/Services Developers Reference Guide.” Instructions for configuring an ActiveMQ or Azure Service Bus server for Machine Streams are provided in the “Axeda® Machine Streams: A Guide to Setting Up Broker Endpoints” Reference Guide (available from PTC Support (http://support.ptc.com/)).

    Building the Project

    This page provides instructions for building the Data Relay project for Linux and for Windows environments.
    1. Download and uncompress the project for your environment
    Linux: Click here for the machine-streams-data-relay-1.0.3-project.tar.gz

     

    # tar -zxvf machine-streams-data-relay-1.0.3-project.tar.gz


    # cd machine-streams-data-relay-1.0.3

     

    Windows: Click here for the machine-streams-data-relay-1.0.3-project.zip

    Unzip the project to the following directory:

     

    C:\machine-streams-data-relay-1.0.3


    2. Edit the ActiveMQ or Azure Service Bus configuration file (configAMQ.properties or configASB.properties) in src\main\scripts\ as needed.

    sample Config.properties files for the MachineStreamsDataRelay component

    For ActiveMQ broker endpoints - configAMQ.properties


    # The ActiveMQ broker URL.

    brokerURL=tcp://localhost:62000

     

    # The ActiveMQ queue name to process messages from.

    # It can be a single queue: MachineStream.stream01

    # Or a wildcard queue: MachineStream.>

    queueName=MachineStream.>

     

    # The username used to connect to the ActiveMQ queue

    username=axedaadmin

     

    # The password used to connect to the ActiveMQ queue

    password=zQXuLzhQgcyRZ25JCDXYEPBCT2kx48

     

    # The number of ActiveMQ broker connections.

    numConnections=10


    # The number of sessions per connection. Note that each session will create a separate thread.

    numSessionsPerConnection=5

     

    # The number of concurrent threads used for processing machine streams messages.

    numProcessingThreads=100


    # The type of message listener container.

    # default = single queue name per connection.

    # multiDestination = supports multiple queue names per connection

    messageListenerContainerType=default


    For Azure Service Bus broker endpoints - configASB.properties

    # The ASB broker URL.

    brokerURL=amqps://your-azure-service-bus-namespace.servicebus.windows.net

     

    # The ASB queues to process messages from.

    # It can be a single queue: MachineStream.stream01

    # Or multiple queues separated by a comma: MachineStream.stream01,MachineStream.stream02

    # Or a queue range defined by the following syntax: MachineStream.stream[01-20]

    queueName=MachineStream.stream[01-50]

     

    # The username used to connect to the ASB queue(s)

    username=your-azure-service-bus-username

     

    # The password used to connect to the ASB queue(s)

    password=the-password-for-your-azure-service-bus-username

     

    # The max number of ASB broker connections.

    numConnections=10

     

    # The number of concurrent threads used for processing machine streams messages.

    numProcessingThreads=100

     

    # The type of message listener container.

    # default = single queue name per connection.

    # multiDestination = supports multiple queue names per connection

    messageListenerContainerType=multiDestination

    Note: messageListenerContainerType is provided because Azure Service Bus does not support wildcard queue names.

     

    The configuration details are as follows:

    NameDescriptionValue
    brokerURLlocation of the ActiveMQ or Azure Service Bus (broker)location of the ActiveMQ or Azure Service Bus server (broker)
    queueNameName of the ActiveMQ or Azure Service Bus queue from which you want to process messages

    To define a single queue:
    MachineStream.<insert single queue name here>
    To define a wildcard queue name for multiple queues:MachineStream.

    It can be a single queue:  MachineStream.stream01

    Or multiple queues separated by a comma: MachineStream.stream01,MachineStream.stream02

    Or a queue range defined by the following syntax: MachineStream.stream[01-20]:
    queueName=MachineStream.stream[01-50]

    (if you have multiple queues and you want to use ASB, then you have to use multiDestination and use the range)

    usernameusername used to connect to the ActiveMQ or Azure Service Bus queue

    For ActiveMQ:
    username=axedaadmin

    For ASB:
    username=your-azure-service-bus-username

    passwordused to connect to the ActiveMQ or Azure Service Bus queuepassword used to connect to the ActiveMQ or ASB queue(s)
    numConnectionsnumber of ActiveMQ or Azure Service Bus broker connectionsDefault is 10 broker connections
    numSessionsPerConnectionThe number of sessions per connection. Note that each session will create a separate thread. (This key is used infrequently.) APPLICABLE TO ACTIVEMQ ONLY.

    Default is 5 sessions per connection

    APPLICABLE TO ACTIVEMQ ONLY.

    numProcessingThreadsThe number of concurrent threads used for processing machine streams messages.Default is 100 concurrent threads
    messageListenerContainerTypeThe type of message listener container.

    Default is single queue name per connection.

    Supports multiple queue names per connection

     

    3. Build code using Maven.  Use -DskipTests option if you want to skip tests.  This will build all source code and produce a bin archive in the target directory.

    For Linux:

    # mvn package -DskipTests


    For Windows:

    c:\> mvn package -DskipTests

     

    4. Enter the target directory and uncompress *bin.tar.gz archive and enter correct directory


    For Linux:

    # cd target


    # tar -zxvf machine-streams-data-relay-1.0.3-bin.tar.gz


    # cd machine-streams-data-relay-1.0.3

    For Windows:

    c:\> cd target


    c:\> unzip machine-streams-data-relay-1.0.3.bin.zip


    c:\> cd machine-streams-data-relay-1.0.3

    5. Start the application.


    For Linux:

    # ./machineStreamsDataRelay.sh <config properties file>

    for example: e.g. ./machineStreamsDataRelay.sh configOfYourChoice.properties

     

    For Windows:

    # ./machineStreamsDataRelay.sh <config properties file>

    for example: e.g. ./machineStreamDataRelay.bat configOfYourChoice.properties

     

    See the two example config files included within the project: configASB.properties (for Azure Service Bus) and configAMQ.properties (for ActiveMQ).

     

    6. Scan the output.

     

    If your ActiveMQ configuration is correct, output similar to the following should appear, and no ERRORS should be shown:

    2014-03-26 10:27:06.179 [main] INFO  [MessageListenerServiceImpl]: Initializing connections to tcp://localhost:62000 username=axedaadmin

    2014-03-26 10:27:06.346 [main] INFO  [MessageListenerServiceImpl]: Initialized connection 1: queue=MachineStream.> numSessions=5

    2014-03-26 10:27:06.351 [main] INFO  [MessageListenerServiceImpl]: Initialized connection 2: queue=MachineStream.> numSessions=5

    2014-03-26 10:27:06.356 [main] INFO  [MessageListenerServiceImpl]: Initialized connection 3: queue=MachineStream.> numSessions=5

    2014-03-26 10:27:06.365 [main] INFO  [MessageListenerServiceImpl]: Initialized connection 4: queue=MachineStream.> numSessions=5

    2014-03-26 10:27:06.369 [main] INFO  [MessageListenerServiceImpl]: Initialized connection 5: queue=MachineStream.> numSessions=5

    2014-03-26 10:27:06.381 [main] INFO  [MessageListenerServiceImpl]: Initialized connection 6: queue=MachineStream.> numSessions=5

    2014-03-26 10:27:06.388 [main] INFO  [MessageListenerServiceImpl]: Initialized connection 7: queue=MachineStream.> numSessions=5

    2014-03-26 10:27:06.402 [main] INFO  [MessageListenerServiceImpl]: Initialized connection 8: queue=MachineStream.> numSessions=5

    2014-03-26 10:27:06.411 [main] INFO  [MessageListenerServiceImpl]: Initialized connection 9: queue=MachineStream.> numSessions=5

    2014-03-26 10:27:06.416 [main] INFO  [MessageListenerServiceImpl]: Initialized connection 10: queue=MachineStream.> numSessions=5

    If your Azure Service Bus configuration is correct, output similar to the following should appear, and no ERRORS should be shown:

    2014-10-01 16:51:30.114 [main] INFO [MessageListenerServiceImpl]: Initializing Connections to amqps://acme.servicebus.windows.net username=owner

    2014-10-01 16:51:31.613 [ConnectionRecovery-thread-6] INFO [MultiDestinationMessageListenerContainer]: Connection 6 created 0/10 queue consumers

    2014-10-01 16:51:31.614 [ConnectionRecovery-thread-8] INFO [MultiDestinationMessageListenerContainer]: Connection 8 created 0/10 queue consumers

    2014-10-01 16:51:31.614 [ConnectionRecovery-thread-10] INFO [MultiDestinationMessageListenerContainer]: Connection 10 created 0/9 queue consumers

    2014-10-01 16:51:31.614 [ConnectionRecovery-thread-2] INFO [MultiDestinationMessageListenerContainer]: Connection 2 created 0/10 queue consumers

    2014-10-01 16:51:31.614 [ConnectionRecovery-thread-3] INFO [MultiDestinationMessageListenerContainer]: Connection 3 created 0/10 queue consumers

    2014-10-01 16:51:31.614 [ConnectionRecovery-thread-5] INFO [MultiDestinationMessageListenerContainer]: Connection 5 created 0/10 queue consumers

    2014-10-01 16:51:31.615 [ConnectionRecovery-thread-9] INFO [MultiDestinationMessageListenerContainer]: Connection 9 created 0/10 queue consumers

    2014-10-01 16:51:31.615 [ConnectionRecovery-thread-4] INFO [MultiDestinationMessageListenerContainer]: Connection 4 created 0/10 queue consumers

    2014-10-01 16:51:31.621 [ConnectionRecovery-thread-7] INFO [MultiDestinationMessageListenerContainer]: Connection 7 created 0/10 queue consumers

    2014-10-01 16:51:31.756 [ConnectionRecovery-thread-1] INFO [MultiDestinationMessageListenerContainer]: Connection 1 created 0/10 queue consumers

    2014-10-01 16:51:32.613 [ConnectionRecovery-thread-6] INFO [MultiDestinationMessageListenerContainer]: Connection 6 created 9/10 queue consumers

    2014-10-01 16:51:32.614 [ConnectionRecovery-thread-8] INFO [MultiDestinationMessageListenerContainer]: Connection 8 created 9/10 queue consumers

    2014-10-01 16:51:32.614 [ConnectionRecovery-thread-10] INFO [MultiDestinationMessageListenerContainer]: Connection 10 created 7/9 queue consumers

    2014-10-01 16:51:32.614 [ConnectionRecovery-thread-2] INFO [MultiDestinationMessageListenerContainer]: Connection 2 created 10/10 queue consumers

    2014-10-01 16:51:32.615 [ConnectionRecovery-thread-3] INFO [MultiDestinationMessageListenerContainer]: Connection 3 created 9/10 queue consumers

    2014-10-01 16:51:32.615 [ConnectionRecovery-thread-5] INFO [MultiDestinationMessageListenerContainer]: Connection 5 created 0/10 queue consumers

    2014-10-01 16:51:32.615 [ConnectionRecovery-thread-9] INFO [MultiDestinationMessageListenerContainer]: Connection 9 created 7/10 queue consumers

    2014-10-01 16:51:32.615 [ConnectionRecovery-thread-4] INFO [MultiDestinationMessageListenerContainer]: Connection 4 created 9/10 queue consumers

    2014-10-01 16:51:32.623 [ConnectionRecovery-thread-7] INFO [MultiDestinationMessageListenerContainer]: Connection 7 created 9/10 queue consumers

    2014-10-01 16:51:32.756 [ConnectionRecovery-thread-1] INFO [MultiDestinationMessageListenerContainer]: Connection 1 created 10/10 queue consumers

    2014-10-01 16:51:32.833 [main] INFO [MessageListenerServiceImpl]: Initialized Connection 1: numQueues=10 initTimeMillis=2631 millis

    2014-10-01 16:51:32.833 [main] INFO [MessageListenerServiceImpl]: Initialized Connection 2: numQueues=10 initTimeMillis=2488 millis

    2014-10-01 16:51:33.613 [ConnectionRecovery-thread-6] INFO [MultiDestinationMessageListenerContainer]: Connection 6 created 10/10 queue consumers

    2014-10-01 16:51:33.614 [ConnectionRecovery-thread-8] INFO [MultiDestinationMessageListenerContainer]: Connection 8 created 10/10 queue consumers

    2014-10-01 16:51:33.614 [ConnectionRecovery-thread-10] INFO [MultiDestinationMessageListenerContainer]: Connection 10 created 9/9 queue consumers

    2014-10-01 16:51:33.615 [ConnectionRecovery-thread-3] INFO [MultiDestinationMessageListenerContainer]: Connection 3 created 9/10 queue consumers

    2014-10-01 16:51:33.615 [ConnectionRecovery-thread-5] INFO [MultiDestinationMessageListenerContainer]: Connection 5 created 0/10 queue consumers

    2014-10-01 16:51:33.615 [ConnectionRecovery-thread-9] INFO [MultiDestinationMessageListenerContainer]: Connection 9 created 8/10 queue consumers

    2014-10-01 16:51:33.615 [ConnectionRecovery-thread-4] INFO [MultiDestinationMessageListenerContainer]: Connection 4 created 9/10 queue consumers

    2014-10-01 16:51:33.623 [ConnectionRecovery-thread-7] INFO [MultiDestinationMessageListenerContainer]: Connection 7 created 10/10 queue consumers

    2014-10-01 16:51:34.615 [ConnectionRecovery-thread-5] INFO [MultiDestinationMessageListenerContainer]: Connection 5 created 0/10 queue consumers

    2014-10-01 16:51:34.615 [ConnectionRecovery-thread-3] INFO [MultiDestinationMessageListenerContainer]: Connection 3 created 9/10 queue consumers

    2014-10-01 16:51:34.615 [ConnectionRecovery-thread-9] INFO [MultiDestinationMessageListenerContainer]: Connection 9 created 8/10 queue consumers

    2014-10-01 16:51:34.615 [ConnectionRecovery-thread-4] INFO [MultiDestinationMessageListenerContainer]: Connection 4 created 9/10 queue consumers

    2014-10-01 16:51:35.615 [ConnectionRecovery-thread-5] INFO [MultiDestinationMessageListenerContainer]: Connection 5 created 9/10 queue consumers

    2014-10-01 16:51:35.615 [ConnectionRecovery-thread-3] INFO [MultiDestinationMessageListenerContainer]: Connection 3 created 9/10 queue consumers

    2014-10-01 16:51:35.615 [ConnectionRecovery-thread-9] INFO [MultiDestinationMessageListenerContainer]: Connection 9 created 8/10 queue consumers

    2014-10-01 16:51:35.616 [ConnectionRecovery-thread-4] INFO [MultiDestinationMessageListenerContainer]: Connection 4 created 9/10 queue consumers

    2014-10-01 16:51:36.616 [ConnectionRecovery-thread-5] INFO [MultiDestinationMessageListenerContainer]: Connection 5 created 9/10 queue consumers

    2014-10-01 16:51:36.616 [ConnectionRecovery-thread-3] INFO [MultiDestinationMessageListenerContainer]: Connection 3 created 9/10 queue consumers

    2014-10-01 16:51:36.616 [ConnectionRecovery-thread-4] INFO [MultiDestinationMessageListenerContainer]: Connection 4 created 9/10 queue consumers

    2014-10-01 16:51:36.616 [ConnectionRecovery-thread-9] INFO [MultiDestinationMessageListenerContainer]: Connection 9 created 8/10 queue consumers

    2014-10-01 16:51:37.616 [ConnectionRecovery-thread-5] INFO [MultiDestinationMessageListenerContainer]: Connection 5 created 9/10 queue consumers

    2014-10-01 16:51:37.616 [ConnectionRecovery-thread-3] INFO [MultiDestinationMessageListenerContainer]: Connection 3 created 9/10 queue consumers

    2014-10-01 16:51:37.616 [ConnectionRecovery-thread-4] INFO [MultiDestinationMessageListenerContainer]: Connection 4 created 9/10 queue consumers

    2014-10-01 16:51:37.616 [ConnectionRecovery-thread-9] INFO [MultiDestinationMessageListenerContainer]: Connection 9 created 8/10 queue consumers

    2014-10-01 16:51:38.616 [ConnectionRecovery-thread-3] INFO [MultiDestinationMessageListenerContainer]: Connection 3 created 10/10 queue consumers

    2014-10-01 16:51:38.617 [ConnectionRecovery-thread-9] INFO [MultiDestinationMessageListenerContainer]: Connection 9 created 10/10 queue consumers

    2014-10-01 16:51:38.616 [ConnectionRecovery-thread-4] INFO [MultiDestinationMessageListenerContainer]: Connection 4 created 10/10 queue consumers

    2014-10-01 16:51:38.616 [ConnectionRecovery-thread-5] INFO [MultiDestinationMessageListenerContainer]: Connection 5 created 10/10 queue consumers

    2014-10-01 16:51:38.643 [main] INFO [MessageListenerServiceImpl]: Initialized Connection 3: numQueues=10 initTimeMillis=8491 millis

    2014-10-01 16:51:38.643 [main] INFO [MessageListenerServiceImpl]: Initialized Connection 4: numQueues=10 initTimeMillis=8490 millis

    2014-10-01 16:51:38.643 [main] INFO [MessageListenerServiceImpl]: Initialized Connection 5: numQueues=10 initTimeMillis=8490 millis

    2014-10-01 16:51:38.643 [main] INFO [MessageListenerServiceImpl]: Initialized Connection 6: numQueues=10 initTimeMillis=3485 millis

    2014-10-01 16:51:38.643 [main] INFO [MessageListenerServiceImpl]: Initialized Connection 7: numQueues=10 initTimeMillis=3495 millis

    2014-10-01 16:51:38.643 [main] INFO [MessageListenerServiceImpl]: Initialized Connection 8: numQueues=10 initTimeMillis=3485 millis

    2014-10-01 16:51:38.643 [main] INFO [MessageListenerServiceImpl]: Initialized Connection 9: numQueues=10 initTimeMillis=8488 millis

    2014-10-01 16:51:38.643 [main] INFO [MessageListenerServiceImpl]: Initialized Connection 10: numQueues=9 initTimeMillis=3485 millis

    7. To verify that messages are being streamed properly from the Axeda Platform, send DataItems from your connected Assets. You should see messages similar to the following. (Remember that each Asset you are testing must have an associated Machine Stream.)

    2014-03-26 10:45:16.309 [pool-1-thread-1] INFO  [LogMessageProcessor]: StreamedDataItem: Model,Asset1,799021d6-70a3-7c32-0000-00000000021d,false,Wed Mar 26 14:45:16 EDT 2014,temp,43,analog

    2014-03-26 10:45:21.137 [pool-1-thread-2] INFO  [LogMessageProcessor]: StreamedDataItem: Model,Asset2,799021d6-70a3-7c32-0000-000000000225,false,Wed Mar 26 14:45:21 EDT 2014,temp,43,analog

    2014-03-26 10:45:26.134 [pool-1-thread-3] INFO  [LogMessageProcessor]: StreamedDataItem: Model,Asset1,799021d6-70a3-7c32-0000-00000000022b,false,Wed Mar 26 14:45:26 EDT 2014,temp,44,analog

    2014-03-26 10:45:31.135 [pool-1-thread-4] INFO  [LogMessageProcessor]: StreamedDataItem: Model,Asset2,799021d6-70a3-7c32-0000-000000000231,false,Wed Mar 26 14:45:31 EDT 2014,temp,44,analog

    2014-03-26 10:45:36.142 [pool-1-thread-5] INFO  [LogMessageProcessor]: StreamedDataItem: Model,Asset1,799021d6-70a3-7c32-0000-000000000237,false,Wed Mar 26 14:45:36 EDT 2014,temp,45,analog

    2014-03-26 10:45:41.146 [pool-1-thread-6] INFO  [LogMessageProcessor]: StreamedDataItem: Model,Asset2,799021d6-70a3-7c32-0000-00000000023d,false,Wed Mar 26 14:45:41 EDT 2014,temp,45,analog

    Configuring a CustomMessageProcessor

    By default, the project is configured to use a LogMessageProcessor that logs each streamed message it receives to standard out. The project takes a StreamedMessage in either an XML or JSON format (as configured in the MachineStream SDKv2 object) and decodes the message into a StreamedMessage Java object. LogMessageProcessor.java implements the MessageProcessor interface. Here is the MessageProcessor.java interface:


    MessageProcessor.java

    package com.axeda.tools.streams.processor;

    import com.axeda.tools.streams.model.StreamedMessage;


     

    /**

    * This class defines the message processor method callback that will called for message processing.

    * Note that this methods will be called by multiple threads concurrently.

    */

    public interface MessageProcessor {

    /**

    * Process a machine stream message. Note that this method will be called by multiple threads concurrently. 

    * The number of concurrent processing threads is defined in MachineStreamsConfig.getNumProcessingThreads().

    * If you add code here that significantly slows down message processing, then there is the potential that

    * MessageListenerService threads will also block.  When the MessageListenerService threads block, this means that

    * messages will start to backup in the ActiveMQ or ASB message queues. If you are processing a large number of messages,

    * then you may need to adjust your configuration parameters or optimize your processMessage() code.


     

    * @param message machine streams message to process


     

    */public void processMessage(StreamedMessage message);}

    An additional class named CustomMessageProcessor.java has been provided so that you can provide your own custom message processing logic:


    CustomMessageProcessor.java

    package com.axeda.tools.streams.processor;

    import org.springframework.stereotype.Component;

    import com.axeda.tools.streams.model.StreamedAlarm;

    import com.axeda.tools.streams.model.StreamedDataItemMessage;

    import com.axeda.tools.streams.model.StreamedMessage;

    import com.axeda.tools.streams.model.StreamedMobileLocation;

    import com.axeda.tools.streams.model.StreamedRegistrationMessage;

    /**

    * This class was provided for customers to implement their own message processing business

    * logic. To use this class, change the @Autowired messageProcessor qualifier in

    * MessageProcessingServiceImpl.java to @Qualifier("customMessageProcessor")

    */

    @Component("customMessageProcessor")

    public class CustomMessageProcessor implements MessageProcessor {

    /**

    * (non-Javadoc)

    * @see

    com.axeda.tools.streams.processor.MessageProcessor#processMessage(com.axeda.tools.streams.model.StreamedMessage)

    *

    * Process a machine stream message. Note that this method will be called by multiple threads

    * concurrently. The number of concurrent processing threads is defined in

    * MachineStreamsConfig.getNumProcessingThreads().

    * If you add code here that significantly slows down message processing, then there is the

    * potential that MessageListenerService threads will also block. When the MessageListenerService

    * threads block, this means that messages will start to backup in the ActiveMQ or Azure Service Bus message queues. If you

    * are processing a large number of messages, then you may need to adjust your configuration parameters

    * or optimize your processMessage() code.

    */

    @SuppressWarnings("unused")

    @Override

    public void

    processMessage(StreamedMessage message) {

     

    if (message instanceof StreamedDataItemMessage) {

    StreamedDataItemMessage dataItem = (StreamedDataItemMessage) message;

    // add your business logic here

     

    } else if (message instanceof StreamedAlarm) {

    StreamedAlarm alarm = (StreamedAlarm) message;

    // add your business logic here

     

    } else if (message instanceof StreamedMobileLocation) {

    StreamedMobileLocation mobileLocation = (StreamedMobileLocation) message;

    // add your business logic here

     

    } else if (message instanceof StreamedRegistrationMessage) {

    StreamedRegistrationMessage registration = (StreamedRegistrationMessage)message;

    // add your business logic here

     

    }

    }

    }

    The Axeda Platform Machine Streams feature currently support 4 different message types:
    • StreamedDataItemMessage
    • StreamedAlarm
    • StreamedMobileLocation
    • StreamedRegistrationMessage

     

    For each of the different message types, you should add your message processing business logic.  You may want to write each message to your favorite NoSql database or to a flat file.

     

    Once you have completed your changes to the CustomObjectMessageProcessor, then you must make one change in the MessageProcessingServiceImpl.java class to use this Spring bean.

    • Uncomment this line  // @Qualifier("customMessageProcessor")
    • Comment this line @Qualifier("logMessageProcessor")

     

    The following code snppet shows what your changes should look like when you are finished:


    MessageProcessingServiceImpl.java

    @Component("messageProcessingService")

    public class MessageProcessingServiceImpl implements MessageProcessingService

    private static final Logger LOGGER = LoggerFactory.getLogger(MessageProcessingServiceImpl.class);

    @Autowired

    private MessageDecoder messageDecoder;

    @Autowired

    // If you want to use the CustomMessageProcessor instead of the default LogMessageProcessor

    then change this Qualifier to

    @Qualifier("customMessageProcessor")

    //@Qualifier("logMessageProcessor")

    private MessageProcessor messageProcessor;

    private ExecutorService executorService;