Orders

Technical details for partners integrating with the Event Bridge system to receive real-time order event notifications via webhooks. Our service processes orders and sends it to your provided endpoint

Prerequisites

To integrate, our team will work with you to configure:

  1. Your Webhook URL: A secure HTTPS URL endpoint on your system capable of receiving HTTP POST requests with a JSON payload.

  2. Shared Secret (Generated by Us): We will generate a unique, secure secret string for your specific webhook configuration. This secret must be used by you to verify the HMAC signature on incoming requests. We will provide this secret to you securely.

  3. Custom Headers (Optional): You can specify any additional HTTP headers (key-value pairs) you require us to include in the webhook requests (e.g., for static authentication tokens or routing). We will store and send these with each request, provided they don't conflict with essential headers.

Webhook Request Details

Our service will send events to your configured Webhook URL using the following specifications:

  • Method: POST

  • Headers:

    • Content-Type: application/json

    • User-Agent: liquid-commerce-event-bridge-service/1.0

    • x-liquid-commerce-event-type: The type of event (e.g., order).

    • x-liquid-commerce-hmac-sha256: The HMAC signature (see Security section).

    • x-liquid-commerce-idempotency-key: A unique identifier for this specific webhook delivery attempt, generated by our service.

    • x-liquid-commerce-delivery-id: A unique identifier for this delivery, constructed from order identifiers and a timestamp.

    • x-liquid-commerce-timestamp: An ISO 8601 timestamp indicating when the webhook was sent by our service.

    • Any custom headers you provided during setup.

Payload Structure (event-type: order)

The request body is a complex JSON object representing the transformed and serialized order data. The following structure, but may be simplified for documentation clarity. Refer to specific fields relevant to your integration.

{
  referenceId: 'unique-order-ref-123', // Optional: Our internal unique order reference for new services orders
  legacyOrderNumber: 'RBR-LEGACY-456', // Optional: Legacy order number if applicable
  isHybrid: false, // Flag indicating if order involves legacy components
  partnerId: 'partner-id', // Your assigned Partner ID in our system
  createdAt: '2023-10-27T10:00:00.000Z', // ISO 8601 timestamp of order creation
  updatedAt: '2023-10-27T10:30:00.000Z', // ISO 8601 timestamp of last relevant update triggering webhook
  customer: {
    id: 'cust-id',
    firstName: 'Jane',
    lastName: 'Doe',
    email: '[email protected]',
    phone: '(212) 290-9820',
    birthdate: '1990-05-15', // YYYY-MM-DD
  },
  addresses: {
    shipping: {
      firstName: 'Jane',
      lastName: 'Doe',
      email: '[email protected]',
      phone: '+15551234567',
      company: null,
      one: '123 Main St',
      two: 'Apt 4B',
      city: 'Anytown',
      state: 'CA',
      zip: '90210',
      country: 'US',
    },
    billing: {
      // ... similar structure, may differ from shipping ...
      firstName: 'Jane',
      lastName: 'Doe',
      email: '[email protected]',
      phone: '+15551234567',
      company: null,
      one: '123 Main St',
      two: 'Apt 4B',
      city: 'Anytown',
      state: 'CA',
      zip: '90210',
      country: 'US',
    },
  },
  options: {
    isGift: true,
    giftMessage: 'Happy Birthday!',
    giftRecipient: {
      name: 'John Smith',
      email: '[email protected]',
      phone: '+15559876543',
    },
    hasVerifiedAge: true,
    allowsSubstitution: false,
    billingSameAsShipping: true,
    deliveryInstructions: 'Leave at front door.',
    marketingPreferences: {
      email: false,
      sms: false,
    },
  },
  amounts: {
    subtotal: 10000,
    shipping: 1000,
    platform: 599, // Fee charged by our platform
    tax: 850,
    engraving: 500,
    service: 200,
    delivery: 0, // Specific delivery fee component
    discounts: 1500, // Total discounts applied
    giftCards: 2500, // Amount paid via gift cards
    tip: 500,
    total: 9150, 
    taxDetails: {
      products: 700,
      shipping: 100,
      delivery: 0,
      bag: 0,
      bottleDeposits: 5,
      retailDelivery: 0,
    },
    discountDetails: {
      products: 1000,
      shipping: 500,
      delivery: 0,
      engraving: 0,
      service: 0,
    },
  },
  paymentMethods: [
    {
      type: 'card', // e.g., card, paypal
      card: 'Visa', // e.g., Visa, Mastercard
      last4: '1234',
      holder: 'Jane Doe',
      code: null, // If a Gift Card was used an abstrct code will be available (ex: 'TER*****FH9W48Y')
    },
  ],
  retailers: [
    {
      id: 'retailer-id-1',
      legacyId: 'legacy-retailer-abc',
      name: 'Best Drinks Store',
      system: 'LiquidCommerce OMS', // Actual values: LiquidCommerce OMS, ReserveBar OMS
      timezone: 'America/Los_Angeles',
      address: {
        one: '456 Side St',
        two: null,
        city: 'Anytown',
        state: 'CA',
        zip: '90211',
        country: 'US',
        coordinates: {
          latitude: 34.0522,
          longitude: -118.2437,
        },
      },
      amounts: {
        // Amounts specific to this retailer's fulfillment
        // ... structure similar to top-level amounts ...
        subtotal: 6000,
        shipping: 500,
        platform: 599,
        tax: 510,
        engraving: 5000,
        service: 120,
        delivery: 0,
        discounts: 1000,
        giftCards: 1000,
        tip: 250,
        total: 5940,
        taxDetails: {
           products: 700,
           shipping: 100,
           delivery: 0,
           bag: 0,
           bottleDeposits: 5,
           retailDelivery: 0,
          },
        },
        discountDetails: {
           products: 1000,
           shipping: 500,
           delivery: 0,
           engraving: 0,
           service: 0,
        },
      },
      fulfillments: [
        {
          id: 'fulfillment-id-xyz',
          type: 'onDemand', // Actual values: shipping, onDemand, digital, bopis
          status: 'processing', // Actual values: created, processing, inTransit, canceled, delivered
          scheduledFor: null, // ISO 8601 if scheduled
          updatedAt: '2023-10-27T10:25:00.000Z',
          itemIds: ['item-id-1', 'item-id-2'],
          packages: [
            {
              id: 'package-id-123',
              carrier: 'UPS',
              trackingNumber: '1Z999AA10123456784',
              trackingUrl: 'https://wwwapps.ups.com/...',
              status: 'shipped', // Actual values: created, processing, inTransit, canceled, delivered
              dateShipped: '2023-10-27T10:25:00.000Z',
            },
          ],
          timeline: [
            { status: 'created', timestamp: '2023-10-27T10:00:00.000Z' }, // Actual values for status: created, scheduled, onHold, pending, processing, inTransit, delivered, canceled
            { status: 'processing', timestamp: '2023-10-27T10:15:00.000Z' },
            // Potentially more status updates (inTransit, delivered, etc.)
          ],
        },
        // Potentially multiple fulfillments per retailer if split
      ],
    },
    // Potentially multiple retailers if multi-retailer order
  ],
  items: [
    {
      id: 'item-id-1',
      fulfillmentId: 'fulfillment-id-xyz',
      retailerId: 'retailer-id-1',
      variantId: 'variant-id-1',
      liquidId: 'liquid-id-123',
      legacyGrouping: null,
      legacyPid: 'pid-abc',
      customerPlacement: 'standard', // Actual values for status: standard, pre_sale, back_order
      isPresale: false,
      estimatedShipBy: null,
      product: {
        name: 'Example Wine 750ml',
        brand: 'Example Vineyards',
        upc: '012345678912',
        sku: 'EXWINE750',
        mskus: ['RETAILER123'],
        category: 'Red Wine',
        size: '750ml',
        volume: '750',
        uom: 'ml',
        proof: null,
        attributes: {
          pack: false,
          packDescription: null,
          abv: '13.5',
          container: 'Bottle',
          containerType: 'Glass',
        },
      },
      image: 'https://example.com/image.jpg',
      pricing: {
        price: 2000,
        unitPrice: 2000,
        quantity: 2,
        tax: 340, 
        bottleDeposits: 20, 
      },
      attributes: {
        engraving: {
          hasEngraving: true,
          fee: 5000,
          location: 'Front',
          lines: ['Line 1 Text', 'Line 2 Text'],
        },
        giftCard: {
          // Based on PartnerOrderItemGiftCardSerialization
          sender: 'Jane Doe',
          message: null,
          recipients: ['John Smith <[email protected]>'],
          sendDate: null,
        },
      },
    },
    // ... more items ...
  ],
}

