在 PHP 中集成 Confluent Schema Registry 可通过以下步骤实现:
1. 安装依赖
使用 Composer 安装核心库:
composer require guzzlehttp/guzzle # HTTP 客户端
composer require flix-tech/avro-php # Avro 序列化
2. Schema Registry 客户端封装
use GuzzleHttp\Client;
class SchemaRegistryClient {
private $client;
private $baseUrl;
public function __construct(string $baseUrl) {
$this->baseUrl = $baseUrl;
$this->client = new Client();
}
// 获取 Schema ID
public function getSchemaId(string $subject, string $schema): int {
$response = $this->client->post("{$this->baseUrl}/subjects/{$subject}/versions", [
'json' => ['schema' => $schema]
]);
$data = json_decode($response->getBody(), true);
return $data['id'];
}
// 获取 Schema 定义
public function getSchema(int $schemaId): string {
$response = $this->client->get("{$this->baseUrl}/schemas/ids/{$schemaId}");
$data = json_decode($response->getBody(), true);
return $data['schema'];
}
}
3. Avro 序列化/反序列化
use FlixTech\Avro\Serializer\AvroSerializer;
use FlixTech\Avro\Serializer\Objects\RecordSerializer;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
// 初始化
$schemaRegistry = new BlockingRegistry(
new CachedRegistry(new SchemaRegistryClient('http://schema-registry:8081'))
);
$recordSerializer = new RecordSerializer($schemaRegistry);
4. 生产消息
// 定义 Avro Schema
$schema = '{
"type": "record",
"name": "User",
"fields": [{"name": "name", "type": "string"}]
}';
// 注册 Schema 并获取 ID
$schemaId = $schemaRegistry->getSchemaId('user-value', $schema);
// 序列化数据
$data = ['name' => 'Alice'];
$avroMessage = $recordSerializer->encodeRecord('user-value', $schema, $data);
// 发送到 Kafka(需配合 Kafka 客户端)
$producer->produce(TopicPartition::TOPIC_USER, null, $avroMessage);
5. 消费消息
// 接收 Kafka 消息
$message = $consumer->consume();
// 反序列化
$decodedData = $recordSerializer->decodeMessage($message->payload);
关键注意事项:
- Wire Format:Confluent 要求消息包含
0x00
魔法字节 + Schema ID(4字节) - 缓存策略:使用
CachedRegistry
减少 Schema Registry 请求 - 兼容性检查:更新 Schema 时需验证兼容性(可通过 Schema Registry API 实现)
如果需要完整代码示例或特定场景的解决方案,请进一步说明需求!
(www.nzw6.com)