Key Fields & Notes:

  • Identifiers: referenceId, partnerId, retailers[].id, items[].id, etc., are our internal relevant identifiers.

  • Timestamps: All timestamps are in ISO 8601 format (UTC).

  • Payload Granularity: The payload contains details potentially aggregated across multiple internal entities (orders, shipments, fulfillments, retailers). The retailers array allows for orders fulfilled by multiple retailers. Each retailer has its own fulfillments, packages, and amounts.

  • Status Fields: Note the different status enums used at different levels (e.g., fulfillments[].status vs. packages[].status). Use the status most relevant to your needs.

  • Amounts: Amounts are provided at the top level (order total) and broken down within amounts.taxDetails, amounts.discountDetails, and also within each retailers[].amounts. Item-level pricing is in items[].pricing.

  • Optional Fields: Many fields are optional (nullable) depending on the specific order details (e.g., giftMessage, deliveryInstructions, legacyOrderNumber, referenceId). At least referenceId or legacyOrderNumber will be included and for hybrid orders you'll receive both.

  • Additional Headers: Our service includes a few additional headers for traceability and idempotency assistance:

    • x-liquid-commerce-idempotency-key: A unique UUID for each delivery attempt. While you must implement your own idempotency based on payload content (like referenceId and updatedAt), this key can be useful for logging and tracing specific delivery attempts on your end.

    • x-liquid-commerce-delivery-id: A unique identifier for this delivery, typically formed using order identifiers and a timestamp. Useful for tracing.

    • x-liquid-commerce-timestamp: An ISO 8601 timestamp marking when the webhook request was initiated from our service.

Headers (Recap)

  • Content-Type: application/json

  • User-Agent: liquid-commerce-event-bridge-service/1.0

  • x-liquid-commerce-event-type: e.g., orders

  • x-liquid-commerce-hmac-sha256: (If secret provided) Base64 encoded HMAC-SHA256 signature.

  • x-liquid-commerce-idempotency-key: Unique identifier for the delivery attempt.

  • x-liquid-commerce-delivery-id: Unique identifier for the delivery.

  • x-liquid-commerce-timestamp: Timestamp of when the webhook was sent.

  • Any custom headers you specified.

Expected Partner Response

Your webhook endpoint must respond promptly to acknowledge receipt.

  • Success: Respond with an HTTP 2xx status code (e.g., 200 OK, 202 Accepted, 204 No Content).

    • Respond immediately after validating the signature (if applicable) and before performing complex/long-running business logic to avoid timeouts. Our service treats any 2xx as success for the delivery attempt.

  • Failure: Respond with an HTTP 4xx (Client Error) or 5xx (Server Error) status code if:

    • The signature verification fails (respond with 401 Unauthorized or 400 Bad Request).

    • Your service encounters an error preventing acceptance (e.g., temporary overload, validation error on your side).

Retry Mechanism & DLQ

If your endpoint:

  1. Responds with a non-2xx status code.

  2. Times out (default: 5000ms).

  3. Experiences a network connectivity issue.

Our service will automatically retry 3 times with exponential backoff delays: 1000ms, 2000ms, 4000ms (approximately).

If the event delivery fails after the initial attempt and all 3 retries (total 4 attempts), the original event payload and error details will be sent to our internal Dead Letter Queue (DLQ) which purges after 7 days. No further delivery attempts will be made for that event. Consistent failures indicate an issue with your endpoint that needs investigation.

Idempotency

While we have internal mechanisms, the nature of our Distributed Event Streaming System with at-least-once delivery guarantee means you might receive the same logical update via multiple webhook calls.

It is CRITICAL that your system implements idempotency checks to prevent duplicate processing. Use a combination of identifiers from the payload relevant to your use case. A good candidate is the top-level referenceId combined with a relevant timestamp like updatedAt, or specific fulfillment/package IDs if processing at that level.

Example Check: Before processing, check if you've already successfully processed an update for referenceId with the same or an earlier updatedAt timestamp. If so, respond 2xx OK but skip processing.

Security: HMAC Signature Verification

You must verify the x-liquid-commerce-hmac-sha256 header on every incoming request using the unique Shared Secret we provide.

  • Algorithm: HMAC-SHA256

  • Encoding: Base64

  • Data Signed: The raw, unparsed JSON request body (exactly as received, including whitespace).

Verification Steps:

  1. Retrieve the raw request body bytes.

  2. Compute the HMAC-SHA256 hash of the raw body using the Shared Secret we provided.

  3. Encode the resulting hash in Base64.

  4. Securely compare (using a timing-safe comparison function) your calculated signature with the value from the x-liquid-commerce-hmac-sha256 header.

Failure to implement and correctly verify this signature poses a significant security risk, potentially allowing attackers to send fraudulent data to your endpoint.

Example: Basic Webhook Receiver

Here are conceptual examples demonstrating the core logic (signature verification, immediate acknowledgment, asynchronous processing) in various languages. These examples focus on the essential security and request handling aspects. Refer to the "Webhook Request Details" section for a comprehensive list of all headers your endpoint will receive, including informational headers like x-liquid-commerce-idempotency-key. Adapt the business logic, error handling, logging, security practices, and asynchronous mechanisms to your specific framework and production needs.

Node.js (Express)
import express from 'express';
import crypto from 'crypto';

const app = express();

// IMPORTANT: Use express.raw middleware to access the raw request body for HMAC verification.
// Must run *before* express.json() or any other body-parsing middleware.
app.use(express.raw({ type: 'application/json', limit: '5mb' })); // Adjust limit as needed

const PORT = process.env.PORT || 3000;
// Store the unique secret WE PROVIDED securely (e.g., in environment variables)
const WEBHOOK_SECRET = process.env.EVENT_BRIDGE_WEBHOOK_SECRET;
const HMAC_HEADER = 'x-liquid-commerce-hmac-sha256'; // Use the correct header name

// Your Webhook Endpoint
app.post('/webhook/event-bridge', (req, res) => {
  console.log(`Received webhook request headers: ${JSON.stringify(req.headers)}`);

  // --- 1. Verify Signature (CRITICAL) ---
  if (!WEBHOOK_SECRET) {
    console.error(`${HMAC_HEADER} verification skipped: WEBHOOK_SECRET not configured!`);
    // Consider failing requests if secret is expected but not configured
    return res.status(500).send('Internal Server Error: Webhook secret not configured.');
  }

  const receivedSignature = req.header(HMAC_HEADER);
  if (!receivedSignature) {
    console.warn(`Missing ${HMAC_HEADER} header`);
    return res.status(400).send('Bad Request: Missing signature header');
  }

  try {
    const expectedSignature = crypto
      .createHmac('sha256', WEBHOOK_SECRET)
      .update(req.body) // Use the raw body buffer from express.raw()
      .digest('base64');

    const receivedSigBuffer = Buffer.from(receivedSignature, 'base64');
    const expectedSigBuffer = Buffer.from(expectedSignature, 'base64');

    // Use timing-safe comparison
    if (
      receivedSigBuffer.length !== expectedSigBuffer.length ||
      !crypto.timingSafeEqual(receivedSigBuffer, expectedSigBuffer)
    ) {
      console.error('Invalid signature');
      return res.status(401).send('Unauthorized: Invalid signature');
    }
    console.log('Signature verified successfully');
  } catch (error) {
    console.error('Error during signature verification:', error);
    return res.status(500).send('Internal Server Error during signature verification');
  }

  // --- 2. Acknowledge Receipt Immediately (Before Parsing/Processing) ---
  // Send a 2xx response quickly.
  res.status(202).send('Accepted');

  // --- 3. Parse and Process Asynchronously ---
  // Wrap parsing and processing in a separate async function to avoid blocking.
  handleEventProcessing(req.body)
    .then(() => console.log('Event processing initiated.'))
    .catch((err) => console.error('Failed to initiate event processing:', err));
});

// Asynchronous function to handle parsing and business logic
async function handleEventProcessing(rawBodyBuffer) {
  let eventPayload;
  try {
    // Parse the JSON from the raw body buffer
    eventPayload = JSON.parse(rawBodyBuffer.toString('utf-8'));
    // console.log('Parsed payload:', JSON.stringify(eventPayload, null, 2)); // Be careful logging full payload
    console.log(`Parsed payload for referenceId: ${eventPayload?.referenceId}`);
  } catch (error) {
    console.error('Failed to parse JSON payload after acknowledgment:', error);
    // Add monitoring/alerting here. The request was already acknowledged.
    return; // Stop processing if parsing fails
  }

  console.log(`Processing event referenceId: ${eventPayload.referenceId}, UpdatedAt: ${eventPayload.updatedAt}`);

  // --- Implement Idempotency Check Here ---
  // const alreadyProcessed = await checkIfEventProcessed(eventPayload.referenceId, eventPayload.updatedAt);
  // if (alreadyProcessed) {
  //   console.log(`Event referenceId ${eventPayload.referenceId} already processed or is older. Skipping.`);
  //   return;
  // }

  // --- Your Business Logic Here ---
  // e.g., update your database based on eventPayload details...
  // Use eventPayload.retailers[0].fulfillments[0].status or packages[0].status etc.
  try {
    // ... Simulate async work ...
    await new Promise((resolve) => setTimeout(resolve, 100));
    console.log(`Business logic completed for referenceId: ${eventPayload.referenceId}`);
  } catch (businessError) {
    console.error(`Error during business logic for referenceId ${eventPayload.referenceId}:`, businessError);
    // Add monitoring/alerting. Event already acknowledged. May need manual reconciliation.
    return;
  }

  // --- Mark event as processed (for idempotency) ---
  // await markEventAsProcessed(eventPayload.referenceId, eventPayload.updatedAt);

  console.log(`Successfully processed event referenceId: ${eventPayload.referenceId}`);
}

app.listen(PORT, () => {
  console.log(`Webhook receiver listening on port ${PORT}`);
  if (!WEBHOOK_SECRET) {
    console.warn(
      `Warning: EVENT_BRIDGE_WEBHOOK_SECRET environment variable not set. ${HMAC_HEADER} verification will be skipped unless the check inside verify_signature fails.`,
    );
  }
});
Python (Flask)
import os
import hmac
import hashlib
import base64
import json
import logging
from flask import Flask, request, abort, make_response
from threading import Thread

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

app = Flask(__name__)

# Store the unique secret WE PROVIDED securely (e.g., in environment variables)
WEBHOOK_SECRET = os.environ.get('EVENT_BRIDGE_WEBHOOK_SECRET')
HMAC_HEADER = 'x-liquid-commerce-hmac-sha256' # Use the correct header name

def verify_signature(req):
    """Verifies the HMAC signature of the request."""
    if not WEBHOOK_SECRET:
        logging.error(f"{HMAC_HEADER} verification skipped: WEBHOOK_SECRET not configured!")
        # Fail closed if secret is expected but missing
        abort(500, description='Internal Server Error: Webhook secret not configured.')

    received_signature_b64 = req.headers.get(HMAC_HEADER)
    if not received_signature_b64:
        logging.warning(f"Missing {HMAC_HEADER} header")
        abort(400, description=f'Bad Request: Missing {HMAC_HEADER} header')

    # IMPORTANT: Access the raw request body bytes
    raw_body = req.get_data() # Flask specific way to get raw body

    try:
        hash_obj = hmac.new(WEBHOOK_SECRET.encode('utf-8'), raw_body, hashlib.sha256)
        expected_signature_b64 = base64.b64encode(hash_obj.digest()).decode('utf-8')

        # Use timing-safe comparison
        if not hmac.compare_digest(expected_signature_b64, received_signature_b64):
            logging.error('Invalid signature')
            abort(401, description='Unauthorized: Invalid signature')

        logging.info('Signature verified successfully')
        return raw_body # Return raw body for later processing
    except Exception as e:
        logging.error(f"Error during signature verification: {e}", exc_info=True)
        abort(500, description='Internal Server Error during signature verification')

def process_event_async(raw_body_bytes):
    """Parses and processes the event payload asynchronously."""
    try:
        payload_str = raw_body_bytes.decode('utf-8')
        event_payload = json.loads(payload_str)
        reference_id = event_payload.get('referenceId', 'N/A')
        updated_at = event_payload.get('updatedAt', 'N/A')
        logging.info(f"Parsed payload for referenceId: {reference_id}")
        # Careful logging full payload in production
        # logging.debug(f"Full payload: {payload_str}")

    except json.JSONDecodeError as e:
        logging.error(f"Failed to parse JSON payload after acknowledgment: {e}", exc_info=True)
        # Add monitoring/alerting here
        return # Stop processing if parsing fails
    except Exception as e:
        logging.error(f"Unexpected error during payload parsing: {e}", exc_info=True)
        return

    logging.info(f"Processing event referenceId: {reference_id}, UpdatedAt: {updated_at}")

    # --- Implement Idempotency Check Here ---
    # try:
    #     already_processed = check_if_event_processed(reference_id, updated_at)
    #     if already_processed:
    #         logging.info(f"Event referenceId {reference_id} already processed or is older. Skipping.")
    #         return
    # except Exception as e:
    #     logging.error(f"Error during idempotency check for referenceId {reference_id}: {e}", exc_info=True)
    #     # Decide how to handle idempotency check errors (e.g., retry, alert)
    #     return

    # --- Your Business Logic Here ---
    try:
        # e.g., update your database based on event_payload details...
        # Simulate work
        import time
        time.sleep(0.1) # Simulate async work
        logging.info(f"Business logic completed for referenceId: {reference_id}")
    except Exception as business_error:
        logging.error(f"Error during business logic for referenceId {reference_id}: {business_error}", exc_info=True)
        # Add monitoring/alerting. Event already acknowledged. May need manual reconciliation.
        return

    # --- Mark event as processed (for idempotency) ---
    # try:
    #     mark_event_as_processed(reference_id, updated_at)
    #     logging.info(f"Successfully marked referenceId {reference_id} as processed.")
    # except Exception as e:
    #     logging.error(f"Error marking referenceId {reference_id} as processed: {e}", exc_info=True)
    #     # Alerting is crucial here if marking fails

@app.route('/webhook/event-bridge', methods=['POST'])
def handle_webhook():
    logging.info(f"Received webhook request headers: {dict(request.headers)}")

    # --- 1. Verify Signature (CRITICAL) ---
    # This function calls abort() on failure
    raw_body = verify_signature(request)

    # --- 2. Acknowledge Receipt Immediately (Before Parsing/Processing) ---
    # Send a 2xx response quickly.
    response = make_response("Accepted", 202)

    # --- 3. Start Asynchronous Processing ---
    # Use threading for simple async background task (for production, consider Celery, RQ, etc.)
    thread = Thread(target=process_event_async, args=(raw_body,))
    thread.start()
    logging.info("Event processing initiated in background thread.")

    return response

if __name__ == '__main__':
    port = int(os.environ.get('PORT', 3000))
    if not WEBHOOK_SECRET:
        logging.warning(f"Warning: EVENT_BRIDGE_WEBHOOK_SECRET environment variable not set. {HMAC_HEADER} verification will be skipped unless the check inside verify_signature fails.")
    # Use a production-ready WSGI server like gunicorn or uWSGI instead of app.run in production
    app.run(host='0.0.0.0', port=port)
Go (net/http)
package main

import (
	"bytes"
	"crypto/hmac"
	"crypto/sha256"
	"crypto/subtle"
	"encoding/base64"
	"encoding/json"
	"fmt"
	"io"
	"log"
	"net/http"
	"os"
	"time"
)

const hmacHeader = "x-liquid-commerce-hmac-sha256" // Use the correct header name
var webhookSecret string                          // Store the unique secret WE PROVIDED

func init() {
	webhookSecret = os.Getenv("EVENT_BRIDGE_WEBHOOK_SECRET")
	if webhookSecret == "" {
		log.Printf("Warning: EVENT_BRIDGE_WEBHOOK_SECRET environment variable not set. %s verification will fail.", hmacHeader)
		// Consider os.Exit(1) if secret is mandatory for startup
	}
}

func verifySignature(r *http.Request, rawBody []byte) (bool, error) {
	if webhookSecret == "" {
		log.Printf("Error: %s verification skipped: WEBHOOK_SECRET not configured!", hmacHeader)
		return false, fmt.Errorf("webhook secret not configured") // Fail closed
	}

	receivedSignatureB64 := r.Header.Get(hmacHeader)
	if receivedSignatureB64 == "" {
		log.Printf("Warning: Missing %s header", hmacHeader)
		return false, fmt.Errorf("missing signature header")
	}

	mac := hmac.New(sha256.New, []byte(webhookSecret))
	_, err := mac.Write(rawBody) // Use the raw body bytes
	if err != nil {
		log.Printf("Error writing body to HMAC: %v", err)
		return false, fmt.Errorf("HMAC calculation error")
	}
	expectedSignatureBytes := mac.Sum(nil)
	expectedSignatureB64 := base64.StdEncoding.EncodeToString(expectedSignatureBytes)

	// Decode received signature for timing-safe comparison
	receivedSignatureBytes, err := base64.StdEncoding.DecodeString(receivedSignatureB64)
	if err != nil {
		log.Printf("Error decoding received signature: %v", err)
		return false, fmt.Errorf("invalid signature format")
	}

	// Use timing-safe comparison
	if subtle.ConstantTimeCompare(receivedSignatureBytes, expectedSignatureBytes) == 1 {
		log.Println("Signature verified successfully")
		return true, nil
	}

	log.Println("Invalid signature")
	// Log expected vs received for debugging (CAUTION: Sensitive in prod logs)
	// log.Printf("Expected: %s, Received (b64): %s", expectedSignatureB64, receivedSignatureB64)
	return false, fmt.Errorf("invalid signature")
}

func handleEventProcessingAsync(rawBodyBytes []byte) {
	var eventPayload map[string]interface{} // Use a more specific struct in production

	err := json.Unmarshal(rawBodyBytes, &eventPayload)
	if err != nil {
		log.Printf("Failed to parse JSON payload after acknowledgment: %v", err)
		// Add monitoring/alerting here
		return // Stop processing if parsing fails
	}

	referenceID := "N/A"
	updatedAt := "N/A"
	if refID, ok := eventPayload["referenceId"].(string); ok {
		referenceID = refID
	}
	if updAt, ok := eventPayload["updatedAt"].(string); ok {
		updatedAt = updAt
	}

	log.Printf("Parsed payload for referenceId: %s", referenceID)
	// Careful logging full payload in production
	// log.Printf("Full payload: %s", string(rawBodyBytes))

	log.Printf("Processing event referenceId: %s, UpdatedAt: %s", referenceID, updatedAt)

	// --- Implement Idempotency Check Here ---
	// alreadyProcessed, err := checkIfEventProcessed(referenceID, updatedAt)
	// if err != nil {
	//     log.Printf("Error during idempotency check for referenceId %s: %v", referenceID, err)
	//     // Decide how to handle idempotency check errors
	//     return
	// }
	// if alreadyProcessed {
	//     log.Printf("Event referenceId %s already processed or is older. Skipping.", referenceID)
	//     return
	// }

	// --- Your Business Logic Here ---
	// e.g., update your database based on eventPayload details...
	// Simulate work
	time.Sleep(100 * time.Millisecond) // Simulate async work
	log.Printf("Business logic completed for referenceId: %s", referenceID)

	// --- Mark event as processed (for idempotency) ---
	// err = markEventAsProcessed(referenceID, updatedAt)
	// if err != nil {
	//     log.Printf("Error marking referenceId %s as processed: %v", referenceID, err)
	//     // Alerting is crucial here
	// } else {
	//     log.Printf("Successfully marked referenceId %s as processed.", referenceID)
	// }
}

func webhookHandler(w http.ResponseWriter, r *http.Request) {
	log.Printf("Received webhook request headers: %v", r.Header)

	if r.Method != http.MethodPost {
		log.Printf("Invalid method: %s", r.Method)
		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
		return
	}

	// IMPORTANT: Read the raw request body
	// Limit body size to prevent abuse (e.g., 5MB)
	r.Body = http.MaxBytesReader(w, r.Body, 5*1024*1024)
	rawBody, err := io.ReadAll(r.Body)
	if err != nil {
		if err.Error() == "http: request body too large" {
			log.Printf("Request body too large: %v", err)
			http.Error(w, "Request Entity Too Large", http.StatusRequestEntityTooLarge)
		} else {
			log.Printf("Error reading request body: %v", err)
			http.Error(w, "Internal Server Error", http.StatusInternalServerError)
		}
		return
	}
	// It's crucial to close the original body, although ReadAll does effectively consume it.
	// If you were using io.Copy or similar, ensure the original body is closed.
	defer r.Body.Close()


	// --- 1. Verify Signature (CRITICAL) ---
	valid, sigErr := verifySignature(r, rawBody)
	if !valid {
		// Determine appropriate status code based on error
		statusCode := http.StatusInternalServerError
		if sigErr != nil {
			errMsg := sigErr.Error()
			if errMsg == "missing signature header" {
				statusCode = http.StatusBadRequest
			} else if errMsg == "invalid signature format" || errMsg == "invalid signature" {
				statusCode = http.StatusUnauthorized
			} else if errMsg == "webhook secret not configured" {
				statusCode = http.StatusInternalServerError // Or 503 Service Unavailable
			}
		}
		http.Error(w, fmt.Sprintf("%d %s", statusCode, http.StatusText(statusCode)), statusCode)
		return
	}

	// --- 2. Acknowledge Receipt Immediately (Before Parsing/Processing) ---
	// Send a 2xx response quickly.
	w.WriteHeader(http.StatusAccepted)
	fmt.Fprintln(w, "Accepted")

	// --- 3. Start Asynchronous Processing ---
	// Use a goroutine for async background task
	go handleEventProcessingAsync(rawBody)
	log.Println("Event processing initiated in background goroutine.")
}

func main() {
	port := os.Getenv("PORT")
	if port == "" {
		port = "3000"
	}

	http.HandleFunc("/webhook/event-bridge", webhookHandler)

	log.Printf("Webhook receiver listening on port %s", port)
	err := http.ListenAndServe(":"+port, nil)
	if err != nil {
		log.Fatalf("Failed to start server: %v", err)
	}
}
Java (Spring Boot)
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import jakarta.servlet.http.HttpServletRequest; // Use jakarta for Spring Boot 3+
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.Objects;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map; // Or a specific DTO class

@SpringBootApplication
@EnableAsync // Enable asynchronous processing
public class WebhookReceiverApplication {

    private static final Logger log = LoggerFactory.getLogger(WebhookReceiverApplication.class);
    // Store the unique secret WE PROVIDED securely (e.g., via @Value or config server)
    private static final String WEBHOOK_SECRET = System.getenv("EVENT_BRIDGE_WEBHOOK_SECRET");
    private static final String HMAC_HEADER = "x-liquid-commerce-hmac-sha256"; // Use the correct header name
    private static final String HMAC_ALGORITHM = "HmacSHA256";

    public static void main(String[] args) {
        if (WEBHOOK_SECRET == null || WEBHOOK_SECRET.isEmpty()) {
             log.warn("Warning: EVENT_BRIDGE_WEBHOOK_SECRET environment variable not set. {} verification will fail.", HMAC_HEADER);
             // Potentially prevent application startup if secret is mandatory
        }
        SpringApplication.run(WebhookReceiverApplication.class, args);
    }

    @RestController
    public static class WebhookController {

        private final SignatureVerifier signatureVerifier;
        private final AsyncEventProcessor asyncEventProcessor;

        public WebhookController(SignatureVerifier signatureVerifier, AsyncEventProcessor asyncEventProcessor) {
            this.signatureVerifier = signatureVerifier;
            this.asyncEventProcessor = asyncEventProcessor;
        }

        // IMPORTANT: Receive raw body as byte array
        @PostMapping("/webhook/event-bridge")
        public ResponseEntity<String> handleWebhook(@RequestBody byte[] rawBody, HttpServletRequest request) {
            log.info("Received webhook request headers: {}", request.getHeaderNames()); // Log header names or specific needed headers

            String receivedSignature = request.getHeader(HMAC_HEADER);

            // --- 1. Verify Signature (CRITICAL) ---
            if (!signatureVerifier.isValidSignature(rawBody, receivedSignature)) {
                // SignatureVerifier logs details; determine status code based on internal state/error
                if (receivedSignature == null) {
                    return ResponseEntity.status(HttpStatus.BAD_REQUEST).body("Bad Request: Missing signature header");
                } else if (WEBHOOK_SECRET == null || WEBHOOK_SECRET.isEmpty()){
                     return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Internal Server Error: Webhook secret not configured.");
                }
                 else {
                    return ResponseEntity.status(HttpStatus.UNAUTHORIZED).body("Unauthorized: Invalid signature");
                }
            }

            // --- 2. Acknowledge Receipt Immediately (Before Parsing/Processing) ---
            ResponseEntity<String> response = ResponseEntity.status(HttpStatus.ACCEPTED).body("Accepted");

            // --- 3. Start Asynchronous Processing ---
            asyncEventProcessor.handleEventProcessing(rawBody);
            log.info("Event processing initiated asynchronously.");

            return response;
        }
    }

    @Component
    public static class SignatureVerifier {

        public boolean isValidSignature(byte[] rawBody, String receivedSignatureB64) {
             if (WEBHOOK_SECRET == null || WEBHOOK_SECRET.isEmpty()) {
                 log.error("{} verification skipped: WEBHOOK_SECRET not configured!", HMAC_HEADER);
                 // Fail closed if secret is expected but missing
                 return false;
             }
            if (receivedSignatureB64 == null || receivedSignatureB64.isEmpty()) {
                log.warn("Missing {} header", HMAC_HEADER);
                return false;
            }

            try {
                Mac mac = Mac.getInstance(HMAC_ALGORITHM);
                SecretKeySpec secretKeySpec = new SecretKeySpec(WEBHOOK_SECRET.getBytes(StandardCharsets.UTF_8), HMAC_ALGORITHM);
                mac.init(secretKeySpec);
                byte[] expectedSignatureBytes = mac.doFinal(rawBody);
                // No need to Base64 encode expected here, decode received instead for comparison

                byte[] receivedSignatureBytes = Base64.getDecoder().decode(receivedSignatureB64);

                // Use timing-safe comparison
                boolean valid = MessageDigest.isEqual(receivedSignatureBytes, expectedSignatureBytes);
                if (valid) {
                    log.info("Signature verified successfully");
                } else {
                    log.error("Invalid signature");
                    // String expectedSignatureB64 = Base64.getEncoder().encodeToString(expectedSignatureBytes);
                    // log.debug("Expected B64: {}, Received B64: {}", expectedSignatureB64, receivedSignatureB64); // CAUTION in prod
                }
                return valid;

            } catch (NoSuchAlgorithmException | InvalidKeyException | IllegalArgumentException e) {
                log.error("Error during signature verification: {}", e.getMessage(), e);
                return false; // Treat crypto errors as invalid signature
            }
        }
    }

    @Service
    public static class AsyncEventProcessor {
        private final ObjectMapper objectMapper = new ObjectMapper(); // Jackson mapper

        @Async // Make this method run in a background thread pool
        public void handleEventProcessing(byte[] rawBodyBytes) {
             Map<String, Object> eventPayload; // Use a specific DTO class for better type safety
             String referenceId = "N/A";
             String updatedAt = "N/A";

             try {
                 eventPayload = objectMapper.readValue(rawBodyBytes, Map.class);
                 // Extract relevant fields safely
                 if(eventPayload.get("referenceId") instanceof String) {
                    referenceId = (String) eventPayload.get("referenceId");
                 }
                 if(eventPayload.get("updatedAt") instanceof String) {
                     updatedAt = (String) eventPayload.get("updatedAt");
                 }

                 log.info("Parsed payload for referenceId: {}", referenceId);
                 // Be careful logging full payload in production
                 // log.debug("Full payload: {}", new String(rawBodyBytes, StandardCharsets.UTF_8));

             } catch (JsonProcessingException e) {
                 log.error("Failed to parse JSON payload after acknowledgment: {}", e.getMessage(), e);
                 // Add monitoring/alerting here
                 return; // Stop processing if parsing fails
             } catch (Exception e) {
                 log.error("Unexpected error during payload parsing: {}", e.getMessage(), e);
                 return;
             }

             log.info("Processing event referenceId: {}, UpdatedAt: {}", referenceId, updatedAt);

             // --- Implement Idempotency Check Here ---
             // try {
             //     boolean alreadyProcessed = checkIfEventProcessed(referenceId, updatedAt); // Inject service dependency
             //     if (alreadyProcessed) {
             //         log.info("Event referenceId {} already processed or is older. Skipping.", referenceId);
             //         return;
             //     }
             // } catch (Exception e) {
             //     log.error("Error during idempotency check for referenceId {}: {}", referenceId, e.getMessage(), e);
             //     // Decide how to handle idempotency check errors
             //     return;
             // }

            // --- Your Business Logic Here ---
            try {
                // e.g., update your database based on eventPayload details...
                // Simulate work
                Thread.sleep(100); // Simulate async work
                log.info("Business logic completed for referenceId: {}", referenceId);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.error("Business logic interrupted for referenceId {}: {}", referenceId, e.getMessage());
                 return;
            } catch (Exception businessError) {
                log.error("Error during business logic for referenceId {}: {}", referenceId, businessError.getMessage(), businessError);
                // Add monitoring/alerting. Event already acknowledged. May need manual reconciliation.
                 return;
            }

             // --- Mark event as processed (for idempotency) ---
             // try {
             //     markEventAsProcessed(referenceId, updatedAt); // Inject service dependency
             //     log.info("Successfully marked referenceId {} as processed.", referenceId);
             // } catch (Exception e) {
             //     log.error("Error marking referenceId {} as processed: {}", referenceId, e.getMessage(), e);
             //     // Alerting is crucial here
             // }
        }
    }
}
PHP
<?php declare(strict_types=1);

// Basic error logging (use a proper logger like Monolog in production)
error_reporting(E_ALL);
ini_set('log_errors', '1');
ini_set('error_log', '/tmp/php_webhook_errors.log'); // Adjust path

// --- Configuration ---
$webhookSecret = getenv('EVENT_BRIDGE_WEBHOOK_SECRET') ?: ''; // Store the unique secret WE PROVIDED securely
$hmacHeader = 'x-liquid-commerce-hmac-sha256'; // Use the correct header name

// --- Request Handling ---
$requestMethod = $_SERVER['REQUEST_METHOD'] ?? 'GET';
$headers = getallheaders(); // Get all request headers

// Log headers (be careful in production)
// file_put_contents('php://stderr', 'Received headers: ' . json_encode($headers) . PHP_EOL);

if ($requestMethod !== 'POST') {
    error_log('Invalid method: ' . $requestMethod);
    http_response_code(405); // Method Not Allowed
    echo 'Method Not Allowed';
    exit;
}

// --- 1. Verify Signature (CRITICAL) ---
if (empty($webhookSecret)) {
    error_log("{$hmacHeader} verification skipped: WEBHOOK_SECRET not configured!");
    http_response_code(500);
    echo 'Internal Server Error: Webhook secret not configured.';
    exit;
}

$receivedSignatureB64 = $headers[$hmacHeader] ?? '';
if (empty($receivedSignatureB64)) {
    error_log("Missing {$hmacHeader} header");
    http_response_code(400);
    echo "Bad Request: Missing {$hmacHeader} header";
    exit;
}

// IMPORTANT: Read the raw request body
$rawBody = file_get_contents('php://input');
if ($rawBody === false) {
    error_log("Failed to read raw request body");
    http_response_code(500);
    echo 'Internal Server Error: Cannot read request body';
    exit;
}

try {
    $expectedSignature = hash_hmac('sha256', $rawBody, $webhookSecret, true); // true for raw binary output
    $expectedSignatureB64 = base64_encode($expectedSignature);

    // Use timing-safe comparison (hash_equals)
    if (!hash_equals($expectedSignatureB64, $receivedSignatureB64)) {
        error_log('Invalid signature');
        // Debugging (CAUTION in production):
        // error_log("Expected B64: " . $expectedSignatureB64);
        // error_log("Received B64: " . $receivedSignatureB64);
        http_response_code(401);
        echo 'Unauthorized: Invalid signature';
        exit;
    }
    error_log('Signature verified successfully'); // Use info level in proper logger

} catch (\Exception $e) {
    error_log('Error during signature verification: ' . $e->getMessage());
    http_response_code(500);
    echo 'Internal Server Error during signature verification';
    exit;
}

// --- 2. Acknowledge Receipt Immediately (Before Parsing/Processing) ---
// Send a 2xx Accepted response quickly.
http_response_code(202);
echo 'Accepted';

// Flush output buffers to ensure the response is sent before potentially long processing
if (function_exists('fastcgi_finish_request')) {
    // Good practice for PHP-FPM to release connection
    fastcgi_finish_request();
} elseif ('cli' !== PHP_SAPI) {
    // Fallback for other SAPIs (might not work perfectly)
    ob_start(); // Start output buffering if not already started
    header('Connection: close');
    header('Content-Length: ' . ob_get_length());
    ob_end_flush();
    flush();
}

// --- 3. Parse and Process (Ideally Asynchronously) ---
// WARNING: PHP typically processes requests synchronously. Long-running tasks here
// can block new requests depending on the server setup.
// For production, offload this processing to a background job queue (e.g., Redis Queue, Beanstalkd, RabbitMQ).

try {
    $eventPayload = json_decode($rawBody, true, 512, JSON_THROW_ON_ERROR);
    $referenceId = $eventPayload['referenceId'] ?? 'N/A';
    $updatedAt = $eventPayload['updatedAt'] ?? 'N/A';

    error_log("Parsed payload for referenceId: {$referenceId}");
    // Careful logging full payload in production
    // error_log("Full payload: {$rawBody}");

    error_log("Processing event referenceId: {$referenceId}, UpdatedAt: {$updatedAt}");

    // --- Implement Idempotency Check Here ---
    // $alreadyProcessed = checkIfEventProcessed($referenceId, $updatedAt); // Implement this function
    // if ($alreadyProcessed) {
    //     error_log("Event referenceId {$referenceId} already processed or is older. Skipping.");
    //     exit; // Exit script cleanly
    // }

    // --- Your Business Logic Here ---
    // e.g., update your database based on eventPayload details...
    // Simulate work
    sleep(1); // Simulate work (DON'T DO THIS IN PRODUCTION REQUEST HANDLER)
    error_log("Business logic completed for referenceId: {$referenceId}");


    // --- Mark event as processed (for idempotency) ---
    // markEventAsProcessed($referenceId, $updatedAt); // Implement this function
    // error_log("Successfully marked referenceId {$referenceId} as processed.");


} catch (\JsonException $e) {
    error_log('Failed to parse JSON payload after acknowledgment: ' . $e->getMessage());
    // Add monitoring/alerting here
    exit; // Stop processing if parsing fails
} catch (\Throwable $e) { // Catch any other errors during processing
     error_log("Error during business logic/idempotency for referenceId {$referenceId}: {$e->getMessage()} " . $e->getTraceAsString());
     // Add monitoring/alerting. Event already acknowledged. May need manual reconciliation.
     exit;
}

error_log("Successfully processed event referenceId: {$referenceId} (end of script)"); // Should match logged success above

// Helper function if not using Apache/Nginx specific ways
if (!function_exists('getallheaders')) {
    function getallheaders(): array {
        $headers = [];
        foreach ($_SERVER as $name => $value) {
            if (str_starts_with($name, 'HTTP_')) {
                $headerName = str_replace(' ', '-', ucwords(strtolower(str_replace('_', ' ', substr($name, 5)))));
                $headers[$headerName] = $value;
            } elseif (in_array($name, ['CONTENT_TYPE', 'CONTENT_LENGTH'])) {
                 $headerName = str_replace(' ', '-', ucwords(strtolower(str_replace('_', ' ', $name))));
                 $headers[$headerName] = $value;
            }
        }
        return $headers;
    }
}

?>

Support

If you encounter issues during integration or have questions about the webhook format or behavior, please contact [email protected].

Last updated